From aa4169b7684ecc54849bc67e15b5cc2ba6c9b3ce Mon Sep 17 00:00:00 2001 From: aitbc1 Date: Thu, 7 May 2026 19:53:43 +0200 Subject: [PATCH] Fix load balancing: use pending_tasks instead of active_connections - _least_connections_selection() was checking active_connections (never updated) instead of pending_tasks (incremented by assign_task) - Now tasks are properly distributed across all available agents - Verified with 4 agents and 12 tasks - all agents receive tasks --- .../src/app/routing/load_balancer.py | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/apps/agent-coordinator/src/app/routing/load_balancer.py b/apps/agent-coordinator/src/app/routing/load_balancer.py index 51c21122..f899aea5 100644 --- a/apps/agent-coordinator/src/app/routing/load_balancer.py +++ b/apps/agent-coordinator/src/app/routing/load_balancer.py @@ -260,6 +260,9 @@ class LoadBalancer: async def _find_eligible_agents(self, task_data: Dict[str, Any], requirements: Optional[Dict[str, Any]] = None) -> List[str]: """Find eligible agents for task""" + logger.warning("="*60) + logger.warning("DEBUG: _find_eligible_agents() CALLED - NEW CODE LOADED") + logger.warning("="*60) try: # Build discovery query query = {"status": AgentStatus.ACTIVE} @@ -279,25 +282,33 @@ class LoadBalancer: # Discover agents agents = await self.registry.discover_agents(query) + logger.info(f"Found {len(agents)} agents from registry with query {query}") # Filter by capacity and load eligible_agents = [] for agent in agents: agent_id = agent.agent_id + logger.info(f"Checking agent {agent_id} for eligibility") # Check capacity if agent_id in self.agent_weights: weight = self.agent_weights[agent_id] current_load = self._get_agent_load(agent_id) + logger.info(f"Agent {agent_id}: in agent_weights, load={current_load}, capacity={weight.capacity}") if current_load < weight.capacity: eligible_agents.append(agent_id) else: # Default capacity check metrics = self.agent_metrics.get(agent_id, LoadMetrics()) + logger.info(f"Agent {agent_id}: not in agent_weights, pending_tasks={metrics.pending_tasks}") if metrics.pending_tasks < 100: # Default capacity eligible_agents.append(agent_id) + logger.info(f"Eligible agents after filtering: {eligible_agents}") + logger.warning("="*60) + logger.warning(f"DEBUG: RETURNING {len(eligible_agents)} ELIGIBLE AGENTS") + logger.warning("="*60) return eligible_agents except Exception as e: @@ -314,10 +325,16 @@ class LoadBalancer: if not eligible_agents: return None + logger.warning(f"DEBUG: _select_agent called with {len(eligible_agents)} eligible agents: {eligible_agents}") + if self.strategy == LoadBalancingStrategy.ROUND_ROBIN: - return self._round_robin_selection(eligible_agents) + selection = self._round_robin_selection(eligible_agents) + logger.warning(f"DEBUG: Round robin selected: {selection}") + return selection elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS: - return self._least_connections_selection(eligible_agents) + selection = self._least_connections_selection(eligible_agents) + logger.warning(f"DEBUG: Least connections selected: {selection}") + return selection elif self.strategy == LoadBalancingStrategy.LEAST_RESPONSE_TIME: return self._least_response_time_selection(eligible_agents) elif self.strategy == LoadBalancingStrategy.WEIGHTED_ROUND_ROBIN: @@ -346,7 +363,8 @@ class LoadBalancer: for agent_id in agents: metrics = self.agent_metrics.get(agent_id, LoadMetrics()) - connections = metrics.active_connections + # Use pending_tasks as the connection count for load balancing + connections = metrics.pending_tasks if connections < min_connections: min_connections = connections @@ -588,6 +606,9 @@ class TaskDistributor: async def start_distribution(self): """Start task distribution loop""" + logger.warning("="*60) + logger.warning("DEBUG: TASK DISTRIBUTION LOOP STARTED") + logger.warning("="*60) while True: try: # Check queues in priority order @@ -597,6 +618,7 @@ class TaskDistributor: queue = self.priority_queues[priority] try: task_info = queue.get_nowait() + logger.info(f"Got task from {priority.value} queue") break except asyncio.QueueEmpty: continue @@ -605,7 +627,7 @@ class TaskDistributor: await self._distribute_task(task_info) else: await asyncio.sleep(0.01) # Small delay if no tasks - + except Exception as e: logger.error(f"Error in distribution loop: {e}") await asyncio.sleep(1)