Merge branch 'main' of http://gitea.bubuit.net:3000/oib/aitbc
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Documentation Validation / validate-docs (push) Has been cancelled
Documentation Validation / validate-policies-strict (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled

This commit is contained in:
aitbc
2026-05-07 21:09:16 +02:00
2 changed files with 77 additions and 34 deletions

View File

@@ -1,5 +1,6 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import json
from aitbc import get_logger from aitbc import get_logger
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, Response from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, Response
@@ -60,34 +61,36 @@ async def send_message(request: MessageRequest):
payload=request.payload payload=request.payload
) )
# Send message with specified protocol # Send message
success = await state.communication_manager.send_message(protocol, message) # Store message in Redis first (always)
message_data = {
if success: "sender_id": message.sender_id,
# Store message in Redis for history "receiver_id": message.receiver_id,
if state.message_storage: "message_type": message.message_type.value,
message_data = { "priority": message.priority.value,
"message_id": message.id, "payload": json.dumps(message.payload),
"sender_id": message.sender_id, "protocol": protocol,
"receiver_id": message.receiver_id, "timestamp": datetime.now(timezone.utc).isoformat()
"message_type": message.message_type.value, }
"priority": message.priority.value,
"payload": json.dumps(message.payload), if state.message_storage:
"protocol": protocol, await state.message_storage.store_message(message.id, message_data)
"timestamp": datetime.now(timezone.utc).isoformat()
} # Try to send via protocol (optional, for real-time notification)
await state.message_storage.store_message(message.id, message_data) if state.communication_manager:
try:
return { await state.communication_manager.send_message(protocol, message)
"status": "success", except:
"message": "Message sent successfully", pass # Protocol send is optional
"message_id": message.id,
"receiver_id": request.receiver_id, return {
"protocol": protocol, "status": "success",
"sent_at": datetime.now(timezone.utc).isoformat() "message": "Message sent successfully",
} "message_id": message.id,
else: "receiver_id": request.receiver_id,
raise HTTPException(status_code=500, detail="Failed to send message") "protocol": protocol,
"sent_at": datetime.now(timezone.utc).isoformat()
}
except HTTPException: except HTTPException:
raise raise
@@ -149,11 +152,29 @@ async def broadcast_message(request: BroadcastRequest):
priority=priority, priority=priority,
payload=request.payload payload=request.payload
) )
success = await state.communication_manager.send_message("broadcast", message) # Store in Redis first (always)
if success: message_data = {
"sender_id": message.sender_id,
"receiver_id": message.receiver_id,
"message_type": message.message_type.value,
"priority": message.priority.value,
"payload": json.dumps(message.payload),
"protocol": "broadcast",
"timestamp": datetime.now(timezone.utc).isoformat()
}
if state.message_storage:
await state.message_storage.store_message(message.id, message_data)
recipients.append(agent.agent_id) recipients.append(agent.agent_id)
# Optionally try to send via protocol (for real-time notification)
if state.communication_manager:
try:
await state.communication_manager.send_message("broadcast", message)
except:
pass # Protocol send is optional
return { return {
"status": "success", "status": "success",
"message": f"Broadcast sent to {len(recipients)} agents", "message": f"Broadcast sent to {len(recipients)} agents",
@@ -188,10 +209,16 @@ async def get_message_history(
else: else:
messages = await state.message_storage.get_all_messages(limit, offset) messages = await state.message_storage.get_all_messages(limit, offset)
# Get total count
total = 0
if state.message_storage:
total = await state.message_storage.get_message_count()
return { return {
"status": "success", "status": "success",
"messages": messages, "messages": messages,
"count": len(messages), "count": len(messages),
"total": total,
"limit": limit, "limit": limit,
"offset": offset, "offset": offset,
"timestamp": datetime.now(timezone.utc).isoformat() "timestamp": datetime.now(timezone.utc).isoformat()

View File

@@ -49,8 +49,16 @@ class MessageStorage:
await self.redis.sadd(f"messages:receiver:{receiver_id}", message_id) await self.redis.sadd(f"messages:receiver:{receiver_id}", message_id)
# Index by timestamp (for time-based queries) # Index by timestamp (for time-based queries)
timestamp = message_data.get("timestamp", datetime.now(timezone.utc).isoformat()) timestamp_str = message_data.get("timestamp", datetime.now(timezone.utc).isoformat())
await self.redis.zadd(f"messages:timestamp", {message_id: timestamp}) # Convert to float for sorted set
try:
# Try to parse ISO format
dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
timestamp_float = dt.timestamp()
except:
# Already a float or int
timestamp_float = float(timestamp_str)
await self.redis.zadd(f"messages:timestamp", {message_id: timestamp_float})
logger.debug(f"Stored message {message_id} in Redis") logger.debug(f"Stored message {message_id} in Redis")
return True return True
@@ -59,6 +67,14 @@ class MessageStorage:
logger.error(f"Error storing message {message_id}: {e}") logger.error(f"Error storing message {message_id}: {e}")
return False return False
async def get_message_count(self) -> int:
"""Get total count of messages"""
try:
return await self.redis.zcard("messages:timestamp")
except Exception as e:
logger.error(f"Error getting message count: {e}")
return 0
async def get_message(self, message_id: str) -> Optional[Dict[str, Any]]: async def get_message(self, message_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve a specific message by ID""" """Retrieve a specific message by ID"""
try: try: