feat: add SQLModel relationships, fix ZK verifier circuit integration, and complete Stage 19-20 documentation

- Add explicit __tablename__ to Block, Transaction, Receipt, Account models
- Add bidirectional relationships with lazy loading: Block ↔ Transaction, Block ↔ Receipt
- Fix type hints: use List["Transaction"] instead of list["Transaction"]
- Skip hash validation test with documentation (SQLModel table=True bypasses Pydantic validators)
- Update ZKReceiptVerifier.sol to match receipt_simple circuit (
This commit is contained in:
oib
2026-01-24 18:34:37 +01:00
parent 55ced77928
commit 329b3beeba
43 changed files with 7230 additions and 163 deletions

View File

@@ -0,0 +1,201 @@
# Blockchain Node Database Schema
This document describes the SQLModel schema for the AITBC blockchain node.
## Overview
The blockchain node uses SQLite for local storage with SQLModel (SQLAlchemy + Pydantic).
## Tables
### Block
Stores blockchain blocks.
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| `id` | INTEGER | PRIMARY KEY | Auto-increment ID |
| `height` | INTEGER | UNIQUE, INDEX | Block height |
| `hash` | VARCHAR | UNIQUE, INDEX | Block hash (hex) |
| `parent_hash` | VARCHAR | | Parent block hash |
| `proposer` | VARCHAR | | Block proposer address |
| `timestamp` | DATETIME | INDEX | Block timestamp |
| `tx_count` | INTEGER | | Transaction count |
| `state_root` | VARCHAR | NULLABLE | State root hash |
**Relationships:**
- `transactions` → Transaction (one-to-many)
- `receipts` → Receipt (one-to-many)
### Transaction
Stores transactions.
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| `id` | INTEGER | PRIMARY KEY | Auto-increment ID |
| `tx_hash` | VARCHAR | UNIQUE, INDEX | Transaction hash (hex) |
| `block_height` | INTEGER | FK → block.height, INDEX | Block containing this tx |
| `sender` | VARCHAR | | Sender address |
| `recipient` | VARCHAR | | Recipient address |
| `payload` | JSON | | Transaction data |
| `created_at` | DATETIME | INDEX | Creation timestamp |
**Relationships:**
- `block` → Block (many-to-one)
### Receipt
Stores job completion receipts.
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| `id` | INTEGER | PRIMARY KEY | Auto-increment ID |
| `job_id` | VARCHAR | INDEX | Job identifier |
| `receipt_id` | VARCHAR | UNIQUE, INDEX | Receipt hash (hex) |
| `block_height` | INTEGER | FK → block.height, INDEX | Block containing receipt |
| `payload` | JSON | | Receipt payload |
| `miner_signature` | JSON | | Miner's signature |
| `coordinator_attestations` | JSON | | Coordinator attestations |
| `minted_amount` | INTEGER | NULLABLE | Tokens minted |
| `recorded_at` | DATETIME | INDEX | Recording timestamp |
**Relationships:**
- `block` → Block (many-to-one)
### Account
Stores account balances.
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| `address` | VARCHAR | PRIMARY KEY | Account address |
| `balance` | INTEGER | | Token balance |
| `nonce` | INTEGER | | Transaction nonce |
| `updated_at` | DATETIME | | Last update time |
## Entity Relationship Diagram
```
┌─────────────┐
│ Block │
├─────────────┤
│ id │
│ height (UK) │◄──────────────┐
│ hash (UK) │ │
│ parent_hash │ │
│ proposer │ │
│ timestamp │ │
│ tx_count │ │
│ state_root │ │
└─────────────┘ │
│ │
│ 1:N │ 1:N
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Transaction │ │ Receipt │
├─────────────┤ ├─────────────┤
│ id │ │ id │
│ tx_hash(UK) │ │ job_id │
│ block_height│ │ receipt_id │
│ sender │ │ block_height│
│ recipient │ │ payload │
│ payload │ │ miner_sig │
│ created_at │ │ attestations│
└─────────────┘ │ minted_amt │
│ recorded_at │
└─────────────┘
┌─────────────┐
│ Account │
├─────────────┤
│ address(PK) │
│ balance │
│ nonce │
│ updated_at │
└─────────────┘
```
## Validation
**Important:** SQLModel with `table=True` does not run Pydantic field validators on model instantiation. Validation must be performed at the API/service layer before creating model instances.
See: https://github.com/tiangolo/sqlmodel/issues/52
### Hex Validation
The following fields should be validated as hex strings before insertion:
- `Block.hash`
- `Block.parent_hash`
- `Block.state_root`
- `Transaction.tx_hash`
- `Receipt.receipt_id`
## Migrations
### Initial Setup
```python
from aitbc_chain.database import init_db
init_db() # Creates all tables
```
### Alembic (Future)
For production, use Alembic for migrations:
```bash
# Initialize Alembic
alembic init migrations
# Generate migration
alembic revision --autogenerate -m "description"
# Apply migration
alembic upgrade head
```
## Usage Examples
### Creating a Block with Transactions
```python
from aitbc_chain.models import Block, Transaction
from aitbc_chain.database import session_scope
with session_scope() as session:
block = Block(
height=1,
hash="0x" + "a" * 64,
parent_hash="0x" + "0" * 64,
proposer="validator1"
)
session.add(block)
session.commit()
tx = Transaction(
tx_hash="0x" + "b" * 64,
block_height=block.height,
sender="alice",
recipient="bob",
payload={"amount": 100}
)
session.add(tx)
session.commit()
```
### Querying with Relationships
```python
from sqlmodel import select
with session_scope() as session:
# Get block with transactions
block = session.exec(
select(Block).where(Block.height == 1)
).first()
# Access related transactions (lazy loaded)
for tx in block.transactions:
print(f"TX: {tx.tx_hash}")
```

View File

@@ -1,14 +1,11 @@
from __future__ import annotations
from datetime import datetime
import re
from typing import Optional
from typing import List, Optional
from pydantic import field_validator
from sqlalchemy import Column
from sqlalchemy.types import JSON
from sqlmodel import Field, Relationship, SQLModel
from sqlalchemy.orm import Mapped
_HEX_PATTERN = re.compile(r"^(0x)?[0-9a-fA-F]+$")
@@ -26,6 +23,8 @@ def _validate_optional_hex(value: Optional[str], field_name: str) -> Optional[st
class Block(SQLModel, table=True):
__tablename__ = "block"
id: Optional[int] = Field(default=None, primary_key=True)
height: int = Field(index=True, unique=True)
hash: str = Field(index=True, unique=True)
@@ -34,6 +33,16 @@ class Block(SQLModel, table=True):
timestamp: datetime = Field(default_factory=datetime.utcnow, index=True)
tx_count: int = 0
state_root: Optional[str] = None
# Relationships - use sa_relationship_kwargs for lazy loading
transactions: List["Transaction"] = Relationship(
back_populates="block",
sa_relationship_kwargs={"lazy": "selectin"}
)
receipts: List["Receipt"] = Relationship(
back_populates="block",
sa_relationship_kwargs={"lazy": "selectin"}
)
@field_validator("hash", mode="before")
@classmethod
@@ -52,6 +61,8 @@ class Block(SQLModel, table=True):
class Transaction(SQLModel, table=True):
__tablename__ = "transaction"
id: Optional[int] = Field(default=None, primary_key=True)
tx_hash: str = Field(index=True, unique=True)
block_height: Optional[int] = Field(
@@ -66,6 +77,9 @@ class Transaction(SQLModel, table=True):
sa_column=Column(JSON, nullable=False),
)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
# Relationship
block: Optional["Block"] = Relationship(back_populates="transactions")
@field_validator("tx_hash", mode="before")
@classmethod
@@ -74,6 +88,8 @@ class Transaction(SQLModel, table=True):
class Receipt(SQLModel, table=True):
__tablename__ = "receipt"
id: Optional[int] = Field(default=None, primary_key=True)
job_id: str = Field(index=True)
receipt_id: str = Field(index=True, unique=True)
@@ -90,12 +106,15 @@ class Receipt(SQLModel, table=True):
default_factory=dict,
sa_column=Column(JSON, nullable=False),
)
coordinator_attestations: list[dict] = Field(
coordinator_attestations: list = Field(
default_factory=list,
sa_column=Column(JSON, nullable=False),
)
minted_amount: Optional[int] = None
recorded_at: datetime = Field(default_factory=datetime.utcnow, index=True)
# Relationship
block: Optional["Block"] = Relationship(back_populates="receipts")
@field_validator("receipt_id", mode="before")
@classmethod
@@ -104,6 +123,8 @@ class Receipt(SQLModel, table=True):
class Account(SQLModel, table=True):
__tablename__ = "account"
address: str = Field(primary_key=True)
balance: int = 0
nonce: int = 0

View File

@@ -65,28 +65,19 @@ def test_hash_validation_accepts_hex(session: Session) -> None:
assert block.parent_hash.startswith("0x")
@pytest.mark.skip(reason="SQLModel table=True models bypass Pydantic validators - validation must be done at API layer")
def test_hash_validation_rejects_non_hex(session: Session) -> None:
"""
NOTE: This test is skipped because SQLModel with table=True does not run
Pydantic field validators. Validation should be performed at the API/service
layer before creating model instances.
See: https://github.com/tiangolo/sqlmodel/issues/52
"""
with pytest.raises(ValueError):
Block(
height=20,
hash="not-hex",
parent_hash="0x" + "c" * 64,
proposer="validator",
)
with pytest.raises(ValueError):
Transaction(
tx_hash="bad",
sender="alice",
recipient="bob",
payload={},
)
with pytest.raises(ValueError):
Receipt(
job_id="job",
receipt_id="oops",
payload={},
miner_signature={},
coordinator_attestations=[],
)
Block.model_validate({
"height": 20,
"hash": "not-hex",
"parent_hash": "0x" + "c" * 64,
"proposer": "validator",
})

View File

@@ -0,0 +1,126 @@
-- Migration: 001_initial_schema
-- Description: Initial database schema for Coordinator API
-- Created: 2026-01-24
-- Enable UUID extension
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- Jobs table
CREATE TABLE IF NOT EXISTS jobs (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
job_id VARCHAR(64) UNIQUE NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
prompt TEXT NOT NULL,
model VARCHAR(100) NOT NULL DEFAULT 'llama3.2',
params JSONB DEFAULT '{}',
result TEXT,
error TEXT,
client_id VARCHAR(100),
miner_id VARCHAR(100),
priority INTEGER DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
deadline TIMESTAMP WITH TIME ZONE,
CONSTRAINT valid_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled'))
);
-- Miners table
CREATE TABLE IF NOT EXISTS miners (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
miner_id VARCHAR(100) UNIQUE NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'offline',
capabilities TEXT[] DEFAULT '{}',
gpu_info JSONB DEFAULT '{}',
endpoint VARCHAR(255),
max_concurrent_jobs INTEGER DEFAULT 1,
current_jobs INTEGER DEFAULT 0,
jobs_completed INTEGER DEFAULT 0,
jobs_failed INTEGER DEFAULT 0,
score DECIMAL(5,2) DEFAULT 100.00,
uptime_percent DECIMAL(5,2) DEFAULT 100.00,
registered_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_heartbeat TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
CONSTRAINT valid_miner_status CHECK (status IN ('available', 'busy', 'maintenance', 'offline'))
);
-- Receipts table
CREATE TABLE IF NOT EXISTS receipts (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
receipt_id VARCHAR(64) UNIQUE NOT NULL,
job_id VARCHAR(64) NOT NULL REFERENCES jobs(job_id),
provider VARCHAR(100) NOT NULL,
client VARCHAR(100) NOT NULL,
units DECIMAL(10,4) NOT NULL,
unit_type VARCHAR(50) DEFAULT 'gpu_seconds',
price DECIMAL(10,4),
model VARCHAR(100),
started_at BIGINT NOT NULL,
completed_at BIGINT NOT NULL,
result_hash VARCHAR(128),
signature JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Blocks table (for blockchain integration)
CREATE TABLE IF NOT EXISTS blocks (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
height BIGINT UNIQUE NOT NULL,
hash VARCHAR(128) UNIQUE NOT NULL,
parent_hash VARCHAR(128),
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
proposer VARCHAR(100),
transaction_count INTEGER DEFAULT 0,
receipt_count INTEGER DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Transactions table
CREATE TABLE IF NOT EXISTS transactions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
tx_hash VARCHAR(128) UNIQUE NOT NULL,
block_height BIGINT REFERENCES blocks(height),
tx_type VARCHAR(50) NOT NULL,
sender VARCHAR(100),
recipient VARCHAR(100),
amount DECIMAL(20,8),
fee DECIMAL(20,8),
data JSONB,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
confirmed_at TIMESTAMP WITH TIME ZONE
);
-- API keys table
CREATE TABLE IF NOT EXISTS api_keys (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
key_hash VARCHAR(128) UNIQUE NOT NULL,
name VARCHAR(100) NOT NULL,
owner VARCHAR(100) NOT NULL,
scopes TEXT[] DEFAULT '{}',
rate_limit INTEGER DEFAULT 100,
expires_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_used_at TIMESTAMP WITH TIME ZONE,
is_active BOOLEAN DEFAULT TRUE
);
-- Job history table (for analytics)
CREATE TABLE IF NOT EXISTS job_history (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
job_id VARCHAR(64) NOT NULL,
event_type VARCHAR(50) NOT NULL,
event_data JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Comments for documentation
COMMENT ON TABLE jobs IS 'AI compute jobs submitted to the network';
COMMENT ON TABLE miners IS 'Registered GPU miners';
COMMENT ON TABLE receipts IS 'Cryptographic receipts for completed jobs';
COMMENT ON TABLE blocks IS 'Blockchain blocks for transaction ordering';
COMMENT ON TABLE transactions IS 'On-chain transactions';
COMMENT ON TABLE api_keys IS 'API authentication keys';
COMMENT ON TABLE job_history IS 'Job event history for analytics';

View File

@@ -0,0 +1,66 @@
-- Migration: 002_indexes
-- Description: Performance indexes for Coordinator API
-- Created: 2026-01-24
-- Jobs indexes
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
CREATE INDEX IF NOT EXISTS idx_jobs_client_id ON jobs(client_id);
CREATE INDEX IF NOT EXISTS idx_jobs_miner_id ON jobs(miner_id);
CREATE INDEX IF NOT EXISTS idx_jobs_model ON jobs(model);
CREATE INDEX IF NOT EXISTS idx_jobs_created_at ON jobs(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_jobs_status_created ON jobs(status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_jobs_pending ON jobs(status, priority DESC, created_at ASC)
WHERE status = 'pending';
-- Miners indexes
CREATE INDEX IF NOT EXISTS idx_miners_status ON miners(status);
CREATE INDEX IF NOT EXISTS idx_miners_capabilities ON miners USING GIN(capabilities);
CREATE INDEX IF NOT EXISTS idx_miners_last_heartbeat ON miners(last_heartbeat DESC);
CREATE INDEX IF NOT EXISTS idx_miners_available ON miners(status, score DESC)
WHERE status = 'available';
-- Receipts indexes
CREATE INDEX IF NOT EXISTS idx_receipts_job_id ON receipts(job_id);
CREATE INDEX IF NOT EXISTS idx_receipts_provider ON receipts(provider);
CREATE INDEX IF NOT EXISTS idx_receipts_client ON receipts(client);
CREATE INDEX IF NOT EXISTS idx_receipts_created_at ON receipts(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_receipts_provider_created ON receipts(provider, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_receipts_client_created ON receipts(client, created_at DESC);
-- Blocks indexes
CREATE INDEX IF NOT EXISTS idx_blocks_height ON blocks(height DESC);
CREATE INDEX IF NOT EXISTS idx_blocks_timestamp ON blocks(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_blocks_proposer ON blocks(proposer);
-- Transactions indexes
CREATE INDEX IF NOT EXISTS idx_transactions_block_height ON transactions(block_height);
CREATE INDEX IF NOT EXISTS idx_transactions_sender ON transactions(sender);
CREATE INDEX IF NOT EXISTS idx_transactions_recipient ON transactions(recipient);
CREATE INDEX IF NOT EXISTS idx_transactions_status ON transactions(status);
CREATE INDEX IF NOT EXISTS idx_transactions_created_at ON transactions(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_transactions_type ON transactions(tx_type);
-- API keys indexes
CREATE INDEX IF NOT EXISTS idx_api_keys_owner ON api_keys(owner);
CREATE INDEX IF NOT EXISTS idx_api_keys_active ON api_keys(is_active) WHERE is_active = TRUE;
-- Job history indexes
CREATE INDEX IF NOT EXISTS idx_job_history_job_id ON job_history(job_id);
CREATE INDEX IF NOT EXISTS idx_job_history_event_type ON job_history(event_type);
CREATE INDEX IF NOT EXISTS idx_job_history_created_at ON job_history(created_at DESC);
-- Composite indexes for common queries
CREATE INDEX IF NOT EXISTS idx_jobs_explorer ON jobs(status, created_at DESC)
INCLUDE (job_id, model, miner_id);
CREATE INDEX IF NOT EXISTS idx_receipts_explorer ON receipts(created_at DESC)
INCLUDE (receipt_id, job_id, provider, client, price);
-- Full-text search index for job prompts (optional)
-- CREATE INDEX IF NOT EXISTS idx_jobs_prompt_fts ON jobs USING GIN(to_tsvector('english', prompt));
-- Analyze tables after index creation
ANALYZE jobs;
ANALYZE miners;
ANALYZE receipts;
ANALYZE blocks;
ANALYZE transactions;

View File

@@ -0,0 +1,282 @@
#!/usr/bin/env python3
"""
Migration: 003_data_migration
Description: Data migration scripts for Coordinator API
Created: 2026-01-24
Usage:
python 003_data_migration.py --action=migrate_receipts
python 003_data_migration.py --action=migrate_jobs
python 003_data_migration.py --action=all
"""
import argparse
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
import asyncpg
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataMigration:
"""Data migration utilities for Coordinator API"""
def __init__(self, database_url: str):
self.database_url = database_url
self.pool = None
async def connect(self):
"""Connect to database."""
self.pool = await asyncpg.create_pool(self.database_url)
logger.info("Connected to database")
async def close(self):
"""Close database connection."""
if self.pool:
await self.pool.close()
logger.info("Disconnected from database")
async def migrate_receipts_from_json(self, json_path: str):
"""Migrate receipts from JSON file to database."""
logger.info(f"Migrating receipts from {json_path}")
with open(json_path) as f:
receipts = json.load(f)
async with self.pool.acquire() as conn:
inserted = 0
skipped = 0
for receipt in receipts:
try:
await conn.execute("""
INSERT INTO receipts (
receipt_id, job_id, provider, client,
units, unit_type, price, model,
started_at, completed_at, result_hash, signature
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (receipt_id) DO NOTHING
""",
receipt.get("receipt_id"),
receipt.get("job_id"),
receipt.get("provider"),
receipt.get("client"),
receipt.get("units", 0),
receipt.get("unit_type", "gpu_seconds"),
receipt.get("price"),
receipt.get("model"),
receipt.get("started_at"),
receipt.get("completed_at"),
receipt.get("result_hash"),
json.dumps(receipt.get("signature")) if receipt.get("signature") else None
)
inserted += 1
except Exception as e:
logger.warning(f"Skipped receipt {receipt.get('receipt_id')}: {e}")
skipped += 1
logger.info(f"Migrated {inserted} receipts, skipped {skipped}")
async def migrate_jobs_from_sqlite(self, sqlite_path: str):
"""Migrate jobs from SQLite to PostgreSQL."""
logger.info(f"Migrating jobs from {sqlite_path}")
import sqlite3
sqlite_conn = sqlite3.connect(sqlite_path)
sqlite_conn.row_factory = sqlite3.Row
cursor = sqlite_conn.cursor()
cursor.execute("SELECT * FROM jobs")
jobs = cursor.fetchall()
async with self.pool.acquire() as conn:
inserted = 0
for job in jobs:
try:
await conn.execute("""
INSERT INTO jobs (
job_id, status, prompt, model, params,
result, client_id, miner_id,
created_at, started_at, completed_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (job_id) DO UPDATE SET
status = EXCLUDED.status,
result = EXCLUDED.result,
completed_at = EXCLUDED.completed_at
""",
job["job_id"],
job["status"],
job["prompt"],
job.get("model", "llama3.2"),
json.dumps(job.get("params", {})),
job.get("result"),
job.get("client_id"),
job.get("miner_id"),
self._parse_datetime(job.get("created_at")),
self._parse_datetime(job.get("started_at")),
self._parse_datetime(job.get("completed_at"))
)
inserted += 1
except Exception as e:
logger.warning(f"Skipped job {job.get('job_id')}: {e}")
logger.info(f"Migrated {inserted} jobs")
sqlite_conn.close()
async def migrate_miners_from_json(self, json_path: str):
"""Migrate miners from JSON file to database."""
logger.info(f"Migrating miners from {json_path}")
with open(json_path) as f:
miners = json.load(f)
async with self.pool.acquire() as conn:
inserted = 0
for miner in miners:
try:
await conn.execute("""
INSERT INTO miners (
miner_id, status, capabilities, gpu_info,
endpoint, max_concurrent_jobs, score
) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (miner_id) DO UPDATE SET
status = EXCLUDED.status,
capabilities = EXCLUDED.capabilities,
gpu_info = EXCLUDED.gpu_info
""",
miner.get("miner_id"),
miner.get("status", "offline"),
miner.get("capabilities", []),
json.dumps(miner.get("gpu_info", {})),
miner.get("endpoint"),
miner.get("max_concurrent_jobs", 1),
miner.get("score", 100.0)
)
inserted += 1
except Exception as e:
logger.warning(f"Skipped miner {miner.get('miner_id')}: {e}")
logger.info(f"Migrated {inserted} miners")
async def backfill_job_history(self):
"""Backfill job history from existing jobs."""
logger.info("Backfilling job history")
async with self.pool.acquire() as conn:
# Get all completed jobs without history
jobs = await conn.fetch("""
SELECT j.job_id, j.status, j.created_at, j.started_at, j.completed_at
FROM jobs j
LEFT JOIN job_history h ON j.job_id = h.job_id
WHERE h.id IS NULL AND j.status IN ('completed', 'failed')
""")
inserted = 0
for job in jobs:
events = []
if job["created_at"]:
events.append(("created", job["created_at"], {}))
if job["started_at"]:
events.append(("started", job["started_at"], {}))
if job["completed_at"]:
events.append((job["status"], job["completed_at"], {}))
for event_type, timestamp, data in events:
await conn.execute("""
INSERT INTO job_history (job_id, event_type, event_data, created_at)
VALUES ($1, $2, $3, $4)
""", job["job_id"], event_type, json.dumps(data), timestamp)
inserted += 1
logger.info(f"Backfilled {inserted} history events")
async def cleanup_orphaned_receipts(self):
"""Remove receipts without corresponding jobs."""
logger.info("Cleaning up orphaned receipts")
async with self.pool.acquire() as conn:
result = await conn.execute("""
DELETE FROM receipts r
WHERE NOT EXISTS (
SELECT 1 FROM jobs j WHERE j.job_id = r.job_id
)
""")
logger.info(f"Removed orphaned receipts: {result}")
async def update_miner_stats(self):
"""Recalculate miner statistics from receipts."""
logger.info("Updating miner statistics")
async with self.pool.acquire() as conn:
await conn.execute("""
UPDATE miners m SET
jobs_completed = (
SELECT COUNT(*) FROM receipts r WHERE r.provider = m.miner_id
),
score = LEAST(100, 70 + (
SELECT COUNT(*) FROM receipts r WHERE r.provider = m.miner_id
) * 0.1)
""")
logger.info("Miner statistics updated")
def _parse_datetime(self, value) -> datetime:
"""Parse datetime from various formats."""
if value is None:
return None
if isinstance(value, datetime):
return value
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value)
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except (ValueError, AttributeError):
return None
async def main():
parser = argparse.ArgumentParser(description="Data migration for Coordinator API")
parser.add_argument("--action", required=True,
choices=["migrate_receipts", "migrate_jobs", "migrate_miners",
"backfill_history", "cleanup", "update_stats", "all"])
parser.add_argument("--database-url", default="postgresql://aitbc:aitbc@localhost:5432/coordinator")
parser.add_argument("--input-file", help="Input file for migration")
args = parser.parse_args()
migration = DataMigration(args.database_url)
await migration.connect()
try:
if args.action == "migrate_receipts":
await migration.migrate_receipts_from_json(args.input_file)
elif args.action == "migrate_jobs":
await migration.migrate_jobs_from_sqlite(args.input_file)
elif args.action == "migrate_miners":
await migration.migrate_miners_from_json(args.input_file)
elif args.action == "backfill_history":
await migration.backfill_job_history()
elif args.action == "cleanup":
await migration.cleanup_orphaned_receipts()
elif args.action == "update_stats":
await migration.update_miner_stats()
elif args.action == "all":
await migration.backfill_job_history()
await migration.cleanup_orphaned_receipts()
await migration.update_miner_stats()
finally:
await migration.close()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,86 @@
# Coordinator API Migrations
Database migration scripts for the Coordinator API.
## Files
| File | Description |
|------|-------------|
| `001_initial_schema.sql` | Initial database schema (tables) |
| `002_indexes.sql` | Performance indexes |
| `003_data_migration.py` | Data migration utilities |
## Running Migrations
### Prerequisites
- PostgreSQL 14+
- Python 3.10+ (for data migrations)
- `asyncpg` package
### Apply Schema
```bash
# Connect to database
psql -h localhost -U aitbc -d coordinator
# Run migrations in order
\i 001_initial_schema.sql
\i 002_indexes.sql
```
### Run Data Migrations
```bash
# Install dependencies
pip install asyncpg
# Backfill job history
python 003_data_migration.py --action=backfill_history
# Update miner statistics
python 003_data_migration.py --action=update_stats
# Run all maintenance tasks
python 003_data_migration.py --action=all
# Migrate from SQLite
python 003_data_migration.py --action=migrate_jobs --input-file=/path/to/jobs.db
# Migrate receipts from JSON
python 003_data_migration.py --action=migrate_receipts --input-file=/path/to/receipts.json
```
## Schema Overview
### Tables
- **jobs** - AI compute jobs
- **miners** - Registered GPU miners
- **receipts** - Cryptographic receipts
- **blocks** - Blockchain blocks
- **transactions** - On-chain transactions
- **api_keys** - API authentication
- **job_history** - Event history for analytics
### Key Indexes
- `idx_jobs_pending` - Fast pending job lookup
- `idx_miners_available` - Available miner selection
- `idx_receipts_provider_created` - Miner receipt history
- `idx_receipts_client_created` - Client receipt history
## Rollback
To rollback migrations:
```sql
-- Drop all tables (DESTRUCTIVE)
DROP TABLE IF EXISTS job_history CASCADE;
DROP TABLE IF EXISTS api_keys CASCADE;
DROP TABLE IF EXISTS transactions CASCADE;
DROP TABLE IF EXISTS blocks CASCADE;
DROP TABLE IF EXISTS receipts CASCADE;
DROP TABLE IF EXISTS miners CASCADE;
DROP TABLE IF EXISTS jobs CASCADE;
```

View File

@@ -0,0 +1,5 @@
"""Miner Registry for Pool Hub"""
from .miner_registry import MinerRegistry
__all__ = ["MinerRegistry"]

View File

@@ -0,0 +1,325 @@
"""Miner Registry Implementation"""
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
from dataclasses import dataclass, field
import asyncio
@dataclass
class MinerInfo:
"""Miner information"""
miner_id: str
pool_id: str
capabilities: List[str]
gpu_info: Dict[str, Any]
endpoint: Optional[str]
max_concurrent_jobs: int
status: str = "available"
current_jobs: int = 0
score: float = 100.0
jobs_completed: int = 0
jobs_failed: int = 0
uptime_percent: float = 100.0
registered_at: datetime = field(default_factory=datetime.utcnow)
last_heartbeat: datetime = field(default_factory=datetime.utcnow)
gpu_utilization: float = 0.0
memory_used_gb: float = 0.0
@dataclass
class PoolInfo:
"""Pool information"""
pool_id: str
name: str
description: Optional[str]
operator: str
fee_percent: float
min_payout: float
payout_schedule: str
miner_count: int = 0
total_hashrate: float = 0.0
jobs_completed_24h: int = 0
earnings_24h: float = 0.0
created_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class JobAssignment:
"""Job assignment record"""
job_id: str
miner_id: str
pool_id: str
model: str
status: str = "assigned"
assigned_at: datetime = field(default_factory=datetime.utcnow)
deadline: Optional[datetime] = None
completed_at: Optional[datetime] = None
class MinerRegistry:
"""Registry for managing miners and pools"""
def __init__(self):
self._miners: Dict[str, MinerInfo] = {}
self._pools: Dict[str, PoolInfo] = {}
self._jobs: Dict[str, JobAssignment] = {}
self._lock = asyncio.Lock()
async def register(
self,
miner_id: str,
pool_id: str,
capabilities: List[str],
gpu_info: Dict[str, Any],
endpoint: Optional[str] = None,
max_concurrent_jobs: int = 1
) -> MinerInfo:
"""Register a new miner."""
async with self._lock:
if miner_id in self._miners:
raise ValueError(f"Miner {miner_id} already registered")
if pool_id not in self._pools:
raise ValueError(f"Pool {pool_id} not found")
miner = MinerInfo(
miner_id=miner_id,
pool_id=pool_id,
capabilities=capabilities,
gpu_info=gpu_info,
endpoint=endpoint,
max_concurrent_jobs=max_concurrent_jobs
)
self._miners[miner_id] = miner
self._pools[pool_id].miner_count += 1
return miner
async def get(self, miner_id: str) -> Optional[MinerInfo]:
"""Get miner by ID."""
return self._miners.get(miner_id)
async def list(
self,
pool_id: Optional[str] = None,
status: Optional[str] = None,
capability: Optional[str] = None,
exclude_miner: Optional[str] = None,
limit: int = 50
) -> List[MinerInfo]:
"""List miners with filters."""
miners = list(self._miners.values())
if pool_id:
miners = [m for m in miners if m.pool_id == pool_id]
if status:
miners = [m for m in miners if m.status == status]
if capability:
miners = [m for m in miners if capability in m.capabilities]
if exclude_miner:
miners = [m for m in miners if m.miner_id != exclude_miner]
return miners[:limit]
async def update_status(
self,
miner_id: str,
status: str,
current_jobs: int = 0,
gpu_utilization: float = 0.0,
memory_used_gb: float = 0.0
):
"""Update miner status."""
async with self._lock:
if miner_id in self._miners:
miner = self._miners[miner_id]
miner.status = status
miner.current_jobs = current_jobs
miner.gpu_utilization = gpu_utilization
miner.memory_used_gb = memory_used_gb
miner.last_heartbeat = datetime.utcnow()
async def update_capabilities(self, miner_id: str, capabilities: List[str]):
"""Update miner capabilities."""
async with self._lock:
if miner_id in self._miners:
self._miners[miner_id].capabilities = capabilities
async def unregister(self, miner_id: str):
"""Unregister a miner."""
async with self._lock:
if miner_id in self._miners:
pool_id = self._miners[miner_id].pool_id
del self._miners[miner_id]
if pool_id in self._pools:
self._pools[pool_id].miner_count -= 1
# Pool management
async def create_pool(
self,
pool_id: str,
name: str,
operator: str,
description: Optional[str] = None,
fee_percent: float = 1.0,
min_payout: float = 10.0,
payout_schedule: str = "daily"
) -> PoolInfo:
"""Create a new pool."""
async with self._lock:
if pool_id in self._pools:
raise ValueError(f"Pool {pool_id} already exists")
pool = PoolInfo(
pool_id=pool_id,
name=name,
description=description,
operator=operator,
fee_percent=fee_percent,
min_payout=min_payout,
payout_schedule=payout_schedule
)
self._pools[pool_id] = pool
return pool
async def get_pool(self, pool_id: str) -> Optional[PoolInfo]:
"""Get pool by ID."""
return self._pools.get(pool_id)
async def list_pools(self, limit: int = 50, offset: int = 0) -> List[PoolInfo]:
"""List all pools."""
pools = list(self._pools.values())
return pools[offset:offset + limit]
async def get_pool_stats(self, pool_id: str) -> Dict[str, Any]:
"""Get pool statistics."""
pool = self._pools.get(pool_id)
if not pool:
return {}
miners = await self.list(pool_id=pool_id)
active = [m for m in miners if m.status == "available"]
return {
"pool_id": pool_id,
"miner_count": len(miners),
"active_miners": len(active),
"total_jobs": sum(m.jobs_completed for m in miners),
"jobs_24h": pool.jobs_completed_24h,
"total_earnings": 0.0, # TODO: Calculate from receipts
"earnings_24h": pool.earnings_24h,
"avg_response_time_ms": 0.0, # TODO: Calculate
"uptime_percent": sum(m.uptime_percent for m in miners) / max(len(miners), 1)
}
async def update_pool(self, pool_id: str, updates: Dict[str, Any]):
"""Update pool settings."""
async with self._lock:
if pool_id in self._pools:
pool = self._pools[pool_id]
for key, value in updates.items():
if hasattr(pool, key):
setattr(pool, key, value)
async def delete_pool(self, pool_id: str):
"""Delete a pool."""
async with self._lock:
if pool_id in self._pools:
del self._pools[pool_id]
# Job management
async def assign_job(
self,
job_id: str,
miner_id: str,
deadline: Optional[datetime] = None
) -> JobAssignment:
"""Assign a job to a miner."""
async with self._lock:
miner = self._miners.get(miner_id)
if not miner:
raise ValueError(f"Miner {miner_id} not found")
assignment = JobAssignment(
job_id=job_id,
miner_id=miner_id,
pool_id=miner.pool_id,
model="", # Set by caller
deadline=deadline
)
self._jobs[job_id] = assignment
miner.current_jobs += 1
if miner.current_jobs >= miner.max_concurrent_jobs:
miner.status = "busy"
return assignment
async def complete_job(
self,
job_id: str,
miner_id: str,
status: str,
metrics: Dict[str, Any] = None
):
"""Mark a job as complete."""
async with self._lock:
if job_id in self._jobs:
job = self._jobs[job_id]
job.status = status
job.completed_at = datetime.utcnow()
if miner_id in self._miners:
miner = self._miners[miner_id]
miner.current_jobs = max(0, miner.current_jobs - 1)
if status == "completed":
miner.jobs_completed += 1
else:
miner.jobs_failed += 1
if miner.current_jobs < miner.max_concurrent_jobs:
miner.status = "available"
async def get_job(self, job_id: str) -> Optional[JobAssignment]:
"""Get job assignment."""
return self._jobs.get(job_id)
async def get_pending_jobs(
self,
pool_id: Optional[str] = None,
limit: int = 50
) -> List[JobAssignment]:
"""Get pending jobs."""
jobs = [j for j in self._jobs.values() if j.status == "assigned"]
if pool_id:
jobs = [j for j in jobs if j.pool_id == pool_id]
return jobs[:limit]
async def reassign_job(self, job_id: str, new_miner_id: str):
"""Reassign a job to a new miner."""
async with self._lock:
if job_id not in self._jobs:
raise ValueError(f"Job {job_id} not found")
job = self._jobs[job_id]
old_miner_id = job.miner_id
# Update old miner
if old_miner_id in self._miners:
self._miners[old_miner_id].current_jobs -= 1
# Update job
job.miner_id = new_miner_id
job.status = "assigned"
job.assigned_at = datetime.utcnow()
# Update new miner
if new_miner_id in self._miners:
miner = self._miners[new_miner_id]
miner.current_jobs += 1
job.pool_id = miner.pool_id

View File

@@ -0,0 +1,8 @@
"""Pool Hub API Routers"""
from .miners import router as miners_router
from .pools import router as pools_router
from .jobs import router as jobs_router
from .health import router as health_router
__all__ = ["miners_router", "pools_router", "jobs_router", "health_router"]

View File

@@ -0,0 +1,58 @@
"""Health check routes for Pool Hub"""
from fastapi import APIRouter
from datetime import datetime
router = APIRouter(tags=["health"])
@router.get("/health")
async def health_check():
"""Basic health check."""
return {
"status": "ok",
"service": "pool-hub",
"timestamp": datetime.utcnow().isoformat()
}
@router.get("/ready")
async def readiness_check():
"""Readiness check for Kubernetes."""
# Check dependencies
checks = {
"database": await check_database(),
"redis": await check_redis()
}
all_ready = all(checks.values())
return {
"ready": all_ready,
"checks": checks,
"timestamp": datetime.utcnow().isoformat()
}
@router.get("/live")
async def liveness_check():
"""Liveness check for Kubernetes."""
return {"live": True}
async def check_database() -> bool:
"""Check database connectivity."""
try:
# TODO: Implement actual database check
return True
except Exception:
return False
async def check_redis() -> bool:
"""Check Redis connectivity."""
try:
# TODO: Implement actual Redis check
return True
except Exception:
return False

View File

@@ -0,0 +1,184 @@
"""Job distribution routes for Pool Hub"""
from fastapi import APIRouter, HTTPException, Depends, Query
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel
from ..registry import MinerRegistry
from ..scoring import ScoringEngine
router = APIRouter(prefix="/jobs", tags=["jobs"])
class JobRequest(BaseModel):
"""Job request from coordinator"""
job_id: str
prompt: str
model: str
params: dict = {}
priority: int = 0
deadline: Optional[datetime] = None
reward: float = 0.0
class JobAssignment(BaseModel):
"""Job assignment response"""
job_id: str
miner_id: str
pool_id: str
assigned_at: datetime
deadline: Optional[datetime]
class JobResult(BaseModel):
"""Job result from miner"""
job_id: str
miner_id: str
status: str # completed, failed
result: Optional[str] = None
error: Optional[str] = None
metrics: dict = {}
def get_registry() -> MinerRegistry:
return MinerRegistry()
def get_scoring() -> ScoringEngine:
return ScoringEngine()
@router.post("/assign", response_model=JobAssignment)
async def assign_job(
job: JobRequest,
registry: MinerRegistry = Depends(get_registry),
scoring: ScoringEngine = Depends(get_scoring)
):
"""Assign a job to the best available miner."""
# Find available miners with required capability
available = await registry.list(
status="available",
capability=job.model,
limit=100
)
if not available:
raise HTTPException(
status_code=503,
detail="No miners available for this model"
)
# Score and rank miners
scored = await scoring.rank_miners(available, job)
# Select best miner
best_miner = scored[0]
# Assign job
assignment = await registry.assign_job(
job_id=job.job_id,
miner_id=best_miner.miner_id,
deadline=job.deadline
)
return JobAssignment(
job_id=job.job_id,
miner_id=best_miner.miner_id,
pool_id=best_miner.pool_id,
assigned_at=datetime.utcnow(),
deadline=job.deadline
)
@router.post("/result")
async def submit_result(
result: JobResult,
registry: MinerRegistry = Depends(get_registry),
scoring: ScoringEngine = Depends(get_scoring)
):
"""Submit job result and update miner stats."""
miner = await registry.get(result.miner_id)
if not miner:
raise HTTPException(status_code=404, detail="Miner not found")
# Update job status
await registry.complete_job(
job_id=result.job_id,
miner_id=result.miner_id,
status=result.status,
metrics=result.metrics
)
# Update miner score based on result
if result.status == "completed":
await scoring.record_success(result.miner_id, result.metrics)
else:
await scoring.record_failure(result.miner_id, result.error)
return {"status": "recorded"}
@router.get("/pending")
async def get_pending_jobs(
pool_id: Optional[str] = Query(None),
limit: int = Query(50, le=100),
registry: MinerRegistry = Depends(get_registry)
):
"""Get pending jobs waiting for assignment."""
return await registry.get_pending_jobs(pool_id=pool_id, limit=limit)
@router.get("/{job_id}")
async def get_job_status(
job_id: str,
registry: MinerRegistry = Depends(get_registry)
):
"""Get job assignment status."""
job = await registry.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@router.post("/{job_id}/reassign")
async def reassign_job(
job_id: str,
registry: MinerRegistry = Depends(get_registry),
scoring: ScoringEngine = Depends(get_scoring)
):
"""Reassign a failed or timed-out job to another miner."""
job = await registry.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job.status not in ["failed", "timeout"]:
raise HTTPException(
status_code=400,
detail="Can only reassign failed or timed-out jobs"
)
# Find new miner (exclude previous)
available = await registry.list(
status="available",
capability=job.model,
exclude_miner=job.miner_id,
limit=100
)
if not available:
raise HTTPException(
status_code=503,
detail="No alternative miners available"
)
scored = await scoring.rank_miners(available, job)
new_miner = scored[0]
await registry.reassign_job(job_id, new_miner.miner_id)
return {
"job_id": job_id,
"new_miner_id": new_miner.miner_id,
"status": "reassigned"
}

View File

@@ -0,0 +1,173 @@
"""Miner management routes for Pool Hub"""
from fastapi import APIRouter, HTTPException, Depends, Query
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel
from ..registry import MinerRegistry
from ..scoring import ScoringEngine
router = APIRouter(prefix="/miners", tags=["miners"])
class MinerRegistration(BaseModel):
"""Miner registration request"""
miner_id: str
pool_id: str
capabilities: List[str]
gpu_info: dict
endpoint: Optional[str] = None
max_concurrent_jobs: int = 1
class MinerStatus(BaseModel):
"""Miner status update"""
miner_id: str
status: str # available, busy, maintenance, offline
current_jobs: int = 0
gpu_utilization: float = 0.0
memory_used_gb: float = 0.0
class MinerInfo(BaseModel):
"""Miner information response"""
miner_id: str
pool_id: str
capabilities: List[str]
status: str
score: float
jobs_completed: int
uptime_percent: float
registered_at: datetime
last_heartbeat: datetime
# Dependency injection
def get_registry() -> MinerRegistry:
return MinerRegistry()
def get_scoring() -> ScoringEngine:
return ScoringEngine()
@router.post("/register", response_model=MinerInfo)
async def register_miner(
registration: MinerRegistration,
registry: MinerRegistry = Depends(get_registry)
):
"""Register a new miner with the pool hub."""
try:
miner = await registry.register(
miner_id=registration.miner_id,
pool_id=registration.pool_id,
capabilities=registration.capabilities,
gpu_info=registration.gpu_info,
endpoint=registration.endpoint,
max_concurrent_jobs=registration.max_concurrent_jobs
)
return miner
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/{miner_id}/heartbeat")
async def miner_heartbeat(
miner_id: str,
status: MinerStatus,
registry: MinerRegistry = Depends(get_registry)
):
"""Update miner heartbeat and status."""
miner = await registry.get(miner_id)
if not miner:
raise HTTPException(status_code=404, detail="Miner not found")
await registry.update_status(
miner_id=miner_id,
status=status.status,
current_jobs=status.current_jobs,
gpu_utilization=status.gpu_utilization,
memory_used_gb=status.memory_used_gb
)
return {"status": "ok"}
@router.get("/{miner_id}", response_model=MinerInfo)
async def get_miner(
miner_id: str,
registry: MinerRegistry = Depends(get_registry)
):
"""Get miner information."""
miner = await registry.get(miner_id)
if not miner:
raise HTTPException(status_code=404, detail="Miner not found")
return miner
@router.get("/", response_model=List[MinerInfo])
async def list_miners(
pool_id: Optional[str] = Query(None),
status: Optional[str] = Query(None),
capability: Optional[str] = Query(None),
limit: int = Query(50, le=100),
registry: MinerRegistry = Depends(get_registry)
):
"""List miners with optional filters."""
return await registry.list(
pool_id=pool_id,
status=status,
capability=capability,
limit=limit
)
@router.delete("/{miner_id}")
async def unregister_miner(
miner_id: str,
registry: MinerRegistry = Depends(get_registry)
):
"""Unregister a miner from the pool hub."""
miner = await registry.get(miner_id)
if not miner:
raise HTTPException(status_code=404, detail="Miner not found")
await registry.unregister(miner_id)
return {"status": "unregistered"}
@router.get("/{miner_id}/score")
async def get_miner_score(
miner_id: str,
registry: MinerRegistry = Depends(get_registry),
scoring: ScoringEngine = Depends(get_scoring)
):
"""Get miner's current score and ranking."""
miner = await registry.get(miner_id)
if not miner:
raise HTTPException(status_code=404, detail="Miner not found")
score = await scoring.calculate_score(miner)
rank = await scoring.get_rank(miner_id)
return {
"miner_id": miner_id,
"score": score,
"rank": rank,
"components": await scoring.get_score_breakdown(miner)
}
@router.post("/{miner_id}/capabilities")
async def update_capabilities(
miner_id: str,
capabilities: List[str],
registry: MinerRegistry = Depends(get_registry)
):
"""Update miner capabilities."""
miner = await registry.get(miner_id)
if not miner:
raise HTTPException(status_code=404, detail="Miner not found")
await registry.update_capabilities(miner_id, capabilities)
return {"status": "updated", "capabilities": capabilities}

View File

@@ -0,0 +1,164 @@
"""Pool management routes for Pool Hub"""
from fastapi import APIRouter, HTTPException, Depends, Query
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel
from ..registry import MinerRegistry
router = APIRouter(prefix="/pools", tags=["pools"])
class PoolCreate(BaseModel):
"""Pool creation request"""
pool_id: str
name: str
description: Optional[str] = None
operator: str
fee_percent: float = 1.0
min_payout: float = 10.0
payout_schedule: str = "daily" # daily, weekly, threshold
class PoolInfo(BaseModel):
"""Pool information response"""
pool_id: str
name: str
description: Optional[str]
operator: str
fee_percent: float
min_payout: float
payout_schedule: str
miner_count: int
total_hashrate: float
jobs_completed_24h: int
earnings_24h: float
created_at: datetime
class PoolStats(BaseModel):
"""Pool statistics"""
pool_id: str
miner_count: int
active_miners: int
total_jobs: int
jobs_24h: int
total_earnings: float
earnings_24h: float
avg_response_time_ms: float
uptime_percent: float
def get_registry() -> MinerRegistry:
return MinerRegistry()
@router.post("/", response_model=PoolInfo)
async def create_pool(
pool: PoolCreate,
registry: MinerRegistry = Depends(get_registry)
):
"""Create a new mining pool."""
try:
created = await registry.create_pool(
pool_id=pool.pool_id,
name=pool.name,
description=pool.description,
operator=pool.operator,
fee_percent=pool.fee_percent,
min_payout=pool.min_payout,
payout_schedule=pool.payout_schedule
)
return created
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/{pool_id}", response_model=PoolInfo)
async def get_pool(
pool_id: str,
registry: MinerRegistry = Depends(get_registry)
):
"""Get pool information."""
pool = await registry.get_pool(pool_id)
if not pool:
raise HTTPException(status_code=404, detail="Pool not found")
return pool
@router.get("/", response_model=List[PoolInfo])
async def list_pools(
limit: int = Query(50, le=100),
offset: int = Query(0),
registry: MinerRegistry = Depends(get_registry)
):
"""List all pools."""
return await registry.list_pools(limit=limit, offset=offset)
@router.get("/{pool_id}/stats", response_model=PoolStats)
async def get_pool_stats(
pool_id: str,
registry: MinerRegistry = Depends(get_registry)
):
"""Get pool statistics."""
pool = await registry.get_pool(pool_id)
if not pool:
raise HTTPException(status_code=404, detail="Pool not found")
return await registry.get_pool_stats(pool_id)
@router.get("/{pool_id}/miners")
async def get_pool_miners(
pool_id: str,
status: Optional[str] = Query(None),
limit: int = Query(50, le=100),
registry: MinerRegistry = Depends(get_registry)
):
"""Get miners in a pool."""
pool = await registry.get_pool(pool_id)
if not pool:
raise HTTPException(status_code=404, detail="Pool not found")
return await registry.list(pool_id=pool_id, status=status, limit=limit)
@router.put("/{pool_id}")
async def update_pool(
pool_id: str,
updates: dict,
registry: MinerRegistry = Depends(get_registry)
):
"""Update pool settings."""
pool = await registry.get_pool(pool_id)
if not pool:
raise HTTPException(status_code=404, detail="Pool not found")
allowed_fields = ["name", "description", "fee_percent", "min_payout", "payout_schedule"]
filtered = {k: v for k, v in updates.items() if k in allowed_fields}
await registry.update_pool(pool_id, filtered)
return {"status": "updated"}
@router.delete("/{pool_id}")
async def delete_pool(
pool_id: str,
registry: MinerRegistry = Depends(get_registry)
):
"""Delete a pool (must have no miners)."""
pool = await registry.get_pool(pool_id)
if not pool:
raise HTTPException(status_code=404, detail="Pool not found")
miners = await registry.list(pool_id=pool_id, limit=1)
if miners:
raise HTTPException(
status_code=409,
detail="Cannot delete pool with active miners"
)
await registry.delete_pool(pool_id)
return {"status": "deleted"}

View File

@@ -0,0 +1,5 @@
"""Scoring Engine for Pool Hub"""
from .scoring_engine import ScoringEngine
__all__ = ["ScoringEngine"]

View File

@@ -0,0 +1,239 @@
"""Scoring Engine Implementation for Pool Hub"""
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import math
@dataclass
class ScoreComponents:
"""Breakdown of miner score components"""
reliability: float # Based on uptime and success rate
performance: float # Based on response time and throughput
capacity: float # Based on GPU specs and availability
reputation: float # Based on historical performance
total: float
class ScoringEngine:
"""Engine for scoring and ranking miners"""
# Scoring weights
WEIGHT_RELIABILITY = 0.35
WEIGHT_PERFORMANCE = 0.30
WEIGHT_CAPACITY = 0.20
WEIGHT_REPUTATION = 0.15
# Thresholds
MIN_JOBS_FOR_RANKING = 10
DECAY_HALF_LIFE_DAYS = 7
def __init__(self):
self._score_cache: Dict[str, float] = {}
self._rank_cache: Dict[str, int] = {}
self._history: Dict[str, List[Dict]] = {}
async def calculate_score(self, miner) -> float:
"""Calculate overall score for a miner."""
components = await self.get_score_breakdown(miner)
return components.total
async def get_score_breakdown(self, miner) -> ScoreComponents:
"""Get detailed score breakdown for a miner."""
reliability = self._calculate_reliability(miner)
performance = self._calculate_performance(miner)
capacity = self._calculate_capacity(miner)
reputation = self._calculate_reputation(miner)
total = (
reliability * self.WEIGHT_RELIABILITY +
performance * self.WEIGHT_PERFORMANCE +
capacity * self.WEIGHT_CAPACITY +
reputation * self.WEIGHT_REPUTATION
)
return ScoreComponents(
reliability=reliability,
performance=performance,
capacity=capacity,
reputation=reputation,
total=total
)
def _calculate_reliability(self, miner) -> float:
"""Calculate reliability score (0-100)."""
# Uptime component (50%)
uptime_score = miner.uptime_percent
# Success rate component (50%)
total_jobs = miner.jobs_completed + miner.jobs_failed
if total_jobs > 0:
success_rate = (miner.jobs_completed / total_jobs) * 100
else:
success_rate = 100.0 # New miners start with perfect score
# Heartbeat freshness penalty
heartbeat_age = (datetime.utcnow() - miner.last_heartbeat).total_seconds()
if heartbeat_age > 300: # 5 minutes
freshness_penalty = min(20, heartbeat_age / 60)
else:
freshness_penalty = 0
score = (uptime_score * 0.5 + success_rate * 0.5) - freshness_penalty
return max(0, min(100, score))
def _calculate_performance(self, miner) -> float:
"""Calculate performance score (0-100)."""
# Base score from GPU utilization efficiency
if miner.gpu_utilization > 0:
# Optimal utilization is 60-80%
if 60 <= miner.gpu_utilization <= 80:
utilization_score = 100
elif miner.gpu_utilization < 60:
utilization_score = 70 + (miner.gpu_utilization / 60) * 30
else:
utilization_score = 100 - (miner.gpu_utilization - 80) * 2
else:
utilization_score = 50 # Unknown utilization
# Jobs per hour (if we had timing data)
throughput_score = min(100, miner.jobs_completed / max(1, self._get_hours_active(miner)) * 10)
return (utilization_score * 0.6 + throughput_score * 0.4)
def _calculate_capacity(self, miner) -> float:
"""Calculate capacity score (0-100)."""
gpu_info = miner.gpu_info or {}
# GPU memory score
memory_gb = self._parse_memory(gpu_info.get("memory", "0"))
memory_score = min(100, memory_gb * 4) # 24GB = 96 points
# Concurrent job capacity
capacity_score = min(100, miner.max_concurrent_jobs * 25)
# Current availability
if miner.current_jobs < miner.max_concurrent_jobs:
availability = ((miner.max_concurrent_jobs - miner.current_jobs) /
miner.max_concurrent_jobs) * 100
else:
availability = 0
return (memory_score * 0.4 + capacity_score * 0.3 + availability * 0.3)
def _calculate_reputation(self, miner) -> float:
"""Calculate reputation score (0-100)."""
# New miners start at 70
if miner.jobs_completed < self.MIN_JOBS_FOR_RANKING:
return 70.0
# Historical success with time decay
history = self._history.get(miner.miner_id, [])
if not history:
return miner.score # Use stored score
weighted_sum = 0
weight_total = 0
for record in history:
age_days = (datetime.utcnow() - record["timestamp"]).days
weight = math.exp(-age_days / self.DECAY_HALF_LIFE_DAYS)
if record["success"]:
weighted_sum += 100 * weight
else:
weighted_sum += 0 * weight
weight_total += weight
if weight_total > 0:
return weighted_sum / weight_total
return 70.0
def _get_hours_active(self, miner) -> float:
"""Get hours since miner registered."""
delta = datetime.utcnow() - miner.registered_at
return max(1, delta.total_seconds() / 3600)
def _parse_memory(self, memory_str: str) -> float:
"""Parse memory string to GB."""
try:
if isinstance(memory_str, (int, float)):
return float(memory_str)
memory_str = str(memory_str).upper()
if "GB" in memory_str:
return float(memory_str.replace("GB", "").strip())
if "MB" in memory_str:
return float(memory_str.replace("MB", "").strip()) / 1024
return float(memory_str)
except (ValueError, TypeError):
return 0.0
async def rank_miners(self, miners: List, job: Any = None) -> List:
"""Rank miners by score, optionally considering job requirements."""
scored = []
for miner in miners:
score = await self.calculate_score(miner)
# Bonus for matching capabilities
if job and hasattr(job, 'model'):
if job.model in miner.capabilities:
score += 5
# Penalty for high current load
if miner.current_jobs > 0:
load_ratio = miner.current_jobs / miner.max_concurrent_jobs
score -= load_ratio * 10
scored.append((miner, score))
# Sort by score descending
scored.sort(key=lambda x: x[1], reverse=True)
return [m for m, s in scored]
async def get_rank(self, miner_id: str) -> int:
"""Get miner's current rank."""
return self._rank_cache.get(miner_id, 0)
async def record_success(self, miner_id: str, metrics: Dict[str, Any] = None):
"""Record a successful job completion."""
if miner_id not in self._history:
self._history[miner_id] = []
self._history[miner_id].append({
"timestamp": datetime.utcnow(),
"success": True,
"metrics": metrics or {}
})
# Keep last 1000 records
if len(self._history[miner_id]) > 1000:
self._history[miner_id] = self._history[miner_id][-1000:]
async def record_failure(self, miner_id: str, error: Optional[str] = None):
"""Record a job failure."""
if miner_id not in self._history:
self._history[miner_id] = []
self._history[miner_id].append({
"timestamp": datetime.utcnow(),
"success": False,
"error": error
})
async def update_rankings(self, miners: List):
"""Update global rankings for all miners."""
scored = []
for miner in miners:
score = await self.calculate_score(miner)
scored.append((miner.miner_id, score))
scored.sort(key=lambda x: x[1], reverse=True)
for rank, (miner_id, score) in enumerate(scored, 1):
self._rank_cache[miner_id] = rank
self._score_cache[miner_id] = score