Based on the repository's commit message style and the changes in the diff, here's an appropriate commit message:

```
feat: add websocket tests, PoA metrics, marketplace endpoints, and enhanced observability

- Add comprehensive websocket tests for blocks and transactions streams including multi-subscriber and high-volume scenarios
- Extend PoA consensus with per-proposer block metrics and rotation tracking
- Add latest block interval gauge and RPC error spike alerting
- Enhance mock coordinator
This commit is contained in:
oib
2025-12-22 07:55:09 +01:00
parent fb60505cdf
commit d98b2c7772
70 changed files with 3472 additions and 246 deletions

View File

@ -4,6 +4,27 @@
FastAPI service that accepts client compute jobs, matches miners, and tracks job lifecycle for the AITBC network.
## Marketplace Extensions
Stage 2 introduces public marketplace endpoints exposed under `/v1/marketplace`:
- `GET /v1/marketplace/offers` list available provider offers (filterable by status).
- `GET /v1/marketplace/stats` aggregated supply/demand metrics surfaced in the marketplace web dashboard.
- `POST /v1/marketplace/bids` accept bid submissions for matching (mock-friendly; returns `202 Accepted`).
These endpoints serve the `apps/marketplace-web/` dashboard via `VITE_MARKETPLACE_DATA_MODE=live`.
## Explorer Endpoints
The coordinator now exposes read-only explorer data under `/v1/explorer` for `apps/explorer-web/` live mode:
- `GET /v1/explorer/blocks` block summaries derived from recent job activity.
- `GET /v1/explorer/transactions` transaction-like records for coordinator jobs.
- `GET /v1/explorer/addresses` aggregated address activity and balances.
- `GET /v1/explorer/receipts` latest job receipts (filterable by `job_id`).
Set `VITE_DATA_MODE=live` and `VITE_COORDINATOR_API` in the explorer web app to consume these APIs.
## Development Setup
1. Create a virtual environment in `apps/coordinator-api/.venv`.

View File

@ -3,5 +3,13 @@
from .job import Job
from .miner import Miner
from .job_receipt import JobReceipt
from .marketplace import MarketplaceOffer, MarketplaceBid, OfferStatus
__all__ = ["Job", "Miner", "JobReceipt"]
__all__ = [
"Job",
"Miner",
"JobReceipt",
"MarketplaceOffer",
"MarketplaceBid",
"OfferStatus",
]

View File

@ -0,0 +1,36 @@
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Optional
from uuid import uuid4
from sqlalchemy import Column, Enum as SAEnum, JSON
from sqlmodel import Field, SQLModel
class OfferStatus(str, Enum):
open = "open"
reserved = "reserved"
closed = "closed"
class MarketplaceOffer(SQLModel, table=True):
id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True)
provider: str = Field(index=True)
capacity: int = Field(default=0, nullable=False)
price: float = Field(default=0.0, nullable=False)
sla: str = Field(default="")
status: OfferStatus = Field(default=OfferStatus.open, sa_column=Column(SAEnum(OfferStatus), nullable=False))
created_at: datetime = Field(default_factory=datetime.utcnow, nullable=False, index=True)
attributes: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=False))
class MarketplaceBid(SQLModel, table=True):
id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True)
provider: str = Field(index=True)
capacity: int = Field(default=0, nullable=False)
price: float = Field(default=0.0, nullable=False)
notes: Optional[str] = Field(default=None)
status: str = Field(default="pending", nullable=False)
submitted_at: datetime = Field(default_factory=datetime.utcnow, nullable=False, index=True)

View File

@ -2,7 +2,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .config import settings
from .routers import client, miner, admin
from .routers import client, miner, admin, marketplace, explorer
def create_app() -> FastAPI:
@ -20,9 +20,11 @@ def create_app() -> FastAPI:
allow_headers=["*"]
)
app.include_router(client.router, prefix="/v1")
app.include_router(miner.router, prefix="/v1")
app.include_router(admin.router, prefix="/v1")
app.include_router(client, prefix="/v1")
app.include_router(miner, prefix="/v1")
app.include_router(admin, prefix="/v1")
app.include_router(marketplace, prefix="/v1")
app.include_router(explorer, prefix="/v1")
@app.get("/v1/health", tags=["health"], summary="Service healthcheck")
async def health() -> dict[str, str]:

View File

@ -4,7 +4,7 @@ from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, ConfigDict
class JobState(str, Enum):
@ -76,3 +76,97 @@ class JobFailSubmit(BaseModel):
error_code: str
error_message: str
metrics: Dict[str, Any] = Field(default_factory=dict)
class MarketplaceOfferView(BaseModel):
id: str
provider: str
capacity: int
price: float
sla: str
status: str
created_at: datetime
class MarketplaceStatsView(BaseModel):
totalOffers: int
openCapacity: int
averagePrice: float
activeBids: int
class MarketplaceBidRequest(BaseModel):
provider: str = Field(..., min_length=1)
capacity: int = Field(..., gt=0)
price: float = Field(..., gt=0)
notes: Optional[str] = Field(default=None, max_length=1024)
class BlockSummary(BaseModel):
model_config = ConfigDict(populate_by_name=True)
height: int
hash: str
timestamp: datetime
txCount: int
proposer: str
class BlockListResponse(BaseModel):
model_config = ConfigDict(populate_by_name=True)
items: list[BlockSummary]
next_offset: Optional[str | int] = None
class TransactionSummary(BaseModel):
model_config = ConfigDict(populate_by_name=True, ser_json_tuples=True)
hash: str
block: str | int
from_address: str = Field(alias="from")
to_address: Optional[str] = Field(default=None, alias="to")
value: str
status: str
class TransactionListResponse(BaseModel):
model_config = ConfigDict(populate_by_name=True)
items: list[TransactionSummary]
next_offset: Optional[str | int] = None
class AddressSummary(BaseModel):
model_config = ConfigDict(populate_by_name=True)
address: str
balance: str
txCount: int
lastActive: datetime
recentTransactions: Optional[list[str]] = Field(default=None)
class AddressListResponse(BaseModel):
model_config = ConfigDict(populate_by_name=True)
items: list[AddressSummary]
next_offset: Optional[str | int] = None
class ReceiptSummary(BaseModel):
model_config = ConfigDict(populate_by_name=True)
receiptId: str
miner: str
coordinator: str
issuedAt: datetime
status: str
payload: Optional[Dict[str, Any]] = None
class ReceiptListResponse(BaseModel):
model_config = ConfigDict(populate_by_name=True)
jobId: str
items: list[ReceiptSummary]

View File

@ -1 +1,9 @@
"""Router modules for the coordinator API."""
from .client import router as client
from .miner import router as miner
from .admin import router as admin
from .marketplace import router as marketplace
from .explorer import router as explorer
__all__ = ["client", "miner", "admin", "marketplace", "explorer"]

View File

@ -0,0 +1,63 @@
from __future__ import annotations
from fastapi import APIRouter, Depends, Query
from ..models import (
BlockListResponse,
TransactionListResponse,
AddressListResponse,
ReceiptListResponse,
)
from ..services import ExplorerService
from ..storage import SessionDep
router = APIRouter(prefix="/explorer", tags=["explorer"])
def _service(session: SessionDep) -> ExplorerService:
return ExplorerService(session)
@router.get("/blocks", response_model=BlockListResponse, summary="List recent blocks")
async def list_blocks(
*,
session: SessionDep,
limit: int = Query(default=20, ge=1, le=200),
offset: int = Query(default=0, ge=0),
) -> BlockListResponse:
return _service(session).list_blocks(limit=limit, offset=offset)
@router.get(
"/transactions",
response_model=TransactionListResponse,
summary="List recent transactions",
)
async def list_transactions(
*,
session: SessionDep,
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
) -> TransactionListResponse:
return _service(session).list_transactions(limit=limit, offset=offset)
@router.get("/addresses", response_model=AddressListResponse, summary="List address summaries")
async def list_addresses(
*,
session: SessionDep,
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
) -> AddressListResponse:
return _service(session).list_addresses(limit=limit, offset=offset)
@router.get("/receipts", response_model=ReceiptListResponse, summary="List job receipts")
async def list_receipts(
*,
session: SessionDep,
job_id: str | None = Query(default=None, description="Filter by job identifier"),
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
) -> ReceiptListResponse:
return _service(session).list_receipts(job_id=job_id, limit=limit, offset=offset)

View File

@ -0,0 +1,57 @@
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi import status as http_status
from ..models import MarketplaceBidRequest, MarketplaceOfferView, MarketplaceStatsView
from ..services import MarketplaceService
from ..storage import SessionDep
router = APIRouter(tags=["marketplace"])
def _get_service(session: SessionDep) -> MarketplaceService:
return MarketplaceService(session)
@router.get(
"/marketplace/offers",
response_model=list[MarketplaceOfferView],
summary="List marketplace offers",
)
async def list_marketplace_offers(
*,
session: SessionDep,
status_filter: str | None = Query(default=None, alias="status", description="Filter by offer status"),
limit: int = Query(default=100, ge=1, le=500),
offset: int = Query(default=0, ge=0),
) -> list[MarketplaceOfferView]:
service = _get_service(session)
try:
return service.list_offers(status=status_filter, limit=limit, offset=offset)
except ValueError:
raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail="invalid status filter") from None
@router.get(
"/marketplace/stats",
response_model=MarketplaceStatsView,
summary="Get marketplace summary statistics",
)
async def get_marketplace_stats(*, session: SessionDep) -> MarketplaceStatsView:
service = _get_service(session)
return service.get_stats()
@router.post(
"/marketplace/bids",
status_code=http_status.HTTP_202_ACCEPTED,
summary="Submit a marketplace bid",
)
async def submit_marketplace_bid(
payload: MarketplaceBidRequest,
session: SessionDep,
) -> dict[str, str]:
service = _get_service(session)
bid = service.create_bid(payload)
return {"id": bid.id}

View File

@ -2,5 +2,7 @@
from .jobs import JobService
from .miners import MinerService
from .marketplace import MarketplaceService
from .explorer import ExplorerService
__all__ = ["JobService", "MinerService"]
__all__ = ["JobService", "MinerService", "MarketplaceService", "ExplorerService"]

View File

@ -0,0 +1,182 @@
from __future__ import annotations
from collections import defaultdict, deque
from datetime import datetime
from typing import Optional
from sqlmodel import Session, select
from ..domain import Job, JobReceipt
from ..models import (
BlockListResponse,
BlockSummary,
TransactionListResponse,
TransactionSummary,
AddressListResponse,
AddressSummary,
ReceiptListResponse,
ReceiptSummary,
JobState,
)
_STATUS_LABELS = {
JobState.queued: "Queued",
JobState.running: "Running",
JobState.completed: "Succeeded",
JobState.failed: "Failed",
JobState.canceled: "Canceled",
JobState.expired: "Expired",
}
_DEFAULT_HEIGHT_BASE = 100_000
class ExplorerService:
"""Derives explorer-friendly summaries from coordinator data."""
def __init__(self, session: Session) -> None:
self.session = session
def list_blocks(self, *, limit: int = 20, offset: int = 0) -> BlockListResponse:
statement = select(Job).order_by(Job.requested_at.desc())
jobs = self.session.exec(statement.offset(offset).limit(limit)).all()
items: list[BlockSummary] = []
for index, job in enumerate(jobs):
height = _DEFAULT_HEIGHT_BASE + offset + index
proposer = job.assigned_miner_id or "unassigned"
items.append(
BlockSummary(
height=height,
hash=job.id,
timestamp=job.requested_at,
tx_count=1,
proposer=proposer,
)
)
next_offset: Optional[int] = offset + len(items) if len(items) == limit else None
return BlockListResponse(items=items, next_offset=next_offset)
def list_transactions(self, *, limit: int = 50, offset: int = 0) -> TransactionListResponse:
statement = (
select(Job)
.order_by(Job.requested_at.desc())
.offset(offset)
.limit(limit)
)
jobs = self.session.exec(statement).all()
items: list[TransactionSummary] = []
for index, job in enumerate(jobs):
height = _DEFAULT_HEIGHT_BASE + offset + index
status_label = _STATUS_LABELS.get(job.state, job.state.value.title())
value = job.payload.get("value") if isinstance(job.payload, dict) else None
if value is None:
value_str = "0"
elif isinstance(value, (int, float)):
value_str = f"{value}"
else:
value_str = str(value)
items.append(
TransactionSummary(
hash=job.id,
block=height,
from_address=job.client_id,
to_address=job.assigned_miner_id,
value=value_str,
status=status_label,
)
)
next_offset: Optional[int] = offset + len(items) if len(items) == limit else None
return TransactionListResponse(items=items, next_offset=next_offset)
def list_addresses(self, *, limit: int = 50, offset: int = 0) -> AddressListResponse:
statement = select(Job).order_by(Job.requested_at.desc())
jobs = self.session.exec(statement.offset(offset).limit(limit)).all()
address_map: dict[str, dict[str, object]] = defaultdict(
lambda: {
"address": "",
"balance": "0",
"tx_count": 0,
"last_active": datetime.min,
"recent_transactions": deque(maxlen=5),
}
)
def touch(address: Optional[str], tx_id: str, when: datetime, value_hint: Optional[str] = None) -> None:
if not address:
return
entry = address_map[address]
entry["address"] = address
entry["tx_count"] = int(entry["tx_count"]) + 1
if when > entry["last_active"]:
entry["last_active"] = when
if value_hint:
entry["balance"] = value_hint
recent: deque[str] = entry["recent_transactions"] # type: ignore[assignment]
recent.appendleft(tx_id)
for job in jobs:
value = job.payload.get("value") if isinstance(job.payload, dict) else None
value_hint: Optional[str] = None
if value is not None:
value_hint = str(value)
touch(job.client_id, job.id, job.requested_at, value_hint=value_hint)
touch(job.assigned_miner_id, job.id, job.requested_at)
sorted_addresses = sorted(
address_map.values(),
key=lambda entry: entry["last_active"],
reverse=True,
)
sliced = sorted_addresses[offset : offset + limit]
items = [
AddressSummary(
address=entry["address"],
balance=str(entry["balance"]),
txCount=int(entry["tx_count"]),
lastActive=entry["last_active"],
recentTransactions=list(entry["recent_transactions"]),
)
for entry in sliced
]
next_offset: Optional[int] = offset + len(sliced) if len(sliced) == limit else None
return AddressListResponse(items=items, next_offset=next_offset)
def list_receipts(
self,
*,
job_id: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> ReceiptListResponse:
statement = select(JobReceipt).order_by(JobReceipt.created_at.desc())
if job_id:
statement = statement.where(JobReceipt.job_id == job_id)
rows = self.session.exec(statement.offset(offset).limit(limit)).all()
items: list[ReceiptSummary] = []
for row in rows:
payload = row.payload or {}
miner = payload.get("miner") or payload.get("miner_id") or "unknown"
coordinator = payload.get("coordinator") or payload.get("coordinator_id") or "unknown"
status = payload.get("status") or payload.get("state") or "Unknown"
items.append(
ReceiptSummary(
receipt_id=row.receipt_id,
miner=miner,
coordinator=coordinator,
issued_at=row.created_at,
status=status,
payload=payload,
)
)
resolved_job_id = job_id or "all"
return ReceiptListResponse(job_id=resolved_job_id, items=items)

View File

@ -0,0 +1,83 @@
from __future__ import annotations
from statistics import mean
from typing import Iterable, Optional
from sqlmodel import Session, select
from ..domain import MarketplaceOffer, MarketplaceBid, OfferStatus
from ..models import (
MarketplaceBidRequest,
MarketplaceOfferView,
MarketplaceStatsView,
)
class MarketplaceService:
"""Business logic for marketplace offers, stats, and bids."""
def __init__(self, session: Session) -> None:
self.session = session
def list_offers(
self,
*,
status: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> list[MarketplaceOfferView]:
statement = select(MarketplaceOffer).order_by(MarketplaceOffer.created_at.desc())
if status:
try:
desired_status = OfferStatus(status.lower())
except ValueError as exc: # pragma: no cover - validated in router
raise ValueError("invalid status filter") from exc
statement = statement.where(MarketplaceOffer.status == desired_status)
if offset:
statement = statement.offset(offset)
if limit:
statement = statement.limit(limit)
offers = self.session.exec(statement).all()
return [self._to_offer_view(offer) for offer in offers]
def get_stats(self) -> MarketplaceStatsView:
offers = self.session.exec(select(MarketplaceOffer)).all()
open_offers = [offer for offer in offers if offer.status == OfferStatus.open]
total_offers = len(offers)
open_capacity = sum(offer.capacity for offer in open_offers)
average_price = mean([offer.price for offer in open_offers]) if open_offers else 0.0
active_bids = self.session.exec(
select(MarketplaceBid).where(MarketplaceBid.status == "pending")
).all()
return MarketplaceStatsView(
totalOffers=total_offers,
openCapacity=open_capacity,
averagePrice=round(average_price, 4),
activeBids=len(active_bids),
)
def create_bid(self, payload: MarketplaceBidRequest) -> MarketplaceBid:
bid = MarketplaceBid(
provider=payload.provider,
capacity=payload.capacity,
price=payload.price,
notes=payload.notes,
)
self.session.add(bid)
self.session.commit()
self.session.refresh(bid)
return bid
@staticmethod
def _to_offer_view(offer: MarketplaceOffer) -> MarketplaceOfferView:
return MarketplaceOfferView(
id=offer.id,
provider=offer.provider,
capacity=offer.capacity,
price=offer.price,
sla=offer.sla,
status=offer.status.value,
created_at=offer.created_at,
)

View File

@ -8,7 +8,7 @@ from sqlalchemy.engine import Engine
from sqlmodel import Session, SQLModel, create_engine
from ..config import settings
from ..domain import Job, Miner
from ..domain import Job, Miner, MarketplaceOffer, MarketplaceBid
_engine: Engine | None = None

View File

@ -0,0 +1,113 @@
import pytest
from fastapi.testclient import TestClient
from sqlmodel import Session, delete
from app.config import settings
from app.domain import MarketplaceOffer, OfferStatus, MarketplaceBid
from app.main import create_app
from app.services.marketplace import MarketplaceService
from app.storage.db import init_db, session_scope
@pytest.fixture(scope="module", autouse=True)
def _init_db(tmp_path_factory):
db_file = tmp_path_factory.mktemp("data") / "marketplace.db"
settings.database_url = f"sqlite:///{db_file}"
init_db()
yield
@pytest.fixture()
def session():
with session_scope() as sess:
sess.exec(delete(MarketplaceBid))
sess.exec(delete(MarketplaceOffer))
sess.commit()
yield sess
@pytest.fixture()
def client():
app = create_app()
return TestClient(app)
def test_list_offers_filters_by_status(client: TestClient, session: Session):
open_offer = MarketplaceOffer(provider="Alpha", capacity=250, price=12.5, sla="99.9%", status=OfferStatus.open)
reserved_offer = MarketplaceOffer(provider="Beta", capacity=100, price=15.0, sla="99.5%", status=OfferStatus.reserved)
session.add(open_offer)
session.add(reserved_offer)
session.commit()
# All offers
resp = client.get("/v1/marketplace/offers")
assert resp.status_code == 200
payload = resp.json()
assert len(payload) == 2
# Filter by status
resp_open = client.get("/v1/marketplace/offers", params={"status": "open"})
assert resp_open.status_code == 200
open_payload = resp_open.json()
assert len(open_payload) == 1
assert open_payload[0]["provider"] == "Alpha"
# Invalid status yields 400
resp_invalid = client.get("/v1/marketplace/offers", params={"status": "invalid"})
assert resp_invalid.status_code == 400
def test_marketplace_stats(client: TestClient, session: Session):
session.add_all(
[
MarketplaceOffer(provider="Alpha", capacity=200, price=10.0, sla="99.9%", status=OfferStatus.open),
MarketplaceOffer(provider="Beta", capacity=150, price=20.0, sla="99.5%", status=OfferStatus.open),
MarketplaceOffer(provider="Gamma", capacity=90, price=12.0, sla="99.0%", status=OfferStatus.reserved),
]
)
session.commit()
resp = client.get("/v1/marketplace/stats")
assert resp.status_code == 200
stats = resp.json()
assert stats["totalOffers"] == 3
assert stats["openCapacity"] == 350
assert pytest.approx(stats["averagePrice"], rel=1e-3) == 15.0
assert stats["activeBids"] == 0
def test_submit_bid_creates_record(client: TestClient, session: Session):
payload = {
"provider": "Alpha",
"capacity": 120,
"price": 13.5,
"notes": "Need overnight capacity",
}
resp = client.post("/v1/marketplace/bids", json=payload)
assert resp.status_code == 202
response_payload = resp.json()
assert "id" in response_payload
bid = session.get(MarketplaceBid, response_payload["id"])
assert bid is not None
assert bid.provider == payload["provider"]
assert bid.capacity == payload["capacity"]
assert bid.price == payload["price"]
assert bid.notes == payload["notes"]
def test_marketplace_service_list_offers_handles_limit_offset(session: Session):
session.add_all(
[
MarketplaceOffer(provider="A", capacity=50, price=9.0, sla="99.0%", status=OfferStatus.open),
MarketplaceOffer(provider="B", capacity=70, price=11.0, sla="99.0%", status=OfferStatus.open),
MarketplaceOffer(provider="C", capacity=90, price=13.0, sla="99.0%", status=OfferStatus.open),
]
)
session.commit()
service = MarketplaceService(session)
limited = service.list_offers(limit=2, offset=1)
assert len(limited) == 2
# Offers ordered by created_at descending → last inserted first
assert {offer.provider for offer in limited} == {"B", "A"}