feat: implement all placeholder implementations from codemap
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Failing after 52s
Cross-Chain Functionality Tests / test-cross-chain-sync (push) Successful in 2s
Cross-Chain Functionality Tests / test-cross-chain-transactions (push) Successful in 3s
Cross-Chain Functionality Tests / test-cross-chain-bridge (push) Has been skipped
Cross-Chain Functionality Tests / test-multi-chain-consensus (push) Successful in 2s
Cross-Chain Functionality Tests / aggregate-results (push) Has been skipped
Cross-Node Transaction Testing / transaction-test (push) Successful in 3s
Deploy to Testnet / deploy-testnet (push) Successful in 1m12s
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Chain Island Architecture Tests / test-multi-chain-island (push) Successful in 9s
Multi-Node Blockchain Health Monitoring / health-check (push) Failing after 1m5s
P2P Network Verification / p2p-verification (push) Successful in 8s
Package Tests / Python package - aitbc-agent-sdk (push) Successful in 39s
Package Tests / Python package - aitbc-core (push) Successful in 21s
Package Tests / Python package - aitbc-crypto (push) Successful in 13s
Package Tests / Python package - aitbc-sdk (push) Successful in 12s
Package Tests / JavaScript package - aitbc-sdk-js (push) Successful in 8s
Package Tests / JavaScript package - aitbc-token (push) Successful in 25s
Security Scanning / security-scan (push) Successful in 56s

Security-critical implementations:
- Implement HMAC-SHA256 MAC computation in setup_production.py (3b)
- Implement HMAC-SHA256 MAC computation in keystore.py (3c)
- Implement agent SDK signature verification using coordinator API (2a)

Database cleanup:
- Remove backward compatibility engine from database.py (3a)
- Update fix_db.py to use get_engine() instead of direct engine import

Agent integration service deployment:
- Implement systemd-based agent instance deployment (1a)
- Implement HTTP health check for agent instances (1b)
- Implement systemd service removal for agent instances (1c)
- Implement deployment rollback with previous version redeployment (1d)
- Implement metrics collection from agent endpoints with fallback (agent_integration.py:828)
- Implement alerting rules with configurable thresholds (agent_integration.py:856)

All implementations follow the plan priorities:
- Security-critical items first (MAC computation, signature verification)
- Database cleanup second (backward compatibility removal)
- Agent integration service deployment third (systemd deployment, health checks, removal, rollback, metrics, alerting)
This commit is contained in:
aitbc
2026-05-03 23:30:57 +02:00
parent 9261058754
commit 7d14970392
6 changed files with 319 additions and 84 deletions

View File

@@ -1,8 +1,9 @@
from aitbc_chain.database import engine, init_db
from aitbc_chain.database import get_engine, init_db
from sqlalchemy import text
def fix():
init_db()
engine = get_engine()
with engine.connect() as conn:
try:
conn.execute(text('ALTER TABLE "transaction" ADD COLUMN metadata TEXT'))

View File

@@ -14,6 +14,8 @@ Usage:
from __future__ import annotations
import argparse
import hashlib
import hmac
import json
import os
import sys
@@ -62,6 +64,11 @@ def encrypt_private_key(private_key_bytes: bytes, password: str, salt: bytes) ->
nonce = os.urandom(12)
encrypted = aesgcm.encrypt(nonce, private_key_bytes, None)
# Compute MAC for web3 keystore format (HMAC-SHA256)
# MAC is computed over derived_key[16:32] + ciphertext
mac_data = key[16:32] + encrypted
mac = hmac.new(key[:16], mac_data, hashlib.sha256).hexdigest()
return {
"crypto": {
"cipher": "aes-256-gcm",
@@ -74,7 +81,7 @@ def encrypt_private_key(private_key_bytes: bytes, password: str, salt: bytes) ->
"c": 100_000,
"prf": "hmac-sha256"
},
"mac": "TODO" # In production, compute MAC over ciphertext and KDF params
"mac": mac
},
"address": None, # to be filled
"keytype": "ed25519",

View File

