refactor: improve gossip broker cleanup and enhance CLI wallet transaction handling
Some checks failed
AITBC CI/CD Pipeline / lint-and-test (3.11) (push) Has been cancelled
AITBC CI/CD Pipeline / lint-and-test (3.12) (push) Has been cancelled
AITBC CI/CD Pipeline / lint-and-test (3.13) (push) Has been cancelled
AITBC CI/CD Pipeline / test-cli (push) Has been cancelled
AITBC CI/CD Pipeline / test-services (push) Has been cancelled
AITBC CI/CD Pipeline / test-production-services (push) Has been cancelled
AITBC CI/CD Pipeline / security-scan (push) Has been cancelled
AITBC CI/CD Pipeline / build (push) Has been cancelled
AITBC CI/CD Pipeline / deploy-staging (push) Has been cancelled
AITBC CI/CD Pipeline / deploy-production (push) Has been cancelled
AITBC CI/CD Pipeline / performance-test (push) Has been cancelled
AITBC CI/CD Pipeline / docs (push) Has been cancelled
AITBC CI/CD Pipeline / release (push) Has been cancelled
AITBC CI/CD Pipeline / notify (push) Has been cancelled
Security Scanning / Bandit Security Scan (apps/coordinator-api/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (cli/aitbc_cli) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-core/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-crypto/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-sdk/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (tests) (push) Has been cancelled
Security Scanning / CodeQL Security Analysis (javascript) (push) Has been cancelled
Security Scanning / CodeQL Security Analysis (python) (push) Has been cancelled
Security Scanning / Dependency Security Scan (push) Has been cancelled
Security Scanning / Container Security Scan (push) Has been cancelled
Security Scanning / OSSF Scorecard (push) Has been cancelled
Security Scanning / Security Summary Report (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.11) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.12) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.13) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-summary (push) Has been cancelled

- Refactor _InProcessBroadcast.subscribe to use asynccontextmanager for cleaner resource cleanup
- Remove manual release callback in _InProcessSubscriber
- Update file-based wallet send to submit transactions via blockchain RPC
- Fetch balance and nonce from chain before sending transactions
- Add tx_hash and status tracking to local transaction history
- Enhance run_subprocess to support additional kwargs and return Comp
This commit is contained in:
2026-03-23 14:06:17 +01:00
parent 5dccaffbf9
commit bbe67239a1
9 changed files with 496 additions and 44 deletions

View File

@@ -0,0 +1,50 @@
import sqlite3
def fix_db():
print("Fixing transaction table on aitbc node...")
conn = sqlite3.connect('/opt/aitbc/data/ait-mainnet/chain.db')
cursor = conn.cursor()
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN nonce INTEGER DEFAULT 0;')
print("Added nonce column")
except sqlite3.OperationalError as e:
print(f"Error adding nonce: {e}")
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN value INTEGER DEFAULT 0;')
print("Added value column")
except sqlite3.OperationalError as e:
print(f"Error adding value: {e}")
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN fee INTEGER DEFAULT 0;')
print("Added fee column")
except sqlite3.OperationalError as e:
print(f"Error adding fee: {e}")
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN status TEXT DEFAULT "pending";')
print("Added status column")
except sqlite3.OperationalError as e:
print(f"Error adding status: {e}")
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN tx_metadata TEXT;')
print("Added tx_metadata column")
except sqlite3.OperationalError as e:
print(f"Error adding tx_metadata: {e}")
try:
cursor.execute('ALTER TABLE "transaction" ADD COLUMN timestamp TEXT;')
print("Added timestamp column")
except sqlite3.OperationalError as e:
print(f"Error adding timestamp: {e}")
conn.commit()
conn.close()
print("Done fixing transaction table.")
if __name__ == '__main__':
fix_db()

View File

