Files
aitbc/apps/aitbc-edge/src/aitbc_edge/clients/blockchain_rpc.py
aitbc bd08848e0d
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Documentation Validation / validate-docs (push) Has been cancelled
Documentation Validation / validate-policies-strict (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Integration Tests / test-service-integration (push) Failing after 3s
Python Tests / test-python (push) Failing after 32s
Security Scanning / security-scan (push) Successful in 32s
refactor: rename packages to shorter names
- Renamed aitbc-ai-service to aitbc-ai
- Renamed aitbc-edge-api to aitbc-edge
- Updated pyproject.toml files with new package names
- Renamed directories and package modules
- Updated references in documentation and scripts
- Updated systemd service references
2026-05-20 08:30:51 +02:00

66 lines
2.3 KiB
Python

"""Blockchain RPC client for Edge API Service"""
import httpx
from typing import Dict, Optional, Any
from ..config import settings
class BlockchainRPCClient:
"""Client for blockchain node RPC communication"""
def __init__(self):
self.base_url = f"http://{settings.blockchain_rpc_host}:{settings.blockchain_rpc_port}"
self.client = httpx.AsyncClient(timeout=30.0)
async def close(self):
"""Close the HTTP client"""
await self.client.aclose()
async def join_island(self, island_id: str, island_name: str, chain_id: str, role: str = "compute-provider", is_hub: bool = False) -> Dict[str, Any]:
"""Join island via blockchain RPC"""
response = await self.client.post(
f"{self.base_url}/rpc/islands/join",
json={
"island_id": island_id,
"island_name": island_name,
"chain_id": chain_id,
"role": role,
"is_hub": is_hub
}
)
response.raise_for_status()
return response.json()
async def leave_island(self, island_id: str) -> Dict[str, Any]:
"""Leave island via blockchain RPC"""
response = await self.client.post(
f"{self.base_url}/rpc/islands/leave",
json={"island_id": island_id}
)
response.raise_for_status()
return response.json()
async def get_island_info(self, island_id: str) -> Optional[Dict[str, Any]]:
"""Get island info via blockchain RPC"""
response = await self.client.get(f"{self.base_url}/rpc/islands/{island_id}")
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
async def list_islands(self) -> Dict[str, Any]:
"""List all islands via blockchain RPC"""
response = await self.client.get(f"{self.base_url}/rpc/islands")
response.raise_for_status()
return response.json()
async def request_bridge(self, target_island_id: str) -> Dict[str, Any]:
"""Request bridge via blockchain RPC"""
response = await self.client.post(
f"{self.base_url}/rpc/islands/bridge",
json={"target_island_id": target_island_id}
)
response.raise_for_status()
return response.json()