From f2849ee4a941d2c944f3e5ee52d7236ed098a00a Mon Sep 17 00:00:00 2001 From: aitbc1 Date: Thu, 19 Mar 2026 13:25:29 +0100 Subject: [PATCH] feat(consensus): implement transaction processing in PoA block proposer - Process transactions from mempool during block proposal - Update sender and recipient account balances - Create transaction records with confirmed status - Include transaction hashes in block hash computation - Update tx_count in blocks based on processed transactions - Add balance validation and nonce management - Handle transaction failures gracefully with logging - Fix get_balance endpoint to use chain_id helper - Update --- .../src/aitbc_chain/consensus/poa.py | 85 +++++++++++++++++-- .../src/aitbc_chain/rpc/router.py | 1 + scripts/init_production_genesis.py | 4 +- scripts/setup_production.py | 10 +-- 4 files changed, 84 insertions(+), 16 deletions(-) diff --git a/apps/blockchain-node/src/aitbc_chain/consensus/poa.py b/apps/blockchain-node/src/aitbc_chain/consensus/poa.py index e6549847..821b82ea 100755 --- a/apps/blockchain-node/src/aitbc_chain/consensus/poa.py +++ b/apps/blockchain-node/src/aitbc_chain/consensus/poa.py @@ -120,12 +120,11 @@ class PoAProposer: return async def _propose_block(self) -> None: - # Check internal mempool - but produce empty blocks to keep chain moving + # Check internal mempool and include transactions from ..mempool import get_mempool - mempool_size = get_mempool().size(self._config.chain_id) - if mempool_size == 0: - self._logger.debug("No transactions in mempool, producing empty block") - + from ..models import Transaction, Account + mempool = get_mempool() + with self._session_factory() as session: head = session.exec(select(Block).where(Block.chain_id == self._config.chain_id).order_by(Block.height.desc()).limit(1)).first() next_height = 0 @@ -137,7 +136,70 @@ class PoAProposer: interval_seconds = (datetime.utcnow() - head.timestamp).total_seconds() timestamp = datetime.utcnow() - block_hash = self._compute_block_hash(next_height, parent_hash, timestamp) + + # Pull transactions from mempool + max_txs = self._config.max_txs_per_block + max_bytes = self._config.max_block_size_bytes + pending_txs = mempool.drain(max_txs, max_bytes, self._config.chain_id) + + # Process transactions and update balances + processed_txs = [] + for tx in pending_txs: + try: + # Parse transaction data + tx_data = tx.content + sender = tx_data.get("sender") + recipient = tx_data.get("payload", {}).get("to") + value = tx_data.get("payload", {}).get("value", 0) + fee = tx_data.get("fee", 0) + + if not sender or not recipient: + continue + + # Get sender account + sender_account = session.get(Account, (self._config.chain_id, sender)) + if not sender_account: + continue + + # Check sufficient balance + total_cost = value + fee + if sender_account.balance < total_cost: + continue + + # Get or create recipient account + recipient_account = session.get(Account, (self._config.chain_id, recipient)) + if not recipient_account: + recipient_account = Account(chain_id=self._config.chain_id, address=recipient, balance=0, nonce=0) + session.add(recipient_account) + session.flush() + + # Update balances + sender_account.balance -= total_cost + sender_account.nonce += 1 + recipient_account.balance += value + + # Create transaction record + transaction = Transaction( + chain_id=self._config.chain_id, + tx_hash=tx.tx_hash, + sender=sender, + recipient=recipient, + value=value, + fee=fee, + nonce=sender_account.nonce - 1, + timestamp=timestamp, + block_height=next_height, + status="confirmed" + ) + session.add(transaction) + processed_txs.append(tx) + + except Exception as e: + self._logger.warning(f"Failed to process transaction {tx.tx_hash}: {e}") + continue + + # Compute block hash with transaction data + block_hash = self._compute_block_hash(next_height, parent_hash, timestamp, processed_txs) block = Block( chain_id=self._config.chain_id, @@ -146,7 +208,7 @@ class PoAProposer: parent_hash=parent_hash, proposer=self._config.proposer_id, timestamp=timestamp, - tx_count=0, + tx_count=len(processed_txs), state_root=None, ) session.add(block) @@ -259,6 +321,11 @@ class PoAProposer: with self._session_factory() as session: return session.exec(select(Block).order_by(Block.height.desc()).limit(1)).first() - def _compute_block_hash(self, height: int, parent_hash: str, timestamp: datetime) -> str: - payload = f"{self._config.chain_id}|{height}|{parent_hash}|{timestamp.isoformat()}".encode() + def _compute_block_hash(self, height: int, parent_hash: str, timestamp: datetime, transactions: list = None) -> str: + # Include transaction hashes in block hash computation + tx_hashes = [] + if transactions: + tx_hashes = [tx.tx_hash for tx in transactions] + + payload = f"{self._config.chain_id}|{height}|{parent_hash}|{timestamp.isoformat()}|{'|'.join(sorted(tx_hashes))}".encode() return "0x" + hashlib.sha256(payload).hexdigest() diff --git a/apps/blockchain-node/src/aitbc_chain/rpc/router.py b/apps/blockchain-node/src/aitbc_chain/rpc/router.py index 95c75178..f04ff4f2 100755 --- a/apps/blockchain-node/src/aitbc_chain/rpc/router.py +++ b/apps/blockchain-node/src/aitbc_chain/rpc/router.py @@ -312,6 +312,7 @@ async def get_receipts(limit: int = 20, offset: int = 0) -> Dict[str, Any]: @router.get("/getBalance/{address}", summary="Get account balance") async def get_balance(address: str, chain_id: str = None) -> Dict[str, Any]: + chain_id = get_chain_id(chain_id) metrics_registry.increment("rpc_get_balance_total") start = time.perf_counter() with session_scope() as session: diff --git a/scripts/init_production_genesis.py b/scripts/init_production_genesis.py index d722cc9d..dcfc189a 100644 --- a/scripts/init_production_genesis.py +++ b/scripts/init_production_genesis.py @@ -128,8 +128,8 @@ def main() -> None: if args.db_path: os.environ["DB_PATH"] = str(args.db_path) - from aitbc_chain.config import Settings - settings = Settings() + from aitbc_chain.config import ChainSettings + settings = ChainSettings() print(f"[*] Initializing database at {settings.db_path}") init_db() diff --git a/scripts/setup_production.py b/scripts/setup_production.py index 6eaeb2ee..92e78ae6 100644 --- a/scripts/setup_production.py +++ b/scripts/setup_production.py @@ -39,7 +39,7 @@ def main(): # 1. Keystore directory and password run(f"mkdir -p {KEYS_DIR}") - run(f"chown -R aitbc:aitbc {KEYS_DIR}") + run(f"chown -R root:root {KEYS_DIR}") if not PASSWORD_FILE.exists(): run(f"openssl rand -hex 32 > {PASSWORD_FILE}") run(f"chmod 600 {PASSWORD_FILE}") @@ -48,7 +48,7 @@ def main(): # 2. Generate keystores print("\n=== Generating keystore for aitbc1genesis ===") result = run( - f"sudo -u aitbc {NODE_VENV} /opt/aitbc/scripts/keystore.py aitbc1genesis --output-dir {KEYS_DIR} --force", + f"{NODE_VENV} /opt/aitbc/scripts/keystore.py aitbc1genesis --output-dir {KEYS_DIR} --force", capture_output=True ) print(result.stdout) @@ -65,7 +65,7 @@ def main(): print("\n=== Generating keystore for aitbc1treasury ===") result = run( - f"sudo -u aitbc {NODE_VENV} /opt/aitbc/scripts/keystore.py aitbc1treasury --output-dir {KEYS_DIR} --force", + f"{NODE_VENV} /opt/aitbc/scripts/keystore.py aitbc1treasury --output-dir {KEYS_DIR} --force", capture_output=True ) print(result.stdout) @@ -82,12 +82,12 @@ def main(): # 3. Data directory run(f"mkdir -p {DATA_DIR}") - run(f"chown -R aitbc:aitbc {DATA_DIR}") + run(f"chown -R root:root {DATA_DIR}") # 4. Initialize DB os.environ["DB_PATH"] = str(DB_PATH) os.environ["CHAIN_ID"] = CHAIN_ID - run(f"sudo -E -u aitbc {NODE_VENV} /opt/aitbc/scripts/init_production_genesis.py --chain-id {CHAIN_ID} --db-path {DB_PATH}") + run(f"sudo -E {NODE_VENV} /opt/aitbc/scripts/init_production_genesis.py --chain-id {CHAIN_ID} --db-path {DB_PATH}") # 5. Write .env for blockchain node env_content = f"""CHAIN_ID={CHAIN_ID}