diff --git a/apps/blockchain-node/fix_tx_metadata2.py b/apps/blockchain-node/fix_tx_metadata2.py new file mode 100644 index 00000000..c94cddcb --- /dev/null +++ b/apps/blockchain-node/fix_tx_metadata2.py @@ -0,0 +1,50 @@ +import sqlite3 + +def fix_db(): + print("Fixing transaction table on aitbc node...") + + conn = sqlite3.connect('/opt/aitbc/data/ait-mainnet/chain.db') + cursor = conn.cursor() + + try: + cursor.execute('ALTER TABLE "transaction" ADD COLUMN nonce INTEGER DEFAULT 0;') + print("Added nonce column") + except sqlite3.OperationalError as e: + print(f"Error adding nonce: {e}") + + try: + cursor.execute('ALTER TABLE "transaction" ADD COLUMN value INTEGER DEFAULT 0;') + print("Added value column") + except sqlite3.OperationalError as e: + print(f"Error adding value: {e}") + + try: + cursor.execute('ALTER TABLE "transaction" ADD COLUMN fee INTEGER DEFAULT 0;') + print("Added fee column") + except sqlite3.OperationalError as e: + print(f"Error adding fee: {e}") + + try: + cursor.execute('ALTER TABLE "transaction" ADD COLUMN status TEXT DEFAULT "pending";') + print("Added status column") + except sqlite3.OperationalError as e: + print(f"Error adding status: {e}") + + try: + cursor.execute('ALTER TABLE "transaction" ADD COLUMN tx_metadata TEXT;') + print("Added tx_metadata column") + except sqlite3.OperationalError as e: + print(f"Error adding tx_metadata: {e}") + + try: + cursor.execute('ALTER TABLE "transaction" ADD COLUMN timestamp TEXT;') + print("Added timestamp column") + except sqlite3.OperationalError as e: + print(f"Error adding timestamp: {e}") + + conn.commit() + conn.close() + print("Done fixing transaction table.") + +if __name__ == '__main__': + fix_db() diff --git a/apps/blockchain-node/src/aitbc_chain/gossip/broker.py b/apps/blockchain-node/src/aitbc_chain/gossip/broker.py index 45977dd5..36f982d6 100755 --- a/apps/blockchain-node/src/aitbc_chain/gossip/broker.py +++ b/apps/blockchain-node/src/aitbc_chain/gossip/broker.py @@ -4,7 +4,7 @@ import asyncio import json import warnings from collections import defaultdict -from contextlib import suppress +from contextlib import suppress, asynccontextmanager from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, Set @@ -225,25 +225,15 @@ class GossipBroker: class _InProcessSubscriber: - def __init__(self, queue: "asyncio.Queue[Any]", release: Callable[[], None]): + def __init__(self, queue: "asyncio.Queue[Any]"): self._queue = queue - self._release = release - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc, tb): - self._release() def __aiter__(self): # type: ignore[override] return self._iterator() async def _iterator(self): - try: - while True: - yield await self._queue.get() - finally: - self._release() + while True: + yield await self._queue.get() class _InProcessBroadcast: @@ -262,23 +252,19 @@ class _InProcessBroadcast: self._topics.clear() self._running = False - async def subscribe(self, topic: str) -> _InProcessSubscriber: + @asynccontextmanager + async def subscribe(self, topic: str): queue: "asyncio.Queue[Any]" = asyncio.Queue() async with self._lock: self._topics[topic].append(queue) - - def release() -> None: - async def _remove() -> None: - async with self._lock: - queues = self._topics.get(topic) - if queues and queue in queues: - queues.remove(queue) - if not queues: - self._topics.pop(topic, None) - - asyncio.create_task(_remove()) - - return _InProcessSubscriber(queue, release) + + try: + yield _InProcessSubscriber(queue) + finally: + async with self._lock: + queues = self._topics.get(topic) + if queues and queue in queues: + queues.remove(queue) async def publish(self, topic: str, message: Any) -> None: if not self._running: diff --git a/apps/blockchain-node/src/aitbc_chain/gossip/broker.py.orig b/apps/blockchain-node/src/aitbc_chain/gossip/broker.py.orig new file mode 100755 index 00000000..f7ae9adb --- /dev/null +++ b/apps/blockchain-node/src/aitbc_chain/gossip/broker.py.orig @@ -0,0 +1,307 @@ +from __future__ import annotations + +import asyncio +import json +import warnings +from collections import defaultdict +from contextlib import suppress +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional, Set + +warnings.filterwarnings("ignore", message="coroutine.* was never awaited", category=RuntimeWarning) + +try: + from broadcaster import Broadcast +except ImportError: # pragma: no cover + Broadcast = None # type: ignore[assignment] + +from ..metrics import metrics_registry + + +def _increment_publication(metric_prefix: str, topic: str) -> None: + metrics_registry.increment(f"{metric_prefix}_total") + metrics_registry.increment(f"{metric_prefix}_topic_{topic}") + + +def _set_queue_gauge(topic: str, size: int) -> None: + metrics_registry.set_gauge(f"gossip_queue_size_{topic}", float(size)) + + +def _update_subscriber_metrics(topics: Dict[str, List["asyncio.Queue[Any]"]]) -> None: + for topic, queues in topics.items(): + metrics_registry.set_gauge(f"gossip_subscribers_topic_{topic}", float(len(queues))) + total = sum(len(queues) for queues in topics.values()) + metrics_registry.set_gauge("gossip_subscribers_total", float(total)) + + +def _clear_topic_metrics(topic: str) -> None: + metrics_registry.set_gauge(f"gossip_subscribers_topic_{topic}", 0.0) + _set_queue_gauge(topic, 0) + +@dataclass +class TopicSubscription: + topic: str + queue: "asyncio.Queue[Any]" + _unsubscribe: Callable[[], None] + + def close(self) -> None: + self._unsubscribe() + + async def get(self) -> Any: + return await self.queue.get() + + async def __aiter__(self): # type: ignore[override] + try: + while True: + yield await self.queue.get() + finally: + self.close() + + +class GossipBackend: + async def start(self) -> None: # pragma: no cover - overridden as needed + return None + + async def publish(self, topic: str, message: Any) -> None: + raise NotImplementedError + + async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription: + raise NotImplementedError + + async def shutdown(self) -> None: + return None + + +class InMemoryGossipBackend(GossipBackend): + def __init__(self) -> None: + self._topics: Dict[str, List["asyncio.Queue[Any]"]] = defaultdict(list) + self._lock = asyncio.Lock() + + async def publish(self, topic: str, message: Any) -> None: + async with self._lock: + queues = list(self._topics.get(topic, [])) + for queue in queues: + await queue.put(message) + _set_queue_gauge(topic, queue.qsize()) + _increment_publication("gossip_publications", topic) + + async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription: + queue: "asyncio.Queue[Any]" = asyncio.Queue(maxsize=max_queue_size) + + async with self._lock: + self._topics[topic].append(queue) + _update_subscriber_metrics(self._topics) + + _set_queue_gauge(topic, queue.qsize()) + + def _unsubscribe() -> None: + async def _remove() -> None: + async with self._lock: + queues = self._topics.get(topic) + if queues is None: + return + if queue in queues: + queues.remove(queue) + if not queues: + self._topics.pop(topic, None) + _clear_topic_metrics(topic) + _update_subscriber_metrics(self._topics) + + asyncio.create_task(_remove()) + + return TopicSubscription(topic=topic, queue=queue, _unsubscribe=_unsubscribe) + + async def shutdown(self) -> None: + async with self._lock: + topics = list(self._topics.keys()) + self._topics.clear() + for topic in topics: + _clear_topic_metrics(topic) + _update_subscriber_metrics(self._topics) + + +class BroadcastGossipBackend(GossipBackend): + def __init__(self, url: str) -> None: + if Broadcast is None: # provide in-process fallback when Broadcast is missing + self._broadcast = _InProcessBroadcast() + else: + self._broadcast = Broadcast(url) # type: ignore[arg-type] + self._tasks: Set[asyncio.Task[None]] = set() + self._lock = asyncio.Lock() + self._running = False + + async def start(self) -> None: + if not self._running: + await self._broadcast.connect() # type: ignore[union-attr] + self._running = True + + async def publish(self, topic: str, message: Any) -> None: + if not self._running: + raise RuntimeError("Broadcast backend not started") + payload = _encode_message(message) + await self._broadcast.publish(topic, payload) # type: ignore[union-attr] + _increment_publication("gossip_broadcast_publications", topic) + + async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription: + if not self._running: + raise RuntimeError("Broadcast backend not started") + + queue: "asyncio.Queue[Any]" = asyncio.Queue(maxsize=max_queue_size) + stop_event = asyncio.Event() + + async def _run_subscription() -> None: + async with self._broadcast.subscribe(topic) as subscriber: # type: ignore[attr-defined,union-attr] + async for event in subscriber: # type: ignore[union-attr] + if stop_event.is_set(): + break + data = _decode_message(getattr(event, "message", event)) + try: + await queue.put(data) + _set_queue_gauge(topic, queue.qsize()) + except asyncio.CancelledError: + break + + task = asyncio.create_task(_run_subscription(), name=f"broadcast-sub:{topic}") + async with self._lock: + self._tasks.add(task) + metrics_registry.set_gauge("gossip_broadcast_subscribers_total", float(len(self._tasks))) + + def _unsubscribe() -> None: + async def _stop() -> None: + stop_event.set() + task.cancel() + with suppress(asyncio.CancelledError): + await task + async with self._lock: + self._tasks.discard(task) + metrics_registry.set_gauge("gossip_broadcast_subscribers_total", float(len(self._tasks))) + + asyncio.create_task(_stop()) + + return TopicSubscription(topic=topic, queue=queue, _unsubscribe=_unsubscribe) + + async def shutdown(self) -> None: + async with self._lock: + tasks = list(self._tasks) + self._tasks.clear() + metrics_registry.set_gauge("gossip_broadcast_subscribers_total", 0.0) + for task in tasks: + task.cancel() + with suppress(asyncio.CancelledError): + await task + if self._running: + await self._broadcast.disconnect() # type: ignore[union-attr] + self._running = False + + +class GossipBroker: + def __init__(self, backend: GossipBackend) -> None: + self._backend = backend + self._lock = asyncio.Lock() + self._started = False + + async def publish(self, topic: str, message: Any) -> None: + if not self._started: + await self._backend.start() + self._started = True + await self._backend.publish(topic, message) + + async def subscribe(self, topic: str, max_queue_size: int = 100) -> TopicSubscription: + if not self._started: + await self._backend.start() + self._started = True + return await self._backend.subscribe(topic, max_queue_size=max_queue_size) + + async def set_backend(self, backend: GossipBackend) -> None: + await backend.start() + async with self._lock: + previous = self._backend + self._backend = backend + self._started = True + await previous.shutdown() + + async def shutdown(self) -> None: + await self._backend.shutdown() + + +class _InProcessSubscriber: + def __init__(self, queue: "asyncio.Queue[Any]"): + self._queue = queue + + def __aiter__(self): # type: ignore[override] + return self._iterator() + + async def _iterator(self): + while True: + yield await self._queue.get() + + +class _InProcessBroadcast: + """Minimal in-memory broadcast substitute for tests when Starlette Broadcast is absent.""" + + def __init__(self) -> None: + self._topics: Dict[str, List["asyncio.Queue[Any]"]] = defaultdict(list) + self._lock = asyncio.Lock() + self._running = False + + async def connect(self) -> None: + self._running = True + + async def disconnect(self) -> None: + async with self._lock: + self._topics.clear() + self._running = False + + @asynccontextmanager + async def subscribe(self, topic: str): + queue: "asyncio.Queue[Any]" = asyncio.Queue() + async with self._lock: + self._topics[topic].append(queue) + + try: + yield _InProcessSubscriber(queue) + finally: + async with self._lock: + queues = self._topics.get(topic) + if queues and queue in queues: + queues.remove(queue) + + async def publish(self, topic: str, message: Any) -> None: + if not self._running: + raise RuntimeError("Broadcast backend not started") + async with self._lock: + queues = list(self._topics.get(topic, [])) + for queue in queues: + await queue.put(message) + + +def create_backend(backend_type: str, *, broadcast_url: Optional[str] = None) -> GossipBackend: + backend = backend_type.lower() + if backend in {"memory", "inmemory", "local"}: + return InMemoryGossipBackend() + if backend in {"broadcast", "starlette", "redis"}: + if not broadcast_url: + raise ValueError("Broadcast backend requires a gossip_broadcast_url setting") + return BroadcastGossipBackend(broadcast_url) + raise ValueError(f"Unsupported gossip backend '{backend_type}'") + + +def _encode_message(message: Any) -> Any: + if isinstance(message, (str, bytes, bytearray)): + return message + return json.dumps(message, separators=(",", ":")) + + +def _decode_message(message: Any) -> Any: + if isinstance(message, (bytes, bytearray)): + message = message.decode("utf-8") + if isinstance(message, str): + try: + return json.loads(message) + except json.JSONDecodeError: + return message + return message + + +gossip_broker = GossipBroker(InMemoryGossipBackend()) + diff --git a/cli/aitbc_cli/dual_mode_wallet_adapter.py b/cli/aitbc_cli/dual_mode_wallet_adapter.py index 65ce5c72..e197a28c 100755 --- a/cli/aitbc_cli/dual_mode_wallet_adapter.py +++ b/cli/aitbc_cli/dual_mode_wallet_adapter.py @@ -290,8 +290,11 @@ class DualModeWalletAdapter: def _send_transaction_file(self, wallet_name: str, password: str, to_address: str, amount: float, description: Optional[str]) -> Dict[str, Any]: - """Send transaction using file-based storage""" + """Send transaction using file-based storage and blockchain RPC""" from .commands.wallet import _load_wallet, _save_wallet + import httpx + from .utils import error, success + from datetime import datetime wallet_path = self.wallet_dir / f"{wallet_name}.json" @@ -300,36 +303,79 @@ class DualModeWalletAdapter: raise Exception("Wallet not found") wallet_data = _load_wallet(wallet_path, wallet_name) - balance = wallet_data.get("balance", 0) - - if balance < amount: - error(f"Insufficient balance. Available: {balance}, Required: {amount}") + # Fetch current balance and nonce from blockchain + from_address = wallet_data.get("address") + if not from_address: + error("Wallet does not have an address configured") + raise Exception("Invalid wallet") + + rpc_url = self.config.blockchain_rpc_url + try: + resp = httpx.get(f"{rpc_url}/rpc/getBalance/{from_address}?chain_id=ait-mainnet", timeout=5) + if resp.status_code == 200: + data = resp.json() + chain_balance = data.get("balance", 0) + nonce = data.get("nonce", 0) + else: + error(f"Failed to get balance from chain: {resp.text}") + raise Exception("Chain error") + except Exception as e: + error(f"Failed to connect to blockchain RPC: {e}") + raise + + if chain_balance < amount: + error(f"Insufficient blockchain balance. Available: {chain_balance}, Required: {amount}") raise Exception("Insufficient balance") + + # Construct and send transaction + tx_payload = { + "type": "TRANSFER", + "sender": from_address, + "nonce": nonce, + "fee": 0, + "payload": {"to": to_address, "value": amount}, + "sig": "mock_signature" # Replace with real signature when implemented + } - # Add transaction + try: + resp = httpx.post(f"{rpc_url}/rpc/sendTx", json=tx_payload, timeout=5) + if resp.status_code not in (200, 201): + error(f"Failed to submit transaction to chain: {resp.text}") + raise Exception("Chain submission failed") + tx_hash = resp.json().get("tx_hash") + except Exception as e: + error(f"Failed to send transaction to RPC: {e}") + raise + + # Add transaction to local history transaction = { "type": "send", "amount": -amount, "to_address": to_address, "description": description or "", "timestamp": datetime.now().isoformat(), + "tx_hash": tx_hash, + "status": "pending" } + if "transactions" not in wallet_data: + wallet_data["transactions"] = [] + wallet_data["transactions"].append(transaction) - wallet_data["balance"] = balance - amount + wallet_data["balance"] = chain_balance - amount # Save wallet save_password = password if wallet_data.get("encrypted") else None _save_wallet(wallet_path, wallet_data, save_password) - success(f"Sent {amount} AITBC to {to_address}") + success(f"Submitted transaction {tx_hash} to send {amount} AITBC to {to_address}") return { "mode": "file", "wallet_name": wallet_name, "to_address": to_address, "amount": amount, "description": description, - "new_balance": wallet_data["balance"], + "tx_hash": tx_hash, "timestamp": transaction["timestamp"] } diff --git a/cli/aitbc_cli/utils/subprocess.py b/cli/aitbc_cli/utils/subprocess.py index 9f2e721d..7fb498a5 100644 --- a/cli/aitbc_cli/utils/subprocess.py +++ b/cli/aitbc_cli/utils/subprocess.py @@ -1,29 +1,29 @@ import subprocess import sys -from typing import List, Optional, Union +from typing import List, Optional, Union, Any from . import error, output -def run_subprocess(cmd: List[str], check: bool = True, capture_output: bool = True, shell: bool = False) -> Optional[str]: +def run_subprocess(cmd: List[str], check: bool = True, capture_output: bool = True, shell: bool = False, **kwargs: Any) -> Optional[Union[str, subprocess.CompletedProcess]]: """Run a subprocess command safely with logging""" try: if shell: # When shell=True, cmd should be a string cmd_str = " ".join(cmd) if isinstance(cmd, list) else cmd - result = subprocess.run(cmd_str, shell=True, check=check, capture_output=capture_output, text=True) + result = subprocess.run(cmd_str, shell=True, check=check, capture_output=capture_output, text=True, **kwargs) else: - result = subprocess.run(cmd, check=check, capture_output=capture_output, text=True) + result = subprocess.run(cmd, check=check, capture_output=capture_output, text=True, **kwargs) if capture_output: return result.stdout.strip() - return None + return result except subprocess.CalledProcessError as e: error(f"Command failed with exit code {e.returncode}") - if capture_output and e.stderr: + if capture_output and getattr(e, 'stderr', None): print(e.stderr, file=sys.stderr) if check: sys.exit(e.returncode) - return None + return getattr(e, 'stdout', None) if capture_output else None except Exception as e: error(f"Failed to execute command: {e}") if check: diff --git a/fix_gossip.patch b/fix_gossip.patch new file mode 100644 index 00000000..c5f453a4 --- /dev/null +++ b/fix_gossip.patch @@ -0,0 +1,9 @@ +--- /opt/aitbc/apps/blockchain-node/src/aitbc_chain/gossip/broker.py 2026-03-23 12:40:00.000000000 +0100 ++++ /opt/aitbc/apps/blockchain-node/src/aitbc_chain/gossip/broker.py.new 2026-03-23 12:40:00.000000000 +0100 +@@ -6,7 +6,7 @@ + import warnings + from collections import defaultdict +-from contextlib import suppress ++from contextlib import suppress, asynccontextmanager + from dataclasses import dataclass + from typing import Any, Callable, Dict, List, Optional, Set diff --git a/fix_inprocess.py b/fix_inprocess.py new file mode 100644 index 00000000..73ec5e05 --- /dev/null +++ b/fix_inprocess.py @@ -0,0 +1,30 @@ +import asyncio +from contextlib import asynccontextmanager +from typing import Any + +class _InProcessSubscriber: + def __init__(self, queue, release): + self._queue = queue + self._release = release + def __aiter__(self): + return self._iterator() + async def _iterator(self): + try: + while True: + yield await self._queue.get() + finally: + pass + +@asynccontextmanager +async def subscribe(): + queue = asyncio.Queue() + try: + yield _InProcessSubscriber(queue, lambda: None) + finally: + pass + +async def main(): + async with subscribe() as sub: + print("Success") + +asyncio.run(main()) diff --git a/test_send.sh b/test_send.sh new file mode 100755 index 00000000..709aef9e --- /dev/null +++ b/test_send.sh @@ -0,0 +1,2 @@ +export AITBC_WALLET="test_wallet" +aitbc wallet send aitbc1my-test-wallet_hd 50 diff --git a/test_wallet.py b/test_wallet.py new file mode 100644 index 00000000..666f9e0d --- /dev/null +++ b/test_wallet.py @@ -0,0 +1,22 @@ +import json + +wallet_data = { + "name": "test_wallet", + "type": "hd", + "address": "aitbc1genesis", + "private_key": "dummy", + "public_key": "dummy", + "encrypted": False, + "transactions": [], + "balance": 1000000 +} + +import os +import pathlib + +wallet_dir = pathlib.Path("/root/.aitbc/wallets") +wallet_dir.mkdir(parents=True, exist_ok=True) +wallet_path = wallet_dir / "test_wallet.json" + +with open(wallet_path, "w") as f: + json.dump(wallet_data, f)