@@ -4,7 +4,7 @@ import asyncio
import json import json
import warnings import warnings
from collections import defaultdict from collections import defaultdict
from contextlib import suppress from contextlib import suppress, asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Set from typing import Any, Callable, Dict, List, Optional, Set
@@ -225,25 +225,15 @@ class GossipBroker:
class _InProcessSubscriber: class _InProcessSubscriber:
def __init__(self, queue: "asyncio.Queue[Any]", release: Callable[[], None]): def __init__(self, queue: "asyncio.Queue[Any]"):
self._queue = queue self._queue = queue
self._release = release
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
self._release()
def __aiter__(self): # type: ignore[override] def __aiter__(self): # type: ignore[override]
return self._iterator() return self._iterator()
async def _iterator(self): async def _iterator(self):
try: while True:
while True: yield await self._queue.get()
yield await self._queue.get()
finally:
self._release()
class _InProcessBroadcast: class _InProcessBroadcast:
@@ -262,23 +252,19 @@ class _InProcessBroadcast:
self._topics.clear() self._topics.clear()
self._running = False self._running = False
async def subscribe(self, topic: str) -> _InProcessSubscriber: @asynccontextmanager
async def subscribe(self, topic: str):
queue: "asyncio.Queue[Any]" = asyncio.Queue() queue: "asyncio.Queue[Any]" = asyncio.Queue()
async with self._lock: async with self._lock:
self._topics[topic].append(queue) self._topics[topic].append(queue)
def release() -> None: try:
async def _remove() -> None: yield _InProcessSubscriber(queue)
async with self._lock: finally:
queues = self._topics.get(topic) async with self._lock:
if queues and queue in queues: queues = self._topics.get(topic)
queues.remove(queue) if queues and queue in queues:
if not queues: queues.remove(queue)
self._topics.pop(topic, None)
asyncio.create_task(_remove())
return _InProcessSubscriber(queue, release)
async def publish(self, topic: str, message: Any) -> None: async def publish(self, topic: str, message: Any) -> None:
if not self._running: if not self._running:

View File