@@ -11,6 +11,8 @@ No admin minting; fixed supply at genesis.
from __future__ import annotations
import argparse
import hashlib
import hmac
import json
import os
import secrets
@@ -56,6 +58,11 @@ def encrypt_private_key(private_bytes: bytes, password: str, salt: bytes) -> dic
nonce = os.urandom(12)
ciphertext = aesgcm.encrypt(nonce, private_bytes, None)
# Compute MAC for web3 keystore format (HMAC-SHA256)
# MAC is computed over derived_key[16:32] + ciphertext
mac_data = key[16:32] + ciphertext
mac = hmac.new(key[:16], mac_data, hashlib.sha256).hexdigest()
return {
"crypto": {
"cipher": "aes-256-gcm",
@@ -68,7 +75,7 @@ def encrypt_private_key(private_bytes: bytes, password: str, salt: bytes) -> dic
"c": 100_000,
"prf": "hmac-sha256"
},
"mac": "TODO" # In production, compute proper MAC
"mac": mac
},
"address": None,
"keytype": "ed25519",

View File

@@ -238,7 +238,3 @@ def shutdown_db(chain_id: str = "") -> None:
if resolved_chain_id in _engines:
_engines[resolved_chain_id].dispose()
del _engines[resolved_chain_id]
# Backward compatibility - expose engine for escrow routes (to be removed in Phase 1.3)
# TODO: Remove this in Phase 1.3 when escrow routes are updated
engine = _engine_internal

View File

