Merge gitea/main, preserving security fixes and current dependency versions

This commit is contained in:
AITBC System
2026-03-24 16:18:25 +01:00
252 changed files with 18525 additions and 1029 deletions

View File

@@ -30,9 +30,7 @@ def create_genesis_accounts(session, accounts: List[Dict[str, Any]], chain_id: s
db_account = Account(
address=account['address'],
balance=int(account['balance']),
chain_id=chain_id,
account_type=account.get('type', 'regular'),
metadata=json.dumps(account.get('metadata', {}))
chain_id=chain_id
)
session.add(db_account)
print(f" ✅ Created account: {account['address']} ({account['balance']} AITBC)")
@@ -45,18 +43,10 @@ def create_genesis_contracts(session, contracts: List[Dict[str, Any]], chain_id:
# Create contract deployment transaction
deployment_tx = Transaction(
chain_id=chain_id,
tx_hash=f"0x{hashlib.sha256(f'contract_{contract["name"]}_{chain_id}'.encode()).hexdigest()}",
tx_hash=f"0x{hashlib.sha256(f'contract_{contract['name']}_{chain_id}'.encode()).hexdigest()}",
sender="aitbc1genesis",
receiver=contract['address'],
amount=0,
gas_used=210000,
gas_price=1000000000,
tx_type="contract_deployment",
metadata=json.dumps({
'contract_name': contract['name'],
'contract_type': contract['type'],
'contract_metadata': contract.get('metadata', {})
})
recipient=contract['address'],
payload={"type": "contract_deployment", "contract_name": contract['name'], "code": contract.get('code', '0x')}
)
session.add(deployment_tx)
print(f" ✅ Deployed contract: {contract['name']} at {contract['address']}")
@@ -154,7 +144,7 @@ def create_enhanced_genesis(config_path: str = None):
tx_count=0,
state_root=None,
chain_id=chain_id,
metadata=json.dumps({
block_metadata=json.dumps({
'chain_type': genesis['chain_type'],
'purpose': genesis['purpose'],
'gas_limit': genesis['gas_limit'],

View File

@@ -0,0 +1,14 @@
from aitbc_chain.database import session_scope, init_db
from aitbc_chain.models import Account
from datetime import datetime
def fix():
init_db()
with session_scope() as session:
acc = Account(chain_id="ait-mainnet", address="aitbc1genesis", balance=10000000, nonce=0, updated_at=datetime.utcnow(), account_type="regular", metadata="{}")
session.merge(acc)
session.commit()
print("Added aitbc1genesis to mainnet")
if __name__ == "__main__":
fix()

View File

@@ -0,0 +1,27 @@
import sqlite3
def fix():
try:
conn = sqlite3.connect('/opt/aitbc/data/ait-mainnet/chain.db')
cur = conn.cursor()
cur.execute('PRAGMA table_info("block")')
columns = [col[1] for col in cur.fetchall()]
if 'metadata' in columns:
print("Renaming metadata column to block_metadata...")
cur.execute('ALTER TABLE "block" RENAME COLUMN metadata TO block_metadata')
conn.commit()
elif 'block_metadata' not in columns:
print("Adding block_metadata column...")
cur.execute('ALTER TABLE "block" ADD COLUMN block_metadata TEXT')
conn.commit()
else:
print("block_metadata column already exists.")
conn.close()
except Exception as e:
print(f"Error modifying database: {e}")
if __name__ == "__main__":
fix()

View File

@@ -0,0 +1,39 @@
import sqlite3
def fix():
try:
conn = sqlite3.connect('/opt/aitbc/data/chain.db')
cur = conn.cursor()
cur.execute('PRAGMA table_info("block")')
columns = [col[1] for col in cur.fetchall()]
if 'metadata' in columns:
print("Renaming metadata column to block_metadata in default db...")
cur.execute('ALTER TABLE "block" RENAME COLUMN metadata TO block_metadata')
conn.commit()
elif 'block_metadata' not in columns:
print("Adding block_metadata column to default db...")
cur.execute('ALTER TABLE "block" ADD COLUMN block_metadata TEXT')
conn.commit()
else:
print("block_metadata column already exists in default db.")
cur.execute('PRAGMA table_info("transaction")')
columns = [col[1] for col in cur.fetchall()]
if 'metadata' in columns:
print("Renaming metadata column to tx_metadata in default db...")
cur.execute('ALTER TABLE "transaction" RENAME COLUMN metadata TO tx_metadata')
conn.commit()
elif 'tx_metadata' not in columns:
print("Adding tx_metadata column to default db...")
cur.execute('ALTER TABLE "transaction" ADD COLUMN tx_metadata TEXT')
conn.commit()
conn.close()
except Exception as e:
print(f"Error modifying database: {e}")
if __name__ == "__main__":
fix()

View File

@@ -0,0 +1,40 @@
from aitbc_chain.database import engine, init_db
from sqlalchemy import text
def fix():
init_db()
with engine.connect() as conn:
try:
conn.execute(text('ALTER TABLE "transaction" ADD COLUMN metadata TEXT'))
print("Added metadata")
except Exception as e:
pass
try:
conn.execute(text('ALTER TABLE "transaction" ADD COLUMN value INTEGER DEFAULT 0'))
print("Added value")
except Exception as e:
pass
try:
conn.execute(text('ALTER TABLE "transaction" ADD COLUMN fee INTEGER DEFAULT 0'))
print("Added fee")
except Exception as e:
pass
try:
conn.execute(text('ALTER TABLE "transaction" ADD COLUMN nonce INTEGER DEFAULT 0'))
print("Added nonce")
except Exception as e:
pass
try:
conn.execute(text('ALTER TABLE "transaction" ADD COLUMN status TEXT DEFAULT "pending"'))
print("Added status")
except Exception as e:
pass
try:
conn.execute(text('ALTER TABLE "transaction" ADD COLUMN timestamp TEXT'))
print("Added timestamp")
except Exception as e:
pass
conn.commit()
if __name__ == "__main__":
fix()

View File

@@ -0,0 +1,5 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
class TestSettings(BaseSettings):
model_config = SettingsConfigDict(env_file="/opt/aitbc/.env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore")
db_path: str = ""
print(TestSettings().db_path)

View File

@@ -0,0 +1,27 @@
import sqlite3
def fix():
try:
conn = sqlite3.connect('/opt/aitbc/data/ait-mainnet/chain.db')
cur = conn.cursor()
cur.execute('PRAGMA table_info("transaction")')
columns = [col[1] for col in cur.fetchall()]
if 'metadata' in columns:
print("Renaming metadata column to tx_metadata...")
cur.execute('ALTER TABLE "transaction" RENAME COLUMN metadata TO tx_metadata')
conn.commit()
elif 'tx_metadata' not in columns:
print("Adding tx_metadata column...")
cur.execute('ALTER TABLE "transaction" ADD COLUMN tx_metadata TEXT')
conn.commit()
else:
print("tx_metadata column already exists.")
conn.close()
except Exception as e:
print(f"Error modifying database: {e}")
if __name__ == "__main__":
fix()

View File

@@ -0,0 +1,50 @@
import sqlite3
def fix_db():
print("Fixing transaction table on aitbc node...")
conn = sqlite3.connect('/opt/aitbc/data/ait-mainnet/chain.db')
cursor = conn.cursor()
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN nonce INTEGER DEFAULT 0;')
print("Added nonce column")
except sqlite3.OperationalError as e:
print(f"Error adding nonce: {e}")
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN value INTEGER DEFAULT 0;')
print("Added value column")
except sqlite3.OperationalError as e:
print(f"Error adding value: {e}")
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN fee INTEGER DEFAULT 0;')
print("Added fee column")
except sqlite3.OperationalError as e:
print(f"Error adding fee: {e}")
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN status TEXT DEFAULT "pending";')
print("Added status column")
except sqlite3.OperationalError as e:
print(f"Error adding status: {e}")
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN tx_metadata TEXT;')
print("Added tx_metadata column")
except sqlite3.OperationalError as e:
print(f"Error adding tx_metadata: {e}")
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN timestamp TEXT;')
print("Added timestamp column")
except sqlite3.OperationalError as e:
print(f"Error adding timestamp: {e}")
conn.commit()
conn.close()
print("Done fixing transaction table.")
if __name__ == '__main__':
fix_db()

View File

@@ -0,0 +1,2 @@
from aitbc_chain.config import settings
print(settings.db_path)

View File

@@ -32,6 +32,9 @@ class RateLimitMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
client_ip = request.client.host if request.client else "unknown"
# Bypass rate limiting for localhost (sync/health internal traffic)
if client_ip in {"127.0.0.1", "::1"}:
return await call_next(request)
now = time.time()
# Clean old entries
self._requests[client_ip] = [
@@ -109,7 +112,8 @@ def create_app() -> FastAPI:
# Middleware (applied in reverse order)
app.add_middleware(RequestLoggingMiddleware)
app.add_middleware(RateLimitMiddleware, max_requests=200, window_seconds=60)
# Allow higher RPC throughput (sync + node traffic)
app.add_middleware(RateLimitMiddleware, max_requests=5000, window_seconds=60)
app.add_middleware(
CORSMiddleware,
allow_origins=[

View File

@@ -0,0 +1,272 @@
#!/usr/bin/env python3
"""
Chain Synchronization Service
Keeps blockchain nodes synchronized by sharing blocks via P2P and Redis gossip
"""
import asyncio
import json
import logging
import time
from typing import Dict, Any, Optional, List
logger = logging.getLogger(__name__)
class ChainSyncService:
def __init__(self, redis_url: str, node_id: str, rpc_port: int = 8006, leader_host: str = None,
source_host: str = "127.0.0.1", source_port: int = None,
import_host: str = "127.0.0.1", import_port: int = None):
self.redis_url = redis_url
self.node_id = node_id
self.rpc_port = rpc_port # kept for backward compat (local poll if source_port None)
self.leader_host = leader_host # Host of the leader node (legacy)
self.source_host = source_host
self.source_port = source_port or rpc_port
self.import_host = import_host
self.import_port = import_port or rpc_port
self._stop_event = asyncio.Event()
self._redis = None
self._receiver_ready = asyncio.Event()
async def start(self):
"""Start chain synchronization service"""
logger.info(f"Starting chain sync service for node {self.node_id}")
try:
import redis.asyncio as redis
self._redis = redis.from_url(self.redis_url)
await self._redis.ping()
logger.info("Connected to Redis for chain sync")
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
return
# Start block broadcasting task
# Start block receiving task
receive_task = asyncio.create_task(self._receive_blocks())
# Wait until receiver subscribed so we don't drop the initial burst
await self._receiver_ready.wait()
broadcast_task = asyncio.create_task(self._broadcast_blocks())
try:
await self._stop_event.wait()
finally:
broadcast_task.cancel()
receive_task.cancel()
await asyncio.gather(broadcast_task, receive_task, return_exceptions=True)
if self._redis:
await self._redis.close()
async def stop(self):
"""Stop chain synchronization service"""
logger.info("Stopping chain sync service")
self._stop_event.set()
async def _broadcast_blocks(self):
"""Broadcast local blocks to other nodes"""
import aiohttp
last_broadcast_height = 0
retry_count = 0
max_retries = 5
base_delay = 2
while not self._stop_event.is_set():
try:
# Get current head from local RPC
async with aiohttp.ClientSession() as session:
async with session.get(f"http://{self.source_host}:{self.source_port}/rpc/head") as resp:
if resp.status == 200:
head_data = await resp.json()
current_height = head_data.get('height', 0)
# Reset retry count on successful connection
retry_count = 0
# Broadcast new blocks
if current_height > last_broadcast_height:
for height in range(last_broadcast_height + 1, current_height + 1):
block_data = await self._get_block_by_height(height, session)
if block_data:
await self._broadcast_block(block_data)
last_broadcast_height = current_height
logger.info(f"Broadcasted blocks up to height {current_height}")
elif resp.status == 429:
raise Exception("rate_limit")
else:
raise Exception(f"RPC returned status {resp.status}")
except Exception as e:
retry_count += 1
# If rate-limited, wait longer before retrying
if str(e) == "rate_limit":
delay = base_delay * 30
logger.warning(f"RPC rate limited, retrying in {delay}s")
await asyncio.sleep(delay)
continue
if retry_count <= max_retries:
delay = base_delay * (2 ** (retry_count - 1)) # Exponential backoff
logger.warning(f"RPC connection failed (attempt {retry_count}/{max_retries}), retrying in {delay}s: {e}")
await asyncio.sleep(delay)
continue
else:
logger.error(f"RPC connection failed after {max_retries} attempts, waiting {base_delay * 10}s: {e}")
await asyncio.sleep(base_delay * 10)
retry_count = 0 # Reset retry count after long wait
await asyncio.sleep(base_delay) # Check every 2 seconds when connected
async def _receive_blocks(self):
"""Receive blocks from other nodes via Redis"""
if not self._redis:
return
pubsub = self._redis.pubsub()
await pubsub.subscribe("blocks")
self._receiver_ready.set()
logger.info("Subscribed to block broadcasts")
async for message in pubsub.listen():
if self._stop_event.is_set():
break
if message['type'] == 'message':
try:
block_data = json.loads(message['data'])
await self._import_block(block_data)
except Exception as e:
logger.error(f"Error processing received block: {e}")
async def _get_block_by_height(self, height: int, session) -> Optional[Dict[str, Any]]:
"""Get block data by height from local RPC"""
try:
async with session.get(f"http://{self.source_host}:{self.source_port}/rpc/blocks-range?start={height}&end={height}") as resp:
if resp.status == 200:
blocks_data = await resp.json()
blocks = blocks_data.get('blocks', [])
block = blocks[0] if blocks else None
return block
except Exception as e:
logger.error(f"Error getting block {height}: {e}")
return None
async def _broadcast_block(self, block_data: Dict[str, Any]):
"""Broadcast block to other nodes via Redis"""
if not self._redis:
return
try:
await self._redis.publish("blocks", json.dumps(block_data))
logger.debug(f"Broadcasted block {block_data.get('height')}")
except Exception as e:
logger.error(f"Error broadcasting block: {e}")
async def _import_block(self, block_data: Dict[str, Any]):
"""Import block from another node"""
import aiohttp
try:
# Don't import our own blocks
if block_data.get('proposer') == self.node_id:
return
# Determine target host - if we're a follower, import to leader, else import locally
target_host = self.import_host
target_port = self.import_port
# Retry logic for import
max_retries = 3
base_delay = 1
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"http://{target_host}:{target_port}/rpc/importBlock",
json=block_data
) as resp:
if resp.status == 200:
result = await resp.json()
if result.get('accepted'):
logger.info(f"Imported block {block_data.get('height')} from {block_data.get('proposer')}")
else:
logger.debug(f"Rejected block {block_data.get('height')}: {result.get('reason')}")
return
else:
try:
body = await resp.text()
except Exception:
body = "<no body>"
raise Exception(f"HTTP {resp.status}: {body}")
except Exception as e:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
logger.warning(f"Import failed (attempt {attempt + 1}/{max_retries}), retrying in {delay}s: {e}")
await asyncio.sleep(delay)
else:
logger.error(f"Failed to import block {block_data.get('height')} after {max_retries} attempts: {e}")
return
except Exception as e:
logger.error(f"Error importing block: {e}")
async def run_chain_sync(
redis_url: str,
node_id: str,
rpc_port: int = 8006,
leader_host: str = None,
source_host: str = "127.0.0.1",
source_port: int = None,
import_host: str = "127.0.0.1",
import_port: int = None,
):
"""Run chain synchronization service"""
service = ChainSyncService(
redis_url=redis_url,
node_id=node_id,
rpc_port=rpc_port,
leader_host=leader_host,
source_host=source_host,
source_port=source_port,
import_host=import_host,
import_port=import_port,
)
await service.start()
def main():
import argparse
parser = argparse.ArgumentParser(description="AITBC Chain Synchronization Service")
parser.add_argument("--redis", default="redis://localhost:6379", help="Redis URL")
parser.add_argument("--node-id", required=True, help="Node identifier")
parser.add_argument("--rpc-port", type=int, default=8006, help="RPC port")
parser.add_argument("--leader-host", help="Leader node host (for followers)")
parser.add_argument("--source-host", default="127.0.0.1", help="Host to poll for head/blocks")
parser.add_argument("--source-port", type=int, help="Port to poll for head/blocks")
parser.add_argument("--import-host", default="127.0.0.1", help="Host to import blocks into")
parser.add_argument("--import-port", type=int, help="Port to import blocks into")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
try:
asyncio.run(run_chain_sync(
args.redis,
args.node_id,
args.rpc_port,
args.leader_host,
args.source_host,
args.source_port,
args.import_host,
args.import_port,
))
except KeyboardInterrupt:
logger.info("Chain sync service stopped by user")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,94 @@
#!/usr/bin/env python3
"""
Combined blockchain node and P2P service launcher
Runs both the main blockchain node and P2P placeholder service
"""
import asyncio
import logging
import os
import signal
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from aitbc_chain.main import BlockchainNode, _run as run_node
from aitbc_chain.config import settings
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CombinedService:
def __init__(self):
self._stop_event = asyncio.Event()
self._tasks = []
self._loop = None
def set_stop_event(self):
"""Set the stop event to trigger shutdown"""
if self._stop_event and not self._stop_event.is_set():
self._stop_event.set()
async def start(self):
"""Start both blockchain node and P2P server"""
self._loop = asyncio.get_running_loop()
logger.info("Starting combined blockchain service")
# Start blockchain node in background
node_task = asyncio.create_task(run_node())
self._tasks.append(node_task)
logger.info(f"Combined service started - Node on mainnet")
try:
await self._stop_event.wait()
finally:
await self.stop()
async def stop(self):
"""Stop all services"""
logger.info("Stopping combined blockchain service")
# Cancel all tasks
for task in self._tasks:
task.cancel()
# Wait for tasks to complete
if self._tasks:
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks.clear()
logger.info("Combined service stopped")
# Global service instance for signal handler
_service_instance = None
def signal_handler(signum, frame):
"""Handle shutdown signals"""
logger.info(f"Received signal {signum}, initiating shutdown")
global _service_instance
if _service_instance:
_service_instance.set_stop_event()
async def main():
"""Main entry point"""
global _service_instance
service = CombinedService()
_service_instance = service
# Set up signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
try:
await service.start()
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
finally:
await service.stop()
_service_instance = None
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -16,11 +16,11 @@ class ProposerConfig(BaseModel):
max_txs_per_block: int
class ChainSettings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", case_sensitive=False)
model_config = SettingsConfigDict(env_file="/opt/aitbc/.env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore")
chain_id: str = "ait-devnet"
chain_id: str = ""
supported_chains: str = "ait-devnet" # Comma-separated list of supported chain IDs
db_path: Path = Path("./data/chain.db")
db_path: Path = Path("/opt/aitbc/data/chain.db")
rpc_bind_host: str = "127.0.0.1"
rpc_bind_port: int = 8080
@@ -28,7 +28,7 @@ class ChainSettings(BaseSettings):
p2p_bind_host: str = "127.0.0.2"
p2p_bind_port: int = 7070
proposer_id: str = "ait-devnet-proposer"
proposer_id: str = ""
proposer_key: Optional[str] = None
mint_per_unit: int = 0 # No new minting after genesis for production
@@ -36,6 +36,9 @@ class ChainSettings(BaseSettings):
block_time_seconds: int = 2
# Block production toggle (set false on followers)
enable_block_production: bool = True
# Block production limits
max_block_size_bytes: int = 1_000_000 # 1 MB
max_txs_per_block: int = 500

View File

@@ -120,11 +120,11 @@ class PoAProposer:
return
async def _propose_block(self) -> None:
# Check internal mempool
# Check internal mempool and include transactions
from ..mempool import get_mempool
if get_mempool().size(self._config.chain_id) == 0:
return
from ..models import Transaction, Account
mempool = get_mempool()
with self._session_factory() as session:
head = session.exec(select(Block).where(Block.chain_id == self._config.chain_id).order_by(Block.height.desc()).limit(1)).first()
next_height = 0
@@ -136,7 +136,72 @@ class PoAProposer:
interval_seconds = (datetime.utcnow() - head.timestamp).total_seconds()
timestamp = datetime.utcnow()
block_hash = self._compute_block_hash(next_height, parent_hash, timestamp)
# Pull transactions from mempool
max_txs = self._config.max_txs_per_block
max_bytes = self._config.max_block_size_bytes
pending_txs = mempool.drain(max_txs, max_bytes, 'ait-mainnet')
self._logger.info(f"[PROPOSE] drained {len(pending_txs)} txs from mempool, chain={self._config.chain_id}")
# Process transactions and update balances
processed_txs = []
for tx in pending_txs:
try:
# Parse transaction data
tx_data = tx.content
sender = tx_data.get("sender")
recipient = tx_data.get("payload", {}).get("to")
value = tx_data.get("payload", {}).get("value", 0)
fee = tx_data.get("fee", 0)
if not sender or not recipient:
continue
# Get sender account
sender_account = session.get(Account, (self._config.chain_id, sender))
if not sender_account:
continue
# Check sufficient balance
total_cost = value + fee
if sender_account.balance < total_cost:
continue
# Get or create recipient account
recipient_account = session.get(Account, (self._config.chain_id, recipient))
if not recipient_account:
recipient_account = Account(chain_id=self._config.chain_id, address=recipient, balance=0, nonce=0)
session.add(recipient_account)
session.flush()
# Update balances
sender_account.balance -= total_cost
sender_account.nonce += 1
recipient_account.balance += value
# Create transaction record
transaction = Transaction(
chain_id=self._config.chain_id,
tx_hash=tx.tx_hash,
sender=sender,
recipient=recipient,
payload=tx_data,
value=value,
fee=fee,
nonce=sender_account.nonce - 1,
timestamp=timestamp,
block_height=next_height,
status="confirmed"
)
session.add(transaction)
processed_txs.append(tx)
except Exception as e:
self._logger.warning(f"Failed to process transaction {tx.tx_hash}: {e}")
continue
# Compute block hash with transaction data
block_hash = self._compute_block_hash(next_height, parent_hash, timestamp, processed_txs)
block = Block(
chain_id=self._config.chain_id,
@@ -145,7 +210,7 @@ class PoAProposer:
parent_hash=parent_hash,
proposer=self._config.proposer_id,
timestamp=timestamp,
tx_count=0,
tx_count=len(processed_txs),
state_root=None,
)
session.add(block)
@@ -173,6 +238,7 @@ class PoAProposer:
)
# Broadcast the new block
tx_list = [tx.content for tx in processed_txs] if processed_txs else []
await gossip_broker.publish(
"blocks",
{
@@ -184,7 +250,8 @@ class PoAProposer:
"timestamp": block.timestamp.isoformat(),
"tx_count": block.tx_count,
"state_root": block.state_root,
}
"transactions": tx_list,
},
)
async def _ensure_genesis_block(self) -> None:
@@ -258,6 +325,11 @@ class PoAProposer:
with self._session_factory() as session:
return session.exec(select(Block).order_by(Block.height.desc()).limit(1)).first()
def _compute_block_hash(self, height: int, parent_hash: str, timestamp: datetime) -> str:
payload = f"{self._config.chain_id}|{height}|{parent_hash}|{timestamp.isoformat()}".encode()
def _compute_block_hash(self, height: int, parent_hash: str, timestamp: datetime, transactions: list = None) -> str:
# Include transaction hashes in block hash computation
tx_hashes = []
if transactions:
tx_hashes = [tx.tx_hash for tx in transactions]
payload = f"{self._config.chain_id}|{height}|{parent_hash}|{timestamp.isoformat()}|{'|'.join(sorted(tx_hashes))}".encode()
return "0x" + hashlib.sha256(payload).hexdigest()

View File

@@ -4,15 +4,15 @@ import asyncio
import json
import warnings
from collections import defaultdict
from contextlib import suppress
from contextlib import suppress, asynccontextmanager
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Set
warnings.filterwarnings("ignore", message="coroutine.* was never awaited", category=RuntimeWarning)
try:
from starlette.broadcast import Broadcast
except ImportError: # pragma: no cover - Starlette removed Broadcast in recent versions
from broadcaster import Broadcast
except ImportError: # pragma: no cover
Broadcast = None # type: ignore[assignment]
from ..metrics import metrics_registry
@@ -225,25 +225,15 @@ class GossipBroker:
class _InProcessSubscriber:
def __init__(self, queue: "asyncio.Queue[Any]", release: Callable[[], None]):
def __init__(self, queue: "asyncio.Queue[Any]"):
self._queue = queue
self._release = release
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
self._release()
def __aiter__(self): # type: ignore[override]
return self._iterator()
async def _iterator(self):
try:
while True:
yield await self._queue.get()
finally:
self._release()
while True:
yield await self._queue.get()
class _InProcessBroadcast:
@@ -262,23 +252,19 @@ class _InProcessBroadcast:
self._topics.clear()
self._running = False
async def subscribe(self, topic: str) -> _InProcessSubscriber:
@asynccontextmanager
async def subscribe(self, topic: str):
queue: "asyncio.Queue[Any]" = asyncio.Queue()
async with self._lock:
self._topics[topic].append(queue)
def release() -> None:
async def _remove() -> None:
async with self._lock:
queues = self._topics.get(topic)
if queues and queue in queues:
queues.remove(queue)
if not queues:
self._topics.pop(topic, None)
asyncio.create_task(_remove())
return _InProcessSubscriber(queue, release)
try:
yield _InProcessSubscriber(queue)
finally:
async with self._lock:
queues = self._topics.get(topic)
if queues and queue in queues:
queues.remove(queue)
async def publish(self, topic: str, message: Any) -> None:
if not self._running:

View File

@@ -0,0 +1,307 @@
from __future__ import annotations
import asyncio
import json
import warnings
from collections import defaultdict
from contextlib import suppress
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Set
warnings.filterwarnings("ignore", message="coroutine.* was never awaited", category=RuntimeWarning)
try:
from broadcaster import Broadcast
except ImportError: # pragma: no cover
Broadcast = None # type: ignore[assignment]
from ..metrics import metrics_registry
def _increment_publication(metric_prefix: str, topic: str) -> None:
metrics_registry.increment(f"{metric_prefix}_total")
metrics_registry.increment(f"{metric_prefix}_topic_{topic}")
def _set_queue_gauge(topic: str, size: int) -> None:
metrics_registry.set_gauge(f"gossip_queue_size_{topic}", float(size))
def _update_subscriber_metrics(topics: Dict[str, List["asyncio.Queue[Any]"]]) -> None:
for topic, queues in topics.items():
metrics_registry.set_gauge(f"gossip_subscribers_topic_{topic}", float(len(queues)))
total = sum(len(queues) for queues in topics.values())
metrics_registry.set_gauge("gossip_subscribers_total", float(total))
def _clear_topic_metrics(topic: str) -> None:
metrics_registry.set_gauge(f"gossip_subscribers_topic_{topic}", 0.0)
_set_queue_gauge(topic, 0)
@dataclass
class TopicSubscription:
topic: str
queue: "asyncio.Queue[Any]"
_unsubscribe: Callable[[], None]
def close(self) -> None:
self._unsubscribe()
async def get(self) -> Any:
return await self.queue.get()
async def __aiter__(self): # type: ignore[override]
try:
while True:
yield await self.queue.get()
finally:
self.close()
class GossipBackend:
async def start(self) -> None: # pragma: no cover - overridden as needed
return None
async def publish(self, topic: str, message: Any) -> None:
raise NotImplementedError
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
raise NotImplementedError
async def shutdown(self) -> None:
return None
class InMemoryGossipBackend(GossipBackend):
def __init__(self) -> None:
self._topics: Dict[str, List["asyncio.Queue[Any]"]] = defaultdict(list)
self._lock = asyncio.Lock()
async def publish(self, topic: str, message: Any) -> None:
async with self._lock:
queues = list(self._topics.get(topic, []))
for queue in queues:
await queue.put(message)
_set_queue_gauge(topic, queue.qsize())
_increment_publication("gossip_publications", topic)
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
queue: "asyncio.Queue[Any]" = asyncio.Queue(maxsize=max_queue_size)
async with self._lock:
self._topics[topic].append(queue)
_update_subscriber_metrics(self._topics)
_set_queue_gauge(topic, queue.qsize())
def _unsubscribe() -> None:
async def _remove() -> None:
async with self._lock:
queues = self._topics.get(topic)
if queues is None:
return
if queue in queues:
queues.remove(queue)
if not queues:
self._topics.pop(topic, None)
_clear_topic_metrics(topic)
_update_subscriber_metrics(self._topics)
asyncio.create_task(_remove())
return TopicSubscription(topic=topic, queue=queue, _unsubscribe=_unsubscribe)
async def shutdown(self) -> None:
async with self._lock:
topics = list(self._topics.keys())
self._topics.clear()
for topic in topics:
_clear_topic_metrics(topic)
_update_subscriber_metrics(self._topics)
class BroadcastGossipBackend(GossipBackend):
def __init__(self, url: str) -> None:
if Broadcast is None: # provide in-process fallback when Broadcast is missing
self._broadcast = _InProcessBroadcast()
else:
self._broadcast = Broadcast(url) # type: ignore[arg-type]
self._tasks: Set[asyncio.Task[None]] = set()
self._lock = asyncio.Lock()
self._running = False
async def start(self) -> None:
if not self._running:
await self._broadcast.connect() # type: ignore[union-attr]
self._running = True
async def publish(self, topic: str, message: Any) -> None:
if not self._running:
raise RuntimeError("Broadcast backend not started")
payload = _encode_message(message)
await self._broadcast.publish(topic, payload) # type: ignore[union-attr]
_increment_publication("gossip_broadcast_publications", topic)
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
if not self._running:
raise RuntimeError("Broadcast backend not started")
queue: "asyncio.Queue[Any]" = asyncio.Queue(maxsize=max_queue_size)
stop_event = asyncio.Event()
async def _run_subscription() -> None:
async with self._broadcast.subscribe(topic) as subscriber: # type: ignore[attr-defined,union-attr]
async for event in subscriber: # type: ignore[union-attr]
if stop_event.is_set():
break
data = _decode_message(getattr(event, "message", event))
try:
await queue.put(data)
_set_queue_gauge(topic, queue.qsize())
except asyncio.CancelledError:
break
task = asyncio.create_task(_run_subscription(), name=f"broadcast-sub:{topic}")
async with self._lock:
self._tasks.add(task)
metrics_registry.set_gauge("gossip_broadcast_subscribers_total", float(len(self._tasks)))
def _unsubscribe() -> None:
async def _stop() -> None:
stop_event.set()
task.cancel()
with suppress(asyncio.CancelledError):
await task
async with self._lock:
self._tasks.discard(task)
metrics_registry.set_gauge("gossip_broadcast_subscribers_total", float(len(self._tasks)))
asyncio.create_task(_stop())
return TopicSubscription(topic=topic, queue=queue, _unsubscribe=_unsubscribe)
async def shutdown(self) -> None:
async with self._lock:
tasks = list(self._tasks)
self._tasks.clear()
metrics_registry.set_gauge("gossip_broadcast_subscribers_total", 0.0)
for task in tasks:
task.cancel()
with suppress(asyncio.CancelledError):
await task
if self._running:
await self._broadcast.disconnect() # type: ignore[union-attr]
self._running = False
class GossipBroker:
def __init__(self, backend: GossipBackend) -> None:
self._backend = backend
self._lock = asyncio.Lock()
self._started = False
async def publish(self, topic: str, message: Any) -> None:
if not self._started:
await self._backend.start()
self._started = True
await self._backend.publish(topic, message)
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
if not self._started:
await self._backend.start()
self._started = True
return await self._backend.subscribe(topic, max_queue_size=max_queue_size)
async def set_backend(self, backend: GossipBackend) -> None:
await backend.start()
async with self._lock:
previous = self._backend
self._backend = backend
self._started = True
await previous.shutdown()
async def shutdown(self) -> None:
await self._backend.shutdown()
class _InProcessSubscriber:
def __init__(self, queue: "asyncio.Queue[Any]"):
self._queue = queue
def __aiter__(self): # type: ignore[override]
return self._iterator()
async def _iterator(self):
while True:
yield await self._queue.get()
class _InProcessBroadcast:
"""Minimal in-memory broadcast substitute for tests when Starlette Broadcast is absent."""
def __init__(self) -> None:
self._topics: Dict[str, List["asyncio.Queue[Any]"]] = defaultdict(list)
self._lock = asyncio.Lock()
self._running = False
async def connect(self) -> None:
self._running = True
async def disconnect(self) -> None:
async with self._lock:
self._topics.clear()
self._running = False
@asynccontextmanager
async def subscribe(self, topic: str):
queue: "asyncio.Queue[Any]" = asyncio.Queue()
async with self._lock:
self._topics[topic].append(queue)
try:
yield _InProcessSubscriber(queue)
finally:
async with self._lock:
queues = self._topics.get(topic)
if queues and queue in queues:
queues.remove(queue)
async def publish(self, topic: str, message: Any) -> None:
if not self._running:
raise RuntimeError("Broadcast backend not started")
async with self._lock:
queues = list(self._topics.get(topic, []))
for queue in queues:
await queue.put(message)
def create_backend(backend_type: str, *, broadcast_url: Optional[str] = None) -> GossipBackend:
backend = backend_type.lower()
if backend in {"memory", "inmemory", "local"}:
return InMemoryGossipBackend()
if backend in {"broadcast", "starlette", "redis"}:
if not broadcast_url:
raise ValueError("Broadcast backend requires a gossip_broadcast_url setting")
return BroadcastGossipBackend(broadcast_url)
raise ValueError(f"Unsupported gossip backend '{backend_type}'")
def _encode_message(message: Any) -> Any:
if isinstance(message, (str, bytes, bytearray)):
return message
return json.dumps(message, separators=(",", ":"))
def _decode_message(message: Any) -> Any:
if isinstance(message, (bytes, bytearray)):
message = message.decode("utf-8")
if isinstance(message, str):
try:
return json.loads(message)
except json.JSONDecodeError:
return message
return message
gossip_broker = GossipBroker(InMemoryGossipBackend())

View File

@@ -103,7 +103,7 @@ class BlockchainNode:
if isinstance(tx_data, str):
import json
tx_data = json.loads(tx_data)
chain_id = tx_data.get("chain_id", "ait-devnet")
chain_id = tx_data.get("chain_id", settings.chain_id)
mempool.add(tx_data, chain_id=chain_id)
except Exception as exc:
logger.error(f"Error processing transaction from gossip: {exc}")
@@ -121,10 +121,10 @@ class BlockchainNode:
if isinstance(block_data, str):
import json
block_data = json.loads(block_data)
chain_id = block_data.get("chain_id", "ait-devnet")
chain_id = block_data.get("chain_id", settings.chain_id)
logger.info(f"Importing block for chain {chain_id}: {block_data.get('height')}")
sync = ChainSync(session_factory=session_scope, chain_id=chain_id)
res = sync.import_block(block_data)
res = sync.import_block(block_data, transactions=block_data.get("transactions"))
logger.info(f"Import result: accepted={res.accepted}, reason={res.reason}")
except Exception as exc:
logger.error(f"Error processing block from gossip: {exc}")
@@ -148,7 +148,11 @@ class BlockchainNode:
max_size=settings.mempool_max_size,
min_fee=settings.min_fee,
)
self._start_proposers()
# Start proposers only if enabled (followers set enable_block_production=False)
if getattr(settings, "enable_block_production", True):
self._start_proposers()
else:
logger.info("Block production disabled on this node", extra={"proposer_id": settings.proposer_id})
await self._setup_gossip_subscribers()
try:
await self._stop_event.wait()

View File

@@ -38,7 +38,10 @@ class InMemoryMempool:
self._max_size = max_size
self._min_fee = min_fee
def add(self, tx: Dict[str, Any], chain_id: str = "ait-devnet") -> str:
def add(self, tx: Dict[str, Any], chain_id: str = None) -> str:
from .config import settings
if chain_id is None:
chain_id = settings.chain_id
fee = tx.get("fee", 0)
if fee < self._min_fee:
raise ValueError(f"Fee {fee} below minimum {self._min_fee}")
@@ -59,11 +62,17 @@ class InMemoryMempool:
metrics_registry.increment(f"mempool_tx_added_total_{chain_id}")
return tx_hash
def list_transactions(self, chain_id: str = "ait-devnet") -> List[PendingTransaction]:
def list_transactions(self, chain_id: str = None) -> List[PendingTransaction]:
from .config import settings
if chain_id is None:
chain_id = settings.chain_id
with self._lock:
return list(self._transactions.values())
def drain(self, max_count: int, max_bytes: int, chain_id: str = "ait-devnet") -> List[PendingTransaction]:
def drain(self, max_count: int, max_bytes: int, chain_id: str = None) -> List[PendingTransaction]:
from .config import settings
if chain_id is None:
chain_id = settings.chain_id
"""Drain transactions for block inclusion, prioritized by fee (highest first)."""
with self._lock:
sorted_txs = sorted(
@@ -87,14 +96,20 @@ class InMemoryMempool:
metrics_registry.increment(f"mempool_tx_drained_total_{chain_id}", float(len(result)))
return result
def remove(self, tx_hash: str, chain_id: str = "ait-devnet") -> bool:
def remove(self, tx_hash: str, chain_id: str = None) -> bool:
from .config import settings
if chain_id is None:
chain_id = settings.chain_id
with self._lock:
removed = self._transactions.pop(tx_hash, None) is not None
if removed:
metrics_registry.set_gauge("mempool_size", float(len(self._transactions)))
return removed
def size(self, chain_id: str = "ait-devnet") -> int:
def size(self, chain_id: str = None) -> int:
from .config import settings
if chain_id is None:
chain_id = settings.chain_id
with self._lock:
return len(self._transactions)
@@ -135,7 +150,10 @@ class DatabaseMempool:
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_mempool_fee ON mempool(fee DESC)")
self._conn.commit()
def add(self, tx: Dict[str, Any], chain_id: str = "ait-devnet") -> str:
def add(self, tx: Dict[str, Any], chain_id: str = None) -> str:
from .config import settings
if chain_id is None:
chain_id = settings.chain_id
fee = tx.get("fee", 0)
if fee < self._min_fee:
raise ValueError(f"Fee {fee} below minimum {self._min_fee}")
@@ -169,7 +187,10 @@ class DatabaseMempool:
self._update_gauge(chain_id)
return tx_hash
def list_transactions(self, chain_id: str = "ait-devnet") -> List[PendingTransaction]:
def list_transactions(self, chain_id: str = None) -> List[PendingTransaction]:
from .config import settings
if chain_id is None:
chain_id = settings.chain_id
with self._lock:
rows = self._conn.execute(
"SELECT tx_hash, content, fee, size_bytes, received_at FROM mempool WHERE chain_id = ? ORDER BY fee DESC, received_at ASC",
@@ -182,7 +203,10 @@ class DatabaseMempool:
) for r in rows
]
def drain(self, max_count: int, max_bytes: int, chain_id: str = "ait-devnet") -> List[PendingTransaction]:
def drain(self, max_count: int, max_bytes: int, chain_id: str = None) -> List[PendingTransaction]:
from .config import settings
if chain_id is None:
chain_id = settings.chain_id
with self._lock:
rows = self._conn.execute(
"SELECT tx_hash, content, fee, size_bytes, received_at FROM mempool WHERE chain_id = ? ORDER BY fee DESC, received_at ASC",
@@ -214,7 +238,10 @@ class DatabaseMempool:
self._update_gauge(chain_id)
return result
def remove(self, tx_hash: str, chain_id: str = "ait-devnet") -> bool:
def remove(self, tx_hash: str, chain_id: str = None) -> bool:
from .config import settings
if chain_id is None:
chain_id = settings.chain_id
with self._lock:
cursor = self._conn.execute("DELETE FROM mempool WHERE chain_id = ? AND tx_hash = ?", (chain_id, tx_hash))
self._conn.commit()
@@ -223,11 +250,17 @@ class DatabaseMempool:
self._update_gauge(chain_id)
return removed
def size(self, chain_id: str = "ait-devnet") -> int:
def size(self, chain_id: str = None) -> int:
from .config import settings
if chain_id is None:
chain_id = settings.chain_id
with self._lock:
return self._conn.execute("SELECT COUNT(*) FROM mempool WHERE chain_id = ?", (chain_id,)).fetchone()[0]
def _update_gauge(self, chain_id: str = "ait-devnet") -> None:
def _update_gauge(self, chain_id: str = None) -> None:
from .config import settings
if chain_id is None:
chain_id = settings.chain_id
count = self._conn.execute("SELECT COUNT(*) FROM mempool WHERE chain_id = ?", (chain_id,)).fetchone()[0]
metrics_registry.set_gauge(f"mempool_size_{chain_id}", float(count))

View File

@@ -36,6 +36,7 @@ class Block(SQLModel, table=True):
timestamp: datetime = Field(default_factory=datetime.utcnow, index=True)
tx_count: int = 0
state_root: Optional[str] = None
block_metadata: Optional[str] = Field(default=None)
# Relationships - use sa_relationship_kwargs for lazy loading
transactions: List["Transaction"] = Relationship(
@@ -90,6 +91,14 @@ class Transaction(SQLModel, table=True):
)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
# New fields added to schema
nonce: int = Field(default=0)
value: int = Field(default=0)
fee: int = Field(default=0)
status: str = Field(default="pending")
timestamp: Optional[str] = Field(default=None)
tx_metadata: Optional[str] = Field(default=None)
# Relationship
block: Optional["Block"] = Relationship(
back_populates="transactions",

View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
P2P Network Service using Redis Gossip
Handles peer-to-peer communication between blockchain nodes
"""
import asyncio
import json
import logging
import socket
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class P2PNetworkService:
def __init__(self, host: str, port: int, redis_url: str, node_id: str):
self.host = host
self.port = port
self.redis_url = redis_url
self.node_id = node_id
self._server = None
self._stop_event = asyncio.Event()
async def start(self):
"""Start P2P network service"""
logger.info(f"Starting P2P network service on {self.host}:{self.port}")
# Create TCP server for P2P connections
self._server = await asyncio.start_server(
self._handle_connection,
self.host,
self.port
)
logger.info(f"P2P service listening on {self.host}:{self.port}")
try:
await self._stop_event.wait()
finally:
await self.stop()
async def stop(self):
"""Stop P2P network service"""
logger.info("Stopping P2P network service")
if self._server:
self._server.close()
await self._server.wait_closed()
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
"""Handle incoming P2P connections"""
addr = writer.get_extra_info('peername')
logger.info(f"P2P connection from {addr}")
try:
while True:
data = await reader.read(1024)
if not data:
break
try:
message = json.loads(data.decode())
logger.info(f"P2P received: {message}")
# Handle different message types
if message.get('type') == 'ping':
response = {'type': 'pong', 'node_id': self.node_id}
writer.write(json.dumps(response).encode() + b'\n')
await writer.drain()
except json.JSONDecodeError:
logger.warning(f"Invalid JSON from {addr}")
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"P2P connection error: {e}")
finally:
writer.close()
await writer.wait_closed()
logger.info(f"P2P connection closed from {addr}")
async def run_p2p_service(host: str, port: int, redis_url: str, node_id: str):
"""Run P2P service"""
service = P2PNetworkService(host, port, redis_url, node_id)
await service.start()
def main():
import argparse
parser = argparse.ArgumentParser(description="AITBC P2P Network Service")
parser.add_argument("--host", default="0.0.0.0", help="Bind host")
parser.add_argument("--port", type=int, default=8005, help="Bind port")
parser.add_argument("--redis", default="redis://localhost:6379", help="Redis URL")
parser.add_argument("--node-id", help="Node identifier")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
try:
asyncio.run(run_p2p_service(args.host, args.port, args.redis, args.node_id))
except KeyboardInterrupt:
logger.info("P2P service stopped by user")
if __name__ == "__main__":
main()

View File

@@ -18,6 +18,13 @@ from ..models import Account, Block, Receipt, Transaction
router = APIRouter()
def get_chain_id(chain_id: str = None) -> str:
"""Get chain_id from parameter or use default from settings"""
if chain_id is None:
from ..config import settings
return settings.chain_id
return chain_id
def _serialize_receipt(receipt: Receipt) -> Dict[str, Any]:
return {
@@ -63,7 +70,14 @@ class EstimateFeeRequest(BaseModel):
@router.get("/head", summary="Get current chain head")
async def get_head(chain_id: str = "ait-devnet") -> Dict[str, Any]:
async def get_head(chain_id: str = None) -> Dict[str, Any]:
"""Get current chain head"""
from ..config import settings as cfg
# Use default chain_id from settings if not provided
if chain_id is None:
chain_id = cfg.chain_id
metrics_registry.increment("rpc_get_head_total")
start = time.perf_counter()
with session_scope() as session:
@@ -91,14 +105,24 @@ async def get_block(height: int) -> Dict[str, Any]:
metrics_registry.increment("rpc_get_block_not_found_total")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="block not found")
metrics_registry.increment("rpc_get_block_success_total")
txs = session.exec(select(Transaction).where(Transaction.block_height == height)).all()
tx_list = []
for tx in txs:
t = dict(tx.payload) if tx.payload else {}
t["tx_hash"] = tx.tx_hash
tx_list.append(t)
metrics_registry.observe("rpc_get_block_duration_seconds", time.perf_counter() - start)
return {
"height": block.height,
"hash": block.hash,
"parent_hash": block.parent_hash,
"proposer": block.proposer,
"timestamp": block.timestamp.isoformat(),
"tx_count": block.tx_count,
"state_root": block.state_root,
"transactions": tx_list,
}
@@ -136,13 +160,22 @@ async def get_blocks_range(start: int, end: int) -> Dict[str, Any]:
# Serialize blocks
block_list = []
for block in blocks:
txs = session.exec(select(Transaction).where(Transaction.block_height == block.height)).all()
tx_list = []
for tx in txs:
t = dict(tx.payload) if tx.payload else {}
t["tx_hash"] = tx.tx_hash
tx_list.append(t)
block_list.append({
"height": block.height,
"hash": block.hash,
"parent_hash": block.parent_hash,
"proposer": block.proposer,
"timestamp": block.timestamp.isoformat(),
"tx_count": block.tx_count,
"state_root": block.state_root,
"transactions": tx_list,
})
metrics_registry.increment("rpc_get_blocks_range_success_total")
@@ -157,7 +190,8 @@ async def get_blocks_range(start: int, end: int) -> Dict[str, Any]:
@router.get("/tx/{tx_hash}", summary="Get transaction by hash")
async def get_transaction(tx_hash: str, chain_id: str = "ait-devnet") -> Dict[str, Any]:
async def get_transaction(tx_hash: str, chain_id: str = None) -> Dict[str, Any]:
chain_id = get_chain_id(chain_id)
metrics_registry.increment("rpc_get_transaction_total")
start = time.perf_counter()
with session_scope() as session:
@@ -296,7 +330,8 @@ async def get_receipts(limit: int = 20, offset: int = 0) -> Dict[str, Any]:
@router.get("/getBalance/{address}", summary="Get account balance")
async def get_balance(address: str, chain_id: str = "ait-devnet") -> Dict[str, Any]:
async def get_balance(address: str, chain_id: str = None) -> Dict[str, Any]:
chain_id = get_chain_id(chain_id)
metrics_registry.increment("rpc_get_balance_total")
start = time.perf_counter()
with session_scope() as session:
@@ -442,13 +477,13 @@ async def get_addresses(limit: int = 20, offset: int = 0, min_balance: int = 0)
@router.post("/sendTx", summary="Submit a new transaction")
async def send_transaction(request: TransactionRequest, chain_id: str = "ait-devnet") -> Dict[str, Any]:
async def send_transaction(request: TransactionRequest, chain_id: str = None) -> Dict[str, Any]:
metrics_registry.increment("rpc_send_tx_total")
start = time.perf_counter()
mempool = get_mempool()
tx_dict = request.model_dump()
try:
tx_hash = mempool.add(tx_dict, chain_id=chain_id)
tx_hash = mempool.add(tx_dict, chain_id=chain_id or request.payload.get('chain_id') or 'ait-mainnet')
except ValueError as e:
metrics_registry.increment("rpc_send_tx_rejected_total")
raise HTTPException(status_code=400, detail=str(e))
@@ -481,7 +516,7 @@ async def send_transaction(request: TransactionRequest, chain_id: str = "ait-dev
@router.post("/submitReceipt", summary="Submit receipt claim transaction")
async def submit_receipt(request: ReceiptSubmissionRequest, chain_id: str = "ait-devnet") -> Dict[str, Any]:
async def submit_receipt(request: ReceiptSubmissionRequest, chain_id: str = None) -> Dict[str, Any]:
metrics_registry.increment("rpc_submit_receipt_total")
start = time.perf_counter()
tx_payload = {
@@ -539,7 +574,7 @@ class ImportBlockRequest(BaseModel):
@router.post("/importBlock", summary="Import a block from a remote peer")
async def import_block(request: ImportBlockRequest, chain_id: str = "ait-devnet") -> Dict[str, Any]:
async def import_block(request: ImportBlockRequest, chain_id: str = None) -> Dict[str, Any]:
from ..sync import ChainSync, ProposerSignatureValidator
from ..config import settings as cfg
@@ -578,7 +613,7 @@ async def import_block(request: ImportBlockRequest, chain_id: str = "ait-devnet"
@router.get("/syncStatus", summary="Get chain sync status")
async def sync_status(chain_id: str = "ait-devnet") -> Dict[str, Any]:
async def sync_status(chain_id: str = None) -> Dict[str, Any]:
from ..sync import ChainSync
from ..config import settings as cfg
@@ -588,7 +623,7 @@ async def sync_status(chain_id: str = "ait-devnet") -> Dict[str, Any]:
@router.get("/info", summary="Get blockchain information")
async def get_blockchain_info(chain_id: str = "ait-devnet") -> Dict[str, Any]:
async def get_blockchain_info(chain_id: str = None) -> Dict[str, Any]:
"""Get comprehensive blockchain information"""
from ..config import settings as cfg
@@ -634,31 +669,48 @@ async def get_blockchain_info(chain_id: str = "ait-devnet") -> Dict[str, Any]:
@router.get("/supply", summary="Get token supply information")
async def get_token_supply(chain_id: str = "ait-devnet") -> Dict[str, Any]:
async def get_token_supply(chain_id: str = None) -> Dict[str, Any]:
"""Get token supply information"""
from ..config import settings as cfg
from ..models import Account
chain_id = get_chain_id(chain_id)
metrics_registry.increment("rpc_supply_total")
start = time.perf_counter()
with session_scope() as session:
# Simple implementation for now
response = {
"chain_id": chain_id,
"total_supply": 1000000000, # 1 billion from genesis
"circulating_supply": 0, # No transactions yet
"faucet_balance": 1000000000, # All tokens in faucet
"faucet_address": "ait1faucet000000000000000000000000000000000",
"mint_per_unit": cfg.mint_per_unit,
"total_accounts": 0
}
# Calculate actual values from database
accounts = session.exec(select(Account).where(Account.chain_id == chain_id)).all()
total_balance = sum(account.balance for account in accounts)
total_accounts = len(accounts)
# Production implementation - calculate real circulating supply
if chain_id == "ait-mainnet":
response = {
"chain_id": chain_id,
"total_supply": 1000000000, # 1 billion from genesis
"circulating_supply": total_balance, # Actual tokens in circulation
"mint_per_unit": cfg.mint_per_unit,
"total_accounts": total_accounts # Actual account count
}
else:
# Devnet with faucet - use actual calculations
response = {
"chain_id": chain_id,
"total_supply": 1000000000, # 1 billion from genesis
"circulating_supply": total_balance, # Actual tokens in circulation
"faucet_balance": 1000000000, # All tokens in faucet
"faucet_address": "ait1faucet000000000000000000000000000000000",
"mint_per_unit": cfg.mint_per_unit,
"total_accounts": total_accounts # Actual account count
}
metrics_registry.observe("rpc_supply_duration_seconds", time.perf_counter() - start)
return response
@router.get("/validators", summary="List blockchain validators")
async def get_validators(chain_id: str = "ait-devnet") -> Dict[str, Any]:
async def get_validators(chain_id: str = None) -> Dict[str, Any]:
"""List blockchain validators (authorities)"""
from ..config import settings as cfg
@@ -690,7 +742,7 @@ async def get_validators(chain_id: str = "ait-devnet") -> Dict[str, Any]:
@router.get("/state", summary="Get blockchain state information")
async def get_chain_state(chain_id: str = "ait-devnet"):
async def get_chain_state(chain_id: str = None):
"""Get blockchain state information for a chain"""
start = time.perf_counter()
@@ -710,7 +762,7 @@ async def get_chain_state(chain_id: str = "ait-devnet"):
@router.get("/rpc/getBalance/{address}", summary="Get account balance")
async def get_balance(address: str, chain_id: str = "ait-devnet"):
async def get_balance(address: str, chain_id: str = None):
"""Get account balance for a specific address"""
start = time.perf_counter()
@@ -753,7 +805,7 @@ async def get_balance(address: str, chain_id: str = "ait-devnet"):
@router.get("/rpc/head", summary="Get current chain head")
async def get_head(chain_id: str = "ait-devnet"):
async def get_head(chain_id: str = None):
"""Get current chain head block"""
start = time.perf_counter()
@@ -799,7 +851,7 @@ async def get_head(chain_id: str = "ait-devnet"):
@router.get("/rpc/transactions", summary="Get latest transactions")
async def get_transactions(chain_id: str = "ait-devnet", limit: int = 20, offset: int = 0):
async def get_transactions(chain_id: str = None, limit: int = 20, offset: int = 0):
"""Get latest transactions"""
start = time.perf_counter()

View File

@@ -16,7 +16,7 @@ from sqlmodel import Session, select
from .config import settings
from .logger import get_logger
from .metrics import metrics_registry
from .models import Block, Transaction
from .models import Block, Transaction, Account
logger = get_logger(__name__)
@@ -284,15 +284,45 @@ class ChainSync:
)
session.add(block)
# Import transactions if provided
# Import transactions if provided and apply state changes
if transactions:
for tx_data in transactions:
sender_addr = tx_data.get("sender", "")
payload = tx_data.get("payload", {}) or {}
recipient_addr = payload.get("to") or tx_data.get("recipient", "")
value = int(payload.get("value", 0) or 0)
fee = int(tx_data.get("fee", 0) or 0)
tx_hash = tx_data.get("tx_hash", "")
# Upsert sender/recipient accounts
sender_acct = session.get(Account, (self._chain_id, sender_addr))
if sender_acct is None:
sender_acct = Account(chain_id=self._chain_id, address=sender_addr, balance=0, nonce=0)
session.add(sender_acct)
session.flush()
recipient_acct = session.get(Account, (self._chain_id, recipient_addr))
if recipient_acct is None:
recipient_acct = Account(chain_id=self._chain_id, address=recipient_addr, balance=0, nonce=0)
session.add(recipient_acct)
session.flush()
# Apply balances/nonce; assume block validity already verified on producer
total_cost = value + fee
sender_acct.balance -= total_cost
tx_nonce = tx_data.get("nonce")
if tx_nonce is not None:
sender_acct.nonce = max(sender_acct.nonce, int(tx_nonce) + 1)
else:
sender_acct.nonce += 1
recipient_acct.balance += value
tx = Transaction(
chain_id=self._chain_id,
tx_hash=tx_data.get("tx_hash", ""),
tx_hash=tx_hash,
block_height=block_data["height"],
sender=tx_data.get("sender", ""),
recipient=tx_data.get("recipient", ""),
sender=sender_addr,
recipient=recipient_addr,
payload=tx_data,
)
session.add(tx)

View File

@@ -0,0 +1,5 @@
from aitbc_chain.config import settings
from aitbc_chain.mempool import init_mempool, get_mempool
init_mempool(backend=settings.mempool_backend, db_path=str(settings.db_path.parent / "mempool.db"), max_size=settings.mempool_max_size, min_fee=settings.min_fee)
pool = get_mempool()
print(pool.__class__.__name__)

View File

@@ -0,0 +1,3 @@
from aitbc_chain.config import settings
import sys
print(settings.db_path.parent / "mempool.db")

View File

@@ -0,0 +1,6 @@
from aitbc_chain.database import session_scope
from aitbc_chain.models import Account
with session_scope() as session:
acc = session.get(Account, ("ait-mainnet", "aitbc1genesis"))
print(acc.address, acc.balance)