@@ -0,0 +1,307 @@
from __future__ import annotations
import asyncio
import json
import warnings
from collections import defaultdict
from contextlib import suppress
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Set
warnings.filterwarnings("ignore", message="coroutine.* was never awaited", category=RuntimeWarning)
try:
from broadcaster import Broadcast
except ImportError: # pragma: no cover
Broadcast = None # type: ignore[assignment]
from ..metrics import metrics_registry
def _increment_publication(metric_prefix: str, topic: str) -> None:
metrics_registry.increment(f"{metric_prefix}_total")
metrics_registry.increment(f"{metric_prefix}_topic_{topic}")
def _set_queue_gauge(topic: str, size: int) -> None:
metrics_registry.set_gauge(f"gossip_queue_size_{topic}", float(size))
def _update_subscriber_metrics(topics: Dict[str, List["asyncio.Queue[Any]"]]) -> None:
for topic, queues in topics.items():
metrics_registry.set_gauge(f"gossip_subscribers_topic_{topic}", float(len(queues)))
total = sum(len(queues) for queues in topics.values())
metrics_registry.set_gauge("gossip_subscribers_total", float(total))
def _clear_topic_metrics(topic: str) -> None:
metrics_registry.set_gauge(f"gossip_subscribers_topic_{topic}", 0.0)
_set_queue_gauge(topic, 0)
@dataclass
class TopicSubscription:
topic: str
queue: "asyncio.Queue[Any]"
_unsubscribe: Callable[[], None]
def close(self) -> None:
self._unsubscribe()
async def get(self) -> Any:
return await self.queue.get()
async def __aiter__(self): # type: ignore[override]
try:
while True:
yield await self.queue.get()
finally:
self.close()
class GossipBackend:
async def start(self) -> None: # pragma: no cover - overridden as needed
return None
async def publish(self, topic: str, message: Any) -> None:
raise NotImplementedError
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
raise NotImplementedError
async def shutdown(self) -> None:
return None
class InMemoryGossipBackend(GossipBackend):
def __init__(self) -> None:
self._topics: Dict[str, List["asyncio.Queue[Any]"]] = defaultdict(list)
self._lock = asyncio.Lock()
async def publish(self, topic: str, message: Any) -> None:
async with self._lock:
queues = list(self._topics.get(topic, []))
for queue in queues:
await queue.put(message)
_set_queue_gauge(topic, queue.qsize())
_increment_publication("gossip_publications", topic)
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
queue: "asyncio.Queue[Any]" = asyncio.Queue(maxsize=max_queue_size)
async with self._lock:
self._topics[topic].append(queue)
_update_subscriber_metrics(self._topics)
_set_queue_gauge(topic, queue.qsize())
def _unsubscribe() -> None:
async def _remove() -> None:
async with self._lock:
queues = self._topics.get(topic)
if queues is None:
return
if queue in queues:
queues.remove(queue)
if not queues:
self._topics.pop(topic, None)
_clear_topic_metrics(topic)
_update_subscriber_metrics(self._topics)
asyncio.create_task(_remove())
return TopicSubscription(topic=topic, queue=queue, _unsubscribe=_unsubscribe)
async def shutdown(self) -> None:
async with self._lock:
topics = list(self._topics.keys())
self._topics.clear()
for topic in topics:
_clear_topic_metrics(topic)
_update_subscriber_metrics(self._topics)
class BroadcastGossipBackend(GossipBackend):
def __init__(self, url: str) -> None:
if Broadcast is None: # provide in-process fallback when Broadcast is missing
self._broadcast = _InProcessBroadcast()
else:
self._broadcast = Broadcast(url) # type: ignore[arg-type]
self._tasks: Set[asyncio.Task[None]] = set()
self._lock = asyncio.Lock()
self._running = False
async def start(self) -> None:
if not self._running:
await self._broadcast.connect() # type: ignore[union-attr]
self._running = True
async def publish(self, topic: str, message: Any) -> None:
if not self._running:
raise RuntimeError("Broadcast backend not started")
payload = _encode_message(message)
await self._broadcast.publish(topic, payload) # type: ignore[union-attr]
_increment_publication("gossip_broadcast_publications", topic)
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
if not self._running:
raise RuntimeError("Broadcast backend not started")
queue: "asyncio.Queue[Any]" = asyncio.Queue(maxsize=max_queue_size)
stop_event = asyncio.Event()
async def _run_subscription() -> None:
async with self._broadcast.subscribe(topic) as subscriber: # type: ignore[attr-defined,union-attr]
async for event in subscriber: # type: ignore[union-attr]
if stop_event.is_set():
break
data = _decode_message(getattr(event, "message", event))
try:
await queue.put(data)
_set_queue_gauge(topic, queue.qsize())
except asyncio.CancelledError:
break
task = asyncio.create_task(_run_subscription(), name=f"broadcast-sub:{topic}")
async with self._lock:
self._tasks.add(task)
metrics_registry.set_gauge("gossip_broadcast_subscribers_total", float(len(self._tasks)))
def _unsubscribe() -> None:
async def _stop() -> None:
stop_event.set()
task.cancel()
with suppress(asyncio.CancelledError):
await task
async with self._lock:
self._tasks.discard(task)
metrics_registry.set_gauge("gossip_broadcast_subscribers_total", float(len(self._tasks)))
asyncio.create_task(_stop())
return TopicSubscription(topic=topic, queue=queue, _unsubscribe=_unsubscribe)
async def shutdown(self) -> None:
async with self._lock:
tasks = list(self._tasks)
self._tasks.clear()
metrics_registry.set_gauge("gossip_broadcast_subscribers_total", 0.0)
for task in tasks:
task.cancel()
with suppress(asyncio.CancelledError):
await task
if self._running:
await self._broadcast.disconnect() # type: ignore[union-attr]
self._running = False
class GossipBroker:
def __init__(self, backend: GossipBackend) -> None:
self._backend = backend
self._lock = asyncio.Lock()
self._started = False
async def publish(self, topic: str, message: Any) -> None:
if not self._started:
await self._backend.start()
self._started = True
await self._backend.publish(topic, message)
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
if not self._started:
await self._backend.start()
self._started = True
return await self._backend.subscribe(topic, max_queue_size=max_queue_size)
async def set_backend(self, backend: GossipBackend) -> None:
await backend.start()
async with self._lock:
previous = self._backend
self._backend = backend
self._started = True
await previous.shutdown()
async def shutdown(self) -> None:
await self._backend.shutdown()
class _InProcessSubscriber:
def __init__(self, queue: "asyncio.Queue[Any]"):
self._queue = queue
def __aiter__(self): # type: ignore[override]
return self._iterator()
async def _iterator(self):
while True:
yield await self._queue.get()
class _InProcessBroadcast:
"""Minimal in-memory broadcast substitute for tests when Starlette Broadcast is absent."""
def __init__(self) -> None:
self._topics: Dict[str, List["asyncio.Queue[Any]"]] = defaultdict(list)
self._lock = asyncio.Lock()
self._running = False
async def connect(self) -> None:
self._running = True
async def disconnect(self) -> None:
async with self._lock:
self._topics.clear()
self._running = False
@asynccontextmanager
async def subscribe(self, topic: str):
queue: "asyncio.Queue[Any]" = asyncio.Queue()
async with self._lock:
self._topics[topic].append(queue)
try:
yield _InProcessSubscriber(queue)
finally:
async with self._lock:
queues = self._topics.get(topic)
if queues and queue in queues:
queues.remove(queue)
async def publish(self, topic: str, message: Any) -> None:
if not self._running:
raise RuntimeError("Broadcast backend not started")
async with self._lock:
queues = list(self._topics.get(topic, []))
for queue in queues:
await queue.put(message)
def create_backend(backend_type: str, *, broadcast_url: Optional[str] = None) -> GossipBackend:
backend = backend_type.lower()
if backend in {"memory", "inmemory", "local"}:
return InMemoryGossipBackend()
if backend in {"broadcast", "starlette", "redis"}:
if not broadcast_url:
raise ValueError("Broadcast backend requires a gossip_broadcast_url setting")
return BroadcastGossipBackend(broadcast_url)
raise ValueError(f"Unsupported gossip backend '{backend_type}'")
def _encode_message(message: Any) -> Any:
if isinstance(message, (str, bytes, bytearray)):
return message
return json.dumps(message, separators=(",", ":"))
def _decode_message(message: Any) -> Any:
if isinstance(message, (bytes, bytearray)):
message = message.decode("utf-8")
if isinstance(message, str):
try:
return json.loads(message)
except json.JSONDecodeError:
return message
return message
gossip_broker = GossipBroker(InMemoryGossipBackend())

