diff --git a/docs/agent-training/README.md b/docs/agent-training/README.md index 70583bd8..348a282a 100644 --- a/docs/agent-training/README.md +++ b/docs/agent-training/README.md @@ -99,6 +99,20 @@ The training program consists of the following stages: - Multi-chain asset management - Interoperability patterns +### Stage 10: Failure Recovery +**File:** `stage10_failure_recovery.json` +- Error handling strategies +- Recovery procedures +- Fault tolerance mechanisms +- System resilience + +### Stage 11: Agent Communication +**File:** `stage11_agent_communication.json` +- Message sending protocols (hierarchical, peer-to-peer, broadcast) +- Message history and retrieval +- Peer connection management +- Communication performance metrics + ## Training Schema The training stages follow a standardized JSON schema defined in `training_schema.json`, which specifies: @@ -192,12 +206,12 @@ For issues during training: ## Quality Metrics -- **Stage Coverage:** 9 comprehensive training stages +- **Stage Coverage:** 11 comprehensive training stages - **Schema Compliance:** 100% adherence to training schema - **Documentation Completeness:** All stages documented with examples - **Validation Coverage:** Each stage includes success criteria --- -**Last Updated:** 2026-05-06 +**Last Updated:** 2026-05-07 **Maintained By:** AITBC Development Team diff --git a/docs/agent-training/stage11_agent_communication.json b/docs/agent-training/stage11_agent_communication.json new file mode 100644 index 00000000..557c9dbb --- /dev/null +++ b/docs/agent-training/stage11_agent_communication.json @@ -0,0 +1,253 @@ +{ + "stage": "stage11_agent_communication", + "agent_type": "coordinator", + "difficulty": "intermediate", + "estimated_time_minutes": 45, + "skill_level": "practitioner", + "depends_on": ["stage3_ai_operations"], + "skills": [ + "message_sending", + "protocol_selection", + "broadcast_messaging", + "message_history", + "peer_management", + "communication_metrics" + ], + "objectives": [ + "Send messages using hierarchical protocol", + "Send messages using peer-to-peer protocol", + "Broadcast messages to multiple agents", + "Retrieve message history with filtering", + "Manage peer connections", + "Monitor communication performance metrics" + ], + "scenarios": [ + "/docs/scenarios/agent_communication_basics.md" + ], + "training_data": { + "operations": [ + { + "operation": "message_send_hierarchical", + "parameters": { + "coordinator_url": "http://localhost:9001", + "receiver_id": "worker-1", + "message_type": "direct", + "payload": {"task": "test_task"}, + "priority": "normal", + "protocol": "hierarchical" + }, + "expected_result": { + "status": "success", + "message_id": "*" + }, + "success_criteria": { + "status": "success", + "response_fields": ["message_id", "protocol", "sent_at"] + } + }, + { + "operation": "message_send_peer_to_peer", + "parameters": { + "coordinator_url": "http://localhost:9001", + "receiver_id": "worker-2", + "message_type": "direct", + "payload": {"task": "peer_task"}, + "priority": "normal", + "protocol": "peer_to_peer" + }, + "expected_result": { + "status": "success", + "message_id": "*" + }, + "success_criteria": { + "status": "success", + "response_fields": ["message_id", "protocol", "sent_at"] + } + }, + { + "operation": "message_broadcast", + "parameters": { + "coordinator_url": "http://localhost:9001", + "message_type": "broadcast", + "payload": {"announcement": "test_announcement"}, + "priority": "normal", + "agent_type": "worker" + }, + "expected_result": { + "status": "success", + "count": ">0" + }, + "success_criteria": { + "status": "success", + "response_fields": ["recipients", "count", "broadcast_at"] + } + }, + { + "operation": "message_history", + "parameters": { + "coordinator_url": "http://localhost:9001", + "limit": 10 + }, + "expected_result": { + "status": "success", + "messages": "[]" + }, + "success_criteria": { + "status": "success", + "response_fields": ["messages", "count", "total"] + } + }, + { + "operation": "peer_add", + "parameters": { + "coordinator_url": "http://localhost:9001", + "agent_id": "worker-1", + "peer_id": "worker-2" + }, + "expected_result": { + "status": "success" + }, + "success_criteria": { + "status": "success", + "response_fields": ["agent_id", "peer_id", "connected_at"] + } + }, + { + "operation": "peer_list", + "parameters": { + "coordinator_url": "http://localhost:9001" + }, + "expected_result": { + "status": "success", + "connections": "{}" + }, + "success_criteria": { + "status": "success", + "response_fields": ["connections", "total_agents", "total_peers"] + } + }, + { + "operation": "peer_list_agent", + "parameters": { + "coordinator_url": "http://localhost:9001", + "agent_id": "worker-1" + }, + "expected_result": { + "status": "success", + "peers": "[]" + }, + "success_criteria": { + "status": "success", + "response_fields": ["agent_id", "peers", "count"] + } + }, + { + "operation": "peer_remove", + "parameters": { + "coordinator_url": "http://localhost:9001", + "agent_id": "worker-1", + "peer_id": "worker-2" + }, + "expected_result": { + "status": "success" + }, + "success_criteria": { + "status": "success", + "response_fields": ["agent_id", "peer_id", "removed_at"] + } + } + ] + }, + "validation": { + "exam_tests": [ + { + "test_name": "Send hierarchical message", + "operation": "message_send_hierarchical", + "test_case": { + "coordinator_url": "http://localhost:9001", + "receiver_id": "worker-1", + "message_type": "direct", + "payload": {"test": "exam"}, + "priority": "normal", + "protocol": "hierarchical" + }, + "expected_output": { + "status": "success", + "message_id": "*" + }, + "weight": 2 + }, + { + "test_name": "Send peer-to-peer message", + "operation": "message_send_peer_to_peer", + "test_case": { + "coordinator_url": "http://localhost:9001", + "receiver_id": "worker-2", + "message_type": "direct", + "payload": {"test": "exam"}, + "priority": "normal", + "protocol": "peer_to_peer" + }, + "expected_output": { + "status": "success", + "message_id": "*" + }, + "weight": 2 + }, + { + "test_name": "Broadcast message", + "operation": "message_broadcast", + "test_case": { + "coordinator_url": "http://localhost:9001", + "message_type": "broadcast", + "payload": {"test": "exam"}, + "priority": "normal" + }, + "expected_output": { + "status": "success", + "count": ">0" + }, + "weight": 2 + }, + { + "test_name": "Retrieve message history", + "operation": "message_history", + "test_case": { + "coordinator_url": "http://localhost:9001", + "limit": 10 + }, + "expected_output": { + "status": "success", + "messages": "[]" + }, + "weight": 1 + }, + { + "test_name": "Add peer connection", + "operation": "peer_add", + "test_case": { + "coordinator_url": "http://localhost:9001", + "agent_id": "worker-1", + "peer_id": "worker-2" + }, + "expected_output": { + "status": "success" + }, + "weight": 1 + }, + { + "test_name": "List peer connections", + "operation": "peer_list", + "test_case": { + "coordinator_url": "http://localhost:9001" + }, + "expected_output": { + "status": "success", + "connections": "{}" + }, + "weight": 1 + } + ], + "passing_score": 80 + } +} diff --git a/docs/scenarios/42_cross_chain_atomic_swap.md b/docs/scenarios/42_cross_chain_atomic_swap.md index ec92fddb..47c8ce13 100644 --- a/docs/scenarios/42_cross_chain_atomic_swap.md +++ b/docs/scenarios/42_cross_chain_atomic_swap.md @@ -95,19 +95,28 @@ Lock tokens on source chain with hashlock and timelock: ```bash # Initiate swap on source chain (ait-mainnet) -aitbc atomic-swap initiate \ - --chain ait-mainnet \ - --swap-id $SWAP_ID \ - --token AITBC \ - --amount 1000 \ - --participant aitbc1recipient \ - --hashlock $HASHLOCK \ - --timelock 3600 \ - --wallet mainnet-wallet +# First deploy the CrossChainAtomicSwap contract if not already deployed +aitbc contract deploy \ + --name CrossChainAtomicSwap \ + --type atomic-swap \ + --rpc-url http://localhost:8545 \ + --password-file ~/.aitbc/wallets/mainnet-wallet.password + +# Call the initiateSwap method +aitbc contract call \ + --address \ + --method initiateSwap \ + --params '{"swapId": "'"$SWAP_ID"'", "token": "AITBC", "amount": 1000, "participant": "aitbc1recipient", "hashlock": "'"$HASHLOCK"'", "timelock": 3600}' \ + --rpc-url http://localhost:8545 \ + --password-file ~/.aitbc/wallets/mainnet-wallet.password ``` Output: ``` +Contract call result: + Address: + Method: initiateSwap + Result: {"swapId": "$SWAP_ID", "status": "OPEN"} Swap initiated on ait-mainnet Swap ID: $SWAP_ID Amount: 1000 AITBC @@ -121,16 +130,20 @@ Status: OPEN Counterparty initiates matching swap on destination chain: ```bash +# Deploy CrossChainAtomicSwap contract on destination chain if not already deployed +aitbc contract deploy \ + --name CrossChainAtomicSwap \ + --type atomic-swap \ + --rpc-url http://localhost:8546 \ + --password-file ~/.aitbc/wallets/testnet-wallet.password + # Counterparty initiates on destination chain (ait-testnet) -aitbc atomic-swap initiate \ - --chain ait-testnet \ - --swap-id $SWAP_ID \ - --token AITBC \ - --amount 950 \ - --participant aitbc1sender \ - --hashlock $HASHLOCK \ - --timelock 3600 \ - --wallet testnet-wallet +aitbc contract call \ + --address \ + --method initiateSwap \ + --params '{"swapId": "'"$SWAP_ID"'", "token": "AITBC", "amount": 950, "participant": "aitbc1sender", "hashlock": "'"$HASHLOCK"'", "timelock": 3600}' \ + --rpc-url http://localhost:8546 \ + --password-file ~/.aitbc/wallets/testnet-wallet.password ``` ### **Step 4: Complete Atomic Swap** @@ -138,19 +151,21 @@ aitbc atomic-swap initiate \ Reveal secret to complete both swaps atomically: ```bash -# Reveal secret to complete swap -aitbc atomic-swap complete \ - --chain ait-mainnet \ - --swap-id $SWAP_ID \ - --secret $SECRET \ - --wallet mainnet-wallet +# Reveal secret to complete swap on source chain +aitbc contract call \ + --address \ + --method completeSwap \ + --params '{"swapId": "'"$SWAP_ID"'", "secret": "'"$SECRET"'"}' \ + --rpc-url http://localhost:8545 \ + --password-file ~/.aitbc/wallets/mainnet-wallet.password -# Counterparty completes with same secret -aitbc atomic-swap complete \ - --chain ait-testnet \ - --swap-id $SWAP_ID \ - --secret $SECRET \ - --wallet testnet-wallet +# Counterparty completes with same secret on destination chain +aitbc contract call \ + --address \ + --method completeSwap \ + --params '{"swapId": "'"$SWAP_ID"'", "secret": "'"$SECRET"'"}' \ + --rpc-url http://localhost:8546 \ + --password-file ~/.aitbc/wallets/testnet-wallet.password ``` ### **Step 5: Monitor Swap Status** @@ -159,14 +174,18 @@ Track swap status across chains: ```bash # Check swap status on source chain -aitbc atomic-swap status \ - --chain ait-mainnet \ - --swap-id $SWAP_ID +aitbc contract call \ + --address \ + --method getSwapStatus \ + --params '{"swapId": "'"$SWAP_ID"'"}' \ + --rpc-url http://localhost:8545 # Check swap status on destination chain -aitbc atomic-swap status \ - --chain ait-testnet \ - --swap-id $SWAP_ID +aitbc contract call \ + --address \ + --method getSwapStatus \ + --params '{"swapId": "'"$SWAP_ID"'"}' \ + --rpc-url http://localhost:8546 ``` ### **Step 6: Handle Refund (if needed)** @@ -175,10 +194,12 @@ Refund swap if timelock expires: ```bash # Refund if timelock expired -aitbc atomic-swap refund \ - --chain ait-mainnet \ - --swap-id $SWAP_ID \ - --wallet mainnet-wallet +aitbc contract call \ + --address \ + --method refundSwap \ + --params '{"swapId": "'"$SWAP_ID"'"}' \ + --rpc-url http://localhost:8545 \ + --password-file ~/.aitbc/wallets/mainnet-wallet.password ``` --- diff --git a/scripts/training/agent_communication_training.py b/scripts/training/agent_communication_training.py new file mode 100644 index 00000000..30e16f64 --- /dev/null +++ b/scripts/training/agent_communication_training.py @@ -0,0 +1,370 @@ +""" +Agent Communication Training Stage + +Trains agents on effective communication patterns using the coordinator API. +Tests hierarchical, peer-to-peer, and broadcast protocols with metrics collection. +""" + +import asyncio +import json +import time +from datetime import datetime, timezone +from typing import Any, Dict, List +import aiohttp + + +class CommunicationTrainingStage: + """Training stage for agent communication protocols""" + + def __init__(self, coordinator_url: str = "http://localhost:9001"): + self.coordinator_url = coordinator_url + self.metrics = { + "total_messages_sent": 0, + "successful_messages": 0, + "failed_messages": 0, + "hierarchical_success": 0, + "hierarchical_total": 0, + "peer_to_peer_success": 0, + "peer_to_peer_total": 0, + "broadcast_success": 0, + "broadcast_total": 0, + "avg_latency_ms": 0, + "latency_samples": [] + } + + async def send_message( + self, + receiver_id: str, + message_type: str, + payload: Dict[str, Any], + protocol: str = "hierarchical", + priority: str = "normal" + ) -> Dict[str, Any]: + """Send a message via coordinator API""" + start_time = time.time() + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + f"{self.coordinator_url}/messages/send", + json={ + "receiver_id": receiver_id, + "message_type": message_type, + "payload": payload, + "priority": priority, + "protocol": protocol + } + ) as response: + latency_ms = (time.time() - start_time) * 1000 + self.metrics["latency_samples"].append(latency_ms) + + if response.status == 200: + data = await response.json() + self.metrics["total_messages_sent"] += 1 + self.metrics["successful_messages"] += 1 + + if protocol == "hierarchical": + self.metrics["hierarchical_success"] += 1 + self.metrics["hierarchical_total"] += 1 + elif protocol == "peer_to_peer": + self.metrics["peer_to_peer_success"] += 1 + self.metrics["peer_to_peer_total"] += 1 + + return {"success": True, "data": data, "latency_ms": latency_ms} + else: + self.metrics["total_messages_sent"] += 1 + self.metrics["failed_messages"] += 1 + + if protocol == "hierarchical": + self.metrics["hierarchical_total"] += 1 + elif protocol == "peer_to_peer": + self.metrics["peer_to_peer_total"] += 1 + + error_text = await response.text() + return {"success": False, "error": error_text, "latency_ms": latency_ms} + + except Exception as e: + latency_ms = (time.time() - start_time) * 1000 + self.metrics["total_messages_sent"] += 1 + self.metrics["failed_messages"] += 1 + + if protocol == "hierarchical": + self.metrics["hierarchical_total"] += 1 + elif protocol == "peer_to_peer": + self.metrics["peer_to_peer_total"] += 1 + + return {"success": False, "error": str(e), "latency_ms": latency_ms} + + async def broadcast_message( + self, + message_type: str, + payload: Dict[str, Any], + agent_type: str = None, + capabilities: List[str] = None, + priority: str = "normal" + ) -> Dict[str, Any]: + """Broadcast a message to multiple agents""" + start_time = time.time() + + try: + async with aiohttp.ClientSession() as session: + request_body = { + "message_type": message_type, + "payload": payload, + "priority": priority + } + + if agent_type: + request_body["agent_type"] = agent_type + if capabilities: + request_body["capabilities"] = capabilities + + async with session.post( + f"{self.coordinator_url}/messages/broadcast", + json=request_body + ) as response: + latency_ms = (time.time() - start_time) * 1000 + self.metrics["latency_samples"].append(latency_ms) + + if response.status == 200: + data = await response.json() + self.metrics["total_messages_sent"] += data.get("count", 0) + self.metrics["successful_messages"] += data.get("count", 0) + self.metrics["broadcast_success"] += 1 + self.metrics["broadcast_total"] += 1 + + return {"success": True, "data": data, "latency_ms": latency_ms} + else: + self.metrics["broadcast_total"] += 1 + error_text = await response.text() + return {"success": False, "error": error_text, "latency_ms": latency_ms} + + except Exception as e: + latency_ms = (time.time() - start_time) * 1000 + self.metrics["broadcast_total"] += 1 + return {"success": False, "error": str(e), "latency_ms": latency_ms} + + async def get_message_history( + self, + sender_id: str = None, + receiver_id: str = None, + limit: int = 100 + ) -> Dict[str, Any]: + """Retrieve message history""" + try: + async with aiohttp.ClientSession() as session: + params = {"limit": limit} + if sender_id: + params["sender_id"] = sender_id + if receiver_id: + params["receiver_id"] = receiver_id + + async with session.get( + f"{self.coordinator_url}/messages/history", + params=params + ) as response: + if response.status == 200: + data = await response.json() + return {"success": True, "data": data} + else: + error_text = await response.text() + return {"success": False, "error": error_text} + + except Exception as e: + return {"success": False, "error": str(e)} + + async def add_peer(self, agent_id: str, peer_id: str) -> Dict[str, Any]: + """Add a peer connection""" + try: + async with aiohttp.ClientSession() as session: + async with session.post( + f"{self.coordinator_url}/peers/add", + params={"agent_id": agent_id, "peer_id": peer_id} + ) as response: + if response.status == 200: + data = await response.json() + return {"success": True, "data": data} + else: + error_text = await response.text() + return {"success": False, "error": error_text} + + except Exception as e: + return {"success": False, "error": str(e)} + + async def get_peers(self, agent_id: str = None) -> Dict[str, Any]: + """Get peer connections""" + try: + async with aiohttp.ClientSession() as session: + if agent_id: + url = f"{self.coordinator_url}/peers/{agent_id}" + else: + url = f"{self.coordinator_url}/peers" + + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + return {"success": True, "data": data} + else: + error_text = await response.text() + return {"success": False, "error": error_text} + + except Exception as e: + return {"success": False, "error": str(e)} + + def calculate_metrics(self) -> Dict[str, Any]: + """Calculate final metrics""" + if self.metrics["latency_samples"]: + self.metrics["avg_latency_ms"] = sum(self.metrics["latency_samples"]) / len(self.metrics["latency_samples"]) + + success_rate = 0 + if self.metrics["total_messages_sent"] > 0: + success_rate = (self.metrics["successful_messages"] / self.metrics["total_messages_sent"]) * 100 + + hierarchical_rate = 0 + if self.metrics["hierarchical_total"] > 0: + hierarchical_rate = (self.metrics["hierarchical_success"] / self.metrics["hierarchical_total"]) * 100 + + peer_to_peer_rate = 0 + if self.metrics["peer_to_peer_total"] > 0: + peer_to_peer_rate = (self.metrics["peer_to_peer_success"] / self.metrics["peer_to_peer_total"]) * 100 + + return { + "total_messages_sent": self.metrics["total_messages_sent"], + "successful_messages": self.metrics["successful_messages"], + "failed_messages": self.metrics["failed_messages"], + "overall_success_rate_percent": round(success_rate, 2), + "hierarchical_protocol": { + "total": self.metrics["hierarchical_total"], + "successful": self.metrics["hierarchical_success"], + "success_rate_percent": round(hierarchical_rate, 2) + }, + "peer_to_peer_protocol": { + "total": self.metrics["peer_to_peer_total"], + "successful": self.metrics["peer_to_peer_success"], + "success_rate_percent": round(peer_to_peer_rate, 2) + }, + "broadcast_protocol": { + "total": self.metrics["broadcast_total"], + "successful": self.metrics["broadcast_success"], + "success_rate_percent": round((self.metrics["broadcast_success"] / self.metrics["broadcast_total"] * 100) if self.metrics["broadcast_total"] > 0 else 0, 2) + }, + "performance": { + "avg_latency_ms": round(self.metrics["avg_latency_ms"], 2), + "min_latency_ms": round(min(self.metrics["latency_samples"]) if self.metrics["latency_samples"] else 0, 2), + "max_latency_ms": round(max(self.metrics["latency_samples"]) if self.metrics["latency_samples"] else 0, 2) + } + } + + +async def run_hierarchical_training(trainer: CommunicationTrainingStage, num_agents: int = 4): + """Train hierarchical communication protocol""" + print("\n=== Hierarchical Protocol Training ===") + + for i in range(10): + agent_id = f"worker-{(i % num_agents) + 1}" + result = await trainer.send_message( + receiver_id=agent_id, + message_type="direct", + payload={"training_step": i, "task": f"hierarchical_task_{i}"}, + protocol="hierarchical" + ) + + if result["success"]: + print(f" Step {i}: Message sent to {agent_id} (latency: {result['latency_ms']:.2f}ms)") + else: + print(f" Step {i}: Failed to send to {agent_id} - {result.get('error', 'Unknown error')}") + + await asyncio.sleep(0.1) + + +async def run_peer_to_peer_training(trainer: CommunicationTrainingStage, num_agents: int = 4): + """Train peer-to-peer communication protocol""" + print("\n=== Peer-to-Peer Protocol Training ===") + + # First ensure peer connections exist + for i in range(num_agents): + agent_id = f"worker-{i + 1}" + peer_id = f"worker-{((i + 1) % num_agents) + 1}" + await trainer.add_peer(agent_id, peer_id) + + for i in range(10): + agent_id = f"worker-{(i % num_agents) + 1}" + result = await trainer.send_message( + receiver_id=agent_id, + message_type="direct", + payload={"training_step": i, "task": f"peer_to_peer_task_{i}"}, + protocol="peer_to_peer" + ) + + if result["success"]: + print(f" Step {i}: Message sent to {agent_id} (latency: {result['latency_ms']:.2f}ms)") + else: + print(f" Step {i}: Failed to send to {agent_id} - {result.get('error', 'Unknown error')}") + + await asyncio.sleep(0.1) + + +async def run_broadcast_training(trainer: CommunicationTrainingStage): + """Train broadcast communication protocol""" + print("\n=== Broadcast Protocol Training ===") + + for i in range(5): + result = await trainer.broadcast_message( + message_type="broadcast", + payload={"training_step": i, "announcement": f"broadcast_announcement_{i}"}, + agent_type="worker" + ) + + if result["success"]: + data = result["data"] + print(f" Step {i}: Broadcast sent to {data['count']} agents (latency: {result['latency_ms']:.2f}ms)") + else: + print(f" Step {i}: Failed to broadcast - {result.get('error', 'Unknown error')}") + + await asyncio.sleep(0.2) + + +async def main(): + """Main training execution""" + print("=" * 60) + print("Agent Communication Training Stage") + print("=" * 60) + print(f"Started at: {datetime.now(timezone.utc).isoformat()}") + + trainer = CommunicationTrainingStage(coordinator_url="http://aitbc1:9001") + + # Run training scenarios + await run_hierarchical_training(trainer) + await run_peer_to_peer_training(trainer) + await run_broadcast_training(trainer) + + # Get message history + print("\n=== Message History Check ===") + history = await trainer.get_message_history(limit=10) + if history["success"]: + print(f" Retrieved {history['data']['count']} recent messages") + else: + print(f" Failed to retrieve history - {history.get('error', 'Unknown error')}") + + # Get peer connections + print("\n=== Peer Connections Check ===") + peers = await trainer.get_peers() + if peers["success"]: + data = peers["data"] + print(f" Total agents with peers: {data['total_agents']}") + print(f" Total peer connections: {data['total_peers']}") + else: + print(f" Failed to retrieve peers - {peers.get('error', 'Unknown error')}") + + # Calculate and display final metrics + print("\n" + "=" * 60) + print("Training Results") + print("=" * 60) + final_metrics = trainer.calculate_metrics() + print(json.dumps(final_metrics, indent=2)) + + print(f"\nCompleted at: {datetime.now(timezone.utc).isoformat()}") + + +if __name__ == "__main__": + asyncio.run(main())