diff --git a/.windsurf/skills/aitbc-training.md b/.windsurf/skills/aitbc-training.md new file mode 100644 index 00000000..c18becc8 --- /dev/null +++ b/.windsurf/skills/aitbc-training.md @@ -0,0 +1,174 @@ +# AITBC Training - Agent Coordinator Architecture + +## Important: Real Coordinator Location + +The actual AITBC Agent Coordinator service is located at: +- **Path:** `/opt/aitbc/apps/agent-coordinator/src/app/` +- **Port:** 9001 +- **Service:** `aitbc-agent-coordinator.service` + +## DO NOT Use + +- **Wrong location:** `/opt/aitbc/apps/agent-services/agent-coordinator/src/coordinator.py` +- This is a different/older implementation and is NOT the active service + +## Key Components + +### Core Files +- `agent_discovery.py` - Redis-backed agent registry with persistence +- `load_balancer.py` - Load balancer with multiple strategies (least_connections, round_robin, etc.) +- `routers/agents.py` - Agent management REST API endpoints +- `routers/tasks.py` - Task submission and distribution API endpoints +- `lifespan.py` - Service initialization and component startup +- `state.py` - Global state management for coordinator components + +### Service Initialization +The service initializes in `lifespan.py`: +1. Creates `AgentRegistry()` with Redis backing +2. Starts registry Redis connection +3. Creates `LoadBalancer(registry)` with least_connections strategy +4. Creates `TaskDistributor(balancer)` with priority queues +5. Starts background task distribution loop + +## Agent Registration + +### API Endpoint +``` +POST /agents/register +``` + +### Example +```bash +curl -X POST http://localhost:9001/agents/register \ + -H "Content-Type: application/json" \ + -d '{ + "agent_id": "hermes-agent", + "agent_type": "worker", + "capabilities": ["data-processing", "analysis", "general"], + "services": ["task-execution", "analysis"], + "endpoints": {"http": "http://localhost:9002"}, + "metadata": {"version": "1.0.0"} + }' +``` + +### Response +```json +{ + "status": "success", + "message": "Agent hermes-agent registered successfully", + "agent_id": "hermes-agent", + "registered_at": "2026-05-07T16:26:55.464178+00:00" +} +``` + +## Task Distribution + +### API Endpoint +``` +POST /tasks/submit +``` + +### Example +```bash +curl -X POST http://localhost:9001/tasks/submit \ + -H "Content-Type: application/json" \ + -d '{ + "task_data": { + "model": "llama2", + "prompt": "test prompt" + }, + "priority": "normal", + "requirements": {} + }' +``` + +### Distribution Flow +1. Task submitted to `TaskDistributor` +2. Distributor finds eligible agents via `AgentRegistry.discover_agents()` +3. Load balancer selects agent using configured strategy (default: least_connections) +4. Task assigned to selected agent +5. Agent metrics updated in Redis + +## Task Status + +### API Endpoint +``` +GET /tasks/status +``` + +### Example +```bash +curl http://localhost:9001/tasks/status +``` + +### Response +```json +{ + "status": "success", + "stats": { + "tasks_distributed": 1, + "tasks_completed": 1, + "tasks_failed": 0, + "load_balancer_stats": { + "strategy": "least_connections", + "active_agents": 1, + "total_assignments": 1, + "avg_agent_load": 1 + } + } +} +``` + +## Agent Discovery + +### API Endpoint +``` +POST /agents/discover +``` + +### Example +```bash +curl -X POST http://localhost:9001/agents/discover \ + -H "Content-Type: application/json" \ + -d '{ + "status": "active", + "agent_type": "worker" + }' +``` + +## Redis Persistence + +The agent registry uses Redis for persistence: +- Agent data stored as hashes: `agent:{agent_id}` +- Active agents indexed in set: `agents:active` +- Load metrics tracked per agent +- Health scores calculated from heartbeats + +## Service Status + +### Health Check +```bash +curl http://localhost:9001/health +``` + +### Service Management +```bash +systemctl status aitbc-agent-coordinator.service +systemctl restart aitbc-agent-coordinator.service +journalctl -u aitbc-agent-coordinator.service -f +``` + +## Cross-Node Distribution + +For multi-node setups, register agents on each node: +```bash +# Register agent on aitbc1 +curl -X POST http://aitbc1:9001/agents/register \ + -d '{"agent_id":"aitbc1-agent", ...}' + +# Submit task on localhost +curl -X POST http://localhost:9001/tasks/submit \ + -d '{"task_data":{...}}' + +# Task will be distributed to any active agent across nodes +``` diff --git a/cli/handlers/ai.py b/cli/handlers/ai.py index 1fd947de..13e7b224 100644 --- a/cli/handlers/ai.py +++ b/cli/handlers/ai.py @@ -203,6 +203,28 @@ def handle_ai_stats(args, default_rpc_url, output_format, render_mapping): sys.exit(1) +def handle_ai_distribution_stats(args, default_coordinator_url, output_format, render_mapping): + """Handle task distribution statistics query from agent coordinator.""" + coordinator_url = getattr(args, 'coordinator_url', None) or default_coordinator_url + + print(f"Getting task distribution statistics from {coordinator_url}...") + try: + response = requests.get(f"{coordinator_url}/tasks/status", timeout=10) + if response.status_code == 200: + stats = response.json() + if output_format(args) == "json": + print(json.dumps(stats, indent=2)) + else: + render_mapping("Task distribution statistics:", stats) + else: + print(f"Query failed: {response.status_code}") + print(f"Error: {response.text}") + sys.exit(1) + except Exception as e: + print(f"Error getting distribution stats: {e}") + sys.exit(1) + + def handle_ai_service_list(args, ai_operations, render_mapping): """Handle AI service list command.""" result = ai_operations("service_list") diff --git a/cli/handlers/system.py b/cli/handlers/system.py index 16f51855..4ec355b0 100644 --- a/cli/handlers/system.py +++ b/cli/handlers/system.py @@ -128,45 +128,145 @@ def handle_agent_sdk_action(args, render_mapping): print(f"Agent SDK created: {name}") render_mapping("Agent SDK:", sdk_data) + elif action == "update-status": + agent_id = getattr(args, "agent_id", None) + status = getattr(args, "status", None) + load_metrics = getattr(args, "load_metrics", {}) + coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001") + + if not agent_id or not status: + print("Error: --agent-id and --status are required") + sys.exit(1) + + status_update_request = { + "status": status, + "load_metrics": load_metrics if isinstance(load_metrics, dict) else {} + } + + print(f"Updating agent {agent_id} status to {status}...") + + try: + import requests + response = requests.put( + f"{coordinator_url}/agents/{agent_id}/status", + json=status_update_request, + timeout=30 + ) + + if response.status_code == 200: + result = response.json() + print(f"Agent status updated successfully") + render_mapping("Status Update:", result) + else: + print(f"Status update failed: {response.status_code}") + print(f"Error: {response.text}") + sys.exit(1) + except Exception as e: + print(f"Error updating agent status: {e}") + sys.exit(1) + elif action == "register": agent_id = getattr(args, "agent_id", None) + agent_type = getattr(args, "type", "worker") + capabilities = getattr(args, "capabilities", []) + services = getattr(args, "services", []) + endpoints = getattr(args, "endpoints", {}) + metadata = getattr(args, "metadata", {}) + coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001") - registration_data = { + # Build registration request + registration_request = { "agent_id": agent_id, - "registered": True, - "coordinator": getattr(args, "coordinator_url", "http://localhost:9001"), - "timestamp": __import__('datetime').datetime.now().isoformat() + "agent_type": agent_type, + "capabilities": capabilities if isinstance(capabilities, list) else (capabilities.split(",") if capabilities else []), + "services": services if isinstance(services, list) else (services.split(",") if services else []), + "endpoints": endpoints if isinstance(endpoints, dict) else (json.loads(endpoints) if endpoints else {}), + "metadata": metadata if isinstance(metadata, dict) else (json.loads(metadata) if metadata else {}) } - print(f"Agent registered: {agent_id}") - render_mapping("Registration:", registration_data) + print(f"Registering agent {agent_id} with coordinator at {coordinator_url}...") + + try: + import requests + response = requests.post( + f"{coordinator_url}/agents/register", + json=registration_request, + timeout=30 + ) + + if response.status_code in (200, 201): + result = response.json() + print(f"Agent registered successfully") + render_mapping("Registration:", result) + else: + print(f"Registration failed: {response.status_code}") + print(f"Error: {response.text}") + sys.exit(1) + except Exception as e: + print(f"Error registering agent: {e}") + sys.exit(1) elif action == "list": - agents_data = { - "agents": [ - {"agent_id": "agent_1", "name": "data-analyzer", "status": "active"}, - {"agent_id": "agent_2", "name": "trading-bot", "status": "completed"}, - {"agent_id": "agent_3", "name": "content-generator", "status": "failed"} - ], - "total_count": 3 - } + # Agent discovery via coordinator + coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001") + status = getattr(args, "status", None) + agent_type = getattr(args, "agent_type", None) - print("Local agents listed") - render_mapping("Agents:", agents_data) + query = {} + if status: + query["status"] = status + if agent_type: + query["agent_type"] = agent_type + + print(f"Discovering agents from coordinator at {coordinator_url}...") + + try: + import requests + response = requests.post( + f"{coordinator_url}/agents/discover", + json=query, + timeout=30 + ) + + if response.status_code == 200: + result = response.json() + print(f"Found {result.get('count', 0)} agents") + render_mapping("Agents:", result) + else: + print(f"Discovery failed: {response.status_code}") + print(f"Error: {response.text}") + sys.exit(1) + except Exception as e: + print(f"Error discovering agents: {e}") + sys.exit(1) elif action == "status": agent_id = getattr(args, "agent_id", None) + coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001") - status_data = { - "agent_id": agent_id, - "status": "active", - "uptime": "2h 30m", - "jobs_completed": 15, - "success_rate": "93%" - } + print(f"Getting agent info for {agent_id} from coordinator at {coordinator_url}...") - print(f"Agent status: {agent_id}") - render_mapping("Status:", status_data) + try: + import requests + response = requests.get( + f"{coordinator_url}/agents/{agent_id}", + timeout=30 + ) + + if response.status_code == 200: + result = response.json() + print(f"Agent info retrieved") + render_mapping("Agent:", result) + elif response.status_code == 404: + print(f"Agent not found: {agent_id}") + sys.exit(1) + else: + print(f"Query failed: {response.status_code}") + print(f"Error: {response.text}") + sys.exit(1) + except Exception as e: + print(f"Error getting agent info: {e}") + sys.exit(1) elif action == "capabilities": caps_data = { diff --git a/cli/parsers/agent.py b/cli/parsers/agent.py index 3063ad72..5b85d1d8 100644 --- a/cli/parsers/agent.py +++ b/cli/parsers/agent.py @@ -72,6 +72,11 @@ def register(subparsers: argparse._SubParsersAction, ctx: ParserContext) -> None # agent sdk register agent_sdk_register_parser = agent_sdk_subparsers.add_parser("register", help="Register agent with coordinator") agent_sdk_register_parser.add_argument("--agent-id", required=True, help="Agent ID") + agent_sdk_register_parser.add_argument("--type", choices=["provider", "consumer", "general", "worker"], default="worker", help="Agent type") + agent_sdk_register_parser.add_argument("--capabilities", help="Comma-separated agent capabilities") + agent_sdk_register_parser.add_argument("--services", help="Comma-separated available services") + agent_sdk_register_parser.add_argument("--endpoints", help="JSON string of service endpoints") + agent_sdk_register_parser.add_argument("--metadata", help="JSON string of metadata") agent_sdk_register_parser.add_argument("--coordinator-url", default="http://localhost:9001", help="Coordinator URL") agent_sdk_register_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="register") @@ -83,8 +88,17 @@ def register(subparsers: argparse._SubParsersAction, ctx: ParserContext) -> None # agent sdk status agent_sdk_status_parser = agent_sdk_subparsers.add_parser("status", help="Get agent status") agent_sdk_status_parser.add_argument("--agent-id", required=True, help="Agent ID") + agent_sdk_status_parser.add_argument("--coordinator-url", default="http://localhost:9001", help="Coordinator URL") agent_sdk_status_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="status") + # agent sdk update-status + agent_sdk_update_status_parser = agent_sdk_subparsers.add_parser("update-status", help="Update agent status") + agent_sdk_update_status_parser.add_argument("--agent-id", required=True, help="Agent ID") + agent_sdk_update_status_parser.add_argument("--status", required=True, help="New status (active, inactive, busy)") + agent_sdk_update_status_parser.add_argument("--load-metrics", help="JSON string of load metrics") + agent_sdk_update_status_parser.add_argument("--coordinator-url", default="http://localhost:9001", help="Coordinator URL") + agent_sdk_update_status_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="update-status") + # agent sdk capabilities agent_sdk_caps_parser = agent_sdk_subparsers.add_parser("capabilities", help="Show system capabilities") agent_sdk_caps_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="capabilities") diff --git a/cli/parsers/ai.py b/cli/parsers/ai.py index c3f6f979..80407f82 100644 --- a/cli/parsers/ai.py +++ b/cli/parsers/ai.py @@ -78,3 +78,7 @@ def register(subparsers: argparse._SubParsersAction, ctx: ParserContext) -> None ai_stats_parser.add_argument("--chain-id", help="Chain ID") ai_stats_parser.add_argument("--rpc-url", default=ctx.default_rpc_url) ai_stats_parser.set_defaults(handler=ctx.handle_ai_stats) + + ai_distribution_stats_parser = ai_subparsers.add_parser("distribution-stats", help="Task distribution statistics from agent coordinator") + ai_distribution_stats_parser.add_argument("--coordinator-url", default=ctx.default_coordinator_url) + ai_distribution_stats_parser.set_defaults(handler=ctx.handle_ai_distribution_stats) diff --git a/cli/unified_cli.py b/cli/unified_cli.py index 359fc6e5..634a3a91 100755 --- a/cli/unified_cli.py +++ b/cli/unified_cli.py @@ -538,6 +538,9 @@ def run_cli(argv, core): def handle_ai_stats(args): ai_handlers.handle_ai_stats(args, default_rpc_url, output_format, render_mapping) + def handle_ai_distribution_stats(args): + ai_handlers.handle_ai_distribution_stats(args, default_coordinator_url, output_format, render_mapping) + def handle_ai_service_list(args): ai_handlers.handle_ai_service_list(args, ai_operations, render_mapping) @@ -726,6 +729,7 @@ def run_cli(argv, core): "handle_ai_job": handle_ai_job, "handle_ai_cancel": handle_ai_cancel, "handle_ai_stats": handle_ai_stats, + "handle_ai_distribution_stats": handle_ai_distribution_stats, "handle_ai_service_list": handle_ai_service_list, "handle_ai_service_status": handle_ai_service_status, "handle_ai_service_test": handle_ai_service_test,