View File

@@ -290,8 +290,11 @@ class DualModeWalletAdapter:
def _send_transaction_file(self, wallet_name: str, password: str, to_address: str, def _send_transaction_file(self, wallet_name: str, password: str, to_address: str,
amount: float, description: Optional[str]) -> Dict[str, Any]: amount: float, description: Optional[str]) -> Dict[str, Any]:
"""Send transaction using file-based storage""" """Send transaction using file-based storage and blockchain RPC"""
from .commands.wallet import _load_wallet, _save_wallet from .commands.wallet import _load_wallet, _save_wallet
import httpx
from .utils import error, success
from datetime import datetime
wallet_path = self.wallet_dir / f"{wallet_name}.json" wallet_path = self.wallet_dir / f"{wallet_name}.json"
@@ -300,36 +303,79 @@ class DualModeWalletAdapter:
raise Exception("Wallet not found") raise Exception("Wallet not found")
wallet_data = _load_wallet(wallet_path, wallet_name) wallet_data = _load_wallet(wallet_path, wallet_name)
balance = wallet_data.get("balance", 0) # Fetch current balance and nonce from blockchain
from_address = wallet_data.get("address")
if balance < amount: if not from_address:
error(f"Insufficient balance. Available: {balance}, Required: {amount}") error("Wallet does not have an address configured")
raise Exception("Invalid wallet")
rpc_url = self.config.blockchain_rpc_url
try:
resp = httpx.get(f"{rpc_url}/rpc/getBalance/{from_address}?chain_id=ait-mainnet", timeout=5)
if resp.status_code == 200:
data = resp.json()
chain_balance = data.get("balance", 0)
nonce = data.get("nonce", 0)
else:
error(f"Failed to get balance from chain: {resp.text}")
raise Exception("Chain error")
except Exception as e:
error(f"Failed to connect to blockchain RPC: {e}")
raise
if chain_balance < amount:
error(f"Insufficient blockchain balance. Available: {chain_balance}, Required: {amount}")
raise Exception("Insufficient balance") raise Exception("Insufficient balance")
# Construct and send transaction
tx_payload = {
"type": "TRANSFER",
"sender": from_address,
"nonce": nonce,
"fee": 0,
"payload": {"to": to_address, "value": amount},
"sig": "mock_signature" # Replace with real signature when implemented
}
# Add transaction try:
resp = httpx.post(f"{rpc_url}/rpc/sendTx", json=tx_payload, timeout=5)
if resp.status_code not in (200, 201):
error(f"Failed to submit transaction to chain: {resp.text}")
raise Exception("Chain submission failed")
tx_hash = resp.json().get("tx_hash")
except Exception as e:
error(f"Failed to send transaction to RPC: {e}")
raise
# Add transaction to local history
transaction = { transaction = {
"type": "send", "type": "send",
"amount": -amount, "amount": -amount,
"to_address": to_address, "to_address": to_address,
"description": description or "", "description": description or "",
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"tx_hash": tx_hash,
"status": "pending"
} }
if "transactions" not in wallet_data:
wallet_data["transactions"] = []
wallet_data["transactions"].append(transaction) wallet_data["transactions"].append(transaction)
wallet_data["balance"] = balance - amount wallet_data["balance"] = chain_balance - amount
# Save wallet # Save wallet
save_password = password if wallet_data.get("encrypted") else None save_password = password if wallet_data.get("encrypted") else None
_save_wallet(wallet_path, wallet_data, save_password) _save_wallet(wallet_path, wallet_data, save_password)
success(f"Sent {amount} AITBC to {to_address}") success(f"Submitted transaction {tx_hash} to send {amount} AITBC to {to_address}")
return { return {
"mode": "file", "mode": "file",
"wallet_name": wallet_name, "wallet_name": wallet_name,
"to_address": to_address, "to_address": to_address,
"amount": amount, "amount": amount,
"description": description, "description": description,
"new_balance": wallet_data["balance"], "tx_hash": tx_hash,
"timestamp": transaction["timestamp"] "timestamp": transaction["timestamp"]
} }