@@ -3,6 +3,12 @@ Agent Integration and Deployment Framework for Verifiable AI Agent Orchestration
Integrates agent orchestration with existing ML ZK proof system and provides deployment tools
"""
import asyncio
import os
import subprocess
import tempfile
from pathlib import Path
from aitbc import get_logger
logger = get_logger(__name__)
@@ -458,18 +464,17 @@ class AgentDeploymentManager:
self.session.commit()
self.session.refresh(instance)
# TODO: Actually deploy the instance
# This would involve:
# 1. Setting up the runtime environment
# 2. Deploying the agent orchestration service
# 3. Configuring health checks
# 4. Setting up monitoring
# For now, simulate successful deployment
instance.status = DeploymentStatus.DEPLOYED
instance.health_status = "healthy"
instance.endpoint_url = f"http://localhost:{instance.port}"
instance.last_health_check = datetime.now(timezone.utc)
# Deploy instance using systemd
try:
await self._deploy_agent_systemd(instance, config)
instance.status = DeploymentStatus.DEPLOYED
instance.health_status = "healthy"
instance.endpoint_url = f"http://localhost:{instance.port}"
instance.last_health_check = datetime.now(timezone.utc)
except Exception as deploy_error:
logger.error(f"Systemd deployment failed for {instance_id}: {deploy_error}")
instance.status = DeploymentStatus.FAILED
instance.health_status = "unhealthy"
self.session.commit()
@@ -538,19 +543,124 @@ class AgentDeploymentManager:
logger.error(f"Health monitoring failed for {deployment_config_id}: {e}")
raise
async def _deploy_agent_systemd(
self, instance: AgentDeploymentInstance, config: AgentDeploymentConfig
) -> None:
"""Deploy agent instance using systemd service"""
service_name = f"aitbc-agent-{instance.instance_id}"
service_file = f"/etc/systemd/system/{service_name}.service"
# Generate systemd service file
service_content = f"""[Unit]
Description=AITBC Agent Instance {instance.instance_id}
Documentation=https://github.com/aitbc/blockchain
After=network.target aitbc-blockchain-node.service
Requires=aitbc-blockchain-node.service
[Service]
Type=simple
User=root
Group=root
WorkingDirectory=/opt/aitbc
EnvironmentFile=/etc/aitbc/.env
Environment="AGENT_ID={instance.instance_id}"
Environment="AGENT_PORT={instance.port}"
Environment="PYTHONPATH=/opt/aitbc/packages/py/aitbc-agent-sdk/src:/opt/aitbc"
Environment="PATH=/opt/aitbc/venv/bin:/usr/local/bin:/usr/bin:/bin"
ExecStart=/opt/aitbc/venv/bin/python /opt/aitbc/scripts/wrappers/aitbc-agent-daemon-wrapper.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=AgentInstance-{instance.instance_id}
# Security settings
NoNewPrivileges=true
PrivateTmp=true
ProtectHome=true
[Install]
WantedBy=multi-user.target
"""
try:
# Write systemd service file
with open(service_file, 'w') as f:
f.write(service_content)
os.chmod(service_file, 0o644)
# Reload systemd daemon
subprocess.run(['systemctl', 'daemon-reload'], check=True, capture_output=True)
# Enable and start service
subprocess.run(['systemctl', 'enable', service_name], check=True, capture_output=True)
subprocess.run(['systemctl', 'start', service_name], check=True, capture_output=True)
# Wait for service to become active
max_wait = 30
for i in range(max_wait):
result = subprocess.run(
['systemctl', 'is-active', service_name],
capture_output=True,
text=True
)
if result.stdout.strip() == 'active':
logger.info(f"Service {service_name} is active")
break
await asyncio.sleep(1)
else:
raise RuntimeError(f"Service {service_name} did not become active within {max_wait}s")
logger.info(f"Successfully deployed agent instance {instance.instance_id} via systemd")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to deploy systemd service {service_name}: {e.stderr}")
raise RuntimeError(f"Systemd deployment failed: {e.stderr}")
except Exception as e:
logger.error(f"Error deploying systemd service: {e}")
raise
async def _check_instance_health(self, instance: AgentDeploymentInstance) -> dict[str, Any]:
"""Check health of individual instance"""
try:
# TODO: Implement actual health check
# This would involve:
# 1. HTTP health check endpoint
# 2. Resource usage monitoring
# 3. Performance metrics collection
# Check systemd service status
service_name = f"aitbc-agent-{instance.instance_id}"
result = subprocess.run(
['systemctl', 'is-active', service_name],
capture_output=True,
text=True
)
# For now, simulate health check
health_status = "healthy"
response_time = 0.1
service_active = result.stdout.strip() == 'active'
# HTTP health check endpoint
health_status = "unhealthy"
response_time = 0.0
if service_active and instance.endpoint_url:
try:
import httpx
start_time = datetime.now(timezone.utc)
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{instance.endpoint_url}/health")
end_time = datetime.now(timezone.utc)
response_time = (end_time - start_time).total_seconds()
if response.status_code == 200:
health_data = response.json()
if health_data.get("status") == "healthy":
health_status = "healthy"
else:
health_status = "degraded"
else:
health_status = "unhealthy"
except Exception as http_error:
logger.warning(f"HTTP health check failed for {instance.instance_id}: {http_error}")
health_status = "degraded" if service_active else "unhealthy"
else:
health_status = "healthy" if service_active else "unhealthy"
# Update instance health status
instance.health_status = health_status
@@ -561,6 +671,7 @@ class AgentDeploymentManager:
"timestamp": datetime.now(timezone.utc).isoformat(),
"status": health_status,
"response_time": response_time,
"service_active": service_active,
}
instance.health_check_history.append(health_check_record)
@@ -653,13 +764,29 @@ class AgentDeploymentManager:
try:
instance = self.session.get(AgentDeploymentInstance, instance_id)
if instance:
# TODO: Actually remove the instance
# This would involve:
# 1. Stopping the service
# 2. Cleaning up resources
# 3. Removing from load balancer
# Remove systemd service
service_name = f"aitbc-agent-{instance.instance_id}"
service_file = f"/etc/systemd/system/{service_name}.service"
# For now, just mark as terminated
try:
# Stop and disable service
subprocess.run(['systemctl', 'stop', service_name], check=True, capture_output=True)
subprocess.run(['systemctl', 'disable', service_name], check=True, capture_output=True)
# Remove service file
if os.path.exists(service_file):
os.remove(service_file)
# Reload systemd daemon
subprocess.run(['systemctl', 'daemon-reload'], check=True, capture_output=True)
logger.info(f"Removed systemd service: {service_name}")
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to remove systemd service {service_name}: {e.stderr}")
except Exception as e:
logger.warning(f"Error removing systemd service: {e}")
# Mark as terminated
instance.status = DeploymentStatus.TERMINATED
self.session.commit()
@@ -696,19 +823,28 @@ class AgentDeploymentManager:
# Rollback each instance
for instance in current_instances:
try:
# TODO: Implement actual rollback
# This would involve:
# 1. Deploying previous version
# 2. Verifying rollback success
# 3. Updating load balancer
# Deploy previous version using systemd
# For rollback, we redeploy with the previous configuration
if config.previous_version:
# Remove current instance
await self._remove_deployment_instance(instance.id)
# For now, just mark as rolled back
instance.status = DeploymentStatus.FAILED
self.session.commit()
# Redeploy with previous version
previous_config = config
previous_config.agent_version = config.previous_version
rollback_result["rolled_back_instances"].append(
{"instance_id": instance.instance_id, "status": "rolled_back"}
)
# Recreate instance with previous version
instance_number = int(instance.instance_id.split("-")[-1])
await self._create_deployment_instance(previous_config, instance.environment, instance_number)
rollback_result["rolled_back_instances"].append(
{"instance_id": instance.instance_id, "status": "rolled_back"}
)
else:
logger.warning(f"No previous version available for {instance.instance_id}")
rollback_result["rollback_errors"].append(
{"instance_id": instance.instance_id, "error": "No previous version available"}
)
except Exception as e:
rollback_result["rollback_errors"].append({"instance_id": instance.instance_id, "error": str(e)})
@@ -825,26 +961,66 @@ class AgentMonitoringManager:
"""Collect metrics from individual instance"""
try:
# TODO: Implement actual metrics collection
# This would involve:
# 1. Querying metrics endpoints
# 2. Collecting performance data
# 3. Aggregating time series data
# For now, return current instance data
return {
# Query agent instance metrics endpoint
metrics_data = {
"instance_id": instance.instance_id,
"status": instance.status,
"health_status": instance.health_status,
"request_count": instance.request_count,
"error_count": instance.error_count,
"average_response_time": instance.average_response_time,
"cpu_usage": instance.cpu_usage,
"memory_usage": instance.memory_usage,
"uptime_percentage": instance.uptime_percentage,
"last_health_check": instance.last_health_check.isoformat() if instance.last_health_check else None,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
# Try to fetch metrics from agent instance if endpoint is available
if instance.endpoint_url:
try:
import httpx
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{instance.endpoint_url}/metrics")
if response.status_code == 200:
agent_metrics = response.json()
metrics_data.update({
"cpu_usage": agent_metrics.get("cpu_usage", instance.cpu_usage),
"memory_usage": agent_metrics.get("memory_usage", instance.memory_usage),
"request_count": agent_metrics.get("request_count", instance.request_count),
"error_count": agent_metrics.get("error_count", instance.error_count),
"average_response_time": agent_metrics.get("average_response_time", instance.average_response_time),
"uptime_percentage": agent_metrics.get("uptime_percentage", instance.uptime_percentage),
})
else:
# Use instance database values if endpoint unavailable
metrics_data.update({
"cpu_usage": instance.cpu_usage,
"memory_usage": instance.memory_usage,
"request_count": instance.request_count,
"error_count": instance.error_count,
"average_response_time": instance.average_response_time,
"uptime_percentage": instance.uptime_percentage,
})
except Exception as http_error:
logger.warning(f"Failed to fetch metrics from {instance.instance_id}: {http_error}")
# Use instance database values as fallback
metrics_data.update({
"cpu_usage": instance.cpu_usage,
"memory_usage": instance.memory_usage,
"request_count": instance.request_count,
"error_count": instance.error_count,
"average_response_time": instance.average_response_time,
"uptime_percentage": instance.uptime_percentage,
})
else:
# Use instance database values if no endpoint
metrics_data.update({
"cpu_usage": instance.cpu_usage,
"memory_usage": instance.memory_usage,
"request_count": instance.request_count,
"error_count": instance.error_count,
"average_response_time": instance.average_response_time,
"uptime_percentage": instance.uptime_percentage,
})
metrics_data["last_health_check"] = instance.last_health_check.isoformat() if instance.last_health_check else None
return metrics_data
except Exception as e:
logger.error(f"Metrics collection failed for instance {instance.id}: {e}")
return {"instance_id": instance.instance_id, "error": str(e)}
@@ -853,31 +1029,44 @@ class AgentMonitoringManager:
"""Create alerting rules for deployment monitoring"""
try:
# TODO: Implement alerting rules
# This would involve:
# 1. Setting up monitoring thresholds
# 2. Configuring alert channels
# 3. Creating alert escalation policies
# Get deployment configuration
config = self.session.get(AgentDeploymentConfig, deployment_config_id)
if not config:
raise ValueError(f"Deployment config not found: {deployment_config_id}")
# Store alerting rules in configuration
config.alerting_rules = alerting_rules
self.session.commit()
# Define default thresholds if not provided
thresholds = alerting_rules.get("thresholds", {
"cpu_usage_warning": 80.0,
"cpu_usage_critical": 90.0,
"memory_usage_warning": 85.0,
"memory_usage_critical": 95.0,
"error_rate_warning": 0.05,
"error_rate_critical": 0.10,
"response_time_warning": 2.0,
"response_time_critical": 5.0,
})
# Configure alert channels
alert_channels = alerting_rules.get("channels", ["log"])
alerting_result = {
"deployment_id": deployment_config_id,
"alerting_rules": alerting_rules,
"rules_created": len(alerting_rules.get("rules", [])),
"thresholds_configured": thresholds,
"alert_channels": alert_channels,
"status": "created",
}
# Log alerting configuration
await self.auditor.log_event(
AuditEventType.WORKFLOW_CREATED,
workflow_id=None,
security_level=SecurityLevel.INTERNAL,
event_data={"alerting_config": alerting_result},
)
logger.info(f"Created alerting rules for deployment {deployment_config_id}")
return alerting_result
except Exception as e:
logger.error(f"Alerting rules creation failed for {deployment_config_id}: {e}")
logger.error(f"Failed to create alerting rules for {deployment_config_id}: {e}")
raise

View File

@@ -4,12 +4,13 @@ Core Agent class for AITBC network participation
import asyncio
import json
import os
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import rsa, ed25519
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
@@ -301,6 +302,26 @@ class Agent:
logger.error(f"Error sending message: {e}")
return False
async def _fetch_sender_public_key(self, sender_id: str) -> Optional[str]:
"""Fetch sender's public key from coordinator API"""
try:
coordinator_url = os.getenv("COORDINATOR_API_URL", "http://localhost:8011")
client = AITBCHTTPClient(timeout=5.0)
response = client.get(f"{coordinator_url}/v1/agent-identity/{sender_id}")
if response and "public_key" in response:
return response["public_key"]
else:
logger.warning(f"No public key found for agent {sender_id}")
return None
except NetworkError as e:
logger.error(f"Failed to fetch public key for {sender_id}: {e}")
return None
except Exception as e:
logger.error(f"Error fetching public key: {e}")
return None
async def receive_message(self, message: Dict[str, Any]) -> bool:
"""Process a received message from another agent"""
# Verify signature
@@ -316,13 +337,27 @@ class Agent:
message_to_verify = message.copy()
message_to_verify.pop("signature", None)
# In a real implementation, we would fetch the sender's public key
# For now, we'll assume the signature is valid if present
# TODO: Fetch sender's public key from coordinator API and verify
logger.info(
f"Received message from {sender_id}: {message.get('type')}"
)
return True
# Fetch sender's public key from coordinator API
public_key_hex = await self._fetch_sender_public_key(sender_id)
if not public_key_hex:
logger.error(f"Failed to fetch public key for {sender_id}, rejecting message")
return False
# Verify signature using ed25519
try:
public_key_bytes = bytes.fromhex(public_key_hex)
public_key = ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes)
message_bytes = json.dumps(message_to_verify, sort_keys=True).encode('utf-8')
public_key.verify(signature, message_bytes)
logger.info(
f"Received message from {sender_id}: {message.get('type')} (signature verified)"
)
return True
except Exception as e:
logger.error(f"Signature verification failed for {sender_id}: {e}")
return False
def to_dict(self) -> Dict[str, Any]:
"""Convert agent to dictionary representation"""