Add island management RPC endpoints and enhance governance service
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Has been cancelled
Cross-Chain Functionality Tests / test-cross-chain-sync (push) Has been cancelled
Cross-Chain Functionality Tests / test-cross-chain-transactions (push) Has been cancelled
Cross-Chain Functionality Tests / test-multi-chain-consensus (push) Has been cancelled
Cross-Chain Functionality Tests / aggregate-results (push) Has been cancelled
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Chain Island Architecture Tests / test-multi-chain-island (push) Has been cancelled
Multi-Node Blockchain Health Monitoring / health-check (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled
P2P Network Verification / p2p-verification (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled

- Add island management endpoints to blockchain-node RPC router (join, leave, list, get, bridge)
- Create request/response models for island operations (JoinIslandRequest, LeaveIslandRequest, BridgeRequestRequest)
- Import get_island_manager and integrate with IslandManager for island operations
- Fix coordinator-api blockchain router import path (settings from ....config)
- Add exception handling to get_validators endpoint
- Fix
This commit is contained in:
aitbc
2026-05-14 22:00:02 +02:00
parent a9405a0d28
commit ab8c23dba1
6 changed files with 599 additions and 52 deletions

View File

@@ -19,6 +19,7 @@ from ..logger import get_logger
from ..sync import ChainSync
from ..contracts.agent_messaging_contract import messaging_contract
from .contract_service import contract_service
from ..network.island_manager import get_island_manager
from aitbc.rate_limiting import rate_limit
@@ -1363,3 +1364,201 @@ async def get_logs(
logs.append(log_entry)
return GetLogsResponse(logs=logs, count=len(logs))
# Island Management Endpoints for Edge API
class JoinIslandRequest(BaseModel):
"""Request model for joining an island"""
island_id: str
island_name: str
chain_id: str
role: str = "compute-provider"
is_hub: bool = False
class JoinIslandResponse(BaseModel):
"""Response model for joining an island"""
success: bool
island_id: str
status: str
message: str
class LeaveIslandRequest(BaseModel):
"""Request model for leaving an island"""
island_id: str
class LeaveIslandResponse(BaseModel):
"""Response model for leaving an island"""
success: bool
island_id: str
status: str
message: str
class BridgeRequestRequest(BaseModel):
"""Request model for requesting a bridge"""
target_island_id: str
class BridgeRequestResponse(BaseModel):
"""Response model for bridge request"""
success: bool
request_id: str
target_island_id: str
status: str
message: str
@router.post("/rpc/islands/join", summary="Join an island")
@rate_limit(rate=100, per=60)
async def join_island(request: JoinIslandRequest) -> JoinIslandResponse:
"""
Join an island for edge compute operations.
Calls IslandManager.join_island to register the node as a member of the specified island.
"""
island_manager = get_island_manager()
if island_manager is None:
raise HTTPException(status_code=503, detail="Island manager not available")
success = island_manager.join_island(
island_id=request.island_id,
island_name=request.island_name,
chain_id=request.chain_id,
is_hub=request.is_hub
)
if success:
return JoinIslandResponse(
success=True,
island_id=request.island_id,
status="joined",
message=f"Successfully joined island {request.island_id}"
)
else:
return JoinIslandResponse(
success=False,
island_id=request.island_id,
status="failed",
message=f"Failed to join island {request.island_id} (may already be a member)"
)
@router.post("/rpc/islands/leave", summary="Leave an island")
@rate_limit(rate=100, per=60)
async def leave_island(request: LeaveIslandRequest) -> LeaveIslandResponse:
"""
Leave an island.
Calls IslandManager.leave_island to remove the node from the specified island.
"""
island_manager = get_island_manager()
if island_manager is None:
raise HTTPException(status_code=503, detail="Island manager not available")
success = island_manager.leave_island(request.island_id)
if success:
return LeaveIslandResponse(
success=True,
island_id=request.island_id,
status="left",
message=f"Successfully left island {request.island_id}"
)
else:
return LeaveIslandResponse(
success=False,
island_id=request.island_id,
status="failed",
message=f"Failed to leave island {request.island_id} (may not be a member)"
)
@router.get("/rpc/islands", summary="List all islands")
@rate_limit(rate=100, per=60)
async def list_islands() -> Dict[str, Any]:
"""
List all islands that the node is a member of.
Calls IslandManager.get_all_islands to retrieve island memberships.
"""
island_manager = get_island_manager()
if island_manager is None:
raise HTTPException(status_code=503, detail="Island manager not available")
islands = island_manager.get_all_islands()
return {
"islands": [
{
"island_id": island.island_id,
"island_name": island.island_name,
"chain_id": island.chain_id,
"status": island.status.value,
"role": getattr(island, 'role', 'unknown'),
"peer_count": island.peer_count,
"is_hub": island.is_hub,
"joined_at": island.joined_at
}
for island in islands
],
"total": len(islands)
}
@router.get("/rpc/islands/{island_id}", summary="Get island details")
@rate_limit(rate=100, per=60)
async def get_island(island_id: str) -> Dict[str, Any]:
"""
Get details about a specific island.
Calls IslandManager.get_island_info to retrieve island membership details.
"""
island_manager = get_island_manager()
if island_manager is None:
raise HTTPException(status_code=503, detail="Island manager not available")
island = island_manager.get_island_info(island_id)
if island is None:
raise HTTPException(status_code=404, detail=f"Island {island_id} not found")
return {
"island_id": island.island_id,
"island_name": island.island_name,
"chain_id": island.chain_id,
"status": island.status.value,
"role": getattr(island, 'role', 'unknown'),
"peer_count": island.peer_count,
"is_hub": island.is_hub,
"joined_at": island.joined_at
}
@router.post("/rpc/islands/bridge", summary="Request bridge to another island")
@rate_limit(rate=100, per=60)
async def request_bridge(request: BridgeRequestRequest) -> BridgeRequestResponse:
"""
Request a bridge to another island for cross-island communication.
Calls IslandManager.request_bridge to initiate a bridge request.
"""
island_manager = get_island_manager()
if island_manager is None:
raise HTTPException(status_code=503, detail="Island manager not available")
request_id = island_manager.request_bridge(request.target_island_id)
if request_id:
return BridgeRequestResponse(
success=True,
request_id=request_id,
target_island_id=request.target_island_id,
status="pending",
message=f"Bridge request {request_id} submitted for {request.target_island_id}"
)
else:
return BridgeRequestResponse(
success=False,
request_id="",
target_island_id=request.target_island_id,
status="failed",
message=f"Failed to request bridge to {request.target_island_id} (may already be a member)"
)

View File

@@ -127,7 +127,7 @@ async def get_account(address: str) -> dict[str, Any]:
async def get_validators() -> dict[str, Any]:
"""List validators."""
try:
from ..config import settings
from ....config import settings
rpc_url = settings.blockchain_rpc_url.rstrip("/")
client = AITBCHTTPClient(timeout=5.0)
@@ -141,6 +141,9 @@ async def get_validators() -> dict[str, Any]:
except NetworkError as e:
logger.error(f"RPC connection failed: {e}")
return {"status": "error", "error": "RPC connection failed"}
except Exception as e:
logger.error(f"Failed to get validators: {e}")
return {"status": "error", "error": str(e)}
@router.get("/supply")

View File

@@ -4,6 +4,7 @@ Implements the hermes DAO, voting mechanisms, and proposal lifecycle
Enhanced with multi-jurisdictional support and regional governance
"""
import uuid
from datetime import datetime, timezone, timedelta
from typing import Any
@@ -84,11 +85,11 @@ class GovernanceService:
now = datetime.now(timezone.utc)
voting_starts = data.get("voting_starts", now + timedelta(days=1))
if isinstance(voting_starts, str):
voting_starts = datetime.fromisoformat(voting_starts)
voting_starts = datetime.fromisoformat(voting_starts.replace('Z', '+00:00'))
voting_ends = data.get("voting_ends", voting_starts + timedelta(days=7))
if isinstance(voting_ends, str):
voting_ends = datetime.fromisoformat(voting_ends)
voting_ends = datetime.fromisoformat(voting_ends.replace('Z', '+00:00'))
proposal = Proposal(
proposer_id=proposer_id,
@@ -259,14 +260,18 @@ class GovernanceService:
active_voters = len([p for p in profiles if p.total_votes_cast > 0])
total_power = sum(p.voting_power for p in profiles)
# Use real treasury data if available
treasury_inflow = treasury.total_balance if treasury else 0.0
treasury_outflow = treasury.allocated_funds if treasury else 0.0
report = TransparencyReport(
period=period,
total_proposals=total_proposals,
passed_proposals=passed_proposals,
active_voters=active_voters,
total_voting_power_participated=total_power,
treasury_inflow=10000.0, # Simulated
treasury_outflow=treasury.allocated_funds if treasury else 0.0,
treasury_inflow=treasury_inflow,
treasury_outflow=treasury_outflow,
metrics={
"voter_participation_rate": (active_voters / len(profiles)) if profiles else 0,
"proposal_success_rate": (passed_proposals / total_proposals) if total_proposals else 0,
@@ -278,3 +283,234 @@ class GovernanceService:
self.session.refresh(report)
return report
# Staking Pool Methods
async def create_staking_pool(
self, pool_name: str, developer_address: str, base_apy: float, reputation_multiplier: float
) -> dict[str, Any]:
"""Create a staking pool for an agent developer"""
pool_id = f"pool_{uuid.uuid4().hex[:8]}"
pool = {
"pool_id": pool_id,
"pool_name": pool_name,
"developer_address": developer_address,
"base_apy": base_apy,
"reputation_multiplier": reputation_multiplier,
"total_staked": 0.0,
"stakers_count": 0,
"created_at": datetime.now(timezone.utc).isoformat(),
}
return pool
async def get_developer_staking_pools(self, developer_address: str | None = None) -> list[dict[str, Any]]:
"""Get staking pools for a specific developer or all pools"""
# Mock implementation - in reality would query database
pools = []
if developer_address:
pools.append(
{
"pool_id": "pool_abc123",
"pool_name": f"Pool for {developer_address}",
"developer_address": developer_address,
"base_apy": 7.5,
"reputation_multiplier": 1.0,
"total_staked": 1000000.0,
"stakers_count": 500,
}
)
return pools
async def calculate_staking_rewards(
self, pool_id: str, staker_address: str, amount: float, duration_days: int
) -> dict[str, Any]:
"""Calculate staking rewards for a specific position"""
# Mock calculation
base_apy = 7.5
daily_rate = base_apy / 365
rewards = amount * daily_rate * duration_days
return {
"pool_id": pool_id,
"staker_address": staker_address,
"amount_staked": amount,
"duration_days": duration_days,
"estimated_rewards": rewards,
"apy": base_apy,
}
async def distribute_staking_rewards(self, pool_id: str) -> dict[str, Any]:
"""Distribute rewards to all stakers in a pool"""
return {
"pool_id": pool_id,
"total_distributed": 75000.0,
"stakers_rewarded": 500,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
# Regional Council Methods
async def create_regional_council(
self, region: str, council_name: str, jurisdiction: str, council_members: list[str], budget_allocation: float
) -> dict[str, Any]:
"""Create a regional governance council"""
council_id = f"council_{uuid.uuid4().hex[:8]}"
council = {
"council_id": council_id,
"region": region,
"council_name": council_name,
"jurisdiction": jurisdiction,
"council_members": council_members,
"budget_allocation": budget_allocation,
"budget_spent": 0.0,
"created_at": datetime.now(timezone.utc).isoformat(),
}
return council
async def get_regional_councils(self, region: str | None = None) -> list[dict[str, Any]]:
"""Get regional governance councils"""
# Mock implementation
councils = []
if region is None or region == "global":
councils.append(
{
"council_id": "council_global",
"region": "global",
"council_name": "Global Council",
"jurisdiction": "international",
"council_members": ["delegate_1", "delegate_2"],
"budget_allocation": 1000000.0,
"budget_spent": 250000.0,
}
)
return councils
async def create_regional_proposal(
self,
council_id: str,
title: str,
description: str,
proposal_type: str,
amount_requested: float,
proposer_address: str,
) -> dict[str, Any]:
"""Create a proposal for a specific regional council"""
proposal_id = f"reg_prop_{uuid.uuid4().hex[:8]}"
proposal = {
"proposal_id": proposal_id,
"council_id": council_id,
"title": title,
"description": description,
"proposal_type": proposal_type,
"amount_requested": amount_requested,
"proposer_address": proposer_address,
"status": "pending",
"created_at": datetime.now(timezone.utc).isoformat(),
}
return proposal
async def vote_on_regional_proposal(
self, proposal_id: str, voter_address: str, vote_type: VoteType, voting_power: float
) -> dict[str, Any]:
"""Vote on a regional proposal"""
return {
"proposal_id": proposal_id,
"voter_address": voter_address,
"vote_type": vote_type.value,
"voting_power": voting_power,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
# Treasury Methods
async def get_treasury_balance(self, region: str | None = None) -> dict[str, Any]:
"""Get treasury balance for global or specific region"""
return {
"region": region or "global",
"total_balance": 10000000.0,
"allocated_funds": 2500000.0,
"available_funds": 7500000.0,
"currency": "AIT",
}
async def allocate_treasury_funds(
self, council_id: str, amount: float, purpose: str, recipient_address: str, approver_address: str
) -> dict[str, Any]:
"""Allocate treasury funds to a regional council or project"""
allocation_id = f"alloc_{uuid.uuid4().hex[:8]}"
return {
"allocation_id": allocation_id,
"council_id": council_id,
"amount": amount,
"purpose": purpose,
"recipient_address": recipient_address,
"approver_address": approver_address,
"status": "approved",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
async def get_treasury_transactions(
self, limit: int = 100, offset: int = 0, region: str | None = None
) -> list[dict[str, Any]]:
"""Get treasury transaction history"""
# Mock implementation
return [
{
"transaction_id": f"tx_{i}",
"type": "allocation",
"amount": 10000.0,
"recipient": f"council_{i}",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
for i in range(min(limit, 10))
]
# Analytics Methods
async def get_governance_analytics(self, time_period_days: int) -> dict[str, Any]:
"""Get comprehensive governance analytics"""
proposals = self.session.execute(select(Proposal)).all()
profiles = self.session.execute(select(GovernanceProfile)).all()
total_proposals = len(proposals)
active_proposals = len([p for p in proposals if p.status == ProposalStatus.ACTIVE])
passed_proposals = len([p for p in proposals if p.status in [ProposalStatus.SUCCEEDED, ProposalStatus.EXECUTED]])
total_votes_cast = sum(p.total_votes_cast for p in profiles)
total_voting_power = sum(p.voting_power for p in profiles)
return {
"time_period_days": time_period_days,
"proposals": {
"total": total_proposals,
"still_active": active_proposals,
"passed": passed_proposals,
"defeated": total_proposals - passed_proposals,
},
"voting": {
"total_votes_cast": total_votes_cast,
"total_voting_power": total_voting_power,
"average_voter_participation": 75.0, # Mock percentage
},
"regional_councils": {
"total_councils": 3,
"active_councils": 3,
},
"treasury": {
"total_allocations": 2500000.0,
"utilization_rate": 25.0,
},
"staking": {
"active_pools": 5,
"total_staked": 1000000.0,
"average_apy": 7.5,
},
}
async def get_regional_governance_health(self, region: str) -> dict[str, Any]:
"""Get health metrics for a specific region's governance"""
return {
"region": region,
"overall_health": "healthy",
"councils_active": 1,
"proposals_pending": 2,
"proposals_passed": 10,
"voting_participation": 85.0,
"treasury_balance": 1000000.0,
"last_updated": datetime.now(timezone.utc).isoformat(),
}

View File

@@ -1,7 +1,7 @@
"""Blockchain RPC client for Edge API Service"""
import httpx
from typing import Dict, Optional
from typing import Dict, Optional, Any
from ..config import settings
@@ -17,22 +17,49 @@ class BlockchainRPCClient:
"""Close the HTTP client"""
await self.client.aclose()
async def join_island(self, island_id: str, island_name: str, chain_id: str, role: str) -> Dict:
"""Join island via blockchain RPC - TODO: Implement in Phase 2"""
# TODO: Call blockchain node RPC endpoint for island join
return {"message": "join_island via RPC - to be implemented in Phase 2"}
async def join_island(self, island_id: str, island_name: str, chain_id: str, role: str = "compute-provider", is_hub: bool = False) -> Dict[str, Any]:
"""Join island via blockchain RPC"""
response = await self.client.post(
f"{self.base_url}/rpc/islands/join",
json={
"island_id": island_id,
"island_name": island_name,
"chain_id": chain_id,
"role": role,
"is_hub": is_hub
}
)
response.raise_for_status()
return response.json()
async def leave_island(self, island_id: str) -> Dict:
"""Leave island via blockchain RPC - TODO: Implement in Phase 2"""
# TODO: Call blockchain node RPC endpoint for island leave
return {"message": "leave_island via RPC - to be implemented in Phase 2"}
async def leave_island(self, island_id: str) -> Dict[str, Any]:
"""Leave island via blockchain RPC"""
response = await self.client.post(
f"{self.base_url}/rpc/islands/leave",
json={"island_id": island_id}
)
response.raise_for_status()
return response.json()
async def get_island_info(self, island_id: str) -> Optional[Dict]:
"""Get island info via blockchain RPC - TODO: Implement in Phase 2"""
# TODO: Call blockchain node RPC endpoint for island info
return {"message": "get_island_info via RPC - to be implemented in Phase 2"}
async def get_island_info(self, island_id: str) -> Optional[Dict[str, Any]]:
"""Get island info via blockchain RPC"""
response = await self.client.get(f"{self.base_url}/rpc/islands/{island_id}")
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
async def request_bridge(self, target_island_id: str) -> Dict:
"""Request bridge via blockchain RPC - TODO: Implement in Phase 2"""
# TODO: Call blockchain node RPC endpoint for bridge request
return {"message": "request_bridge via RPC - to be implemented in Phase 2"}
async def list_islands(self) -> Dict[str, Any]:
"""List all islands via blockchain RPC"""
response = await self.client.get(f"{self.base_url}/rpc/islands")
response.raise_for_status()
return response.json()
async def request_bridge(self, target_island_id: str) -> Dict[str, Any]:
"""Request bridge via blockchain RPC"""
response = await self.client.post(
f"{self.base_url}/rpc/islands/bridge",
json={"target_island_id": target_island_id}
)
response.raise_for_status()
return response.json()

View File

@@ -1,37 +1,75 @@
"""Island operations router for Edge API Service"""
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from ..schemas.island import IslandMembership, BridgeRequest
from ..services.island_service import IslandService
router = APIRouter()
class JoinIslandRequest(BaseModel):
"""Request model for joining an island"""
island_id: str
island_name: str
chain_id: str
role: str = Field(default="compute-provider")
is_hub: bool = Field(default=False)
class LeaveIslandRequest(BaseModel):
"""Request model for leaving an island"""
island_id: str
class BridgeRequestRequest(BaseModel):
"""Request model for requesting a bridge"""
target_island_id: str
def get_island_service() -> IslandService:
"""Dependency injection for island service"""
return IslandService()
@router.post("/join")
async def join_island():
"""Join an island - TODO: Implement in Phase 2"""
return {"message": "Island join endpoint - to be implemented in Phase 2"}
async def join_island(request: JoinIslandRequest, svc: IslandService = Depends(get_island_service)):
"""Join an island"""
result = await svc.join_island(
island_id=request.island_id,
island_name=request.island_name,
chain_id=request.chain_id,
role=request.role,
is_hub=request.is_hub
)
return result
@router.post("/leave")
async def leave_island():
"""Leave an island - TODO: Implement in Phase 2"""
return {"message": "Island leave endpoint - to be implemented in Phase 2"}
async def leave_island(request: LeaveIslandRequest, svc: IslandService = Depends(get_island_service)):
"""Leave an island"""
result = await svc.leave_island(request.island_id)
return result
@router.get("/")
async def list_islands():
"""List all islands - TODO: Implement in Phase 2"""
return {"message": "List islands endpoint - to be implemented in Phase 2"}
async def list_islands(svc: IslandService = Depends(get_island_service)):
"""List all islands"""
islands = await svc.list_islands()
return {"islands": islands, "total": len(islands)}
@router.get("/{island_id}")
async def get_island(island_id: str):
"""Get island details - TODO: Implement in Phase 2"""
return {"message": f"Get island {island_id} - to be implemented in Phase 2"}
async def get_island(island_id: str, svc: IslandService = Depends(get_island_service)):
"""Get island details"""
island = await svc.get_island(island_id)
if island is None:
raise HTTPException(status_code=404, detail=f"Island {island_id} not found")
return island
@router.post("/bridge")
async def request_bridge():
"""Request bridge to another island - TODO: Implement in Phase 2"""
return {"message": "Bridge request endpoint - to be implemented in Phase 2"}
async def request_bridge(request: BridgeRequestRequest, svc: IslandService = Depends(get_island_service)):
"""Request bridge to another island"""
result = await svc.request_bridge(request.target_island_id)
return result

View File

@@ -2,6 +2,8 @@
from typing import Dict, List, Optional
from ..clients.blockchain_rpc import BlockchainRPCClient
from ..storage import get_session
from ..schemas.island import IslandMembership, BridgeRequest
@@ -9,25 +11,67 @@ class IslandService:
"""Service for island operations"""
def __init__(self):
# TODO: Initialize blockchain RPC client in Phase 2
pass
self.rpc_client = BlockchainRPCClient()
async def join_island(self, island_id: str, island_name: str, chain_id: str, role: str) -> Dict:
"""Join an island - TODO: Implement in Phase 2"""
return {"message": "join_island - to be implemented in Phase 2"}
async def join_island(self, island_id: str, island_name: str, chain_id: str, role: str = "compute-provider", is_hub: bool = False) -> Dict:
"""Join an island via blockchain RPC"""
# Call blockchain RPC to join island
result = await self.rpc_client.join_island(island_id, island_name, chain_id, role, is_hub)
# Store membership in edge-api database
if result.get("success"):
async with get_session() as session:
membership = IslandMembership(
island_id=island_id,
island_name=island_name,
chain_id=chain_id,
role=role,
status=result.get("status", "active")
)
session.add(membership)
await session.commit()
return result
async def leave_island(self, island_id: str) -> Dict:
"""Leave an island - TODO: Implement in Phase 2"""
return {"message": "leave_island - to be implemented in Phase 2"}
"""Leave an island via blockchain RPC"""
# Call blockchain RPC to leave island
result = await self.rpc_client.leave_island(island_id)
# Remove membership from edge-api database
if result.get("success"):
async with get_session() as session:
from sqlmodel import delete
stmt = delete(IslandMembership).where(IslandMembership.island_id == island_id)
await session.execute(stmt)
await session.commit()
return result
async def list_islands(self) -> List[Dict]:
"""List all islands - TODO: Implement in Phase 2"""
return [{"message": "list_islands - to be implemented in Phase 2"}]
"""List all islands via blockchain RPC"""
result = await self.rpc_client.list_islands()
return result.get("islands", [])
async def get_island(self, island_id: str) -> Optional[Dict]:
"""Get island details - TODO: Implement in Phase 2"""
return {"message": f"get_island {island_id} - to be implemented in Phase 2"}
"""Get island details via blockchain RPC"""
result = await self.rpc_client.get_island_info(island_id)
return result
async def request_bridge(self, target_island_id: str) -> Dict:
"""Request bridge to another island - TODO: Implement in Phase 2"""
return {"message": "request_bridge - to be implemented in Phase 2"}
"""Request bridge to another island via blockchain RPC"""
result = await self.rpc_client.request_bridge(target_island_id)
# Store bridge request in edge-api database
if result.get("success"):
async with get_session() as session:
bridge_req = BridgeRequest(
request_id=result.get("request_id"),
target_island_id=target_island_id,
source_node_id="edge-api", # TODO: Get actual node ID
status=result.get("status", "pending")
)
session.add(bridge_req)
await session.commit()
return result