chore(gitignore): add development log file patterns - Add patterns for .log, .out, and .err files - Add wget-log and download.log patterns - Organize under new "Development Logs" section ```
915 lines
31 KiB
Bash
Executable File
915 lines
31 KiB
Bash
Executable File
#!/bin/bash
|
|
#
|
|
# AITBC Agent Protocols Implementation - Part 2
|
|
# Complete implementation with integration layer and services
|
|
#
|
|
|
|
set -e
|
|
|
|
# Colors for output
|
|
GREEN='\033[0;32m'
|
|
YELLOW='\033[1;33m'
|
|
BLUE='\033[0;34m'
|
|
NC='\033[0m'
|
|
|
|
print_status() {
|
|
echo -e "${GREEN}[INFO]${NC} $1"
|
|
}
|
|
|
|
print_header() {
|
|
echo -e "${BLUE}=== $1 ===${NC}"
|
|
}
|
|
|
|
# Configuration
|
|
PROJECT_ROOT="/opt/aitbc"
|
|
SERVICES_DIR="$PROJECT_ROOT/apps/agent-services"
|
|
AGENTS_DIR="$PROJECT_ROOT/apps/agents"
|
|
|
|
# Complete implementation
|
|
main() {
|
|
print_header "COMPLETING AGENT PROTOCOLS IMPLEMENTATION"
|
|
|
|
# Step 5: Implement Integration Layer
|
|
print_header "Step 5: Implementing Integration Layer"
|
|
implement_integration_layer
|
|
|
|
# Step 6: Create Agent Services
|
|
print_header "Step 6: Creating Agent Services"
|
|
create_agent_services
|
|
|
|
# Step 7: Set up Testing Framework
|
|
print_header "Step 7: Setting Up Testing Framework"
|
|
setup_testing_framework
|
|
|
|
# Step 8: Configure Deployment
|
|
print_header "Step 8: Configuring Deployment"
|
|
configure_deployment
|
|
|
|
print_header "Agent Protocols Implementation Complete! 🎉"
|
|
}
|
|
|
|
# Implement Integration Layer
|
|
implement_integration_layer() {
|
|
print_status "Implementing integration layer..."
|
|
|
|
cat > "$SERVICES_DIR/agent-bridge/src/integration_layer.py" << 'EOF'
|
|
#!/usr/bin/env python3
|
|
"""
|
|
AITBC Agent Integration Layer
|
|
Connects agent protocols to existing AITBC services
|
|
"""
|
|
|
|
import asyncio
|
|
import aiohttp
|
|
import json
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime
|
|
|
|
class AITBCServiceIntegration:
|
|
"""Integration layer for AITBC services"""
|
|
|
|
def __init__(self):
|
|
self.service_endpoints = {
|
|
"coordinator_api": "http://localhost:8000",
|
|
"blockchain_rpc": "http://localhost:8006",
|
|
"exchange_service": "http://localhost:8001",
|
|
"marketplace": "http://localhost:8014",
|
|
"agent_registry": "http://localhost:8003"
|
|
}
|
|
self.session = None
|
|
|
|
async def __aenter__(self):
|
|
self.session = aiohttp.ClientSession()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self.session:
|
|
await self.session.close()
|
|
|
|
async def get_blockchain_info(self) -> Dict[str, Any]:
|
|
"""Get blockchain information"""
|
|
try:
|
|
async with self.session.get(f"{self.service_endpoints['blockchain_rpc']}/health") as response:
|
|
return await response.json()
|
|
except Exception as e:
|
|
return {"error": str(e), "status": "unavailable"}
|
|
|
|
async def get_exchange_status(self) -> Dict[str, Any]:
|
|
"""Get exchange service status"""
|
|
try:
|
|
async with self.session.get(f"{self.service_endpoints['exchange_service']}/api/health") as response:
|
|
return await response.json()
|
|
except Exception as e:
|
|
return {"error": str(e), "status": "unavailable"}
|
|
|
|
async def get_coordinator_status(self) -> Dict[str, Any]:
|
|
"""Get coordinator API status"""
|
|
try:
|
|
async with self.session.get(f"{self.service_endpoints['coordinator_api']}/health") as response:
|
|
return await response.json()
|
|
except Exception as e:
|
|
return {"error": str(e), "status": "unavailable"}
|
|
|
|
async def submit_transaction(self, transaction_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Submit transaction to blockchain"""
|
|
try:
|
|
async with self.session.post(
|
|
f"{self.service_endpoints['blockchain_rpc']}/rpc/submit",
|
|
json=transaction_data
|
|
) as response:
|
|
return await response.json()
|
|
except Exception as e:
|
|
return {"error": str(e), "status": "failed"}
|
|
|
|
async def get_market_data(self, symbol: str = "AITBC/BTC") -> Dict[str, Any]:
|
|
"""Get market data from exchange"""
|
|
try:
|
|
async with self.session.get(f"{self.service_endpoints['exchange_service']}/api/market/{symbol}") as response:
|
|
return await response.json()
|
|
except Exception as e:
|
|
return {"error": str(e), "status": "failed"}
|
|
|
|
async def register_agent_with_coordinator(self, agent_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Register agent with coordinator"""
|
|
try:
|
|
async with self.session.post(
|
|
f"{self.service_endpoints['coordinator_api']}/api/v1/agents/register",
|
|
json=agent_data
|
|
) as response:
|
|
return await response.json()
|
|
except Exception as e:
|
|
return {"error": str(e), "status": "failed"}
|
|
|
|
class AgentServiceBridge:
|
|
"""Bridge between agents and AITBC services"""
|
|
|
|
def __init__(self):
|
|
self.integration = AITBCServiceIntegration()
|
|
self.active_agents = {}
|
|
|
|
async def start_agent(self, agent_id: str, agent_config: Dict[str, Any]) -> bool:
|
|
"""Start an agent with service integration"""
|
|
try:
|
|
# Register agent with coordinator
|
|
async with self.integration as integration:
|
|
registration_result = await integration.register_agent_with_coordinator({
|
|
"agent_id": agent_id,
|
|
"agent_type": agent_config.get("type", "generic"),
|
|
"capabilities": agent_config.get("capabilities", []),
|
|
"endpoint": agent_config.get("endpoint", f"http://localhost:{8000 + len(self.active_agents) + 10}")
|
|
})
|
|
|
|
if registration_result.get("status") == "ok":
|
|
self.active_agents[agent_id] = {
|
|
"config": agent_config,
|
|
"registration": registration_result,
|
|
"started_at": datetime.utcnow()
|
|
}
|
|
return True
|
|
else:
|
|
return False
|
|
except Exception as e:
|
|
print(f"Failed to start agent {agent_id}: {e}")
|
|
return False
|
|
|
|
async def stop_agent(self, agent_id: str) -> bool:
|
|
"""Stop an agent"""
|
|
if agent_id in self.active_agents:
|
|
del self.active_agents[agent_id]
|
|
return True
|
|
return False
|
|
|
|
async def get_agent_status(self, agent_id: str) -> Dict[str, Any]:
|
|
"""Get agent status with service integration"""
|
|
if agent_id not in self.active_agents:
|
|
return {"status": "not_found"}
|
|
|
|
agent_info = self.active_agents[agent_id]
|
|
|
|
async with self.integration as integration:
|
|
# Get service statuses
|
|
blockchain_status = await integration.get_blockchain_info()
|
|
exchange_status = await integration.get_exchange_status()
|
|
coordinator_status = await integration.get_coordinator_status()
|
|
|
|
return {
|
|
"agent_id": agent_id,
|
|
"status": "active",
|
|
"started_at": agent_info["started_at"].isoformat(),
|
|
"services": {
|
|
"blockchain": blockchain_status,
|
|
"exchange": exchange_status,
|
|
"coordinator": coordinator_status
|
|
}
|
|
}
|
|
|
|
async def execute_agent_task(self, agent_id: str, task_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute agent task with service integration"""
|
|
if agent_id not in self.active_agents:
|
|
return {"status": "error", "message": "Agent not found"}
|
|
|
|
task_type = task_data.get("type")
|
|
|
|
if task_type == "market_analysis":
|
|
return await self._execute_market_analysis(task_data)
|
|
elif task_type == "trading":
|
|
return await self._execute_trading_task(task_data)
|
|
elif task_type == "compliance_check":
|
|
return await self._execute_compliance_check(task_data)
|
|
else:
|
|
return {"status": "error", "message": f"Unknown task type: {task_type}"}
|
|
|
|
async def _execute_market_analysis(self, task_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute market analysis task"""
|
|
try:
|
|
async with self.integration as integration:
|
|
market_data = await integration.get_market_data(task_data.get("symbol", "AITBC/BTC"))
|
|
|
|
# Perform basic analysis
|
|
analysis_result = {
|
|
"symbol": task_data.get("symbol", "AITBC/BTC"),
|
|
"market_data": market_data,
|
|
"analysis": {
|
|
"trend": "neutral",
|
|
"volatility": "medium",
|
|
"recommendation": "hold"
|
|
},
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return {"status": "success", "result": analysis_result}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def _execute_trading_task(self, task_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute trading task"""
|
|
try:
|
|
# Get market data first
|
|
async with self.integration as integration:
|
|
market_data = await integration.get_market_data(task_data.get("symbol", "AITBC/BTC"))
|
|
|
|
# Create transaction
|
|
transaction = {
|
|
"type": "trade",
|
|
"symbol": task_data.get("symbol", "AITBC/BTC"),
|
|
"side": task_data.get("side", "buy"),
|
|
"amount": task_data.get("amount", 0.1),
|
|
"price": task_data.get("price", market_data.get("price", 0.001))
|
|
}
|
|
|
|
# Submit transaction
|
|
tx_result = await integration.submit_transaction(transaction)
|
|
|
|
return {"status": "success", "transaction": tx_result}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def _execute_compliance_check(self, task_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute compliance check task"""
|
|
try:
|
|
# Basic compliance check
|
|
compliance_result = {
|
|
"user_id": task_data.get("user_id"),
|
|
"check_type": task_data.get("check_type", "basic"),
|
|
"status": "passed",
|
|
"checks_performed": ["kyc", "aml", "sanctions"],
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return {"status": "success", "result": compliance_result}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
EOF
|
|
|
|
print_status "Integration layer implemented"
|
|
}
|
|
|
|
# Create Agent Services
|
|
create_agent_services() {
|
|
print_status "Creating agent services..."
|
|
|
|
# Trading Agent
|
|
cat > "$AGENTS_DIR/trading/src/trading_agent.py" << 'EOF'
|
|
#!/usr/bin/env python3
|
|
"""
|
|
AITBC Trading Agent
|
|
Automated trading agent for AITBC marketplace
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import time
|
|
from typing import Dict, Any, List
|
|
from datetime import datetime
|
|
import sys
|
|
import os
|
|
|
|
# Add parent directory to path
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '../../../..'))
|
|
|
|
from apps.agent_services.agent_bridge.src.integration_layer import AgentServiceBridge
|
|
|
|
class TradingAgent:
|
|
"""Automated trading agent"""
|
|
|
|
def __init__(self, agent_id: str, config: Dict[str, Any]):
|
|
self.agent_id = agent_id
|
|
self.config = config
|
|
self.bridge = AgentServiceBridge()
|
|
self.is_running = False
|
|
self.trading_strategy = config.get("strategy", "basic")
|
|
self.symbols = config.get("symbols", ["AITBC/BTC"])
|
|
self.trade_interval = config.get("trade_interval", 60) # seconds
|
|
|
|
async def start(self) -> bool:
|
|
"""Start trading agent"""
|
|
try:
|
|
# Register with service bridge
|
|
success = await self.bridge.start_agent(self.agent_id, {
|
|
"type": "trading",
|
|
"capabilities": ["market_analysis", "trading", "risk_management"],
|
|
"endpoint": f"http://localhost:8005"
|
|
})
|
|
|
|
if success:
|
|
self.is_running = True
|
|
print(f"Trading agent {self.agent_id} started successfully")
|
|
return True
|
|
else:
|
|
print(f"Failed to start trading agent {self.agent_id}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error starting trading agent: {e}")
|
|
return False
|
|
|
|
async def stop(self) -> bool:
|
|
"""Stop trading agent"""
|
|
self.is_running = False
|
|
success = await self.bridge.stop_agent(self.agent_id)
|
|
if success:
|
|
print(f"Trading agent {self.agent_id} stopped successfully")
|
|
return success
|
|
|
|
async def run_trading_loop(self):
|
|
"""Main trading loop"""
|
|
while self.is_running:
|
|
try:
|
|
for symbol in self.symbols:
|
|
await self._analyze_and_trade(symbol)
|
|
|
|
await asyncio.sleep(self.trade_interval)
|
|
except Exception as e:
|
|
print(f"Error in trading loop: {e}")
|
|
await asyncio.sleep(10) # Wait before retrying
|
|
|
|
async def _analyze_and_trade(self, symbol: str) -> None:
|
|
"""Analyze market and execute trades"""
|
|
try:
|
|
# Perform market analysis
|
|
analysis_task = {
|
|
"type": "market_analysis",
|
|
"symbol": symbol,
|
|
"strategy": self.trading_strategy
|
|
}
|
|
|
|
analysis_result = await self.bridge.execute_agent_task(self.agent_id, analysis_task)
|
|
|
|
if analysis_result.get("status") == "success":
|
|
analysis = analysis_result["result"]["analysis"]
|
|
|
|
# Make trading decision
|
|
if self._should_trade(analysis):
|
|
await self._execute_trade(symbol, analysis)
|
|
else:
|
|
print(f"Market analysis failed for {symbol}: {analysis_result}")
|
|
|
|
except Exception as e:
|
|
print(f"Error in analyze_and_trade for {symbol}: {e}")
|
|
|
|
def _should_trade(self, analysis: Dict[str, Any]) -> bool:
|
|
"""Determine if should execute trade"""
|
|
recommendation = analysis.get("recommendation", "hold")
|
|
return recommendation in ["buy", "sell"]
|
|
|
|
async def _execute_trade(self, symbol: str, analysis: Dict[str, Any]) -> None:
|
|
"""Execute trade based on analysis"""
|
|
try:
|
|
recommendation = analysis.get("recommendation", "hold")
|
|
|
|
if recommendation == "buy":
|
|
trade_task = {
|
|
"type": "trading",
|
|
"symbol": symbol,
|
|
"side": "buy",
|
|
"amount": self.config.get("trade_amount", 0.1),
|
|
"strategy": self.trading_strategy
|
|
}
|
|
elif recommendation == "sell":
|
|
trade_task = {
|
|
"type": "trading",
|
|
"symbol": symbol,
|
|
"side": "sell",
|
|
"amount": self.config.get("trade_amount", 0.1),
|
|
"strategy": self.trading_strategy
|
|
}
|
|
else:
|
|
return
|
|
|
|
trade_result = await self.bridge.execute_agent_task(self.agent_id, trade_task)
|
|
|
|
if trade_result.get("status") == "success":
|
|
print(f"Trade executed successfully: {trade_result}")
|
|
else:
|
|
print(f"Trade execution failed: {trade_result}")
|
|
|
|
except Exception as e:
|
|
print(f"Error executing trade: {e}")
|
|
|
|
async def get_status(self) -> Dict[str, Any]:
|
|
"""Get agent status"""
|
|
return await self.bridge.get_agent_status(self.agent_id)
|
|
|
|
# Main execution
|
|
async def main():
|
|
"""Main trading agent execution"""
|
|
agent_id = "trading-agent-001"
|
|
config = {
|
|
"strategy": "basic",
|
|
"symbols": ["AITBC/BTC"],
|
|
"trade_interval": 30,
|
|
"trade_amount": 0.1
|
|
}
|
|
|
|
agent = TradingAgent(agent_id, config)
|
|
|
|
# Start agent
|
|
if await agent.start():
|
|
try:
|
|
# Run trading loop
|
|
await agent.run_trading_loop()
|
|
except KeyboardInterrupt:
|
|
print("Shutting down trading agent...")
|
|
finally:
|
|
await agent.stop()
|
|
else:
|
|
print("Failed to start trading agent")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|
|
EOF
|
|
|
|
# Compliance Agent
|
|
cat > "$AGENTS_DIR/compliance/src/compliance_agent.py" << 'EOF'
|
|
#!/usr/bin/env python3
|
|
"""
|
|
AITBC Compliance Agent
|
|
Automated compliance and regulatory monitoring agent
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import time
|
|
from typing import Dict, Any, List
|
|
from datetime import datetime
|
|
import sys
|
|
import os
|
|
|
|
# Add parent directory to path
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '../../../..'))
|
|
|
|
from apps.agent_services.agent_bridge.src.integration_layer import AgentServiceBridge
|
|
|
|
class ComplianceAgent:
|
|
"""Automated compliance agent"""
|
|
|
|
def __init__(self, agent_id: str, config: Dict[str, Any]):
|
|
self.agent_id = agent_id
|
|
self.config = config
|
|
self.bridge = AgentServiceBridge()
|
|
self.is_running = False
|
|
self.check_interval = config.get("check_interval", 300) # 5 minutes
|
|
self.monitored_entities = config.get("monitored_entities", [])
|
|
|
|
async def start(self) -> bool:
|
|
"""Start compliance agent"""
|
|
try:
|
|
success = await self.bridge.start_agent(self.agent_id, {
|
|
"type": "compliance",
|
|
"capabilities": ["kyc_check", "aml_screening", "regulatory_reporting"],
|
|
"endpoint": f"http://localhost:8006"
|
|
})
|
|
|
|
if success:
|
|
self.is_running = True
|
|
print(f"Compliance agent {self.agent_id} started successfully")
|
|
return True
|
|
else:
|
|
print(f"Failed to start compliance agent {self.agent_id}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error starting compliance agent: {e}")
|
|
return False
|
|
|
|
async def stop(self) -> bool:
|
|
"""Stop compliance agent"""
|
|
self.is_running = False
|
|
success = await self.bridge.stop_agent(self.agent_id)
|
|
if success:
|
|
print(f"Compliance agent {self.agent_id} stopped successfully")
|
|
return success
|
|
|
|
async def run_compliance_loop(self):
|
|
"""Main compliance monitoring loop"""
|
|
while self.is_running:
|
|
try:
|
|
for entity in self.monitored_entities:
|
|
await self._perform_compliance_check(entity)
|
|
|
|
await asyncio.sleep(self.check_interval)
|
|
except Exception as e:
|
|
print(f"Error in compliance loop: {e}")
|
|
await asyncio.sleep(30) # Wait before retrying
|
|
|
|
async def _perform_compliance_check(self, entity_id: str) -> None:
|
|
"""Perform compliance check for entity"""
|
|
try:
|
|
compliance_task = {
|
|
"type": "compliance_check",
|
|
"user_id": entity_id,
|
|
"check_type": "full",
|
|
"monitored_activities": ["trading", "transfers", "wallet_creation"]
|
|
}
|
|
|
|
result = await self.bridge.execute_agent_task(self.agent_id, compliance_task)
|
|
|
|
if result.get("status") == "success":
|
|
compliance_result = result["result"]
|
|
await self._handle_compliance_result(entity_id, compliance_result)
|
|
else:
|
|
print(f"Compliance check failed for {entity_id}: {result}")
|
|
|
|
except Exception as e:
|
|
print(f"Error performing compliance check for {entity_id}: {e}")
|
|
|
|
async def _handle_compliance_result(self, entity_id: str, result: Dict[str, Any]) -> None:
|
|
"""Handle compliance check result"""
|
|
status = result.get("status", "unknown")
|
|
|
|
if status == "passed":
|
|
print(f"✅ Compliance check passed for {entity_id}")
|
|
elif status == "failed":
|
|
print(f"❌ Compliance check failed for {entity_id}")
|
|
# Trigger alert or further investigation
|
|
await self._trigger_compliance_alert(entity_id, result)
|
|
else:
|
|
print(f"⚠️ Compliance check inconclusive for {entity_id}")
|
|
|
|
async def _trigger_compliance_alert(self, entity_id: str, result: Dict[str, Any]) -> None:
|
|
"""Trigger compliance alert"""
|
|
alert_data = {
|
|
"entity_id": entity_id,
|
|
"alert_type": "compliance_failure",
|
|
"severity": "high",
|
|
"details": result,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
# In a real implementation, this would send to alert system
|
|
print(f"🚨 COMPLIANCE ALERT: {json.dumps(alert_data, indent=2)}")
|
|
|
|
async def get_status(self) -> Dict[str, Any]:
|
|
"""Get agent status"""
|
|
status = await self.bridge.get_agent_status(self.agent_id)
|
|
status["monitored_entities"] = len(self.monitored_entities)
|
|
status["check_interval"] = self.check_interval
|
|
return status
|
|
|
|
# Main execution
|
|
async def main():
|
|
"""Main compliance agent execution"""
|
|
agent_id = "compliance-agent-001"
|
|
config = {
|
|
"check_interval": 60, # 1 minute for testing
|
|
"monitored_entities": ["user001", "user002", "user003"]
|
|
}
|
|
|
|
agent = ComplianceAgent(agent_id, config)
|
|
|
|
# Start agent
|
|
if await agent.start():
|
|
try:
|
|
# Run compliance loop
|
|
await agent.run_compliance_loop()
|
|
except KeyboardInterrupt:
|
|
print("Shutting down compliance agent...")
|
|
finally:
|
|
await agent.stop()
|
|
else:
|
|
print("Failed to start compliance agent")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|
|
EOF
|
|
|
|
print_status "Agent services created"
|
|
}
|
|
|
|
# Set up Testing Framework
|
|
setup_testing_framework() {
|
|
print_status "Setting up testing framework..."
|
|
|
|
cat > "$PROJECT_ROOT/apps/agent-protocols/tests/test_agent_protocols.py" << 'EOF'
|
|
#!/usr/bin/env python3
|
|
"""
|
|
Test suite for AITBC Agent Protocols
|
|
"""
|
|
|
|
import unittest
|
|
import asyncio
|
|
import json
|
|
import tempfile
|
|
import os
|
|
from datetime import datetime
|
|
|
|
# Add parent directory to path
|
|
import sys
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
|
|
|
|
from src.message_protocol import MessageProtocol, MessageTypes, AgentMessageClient
|
|
from src.task_manager import TaskManager, TaskStatus, TaskPriority
|
|
|
|
class TestMessageProtocol(unittest.TestCase):
|
|
"""Test message protocol functionality"""
|
|
|
|
def setUp(self):
|
|
self.protocol = MessageProtocol()
|
|
self.sender_id = "agent-001"
|
|
self.receiver_id = "agent-002"
|
|
|
|
def test_message_creation(self):
|
|
"""Test message creation"""
|
|
message = self.protocol.create_message(
|
|
sender_id=self.sender_id,
|
|
receiver_id=self.receiver_id,
|
|
message_type=MessageTypes.TASK_ASSIGNMENT,
|
|
payload={"task": "test_task", "data": "test_data"}
|
|
)
|
|
|
|
self.assertEqual(message["sender_id"], self.sender_id)
|
|
self.assertEqual(message["receiver_id"], self.receiver_id)
|
|
self.assertEqual(message["message_type"], MessageTypes.TASK_ASSIGNMENT)
|
|
self.assertIsNotNone(message["signature"])
|
|
|
|
def test_message_verification(self):
|
|
"""Test message verification"""
|
|
message = self.protocol.create_message(
|
|
sender_id=self.sender_id,
|
|
receiver_id=self.receiver_id,
|
|
message_type=MessageTypes.TASK_ASSIGNMENT,
|
|
payload={"task": "test_task"}
|
|
)
|
|
|
|
# Valid message should verify
|
|
self.assertTrue(self.protocol.verify_message(message))
|
|
|
|
# Tampered message should not verify
|
|
message["payload"] = "tampered"
|
|
self.assertFalse(self.protocol.verify_message(message))
|
|
|
|
def test_message_encryption(self):
|
|
"""Test message encryption/decryption"""
|
|
original_payload = {"sensitive": "data", "numbers": [1, 2, 3]}
|
|
|
|
message = self.protocol.create_message(
|
|
sender_id=self.sender_id,
|
|
receiver_id=self.receiver_id,
|
|
message_type=MessageTypes.DATA_RESPONSE,
|
|
payload=original_payload
|
|
)
|
|
|
|
# Decrypt message
|
|
decrypted = self.protocol.decrypt_message(message)
|
|
|
|
self.assertEqual(decrypted["payload"], original_payload)
|
|
|
|
def test_message_queueing(self):
|
|
"""Test message queuing and delivery"""
|
|
message = self.protocol.create_message(
|
|
sender_id=self.sender_id,
|
|
receiver_id=self.receiver_id,
|
|
message_type=MessageTypes.HEARTBEAT,
|
|
payload={"status": "active"}
|
|
)
|
|
|
|
# Send message
|
|
success = self.protocol.send_message(message)
|
|
self.assertTrue(success)
|
|
|
|
# Receive message
|
|
messages = self.protocol.receive_messages(self.receiver_id)
|
|
self.assertEqual(len(messages), 1)
|
|
self.assertEqual(messages[0]["message_type"], MessageTypes.HEARTBEAT)
|
|
|
|
class TestTaskManager(unittest.TestCase):
|
|
"""Test task manager functionality"""
|
|
|
|
def setUp(self):
|
|
self.temp_db = tempfile.NamedTemporaryFile(delete=False)
|
|
self.temp_db.close()
|
|
self.task_manager = TaskManager(self.temp_db.name)
|
|
|
|
def tearDown(self):
|
|
os.unlink(self.temp_db.name)
|
|
|
|
def test_task_creation(self):
|
|
"""Test task creation"""
|
|
task = self.task_manager.create_task(
|
|
task_type="market_analysis",
|
|
payload={"symbol": "AITBC/BTC"},
|
|
required_capabilities=["market_data", "analysis"],
|
|
priority=TaskPriority.HIGH
|
|
)
|
|
|
|
self.assertIsNotNone(task.id)
|
|
self.assertEqual(task.task_type, "market_analysis")
|
|
self.assertEqual(task.status, TaskStatus.PENDING)
|
|
self.assertEqual(task.priority, TaskPriority.HIGH)
|
|
|
|
def test_task_assignment(self):
|
|
"""Test task assignment"""
|
|
task = self.task_manager.create_task(
|
|
task_type="trading",
|
|
payload={"symbol": "AITBC/BTC", "side": "buy"},
|
|
required_capabilities=["trading", "market_access"]
|
|
)
|
|
|
|
success = self.task_manager.assign_task(task.id, "agent-001")
|
|
self.assertTrue(success)
|
|
|
|
# Verify assignment
|
|
updated_task = self.task_manager.get_agent_tasks("agent-001")[0]
|
|
self.assertEqual(updated_task.id, task.id)
|
|
self.assertEqual(updated_task.assigned_agent_id, "agent-001")
|
|
self.assertEqual(updated_task.status, TaskStatus.ASSIGNED)
|
|
|
|
def test_task_completion(self):
|
|
"""Test task completion"""
|
|
task = self.task_manager.create_task(
|
|
task_type="compliance_check",
|
|
payload={"user_id": "user001"},
|
|
required_capabilities=["compliance"]
|
|
)
|
|
|
|
# Assign and start task
|
|
self.task_manager.assign_task(task.id, "agent-002")
|
|
self.task_manager.start_task(task.id)
|
|
|
|
# Complete task
|
|
result = {"status": "passed", "checks": ["kyc", "aml"]}
|
|
success = self.task_manager.complete_task(task.id, result)
|
|
self.assertTrue(success)
|
|
|
|
# Verify completion
|
|
completed_task = self.task_manager.get_agent_tasks("agent-002")[0]
|
|
self.assertEqual(completed_task.status, TaskStatus.COMPLETED)
|
|
self.assertEqual(completed_task.result, result)
|
|
|
|
def test_task_statistics(self):
|
|
"""Test task statistics"""
|
|
# Create multiple tasks
|
|
for i in range(5):
|
|
self.task_manager.create_task(
|
|
task_type=f"task_{i}",
|
|
payload={"index": i},
|
|
required_capabilities=["basic"]
|
|
)
|
|
|
|
stats = self.task_manager.get_task_statistics()
|
|
|
|
self.assertIn("task_counts", stats)
|
|
self.assertIn("agent_statistics", stats)
|
|
self.assertEqual(stats["task_counts"]["pending"], 5)
|
|
|
|
class TestAgentMessageClient(unittest.TestCase):
|
|
"""Test agent message client"""
|
|
|
|
def setUp(self):
|
|
self.client = AgentMessageClient("agent-001", "http://localhost:8003")
|
|
|
|
def test_task_assignment_message(self):
|
|
"""Test task assignment message creation"""
|
|
task_data = {"task": "test_task", "parameters": {"param1": "value1"}}
|
|
|
|
success = self.client.send_task_assignment("agent-002", task_data)
|
|
self.assertTrue(success)
|
|
|
|
# Check message queue
|
|
messages = self.client.receive_messages()
|
|
self.assertEqual(len(messages), 1)
|
|
self.assertEqual(messages[0]["message_type"], MessageTypes.TASK_ASSIGNMENT)
|
|
|
|
def test_coordination_message(self):
|
|
"""Test coordination message"""
|
|
coordination_data = {"action": "coordinate", "details": {"target": "goal"}}
|
|
|
|
success = self.client.send_coordination_message("agent-003", coordination_data)
|
|
self.assertTrue(success)
|
|
|
|
# Check message queue
|
|
messages = self.client.get_coordination_messages()
|
|
self.assertEqual(len(messages), 1)
|
|
self.assertEqual(messages[0]["message_type"], MessageTypes.COORDINATION)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|
|
EOF
|
|
|
|
print_status "Testing framework set up"
|
|
}
|
|
|
|
# Configure Deployment
|
|
configure_deployment() {
|
|
print_status "Configuring deployment..."
|
|
|
|
# Create systemd service files
|
|
cat > "/etc/systemd/system/aitbc-agent-registry.service" << 'EOF'
|
|
[Unit]
|
|
Description=AITBC Agent Registry Service
|
|
After=network.target
|
|
|
|
[Service]
|
|
Type=simple
|
|
User=aitbc
|
|
Group=aitbc
|
|
WorkingDirectory=/opt/aitbc/apps/agent-registry/src
|
|
Environment=PYTHONPATH=/opt/aitbc
|
|
ExecStart=/usr/bin/python3 app.py
|
|
Restart=always
|
|
RestartSec=10
|
|
|
|
[Install]
|
|
WantedBy=multi-user.target
|
|
EOF
|
|
|
|
cat > "/etc/systemd/system/aitbc-agent-coordinator.service" << 'EOF'
|
|
[Unit]
|
|
Description=AITBC Agent Coordinator Service
|
|
After=network.target aitbc-agent-registry.service
|
|
|
|
[Service]
|
|
Type=simple
|
|
User=aitbc
|
|
Group=aitbc
|
|
WorkingDirectory=/opt/aitbc/apps/agent-services/agent-coordinator/src
|
|
Environment=PYTHONPATH=/opt/aitbc
|
|
ExecStart=/usr/bin/python3 coordinator.py
|
|
Restart=always
|
|
RestartSec=10
|
|
|
|
[Install]
|
|
WantedBy=multi-user.target
|
|
EOF
|
|
|
|
# Create deployment script
|
|
cat > "$PROJECT_ROOT/scripts/deploy-agent-protocols.sh" << 'EOF'
|
|
#!/bin/bash
|
|
# Deploy AITBC Agent Protocols
|
|
|
|
set -e
|
|
|
|
echo "🚀 Deploying AITBC Agent Protocols..."
|
|
|
|
# Install dependencies
|
|
pip3 install fastapi uvicorn pydantic cryptography aiohttp
|
|
|
|
# Enable and start services
|
|
systemctl daemon-reload
|
|
systemctl enable aitbc-agent-registry
|
|
systemctl enable aitbc-agent-coordinator
|
|
systemctl start aitbc-agent-registry
|
|
systemctl start aitbc-agent-coordinator
|
|
|
|
# Wait for services to start
|
|
sleep 5
|
|
|
|
# Check service status
|
|
echo "Checking service status..."
|
|
systemctl status aitbc-agent-registry --no-pager
|
|
systemctl status aitbc-agent-coordinator --no-pager
|
|
|
|
# Test services
|
|
echo "Testing services..."
|
|
curl -s http://localhost:8003/api/health || echo "Agent Registry not responding"
|
|
curl -s http://localhost:8004/api/health || echo "Agent Coordinator not responding"
|
|
|
|
echo "✅ Agent Protocols deployment complete!"
|
|
EOF
|
|
|
|
chmod +x "$PROJECT_ROOT/scripts/deploy-agent-protocols.sh"
|
|
|
|
print_status "Deployment configured"
|
|
}
|
|
|
|
# Run main function
|
|
main "$@"
|