refactor: improve gossip broker cleanup and enhance CLI wallet transaction handling
Some checks failed
AITBC CI/CD Pipeline / lint-and-test (3.11) (push) Has been cancelled
AITBC CI/CD Pipeline / lint-and-test (3.12) (push) Has been cancelled
AITBC CI/CD Pipeline / lint-and-test (3.13) (push) Has been cancelled
AITBC CI/CD Pipeline / test-cli (push) Has been cancelled
AITBC CI/CD Pipeline / test-services (push) Has been cancelled
AITBC CI/CD Pipeline / test-production-services (push) Has been cancelled
AITBC CI/CD Pipeline / security-scan (push) Has been cancelled
AITBC CI/CD Pipeline / build (push) Has been cancelled
AITBC CI/CD Pipeline / deploy-staging (push) Has been cancelled
AITBC CI/CD Pipeline / deploy-production (push) Has been cancelled
AITBC CI/CD Pipeline / performance-test (push) Has been cancelled
AITBC CI/CD Pipeline / docs (push) Has been cancelled
AITBC CI/CD Pipeline / release (push) Has been cancelled
AITBC CI/CD Pipeline / notify (push) Has been cancelled
Security Scanning / Bandit Security Scan (apps/coordinator-api/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (cli/aitbc_cli) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-core/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-crypto/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-sdk/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (tests) (push) Has been cancelled
Security Scanning / CodeQL Security Analysis (javascript) (push) Has been cancelled
Security Scanning / CodeQL Security Analysis (python) (push) Has been cancelled
Security Scanning / Dependency Security Scan (push) Has been cancelled
Security Scanning / Container Security Scan (push) Has been cancelled
Security Scanning / OSSF Scorecard (push) Has been cancelled
Security Scanning / Security Summary Report (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.11) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.12) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.13) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-summary (push) Has been cancelled
Some checks failed
AITBC CI/CD Pipeline / lint-and-test (3.11) (push) Has been cancelled
AITBC CI/CD Pipeline / lint-and-test (3.12) (push) Has been cancelled
AITBC CI/CD Pipeline / lint-and-test (3.13) (push) Has been cancelled
AITBC CI/CD Pipeline / test-cli (push) Has been cancelled
AITBC CI/CD Pipeline / test-services (push) Has been cancelled
AITBC CI/CD Pipeline / test-production-services (push) Has been cancelled
AITBC CI/CD Pipeline / security-scan (push) Has been cancelled
AITBC CI/CD Pipeline / build (push) Has been cancelled
AITBC CI/CD Pipeline / deploy-staging (push) Has been cancelled
AITBC CI/CD Pipeline / deploy-production (push) Has been cancelled
AITBC CI/CD Pipeline / performance-test (push) Has been cancelled
AITBC CI/CD Pipeline / docs (push) Has been cancelled
AITBC CI/CD Pipeline / release (push) Has been cancelled
AITBC CI/CD Pipeline / notify (push) Has been cancelled
Security Scanning / Bandit Security Scan (apps/coordinator-api/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (cli/aitbc_cli) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-core/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-crypto/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-sdk/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (tests) (push) Has been cancelled
Security Scanning / CodeQL Security Analysis (javascript) (push) Has been cancelled
Security Scanning / CodeQL Security Analysis (python) (push) Has been cancelled
Security Scanning / Dependency Security Scan (push) Has been cancelled
Security Scanning / Container Security Scan (push) Has been cancelled
Security Scanning / OSSF Scorecard (push) Has been cancelled
Security Scanning / Security Summary Report (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.11) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.12) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.13) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-summary (push) Has been cancelled
- Refactor _InProcessBroadcast.subscribe to use asynccontextmanager for cleaner resource cleanup - Remove manual release callback in _InProcessSubscriber - Update file-based wallet send to submit transactions via blockchain RPC - Fetch balance and nonce from chain before sending transactions - Add tx_hash and status tracking to local transaction history - Enhance run_subprocess to support additional kwargs and return Comp
This commit is contained in:
50
apps/blockchain-node/fix_tx_metadata2.py
Normal file
50
apps/blockchain-node/fix_tx_metadata2.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
import sqlite3
|
||||||
|
|
||||||
|
def fix_db():
|
||||||
|
print("Fixing transaction table on aitbc node...")
|
||||||
|
|
||||||
|
conn = sqlite3.connect('/opt/aitbc/data/ait-mainnet/chain.db')
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor.execute('ALTER TABLE "transaction" ADD COLUMN nonce INTEGER DEFAULT 0;')
|
||||||
|
print("Added nonce column")
|
||||||
|
except sqlite3.OperationalError as e:
|
||||||
|
print(f"Error adding nonce: {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor.execute('ALTER TABLE "transaction" ADD COLUMN value INTEGER DEFAULT 0;')
|
||||||
|
print("Added value column")
|
||||||
|
except sqlite3.OperationalError as e:
|
||||||
|
print(f"Error adding value: {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor.execute('ALTER TABLE "transaction" ADD COLUMN fee INTEGER DEFAULT 0;')
|
||||||
|
print("Added fee column")
|
||||||
|
except sqlite3.OperationalError as e:
|
||||||
|
print(f"Error adding fee: {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor.execute('ALTER TABLE "transaction" ADD COLUMN status TEXT DEFAULT "pending";')
|
||||||
|
print("Added status column")
|
||||||
|
except sqlite3.OperationalError as e:
|
||||||
|
print(f"Error adding status: {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor.execute('ALTER TABLE "transaction" ADD COLUMN tx_metadata TEXT;')
|
||||||
|
print("Added tx_metadata column")
|
||||||
|
except sqlite3.OperationalError as e:
|
||||||
|
print(f"Error adding tx_metadata: {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor.execute('ALTER TABLE "transaction" ADD COLUMN timestamp TEXT;')
|
||||||
|
print("Added timestamp column")
|
||||||
|
except sqlite3.OperationalError as e:
|
||||||
|
print(f"Error adding timestamp: {e}")
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
print("Done fixing transaction table.")
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
fix_db()
|
||||||
@@ -4,7 +4,7 @@ import asyncio
|
|||||||
import json
|
import json
|
||||||
import warnings
|
import warnings
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from contextlib import suppress
|
from contextlib import suppress, asynccontextmanager
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any, Callable, Dict, List, Optional, Set
|
from typing import Any, Callable, Dict, List, Optional, Set
|
||||||
|
|
||||||
@@ -225,25 +225,15 @@ class GossipBroker:
|
|||||||
|
|
||||||
|
|
||||||
class _InProcessSubscriber:
|
class _InProcessSubscriber:
|
||||||
def __init__(self, queue: "asyncio.Queue[Any]", release: Callable[[], None]):
|
def __init__(self, queue: "asyncio.Queue[Any]"):
|
||||||
self._queue = queue
|
self._queue = queue
|
||||||
self._release = release
|
|
||||||
|
|
||||||
async def __aenter__(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
async def __aexit__(self, exc_type, exc, tb):
|
|
||||||
self._release()
|
|
||||||
|
|
||||||
def __aiter__(self): # type: ignore[override]
|
def __aiter__(self): # type: ignore[override]
|
||||||
return self._iterator()
|
return self._iterator()
|
||||||
|
|
||||||
async def _iterator(self):
|
async def _iterator(self):
|
||||||
try:
|
while True:
|
||||||
while True:
|
yield await self._queue.get()
|
||||||
yield await self._queue.get()
|
|
||||||
finally:
|
|
||||||
self._release()
|
|
||||||
|
|
||||||
|
|
||||||
class _InProcessBroadcast:
|
class _InProcessBroadcast:
|
||||||
@@ -262,23 +252,19 @@ class _InProcessBroadcast:
|
|||||||
self._topics.clear()
|
self._topics.clear()
|
||||||
self._running = False
|
self._running = False
|
||||||
|
|
||||||
async def subscribe(self, topic: str) -> _InProcessSubscriber:
|
@asynccontextmanager
|
||||||
|
async def subscribe(self, topic: str):
|
||||||
queue: "asyncio.Queue[Any]" = asyncio.Queue()
|
queue: "asyncio.Queue[Any]" = asyncio.Queue()
|
||||||
async with self._lock:
|
async with self._lock:
|
||||||
self._topics[topic].append(queue)
|
self._topics[topic].append(queue)
|
||||||
|
|
||||||
def release() -> None:
|
try:
|
||||||
async def _remove() -> None:
|
yield _InProcessSubscriber(queue)
|
||||||
async with self._lock:
|
finally:
|
||||||
queues = self._topics.get(topic)
|
async with self._lock:
|
||||||
if queues and queue in queues:
|
queues = self._topics.get(topic)
|
||||||
queues.remove(queue)
|
if queues and queue in queues:
|
||||||
if not queues:
|
queues.remove(queue)
|
||||||
self._topics.pop(topic, None)
|
|
||||||
|
|
||||||
asyncio.create_task(_remove())
|
|
||||||
|
|
||||||
return _InProcessSubscriber(queue, release)
|
|
||||||
|
|
||||||
async def publish(self, topic: str, message: Any) -> None:
|
async def publish(self, topic: str, message: Any) -> None:
|
||||||
if not self._running:
|
if not self._running:
|
||||||
|
|||||||
307
apps/blockchain-node/src/aitbc_chain/gossip/broker.py.orig
Executable file
307
apps/blockchain-node/src/aitbc_chain/gossip/broker.py.orig
Executable file
@@ -0,0 +1,307 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import warnings
|
||||||
|
from collections import defaultdict
|
||||||
|
from contextlib import suppress
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Callable, Dict, List, Optional, Set
|
||||||
|
|
||||||
|
warnings.filterwarnings("ignore", message="coroutine.* was never awaited", category=RuntimeWarning)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from broadcaster import Broadcast
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
Broadcast = None # type: ignore[assignment]
|
||||||
|
|
||||||
|
from ..metrics import metrics_registry
|
||||||
|
|
||||||
|
|
||||||
|
def _increment_publication(metric_prefix: str, topic: str) -> None:
|
||||||
|
metrics_registry.increment(f"{metric_prefix}_total")
|
||||||
|
metrics_registry.increment(f"{metric_prefix}_topic_{topic}")
|
||||||
|
|
||||||
|
|
||||||
|
def _set_queue_gauge(topic: str, size: int) -> None:
|
||||||
|
metrics_registry.set_gauge(f"gossip_queue_size_{topic}", float(size))
|
||||||
|
|
||||||
|
|
||||||
|
def _update_subscriber_metrics(topics: Dict[str, List["asyncio.Queue[Any]"]]) -> None:
|
||||||
|
for topic, queues in topics.items():
|
||||||
|
metrics_registry.set_gauge(f"gossip_subscribers_topic_{topic}", float(len(queues)))
|
||||||
|
total = sum(len(queues) for queues in topics.values())
|
||||||
|
metrics_registry.set_gauge("gossip_subscribers_total", float(total))
|
||||||
|
|
||||||
|
|
||||||
|
def _clear_topic_metrics(topic: str) -> None:
|
||||||
|
metrics_registry.set_gauge(f"gossip_subscribers_topic_{topic}", 0.0)
|
||||||
|
_set_queue_gauge(topic, 0)
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TopicSubscription:
|
||||||
|
topic: str
|
||||||
|
queue: "asyncio.Queue[Any]"
|
||||||
|
_unsubscribe: Callable[[], None]
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
self._unsubscribe()
|
||||||
|
|
||||||
|
async def get(self) -> Any:
|
||||||
|
return await self.queue.get()
|
||||||
|
|
||||||
|
async def __aiter__(self): # type: ignore[override]
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
yield await self.queue.get()
|
||||||
|
finally:
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
|
||||||
|
class GossipBackend:
|
||||||
|
async def start(self) -> None: # pragma: no cover - overridden as needed
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def publish(self, topic: str, message: Any) -> None:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def shutdown(self) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class InMemoryGossipBackend(GossipBackend):
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._topics: Dict[str, List["asyncio.Queue[Any]"]] = defaultdict(list)
|
||||||
|
self._lock = asyncio.Lock()
|
||||||
|
|
||||||
|
async def publish(self, topic: str, message: Any) -> None:
|
||||||
|
async with self._lock:
|
||||||
|
queues = list(self._topics.get(topic, []))
|
||||||
|
for queue in queues:
|
||||||
|
await queue.put(message)
|
||||||
|
_set_queue_gauge(topic, queue.qsize())
|
||||||
|
_increment_publication("gossip_publications", topic)
|
||||||
|
|
||||||
|
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
|
||||||
|
queue: "asyncio.Queue[Any]" = asyncio.Queue(maxsize=max_queue_size)
|
||||||
|
|
||||||
|
async with self._lock:
|
||||||
|
self._topics[topic].append(queue)
|
||||||
|
_update_subscriber_metrics(self._topics)
|
||||||
|
|
||||||
|
_set_queue_gauge(topic, queue.qsize())
|
||||||
|
|
||||||
|
def _unsubscribe() -> None:
|
||||||
|
async def _remove() -> None:
|
||||||
|
async with self._lock:
|
||||||
|
queues = self._topics.get(topic)
|
||||||
|
if queues is None:
|
||||||
|
return
|
||||||
|
if queue in queues:
|
||||||
|
queues.remove(queue)
|
||||||
|
if not queues:
|
||||||
|
self._topics.pop(topic, None)
|
||||||
|
_clear_topic_metrics(topic)
|
||||||
|
_update_subscriber_metrics(self._topics)
|
||||||
|
|
||||||
|
asyncio.create_task(_remove())
|
||||||
|
|
||||||
|
return TopicSubscription(topic=topic, queue=queue, _unsubscribe=_unsubscribe)
|
||||||
|
|
||||||
|
async def shutdown(self) -> None:
|
||||||
|
async with self._lock:
|
||||||
|
topics = list(self._topics.keys())
|
||||||
|
self._topics.clear()
|
||||||
|
for topic in topics:
|
||||||
|
_clear_topic_metrics(topic)
|
||||||
|
_update_subscriber_metrics(self._topics)
|
||||||
|
|
||||||
|
|
||||||
|
class BroadcastGossipBackend(GossipBackend):
|
||||||
|
def __init__(self, url: str) -> None:
|
||||||
|
if Broadcast is None: # provide in-process fallback when Broadcast is missing
|
||||||
|
self._broadcast = _InProcessBroadcast()
|
||||||
|
else:
|
||||||
|
self._broadcast = Broadcast(url) # type: ignore[arg-type]
|
||||||
|
self._tasks: Set[asyncio.Task[None]] = set()
|
||||||
|
self._lock = asyncio.Lock()
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
if not self._running:
|
||||||
|
await self._broadcast.connect() # type: ignore[union-attr]
|
||||||
|
self._running = True
|
||||||
|
|
||||||
|
async def publish(self, topic: str, message: Any) -> None:
|
||||||
|
if not self._running:
|
||||||
|
raise RuntimeError("Broadcast backend not started")
|
||||||
|
payload = _encode_message(message)
|
||||||
|
await self._broadcast.publish(topic, payload) # type: ignore[union-attr]
|
||||||
|
_increment_publication("gossip_broadcast_publications", topic)
|
||||||
|
|
||||||
|
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
|
||||||
|
if not self._running:
|
||||||
|
raise RuntimeError("Broadcast backend not started")
|
||||||
|
|
||||||
|
queue: "asyncio.Queue[Any]" = asyncio.Queue(maxsize=max_queue_size)
|
||||||
|
stop_event = asyncio.Event()
|
||||||
|
|
||||||
|
async def _run_subscription() -> None:
|
||||||
|
async with self._broadcast.subscribe(topic) as subscriber: # type: ignore[attr-defined,union-attr]
|
||||||
|
async for event in subscriber: # type: ignore[union-attr]
|
||||||
|
if stop_event.is_set():
|
||||||
|
break
|
||||||
|
data = _decode_message(getattr(event, "message", event))
|
||||||
|
try:
|
||||||
|
await queue.put(data)
|
||||||
|
_set_queue_gauge(topic, queue.qsize())
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
break
|
||||||
|
|
||||||
|
task = asyncio.create_task(_run_subscription(), name=f"broadcast-sub:{topic}")
|
||||||
|
async with self._lock:
|
||||||
|
self._tasks.add(task)
|
||||||
|
metrics_registry.set_gauge("gossip_broadcast_subscribers_total", float(len(self._tasks)))
|
||||||
|
|
||||||
|
def _unsubscribe() -> None:
|
||||||
|
async def _stop() -> None:
|
||||||
|
stop_event.set()
|
||||||
|
task.cancel()
|
||||||
|
with suppress(asyncio.CancelledError):
|
||||||
|
await task
|
||||||
|
async with self._lock:
|
||||||
|
self._tasks.discard(task)
|
||||||
|
metrics_registry.set_gauge("gossip_broadcast_subscribers_total", float(len(self._tasks)))
|
||||||
|
|
||||||
|
asyncio.create_task(_stop())
|
||||||
|
|
||||||
|
return TopicSubscription(topic=topic, queue=queue, _unsubscribe=_unsubscribe)
|
||||||
|
|
||||||
|
async def shutdown(self) -> None:
|
||||||
|
async with self._lock:
|
||||||
|
tasks = list(self._tasks)
|
||||||
|
self._tasks.clear()
|
||||||
|
metrics_registry.set_gauge("gossip_broadcast_subscribers_total", 0.0)
|
||||||
|
for task in tasks:
|
||||||
|
task.cancel()
|
||||||
|
with suppress(asyncio.CancelledError):
|
||||||
|
await task
|
||||||
|
if self._running:
|
||||||
|
await self._broadcast.disconnect() # type: ignore[union-attr]
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
|
||||||
|
class GossipBroker:
|
||||||
|
def __init__(self, backend: GossipBackend) -> None:
|
||||||
|
self._backend = backend
|
||||||
|
self._lock = asyncio.Lock()
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
async def publish(self, topic: str, message: Any) -> None:
|
||||||
|
if not self._started:
|
||||||
|
await self._backend.start()
|
||||||
|
self._started = True
|
||||||
|
await self._backend.publish(topic, message)
|
||||||
|
|
||||||
|
async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription:
|
||||||
|
if not self._started:
|
||||||
|
await self._backend.start()
|
||||||
|
self._started = True
|
||||||
|
return await self._backend.subscribe(topic, max_queue_size=max_queue_size)
|
||||||
|
|
||||||
|
async def set_backend(self, backend: GossipBackend) -> None:
|
||||||
|
await backend.start()
|
||||||
|
async with self._lock:
|
||||||
|
previous = self._backend
|
||||||
|
self._backend = backend
|
||||||
|
self._started = True
|
||||||
|
await previous.shutdown()
|
||||||
|
|
||||||
|
async def shutdown(self) -> None:
|
||||||
|
await self._backend.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
class _InProcessSubscriber:
|
||||||
|
def __init__(self, queue: "asyncio.Queue[Any]"):
|
||||||
|
self._queue = queue
|
||||||
|
|
||||||
|
def __aiter__(self): # type: ignore[override]
|
||||||
|
return self._iterator()
|
||||||
|
|
||||||
|
async def _iterator(self):
|
||||||
|
while True:
|
||||||
|
yield await self._queue.get()
|
||||||
|
|
||||||
|
|
||||||
|
class _InProcessBroadcast:
|
||||||
|
"""Minimal in-memory broadcast substitute for tests when Starlette Broadcast is absent."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._topics: Dict[str, List["asyncio.Queue[Any]"]] = defaultdict(list)
|
||||||
|
self._lock = asyncio.Lock()
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
async def connect(self) -> None:
|
||||||
|
self._running = True
|
||||||
|
|
||||||
|
async def disconnect(self) -> None:
|
||||||
|
async with self._lock:
|
||||||
|
self._topics.clear()
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def subscribe(self, topic: str):
|
||||||
|
queue: "asyncio.Queue[Any]" = asyncio.Queue()
|
||||||
|
async with self._lock:
|
||||||
|
self._topics[topic].append(queue)
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield _InProcessSubscriber(queue)
|
||||||
|
finally:
|
||||||
|
async with self._lock:
|
||||||
|
queues = self._topics.get(topic)
|
||||||
|
if queues and queue in queues:
|
||||||
|
queues.remove(queue)
|
||||||
|
|
||||||
|
async def publish(self, topic: str, message: Any) -> None:
|
||||||
|
if not self._running:
|
||||||
|
raise RuntimeError("Broadcast backend not started")
|
||||||
|
async with self._lock:
|
||||||
|
queues = list(self._topics.get(topic, []))
|
||||||
|
for queue in queues:
|
||||||
|
await queue.put(message)
|
||||||
|
|
||||||
|
|
||||||
|
def create_backend(backend_type: str, *, broadcast_url: Optional[str] = None) -> GossipBackend:
|
||||||
|
backend = backend_type.lower()
|
||||||
|
if backend in {"memory", "inmemory", "local"}:
|
||||||
|
return InMemoryGossipBackend()
|
||||||
|
if backend in {"broadcast", "starlette", "redis"}:
|
||||||
|
if not broadcast_url:
|
||||||
|
raise ValueError("Broadcast backend requires a gossip_broadcast_url setting")
|
||||||
|
return BroadcastGossipBackend(broadcast_url)
|
||||||
|
raise ValueError(f"Unsupported gossip backend '{backend_type}'")
|
||||||
|
|
||||||
|
|
||||||
|
def _encode_message(message: Any) -> Any:
|
||||||
|
if isinstance(message, (str, bytes, bytearray)):
|
||||||
|
return message
|
||||||
|
return json.dumps(message, separators=(",", ":"))
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_message(message: Any) -> Any:
|
||||||
|
if isinstance(message, (bytes, bytearray)):
|
||||||
|
message = message.decode("utf-8")
|
||||||
|
if isinstance(message, str):
|
||||||
|
try:
|
||||||
|
return json.loads(message)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return message
|
||||||
|
return message
|
||||||
|
|
||||||
|
|
||||||
|
gossip_broker = GossipBroker(InMemoryGossipBackend())
|
||||||
|
|
||||||
@@ -290,8 +290,11 @@ class DualModeWalletAdapter:
|
|||||||
|
|
||||||
def _send_transaction_file(self, wallet_name: str, password: str, to_address: str,
|
def _send_transaction_file(self, wallet_name: str, password: str, to_address: str,
|
||||||
amount: float, description: Optional[str]) -> Dict[str, Any]:
|
amount: float, description: Optional[str]) -> Dict[str, Any]:
|
||||||
"""Send transaction using file-based storage"""
|
"""Send transaction using file-based storage and blockchain RPC"""
|
||||||
from .commands.wallet import _load_wallet, _save_wallet
|
from .commands.wallet import _load_wallet, _save_wallet
|
||||||
|
import httpx
|
||||||
|
from .utils import error, success
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
wallet_path = self.wallet_dir / f"{wallet_name}.json"
|
wallet_path = self.wallet_dir / f"{wallet_name}.json"
|
||||||
|
|
||||||
@@ -300,36 +303,79 @@ class DualModeWalletAdapter:
|
|||||||
raise Exception("Wallet not found")
|
raise Exception("Wallet not found")
|
||||||
|
|
||||||
wallet_data = _load_wallet(wallet_path, wallet_name)
|
wallet_data = _load_wallet(wallet_path, wallet_name)
|
||||||
balance = wallet_data.get("balance", 0)
|
# Fetch current balance and nonce from blockchain
|
||||||
|
from_address = wallet_data.get("address")
|
||||||
if balance < amount:
|
if not from_address:
|
||||||
error(f"Insufficient balance. Available: {balance}, Required: {amount}")
|
error("Wallet does not have an address configured")
|
||||||
|
raise Exception("Invalid wallet")
|
||||||
|
|
||||||
|
rpc_url = self.config.blockchain_rpc_url
|
||||||
|
try:
|
||||||
|
resp = httpx.get(f"{rpc_url}/rpc/getBalance/{from_address}?chain_id=ait-mainnet", timeout=5)
|
||||||
|
if resp.status_code == 200:
|
||||||
|
data = resp.json()
|
||||||
|
chain_balance = data.get("balance", 0)
|
||||||
|
nonce = data.get("nonce", 0)
|
||||||
|
else:
|
||||||
|
error(f"Failed to get balance from chain: {resp.text}")
|
||||||
|
raise Exception("Chain error")
|
||||||
|
except Exception as e:
|
||||||
|
error(f"Failed to connect to blockchain RPC: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
if chain_balance < amount:
|
||||||
|
error(f"Insufficient blockchain balance. Available: {chain_balance}, Required: {amount}")
|
||||||
raise Exception("Insufficient balance")
|
raise Exception("Insufficient balance")
|
||||||
|
|
||||||
|
# Construct and send transaction
|
||||||
|
tx_payload = {
|
||||||
|
"type": "TRANSFER",
|
||||||
|
"sender": from_address,
|
||||||
|
"nonce": nonce,
|
||||||
|
"fee": 0,
|
||||||
|
"payload": {"to": to_address, "value": amount},
|
||||||
|
"sig": "mock_signature" # Replace with real signature when implemented
|
||||||
|
}
|
||||||
|
|
||||||
# Add transaction
|
try:
|
||||||
|
resp = httpx.post(f"{rpc_url}/rpc/sendTx", json=tx_payload, timeout=5)
|
||||||
|
if resp.status_code not in (200, 201):
|
||||||
|
error(f"Failed to submit transaction to chain: {resp.text}")
|
||||||
|
raise Exception("Chain submission failed")
|
||||||
|
tx_hash = resp.json().get("tx_hash")
|
||||||
|
except Exception as e:
|
||||||
|
error(f"Failed to send transaction to RPC: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Add transaction to local history
|
||||||
transaction = {
|
transaction = {
|
||||||
"type": "send",
|
"type": "send",
|
||||||
"amount": -amount,
|
"amount": -amount,
|
||||||
"to_address": to_address,
|
"to_address": to_address,
|
||||||
"description": description or "",
|
"description": description or "",
|
||||||
"timestamp": datetime.now().isoformat(),
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
"tx_hash": tx_hash,
|
||||||
|
"status": "pending"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if "transactions" not in wallet_data:
|
||||||
|
wallet_data["transactions"] = []
|
||||||
|
|
||||||
wallet_data["transactions"].append(transaction)
|
wallet_data["transactions"].append(transaction)
|
||||||
wallet_data["balance"] = balance - amount
|
wallet_data["balance"] = chain_balance - amount
|
||||||
|
|
||||||
# Save wallet
|
# Save wallet
|
||||||
save_password = password if wallet_data.get("encrypted") else None
|
save_password = password if wallet_data.get("encrypted") else None
|
||||||
_save_wallet(wallet_path, wallet_data, save_password)
|
_save_wallet(wallet_path, wallet_data, save_password)
|
||||||
|
|
||||||
success(f"Sent {amount} AITBC to {to_address}")
|
success(f"Submitted transaction {tx_hash} to send {amount} AITBC to {to_address}")
|
||||||
return {
|
return {
|
||||||
"mode": "file",
|
"mode": "file",
|
||||||
"wallet_name": wallet_name,
|
"wallet_name": wallet_name,
|
||||||
"to_address": to_address,
|
"to_address": to_address,
|
||||||
"amount": amount,
|
"amount": amount,
|
||||||
"description": description,
|
"description": description,
|
||||||
"new_balance": wallet_data["balance"],
|
"tx_hash": tx_hash,
|
||||||
"timestamp": transaction["timestamp"]
|
"timestamp": transaction["timestamp"]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,29 +1,29 @@
|
|||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from typing import List, Optional, Union
|
from typing import List, Optional, Union, Any
|
||||||
from . import error, output
|
from . import error, output
|
||||||
|
|
||||||
def run_subprocess(cmd: List[str], check: bool = True, capture_output: bool = True, shell: bool = False) -> Optional[str]:
|
def run_subprocess(cmd: List[str], check: bool = True, capture_output: bool = True, shell: bool = False, **kwargs: Any) -> Optional[Union[str, subprocess.CompletedProcess]]:
|
||||||
"""Run a subprocess command safely with logging"""
|
"""Run a subprocess command safely with logging"""
|
||||||
try:
|
try:
|
||||||
if shell:
|
if shell:
|
||||||
# When shell=True, cmd should be a string
|
# When shell=True, cmd should be a string
|
||||||
cmd_str = " ".join(cmd) if isinstance(cmd, list) else cmd
|
cmd_str = " ".join(cmd) if isinstance(cmd, list) else cmd
|
||||||
result = subprocess.run(cmd_str, shell=True, check=check, capture_output=capture_output, text=True)
|
result = subprocess.run(cmd_str, shell=True, check=check, capture_output=capture_output, text=True, **kwargs)
|
||||||
else:
|
else:
|
||||||
result = subprocess.run(cmd, check=check, capture_output=capture_output, text=True)
|
result = subprocess.run(cmd, check=check, capture_output=capture_output, text=True, **kwargs)
|
||||||
|
|
||||||
if capture_output:
|
if capture_output:
|
||||||
return result.stdout.strip()
|
return result.stdout.strip()
|
||||||
return None
|
return result
|
||||||
|
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
error(f"Command failed with exit code {e.returncode}")
|
error(f"Command failed with exit code {e.returncode}")
|
||||||
if capture_output and e.stderr:
|
if capture_output and getattr(e, 'stderr', None):
|
||||||
print(e.stderr, file=sys.stderr)
|
print(e.stderr, file=sys.stderr)
|
||||||
if check:
|
if check:
|
||||||
sys.exit(e.returncode)
|
sys.exit(e.returncode)
|
||||||
return None
|
return getattr(e, 'stdout', None) if capture_output else None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error(f"Failed to execute command: {e}")
|
error(f"Failed to execute command: {e}")
|
||||||
if check:
|
if check:
|
||||||
|
|||||||
9
fix_gossip.patch
Normal file
9
fix_gossip.patch
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
--- /opt/aitbc/apps/blockchain-node/src/aitbc_chain/gossip/broker.py 2026-03-23 12:40:00.000000000 +0100
|
||||||
|
+++ /opt/aitbc/apps/blockchain-node/src/aitbc_chain/gossip/broker.py.new 2026-03-23 12:40:00.000000000 +0100
|
||||||
|
@@ -6,7 +6,7 @@
|
||||||
|
import warnings
|
||||||
|
from collections import defaultdict
|
||||||
|
-from contextlib import suppress
|
||||||
|
+from contextlib import suppress, asynccontextmanager
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Callable, Dict, List, Optional, Set
|
||||||
30
fix_inprocess.py
Normal file
30
fix_inprocess.py
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
import asyncio
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
class _InProcessSubscriber:
|
||||||
|
def __init__(self, queue, release):
|
||||||
|
self._queue = queue
|
||||||
|
self._release = release
|
||||||
|
def __aiter__(self):
|
||||||
|
return self._iterator()
|
||||||
|
async def _iterator(self):
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
yield await self._queue.get()
|
||||||
|
finally:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def subscribe():
|
||||||
|
queue = asyncio.Queue()
|
||||||
|
try:
|
||||||
|
yield _InProcessSubscriber(queue, lambda: None)
|
||||||
|
finally:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with subscribe() as sub:
|
||||||
|
print("Success")
|
||||||
|
|
||||||
|
asyncio.run(main())
|
||||||
2
test_send.sh
Executable file
2
test_send.sh
Executable file
@@ -0,0 +1,2 @@
|
|||||||
|
export AITBC_WALLET="test_wallet"
|
||||||
|
aitbc wallet send aitbc1my-test-wallet_hd 50
|
||||||
22
test_wallet.py
Normal file
22
test_wallet.py
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
wallet_data = {
|
||||||
|
"name": "test_wallet",
|
||||||
|
"type": "hd",
|
||||||
|
"address": "aitbc1genesis",
|
||||||
|
"private_key": "dummy",
|
||||||
|
"public_key": "dummy",
|
||||||
|
"encrypted": False,
|
||||||
|
"transactions": [],
|
||||||
|
"balance": 1000000
|
||||||
|
}
|
||||||
|
|
||||||
|
import os
|
||||||
|
import pathlib
|
||||||
|
|
||||||
|
wallet_dir = pathlib.Path("/root/.aitbc/wallets")
|
||||||
|
wallet_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
wallet_path = wallet_dir / "test_wallet.json"
|
||||||
|
|
||||||
|
with open(wallet_path, "w") as f:
|
||||||
|
json.dump(wallet_data, f)
|
||||||
Reference in New Issue
Block a user