View File

@@ -1,29 +1,29 @@
import subprocess import subprocess
import sys import sys
from typing import List, Optional, Union from typing import List, Optional, Union, Any
from . import error, output from . import error, output
def run_subprocess(cmd: List[str], check: bool = True, capture_output: bool = True, shell: bool = False) -> Optional[str]: def run_subprocess(cmd: List[str], check: bool = True, capture_output: bool = True, shell: bool = False, **kwargs: Any) -> Optional[Union[str, subprocess.CompletedProcess]]:
"""Run a subprocess command safely with logging""" """Run a subprocess command safely with logging"""
try: try:
if shell: if shell:
# When shell=True, cmd should be a string # When shell=True, cmd should be a string
cmd_str = " ".join(cmd) if isinstance(cmd, list) else cmd cmd_str = " ".join(cmd) if isinstance(cmd, list) else cmd
result = subprocess.run(cmd_str, shell=True, check=check, capture_output=capture_output, text=True) result = subprocess.run(cmd_str, shell=True, check=check, capture_output=capture_output, text=True, **kwargs)
else: else:
result = subprocess.run(cmd, check=check, capture_output=capture_output, text=True) result = subprocess.run(cmd, check=check, capture_output=capture_output, text=True, **kwargs)
if capture_output: if capture_output:
return result.stdout.strip() return result.stdout.strip()
return None return result
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
error(f"Command failed with exit code {e.returncode}") error(f"Command failed with exit code {e.returncode}")
if capture_output and e.stderr: if capture_output and getattr(e, 'stderr', None):
print(e.stderr, file=sys.stderr) print(e.stderr, file=sys.stderr)
if check: if check:
sys.exit(e.returncode) sys.exit(e.returncode)
return None return getattr(e, 'stdout', None) if capture_output else None
except Exception as e: except Exception as e:
error(f"Failed to execute command: {e}") error(f"Failed to execute command: {e}")
if check: if check:

9
fix_gossip.patch Normal file
View File

@@ -0,0 +1,9 @@
--- /opt/aitbc/apps/blockchain-node/src/aitbc_chain/gossip/broker.py 2026-03-23 12:40:00.000000000 +0100
+++ /opt/aitbc/apps/blockchain-node/src/aitbc_chain/gossip/broker.py.new 2026-03-23 12:40:00.000000000 +0100
@@ -6,7 +6,7 @@
import warnings
from collections import defaultdict
-from contextlib import suppress
+from contextlib import suppress, asynccontextmanager
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Set

30
fix_inprocess.py Normal file
View File

@@ -0,0 +1,30 @@
import asyncio
from contextlib import asynccontextmanager
from typing import Any
class _InProcessSubscriber:
def __init__(self, queue, release):
self._queue = queue
self._release = release
def __aiter__(self):
return self._iterator()
async def _iterator(self):
try:
while True:
yield await self._queue.get()
finally:
pass
@asynccontextmanager
async def subscribe():
queue = asyncio.Queue()
try:
yield _InProcessSubscriber(queue, lambda: None)
finally:
pass
async def main():
async with subscribe() as sub:
print("Success")
asyncio.run(main())

2
test_send.sh Executable file
View File

@@ -0,0 +1,2 @@
export AITBC_WALLET="test_wallet"
aitbc wallet send aitbc1my-test-wallet_hd 50

22
test_wallet.py Normal file
View File

@@ -0,0 +1,22 @@
import json
wallet_data = {
"name": "test_wallet",
"type": "hd",
"address": "aitbc1genesis",
"private_key": "dummy",
"public_key": "dummy",
"encrypted": False,
"transactions": [],
"balance": 1000000
}
import os
import pathlib
wallet_dir = pathlib.Path("/root/.aitbc/wallets")
wallet_dir.mkdir(parents=True, exist_ok=True)
wallet_path = wallet_dir / "test_wallet.json"
with open(wallet_path, "w") as f:
json.dump(wallet_data, f)