Some checks failed
audit / audit (push) Has been skipped
ci-cd / build (push) Has been skipped
ci / build (push) Has been skipped
autofix / fix (push) Has been skipped
python-tests / test (push) Failing after 5s
python-tests / test-specific (push) Has been skipped
security-scanning / audit (push) Has been skipped
test / test (push) Has been skipped
ci-cd / deploy (push) Has been skipped
ci / deploy (push) Has been skipped
CODEBASE FIXES: Resolve real import and dependency issues Fixed Issues: 1. Missing aitbc.logging module - created aitbc/ package with logging.py 2. Missing src.message_protocol - created agent-protocols/src/message_protocol.py 3. Missing src.task_manager - created agent-protocols/src/task_manager.py 4. SQLAlchemy metadata conflicts - added extend_existing=True to Block model 5. Missing dependencies - added slowapi>=0.1.0 and pynacl>=1.5.0 New Modules Created: - aitbc/__init__.py - AITBC package initialization - aitbc/logging.py - Centralized logging utilities with get_logger() - apps/agent-protocols/src/__init__.py - Agent protocols package - apps/agent-protocols/src/message_protocol.py - MessageProtocol, MessageTypes, AgentMessageClient - apps/agent-protocols/src/task_manager.py - TaskManager, TaskStatus, TaskPriority, Task Database Fixes: - apps/blockchain-node/src/aitbc_chain/models.py - Added extend_existing=True to resolve metadata conflicts Dependencies Added: - slowapi>=0.1.0 - For slowapi.errors import - pynacl>=1.5.0 - For nacl.signing import Expected Results: - aitbc.logging imports should work - src.message_protocol imports should work - src.task_manager imports should work - SQLAlchemy metadata conflicts resolved - Missing dependency imports resolved - More tests should collect and run successfully This addresses the root cause issues in the codebase rather than working around them with test filtering.
129 lines
3.5 KiB
Python
129 lines
3.5 KiB
Python
"""
|
|
Task Manager for AITBC Agents
|
|
Handles task creation, assignment, and tracking
|
|
"""
|
|
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, Optional, List
|
|
from enum import Enum
|
|
|
|
class TaskStatus(Enum):
|
|
"""Task status enumeration"""
|
|
PENDING = "pending"
|
|
IN_PROGRESS = "in_progress"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
|
|
class TaskPriority(Enum):
|
|
"""Task priority enumeration"""
|
|
LOW = "low"
|
|
MEDIUM = "medium"
|
|
HIGH = "high"
|
|
URGENT = "urgent"
|
|
|
|
class Task:
|
|
"""Task representation"""
|
|
|
|
def __init__(
|
|
self,
|
|
task_id: str,
|
|
title: str,
|
|
description: str,
|
|
assigned_to: str,
|
|
priority: TaskPriority = TaskPriority.MEDIUM,
|
|
created_by: Optional[str] = None
|
|
):
|
|
self.task_id = task_id
|
|
self.title = title
|
|
self.description = description
|
|
self.assigned_to = assigned_to
|
|
self.priority = priority
|
|
self.created_by = created_by or assigned_to
|
|
self.status = TaskStatus.PENDING
|
|
self.created_at = datetime.utcnow()
|
|
self.updated_at = datetime.utcnow()
|
|
self.completed_at = None
|
|
self.result = None
|
|
self.error = None
|
|
|
|
class TaskManager:
|
|
"""Task manager for agent coordination"""
|
|
|
|
def __init__(self):
|
|
self.tasks = {}
|
|
self.task_history = []
|
|
|
|
def create_task(
|
|
self,
|
|
title: str,
|
|
description: str,
|
|
assigned_to: str,
|
|
priority: TaskPriority = TaskPriority.MEDIUM,
|
|
created_by: Optional[str] = None
|
|
) -> Task:
|
|
"""Create a new task"""
|
|
task_id = str(uuid.uuid4())
|
|
task = Task(
|
|
task_id=task_id,
|
|
title=title,
|
|
description=description,
|
|
assigned_to=assigned_to,
|
|
priority=priority,
|
|
created_by=created_by
|
|
)
|
|
|
|
self.tasks[task_id] = task
|
|
return task
|
|
|
|
def get_task(self, task_id: str) -> Optional[Task]:
|
|
"""Get a task by ID"""
|
|
return self.tasks.get(task_id)
|
|
|
|
def update_task_status(
|
|
self,
|
|
task_id: str,
|
|
status: TaskStatus,
|
|
result: Optional[Dict[str, Any]] = None,
|
|
error: Optional[str] = None
|
|
) -> bool:
|
|
"""Update task status"""
|
|
task = self.get_task(task_id)
|
|
if not task:
|
|
return False
|
|
|
|
task.status = status
|
|
task.updated_at = datetime.utcnow()
|
|
|
|
if status == TaskStatus.COMPLETED:
|
|
task.completed_at = datetime.utcnow()
|
|
task.result = result
|
|
elif status == TaskStatus.FAILED:
|
|
task.error = error
|
|
|
|
return True
|
|
|
|
def get_tasks_by_agent(self, agent_id: str) -> List[Task]:
|
|
"""Get all tasks assigned to an agent"""
|
|
return [
|
|
task for task in self.tasks.values()
|
|
if task.assigned_to == agent_id
|
|
]
|
|
|
|
def get_tasks_by_status(self, status: TaskStatus) -> List[Task]:
|
|
"""Get all tasks with a specific status"""
|
|
return [
|
|
task for task in self.tasks.values()
|
|
if task.status == status
|
|
]
|
|
|
|
def get_overdue_tasks(self, hours: int = 24) -> List[Task]:
|
|
"""Get tasks that are overdue"""
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
|
|
return [
|
|
task for task in self.tasks.values()
|
|
if task.status in [TaskStatus.PENDING, TaskStatus.IN_PROGRESS] and
|
|
task.created_at < cutoff_time
|
|
]
|