chore: initialize monorepo with project scaffolding, configs, and CI setup
This commit is contained in:
6
apps/coordinator-api/src/app/services/__init__.py
Normal file
6
apps/coordinator-api/src/app/services/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Service layer for coordinator business logic."""
|
||||
|
||||
from .jobs import JobService
|
||||
from .miners import MinerService
|
||||
|
||||
__all__ = ["JobService", "MinerService"]
|
||||
156
apps/coordinator-api/src/app/services/jobs.py
Normal file
156
apps/coordinator-api/src/app/services/jobs.py
Normal file
@@ -0,0 +1,156 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
from sqlmodel import Session, select
|
||||
|
||||
from ..domain import Job, Miner, JobReceipt
|
||||
from ..models import AssignedJob, Constraints, JobCreate, JobResult, JobState, JobView
|
||||
|
||||
|
||||
class JobService:
|
||||
def __init__(self, session: Session):
|
||||
self.session = session
|
||||
|
||||
def create_job(self, client_id: str, req: JobCreate) -> Job:
|
||||
ttl = max(req.ttl_seconds, 1)
|
||||
now = datetime.utcnow()
|
||||
job = Job(
|
||||
client_id=client_id,
|
||||
payload=req.payload,
|
||||
constraints=req.constraints.model_dump(exclude_none=True),
|
||||
ttl_seconds=ttl,
|
||||
requested_at=now,
|
||||
expires_at=now + timedelta(seconds=ttl),
|
||||
)
|
||||
self.session.add(job)
|
||||
self.session.commit()
|
||||
self.session.refresh(job)
|
||||
return job
|
||||
|
||||
def get_job(self, job_id: str, client_id: Optional[str] = None) -> Job:
|
||||
query = select(Job).where(Job.id == job_id)
|
||||
if client_id:
|
||||
query = query.where(Job.client_id == client_id)
|
||||
job = self.session.exec(query).one_or_none()
|
||||
if not job:
|
||||
raise KeyError("job not found")
|
||||
return self._ensure_not_expired(job)
|
||||
|
||||
def list_receipts(self, job_id: str, client_id: Optional[str] = None) -> list[JobReceipt]:
|
||||
job = self.get_job(job_id, client_id=client_id)
|
||||
receipts = self.session.exec(
|
||||
select(JobReceipt)
|
||||
.where(JobReceipt.job_id == job.id)
|
||||
.order_by(JobReceipt.created_at.asc())
|
||||
).all()
|
||||
return receipts
|
||||
|
||||
def cancel_job(self, job: Job) -> Job:
|
||||
if job.state not in {JobState.queued, JobState.running}:
|
||||
return job
|
||||
job.state = JobState.canceled
|
||||
job.error = "canceled by client"
|
||||
job.assigned_miner_id = None
|
||||
self.session.add(job)
|
||||
self.session.commit()
|
||||
self.session.refresh(job)
|
||||
return job
|
||||
|
||||
def to_view(self, job: Job) -> JobView:
|
||||
return JobView(
|
||||
job_id=job.id,
|
||||
state=job.state,
|
||||
assigned_miner_id=job.assigned_miner_id,
|
||||
requested_at=job.requested_at,
|
||||
expires_at=job.expires_at,
|
||||
error=job.error,
|
||||
)
|
||||
|
||||
def to_result(self, job: Job) -> JobResult:
|
||||
return JobResult(result=job.result, receipt=job.receipt)
|
||||
|
||||
def to_assigned(self, job: Job) -> AssignedJob:
|
||||
constraints = Constraints(**job.constraints) if isinstance(job.constraints, dict) else Constraints()
|
||||
return AssignedJob(job_id=job.id, payload=job.payload, constraints=constraints)
|
||||
|
||||
def acquire_next_job(self, miner: Miner) -> Optional[Job]:
|
||||
now = datetime.utcnow()
|
||||
statement = (
|
||||
select(Job)
|
||||
.where(Job.state == JobState.queued)
|
||||
.order_by(Job.requested_at.asc())
|
||||
)
|
||||
|
||||
jobs = self.session.exec(statement).all()
|
||||
for job in jobs:
|
||||
job = self._ensure_not_expired(job)
|
||||
if job.state != JobState.queued:
|
||||
continue
|
||||
if job.expires_at <= now:
|
||||
continue
|
||||
if not self._satisfies_constraints(job, miner):
|
||||
continue
|
||||
job.state = JobState.running
|
||||
job.assigned_miner_id = miner.id
|
||||
self.session.add(job)
|
||||
self.session.commit()
|
||||
self.session.refresh(job)
|
||||
return job
|
||||
return None
|
||||
|
||||
def _ensure_not_expired(self, job: Job) -> Job:
|
||||
if job.state == JobState.queued and job.expires_at <= datetime.utcnow():
|
||||
job.state = JobState.expired
|
||||
job.error = "job expired"
|
||||
self.session.add(job)
|
||||
self.session.commit()
|
||||
self.session.refresh(job)
|
||||
return job
|
||||
|
||||
def _satisfies_constraints(self, job: Job, miner: Miner) -> bool:
|
||||
if not job.constraints:
|
||||
return True
|
||||
constraints = Constraints(**job.constraints)
|
||||
capabilities = miner.capabilities or {}
|
||||
|
||||
# Region matching
|
||||
if constraints.region and constraints.region != miner.region:
|
||||
return False
|
||||
|
||||
gpu_specs = capabilities.get("gpus", []) or []
|
||||
has_gpu = bool(gpu_specs)
|
||||
|
||||
if constraints.gpu:
|
||||
if not has_gpu:
|
||||
return False
|
||||
names = [gpu.get("name") for gpu in gpu_specs]
|
||||
if constraints.gpu not in names:
|
||||
return False
|
||||
|
||||
if constraints.min_vram_gb:
|
||||
required_mb = constraints.min_vram_gb * 1024
|
||||
if not any((gpu.get("memory_mb") or 0) >= required_mb for gpu in gpu_specs):
|
||||
return False
|
||||
|
||||
if constraints.cuda:
|
||||
cuda_info = capabilities.get("cuda")
|
||||
if not cuda_info or constraints.cuda not in str(cuda_info):
|
||||
return False
|
||||
|
||||
if constraints.models:
|
||||
available_models = capabilities.get("models", [])
|
||||
if not set(constraints.models).issubset(set(available_models)):
|
||||
return False
|
||||
|
||||
if constraints.max_price is not None:
|
||||
price = capabilities.get("price")
|
||||
try:
|
||||
price_value = float(price)
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
if price_value > constraints.max_price:
|
||||
return False
|
||||
|
||||
return True
|
||||
110
apps/coordinator-api/src/app/services/miners.py
Normal file
110
apps/coordinator-api/src/app/services/miners.py
Normal file
@@ -0,0 +1,110 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlmodel import Session, select
|
||||
|
||||
from ..domain import Miner
|
||||
from ..models import AssignedJob, MinerHeartbeat, MinerRegister
|
||||
from .jobs import JobService
|
||||
|
||||
|
||||
class MinerService:
|
||||
def __init__(self, session: Session):
|
||||
self.session = session
|
||||
|
||||
def register(self, miner_id: str, payload: MinerRegister) -> Miner:
|
||||
miner = self.session.get(Miner, miner_id)
|
||||
session_token = uuid4().hex
|
||||
if miner is None:
|
||||
miner = Miner(
|
||||
id=miner_id,
|
||||
capabilities=payload.capabilities,
|
||||
concurrency=payload.concurrency,
|
||||
region=payload.region,
|
||||
session_token=session_token,
|
||||
)
|
||||
self.session.add(miner)
|
||||
else:
|
||||
miner.capabilities = payload.capabilities
|
||||
miner.concurrency = payload.concurrency
|
||||
miner.region = payload.region
|
||||
miner.session_token = session_token
|
||||
miner.last_heartbeat = datetime.utcnow()
|
||||
miner.status = "ONLINE"
|
||||
self.session.commit()
|
||||
self.session.refresh(miner)
|
||||
return miner
|
||||
|
||||
def heartbeat(self, miner_id: str, payload: MinerHeartbeat | dict) -> Miner:
|
||||
if not isinstance(payload, MinerHeartbeat):
|
||||
payload = MinerHeartbeat.model_validate(payload)
|
||||
miner = self.session.get(Miner, miner_id)
|
||||
if miner is None:
|
||||
raise KeyError("miner not registered")
|
||||
miner.inflight = payload.inflight
|
||||
miner.status = payload.status
|
||||
miner.extra_metadata = payload.metadata
|
||||
miner.last_heartbeat = datetime.utcnow()
|
||||
self.session.add(miner)
|
||||
self.session.commit()
|
||||
self.session.refresh(miner)
|
||||
return miner
|
||||
|
||||
def poll(self, miner_id: str, max_wait_seconds: int) -> Optional[AssignedJob]:
|
||||
miner = self.session.get(Miner, miner_id)
|
||||
if miner is None:
|
||||
raise KeyError("miner not registered")
|
||||
if miner.concurrency and miner.inflight >= miner.concurrency:
|
||||
return None
|
||||
|
||||
job_service = JobService(self.session)
|
||||
job = job_service.acquire_next_job(miner)
|
||||
if not job:
|
||||
return None
|
||||
|
||||
miner.inflight += 1
|
||||
miner.last_heartbeat = datetime.utcnow()
|
||||
miner.last_job_at = datetime.utcnow()
|
||||
self.session.add(miner)
|
||||
self.session.commit()
|
||||
return job_service.to_assigned(job)
|
||||
|
||||
def release(
|
||||
self,
|
||||
miner_id: str,
|
||||
success: bool | None = None,
|
||||
duration_ms: int | None = None,
|
||||
receipt_id: str | None = None,
|
||||
) -> None:
|
||||
miner = self.session.get(Miner, miner_id)
|
||||
if miner:
|
||||
miner.inflight = max(0, miner.inflight - 1)
|
||||
if success is True:
|
||||
miner.jobs_completed += 1
|
||||
if duration_ms is not None:
|
||||
miner.total_job_duration_ms += duration_ms
|
||||
miner.average_job_duration_ms = (
|
||||
miner.total_job_duration_ms / max(miner.jobs_completed, 1)
|
||||
)
|
||||
elif success is False:
|
||||
miner.jobs_failed += 1
|
||||
if receipt_id:
|
||||
miner.last_receipt_id = receipt_id
|
||||
self.session.add(miner)
|
||||
self.session.commit()
|
||||
|
||||
def get(self, miner_id: str) -> Miner:
|
||||
miner = self.session.get(Miner, miner_id)
|
||||
if miner is None:
|
||||
raise KeyError("miner not registered")
|
||||
return miner
|
||||
|
||||
def list_records(self) -> list[Miner]:
|
||||
return list(self.session.exec(select(Miner)).all())
|
||||
|
||||
def online_count(self) -> int:
|
||||
result = self.session.exec(select(Miner).where(Miner.status == "ONLINE"))
|
||||
return len(result.all())
|
||||
79
apps/coordinator-api/src/app/services/receipts.py
Normal file
79
apps/coordinator-api/src/app/services/receipts.py
Normal file
@@ -0,0 +1,79 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict, Optional
|
||||
from secrets import token_hex
|
||||
from datetime import datetime
|
||||
|
||||
from aitbc_crypto.signing import ReceiptSigner
|
||||
|
||||
from sqlmodel import Session
|
||||
|
||||
from ..config import settings
|
||||
from ..domain import Job, JobReceipt
|
||||
|
||||
|
||||
class ReceiptService:
|
||||
def __init__(self, session: Session) -> None:
|
||||
self.session = session
|
||||
self._signer: Optional[ReceiptSigner] = None
|
||||
self._attestation_signer: Optional[ReceiptSigner] = None
|
||||
if settings.receipt_signing_key_hex:
|
||||
key_bytes = bytes.fromhex(settings.receipt_signing_key_hex)
|
||||
self._signer = ReceiptSigner(key_bytes)
|
||||
if settings.receipt_attestation_key_hex:
|
||||
attest_bytes = bytes.fromhex(settings.receipt_attestation_key_hex)
|
||||
self._attestation_signer = ReceiptSigner(attest_bytes)
|
||||
|
||||
def create_receipt(
|
||||
self,
|
||||
job: Job,
|
||||
miner_id: str,
|
||||
job_result: Dict[str, Any] | None,
|
||||
result_metrics: Dict[str, Any] | None,
|
||||
) -> Dict[str, Any] | None:
|
||||
if self._signer is None:
|
||||
return None
|
||||
payload = {
|
||||
"version": "1.0",
|
||||
"receipt_id": token_hex(16),
|
||||
"job_id": job.id,
|
||||
"provider": miner_id,
|
||||
"client": job.client_id,
|
||||
"units": _first_present([
|
||||
(result_metrics or {}).get("units"),
|
||||
(job_result or {}).get("units"),
|
||||
], default=0.0),
|
||||
"unit_type": _first_present([
|
||||
(result_metrics or {}).get("unit_type"),
|
||||
(job_result or {}).get("unit_type"),
|
||||
], default="gpu_seconds"),
|
||||
"price": _first_present([
|
||||
(result_metrics or {}).get("price"),
|
||||
(job_result or {}).get("price"),
|
||||
]),
|
||||
"started_at": int(job.requested_at.timestamp()) if job.requested_at else int(datetime.utcnow().timestamp()),
|
||||
"completed_at": int(datetime.utcnow().timestamp()),
|
||||
"metadata": {
|
||||
"job_payload": job.payload,
|
||||
"job_constraints": job.constraints,
|
||||
"result": job_result,
|
||||
"metrics": result_metrics,
|
||||
},
|
||||
}
|
||||
payload["signature"] = self._signer.sign(payload)
|
||||
if self._attestation_signer:
|
||||
payload.setdefault("attestations", [])
|
||||
attestation_payload = dict(payload)
|
||||
attestation_payload.pop("attestations", None)
|
||||
attestation_payload.pop("signature", None)
|
||||
payload["attestations"].append(self._attestation_signer.sign(attestation_payload))
|
||||
receipt_row = JobReceipt(job_id=job.id, receipt_id=payload["receipt_id"], payload=payload)
|
||||
self.session.add(receipt_row)
|
||||
return payload
|
||||
|
||||
|
||||
def _first_present(values: list[Optional[Any]], default: Optional[Any] = None) -> Optional[Any]:
|
||||
for value in values:
|
||||
if value is not None:
|
||||
return value
|
||||
return default
|
||||
Reference in New Issue
Block a user