consensus: integrate state root computation and validation with state transition system
Some checks failed
Integration Tests / test-service-integration (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
Documentation Validation / validate-docs (push) Has been cancelled

- Add _compute_state_root helper function to compute Merkle Patricia Trie state root from account state
- Replace direct balance/nonce updates with state_transition.apply_transaction in block proposal
- Compute and set state_root for both regular blocks and genesis block
- Add state root verification in sync.py after importing blocks
- Add application-layer database validation with DatabaseOperationValidator class
This commit is contained in:
aitbc
2026-04-13 19:16:54 +02:00
parent b3bec1041c
commit b74dfd76e3
12 changed files with 1065 additions and 24 deletions

View File

@@ -0,0 +1,64 @@
"""
Security tests for database access restrictions.
Tests that database manipulation is not possible without detection.
"""
import os
import stat
import pytest
from pathlib import Path
from aitbc_chain.database import DatabaseOperationValidator, init_db
from aitbc_chain.config import settings
class TestDatabaseSecurity:
"""Test database security measures."""
def test_database_file_permissions(self):
"""Test that database file has restrictive permissions."""
# Initialize database
init_db()
# Check file permissions
db_path = settings.db_path
if db_path.exists():
file_stat = os.stat(db_path)
mode = file_stat.st_mode
# Check that file is readable/writable only by owner (600)
assert mode & stat.S_IRUSR # Owner can read
assert mode & stat.S_IWUSR # Owner can write
assert not (mode & stat.S_IRGRP) # Group cannot read
assert not (mode & stat.S_IWGRP) # Group cannot write
assert not (mode & stat.S_IROTH) # Others cannot read
assert not (mode & stat.S_IWOTH) # Others cannot write
def test_operation_validator_allowed_operations(self):
"""Test that operation validator allows valid operations."""
validator = DatabaseOperationValidator()
assert validator.validate_operation('select')
assert validator.validate_operation('insert')
assert validator.validate_operation('update')
assert validator.validate_operation('delete')
assert not validator.validate_operation('drop')
assert not validator.validate_operation('truncate')
def test_operation_validator_dangerous_queries(self):
"""Test that operation validator blocks dangerous queries."""
validator = DatabaseOperationValidator()
# Dangerous patterns should be blocked
assert not validator.validate_query('DROP TABLE account')
assert not validator.validate_query('DROP DATABASE')
assert not validator.validate_query('TRUNCATE account')
assert not validator.validate_query('ALTER TABLE account')
assert not validator.validate_query('DELETE FROM account')
assert not validator.validate_query('UPDATE account SET balance')
# Safe queries should pass
assert validator.validate_query('SELECT * FROM account')
assert validator.validate_query('INSERT INTO transaction VALUES')
assert validator.validate_query('UPDATE block SET height = 1')

View File

@@ -0,0 +1,103 @@
"""
Security tests for state root verification.
Tests that state root verification prevents silent tampering.
"""
import pytest
from aitbc_chain.state.merkle_patricia_trie import MerklePatriciaTrie, StateManager
from aitbc_chain.models import Account
class TestStateRootVerification:
"""Test state root verification with Merkle Patricia Trie."""
def test_merkle_patricia_trie_insert(self):
"""Test that Merkle Patricia Trie can insert key-value pairs."""
trie = MerklePatriciaTrie()
key = b"test_key"
value = b"test_value"
trie.put(key, value)
assert trie.get(key) == value
def test_merkle_patricia_trie_root_computation(self):
"""Test that Merkle Patricia Trie computes correct root."""
trie = MerklePatriciaTrie()
# Insert some data
trie.put(b"key1", b"value1")
trie.put(b"key2", b"value2")
root = trie.get_root()
# Root should not be empty
assert root != b'\x00' * 32
assert len(root) == 32
def test_merkle_patricia_trie_delete(self):
"""Test that Merkle Patricia Trie can delete keys."""
trie = MerklePatriciaTrie()
key = b"test_key"
value = b"test_value"
trie.put(key, value)
assert trie.get(key) == value
trie.delete(key)
assert trie.get(key) is None
def test_state_manager_compute_state_root(self):
"""Test that StateManager computes state root from accounts."""
state_manager = StateManager()
accounts = {
"address1": Account(chain_id="test", address="address1", balance=1000, nonce=0),
"address2": Account(chain_id="test", address="address2", balance=2000, nonce=1),
}
root = state_manager.compute_state_root(accounts)
# Root should be 32 bytes
assert len(root) == 32
assert root != b'\x00' * 32
def test_state_manager_verify_state_root(self):
"""Test that StateManager can verify state root."""
state_manager = StateManager()
accounts = {
"address1": Account(chain_id="test", address="address1", balance=1000, nonce=0),
"address2": Account(chain_id="test", address="address2", balance=2000, nonce=1),
}
expected_root = state_manager.compute_state_root(accounts)
# Verify should pass with correct root
assert state_manager.verify_state_root(accounts, expected_root)
# Verify should fail with incorrect root
fake_root = b'\x00' * 32
assert not state_manager.verify_state_root(accounts, fake_root)
def test_state_manager_different_state_different_root(self):
"""Test that different account states produce different roots."""
state_manager = StateManager()
accounts1 = {
"address1": Account(chain_id="test", address="address1", balance=1000, nonce=0),
}
accounts2 = {
"address1": Account(chain_id="test", address="address1", balance=2000, nonce=0),
}
root1 = state_manager.compute_state_root(accounts1)
root2 = state_manager.compute_state_root(accounts2)
# Different balances should produce different roots
assert root1 != root2

View File

@@ -0,0 +1,88 @@
"""
Security tests for state transition validation.
Tests that balance changes only occur through validated transactions.
"""
import pytest
from sqlmodel import Session, select
from aitbc_chain.state.state_transition import StateTransition, get_state_transition
from aitbc_chain.models import Account
class TestStateTransition:
"""Test state transition validation."""
def test_transaction_validation_insufficient_balance(self):
"""Test that transactions with insufficient balance are rejected."""
state_transition = StateTransition()
# Mock session and transaction data
# This would require a full database setup
# For now, we test the validation logic
tx_data = {
"from": "test_sender",
"to": "test_recipient",
"value": 1000,
"fee": 10,
"nonce": 0
}
# This test would require database setup
# For now, we document the test structure
pass
def test_transaction_validation_invalid_nonce(self):
"""Test that transactions with invalid nonce are rejected."""
state_transition = StateTransition()
tx_data = {
"from": "test_sender",
"to": "test_recipient",
"value": 100,
"fee": 10,
"nonce": 999 # Invalid nonce
}
# This test would require database setup
pass
def test_replay_protection(self):
"""Test that replay attacks are prevented."""
state_transition = StateTransition()
tx_hash = "test_tx_hash"
# Mark transaction as processed
state_transition._processed_tx_hashes.add(tx_hash)
# Try to process again - should fail
assert tx_hash in state_transition._processed_tx_hashes
def test_nonce_tracking(self):
"""Test that nonces are tracked correctly."""
state_transition = StateTransition()
address = "test_address"
nonce = 5
state_transition._processed_nonces[address] = nonce
assert state_transition.get_processed_nonces()[address] == nonce
def test_state_transition_reset(self):
"""Test that state transition can be reset."""
state_transition = StateTransition()
# Add some data
state_transition._processed_tx_hashes.add("test_hash")
state_transition._processed_nonces["test_addr"] = 5
# Reset
state_transition.reset()
# Verify reset
assert len(state_transition._processed_tx_hashes) == 0
assert len(state_transition._processed_nonces) == 0