diff --git a/apps/blockchain-node/.env.example b/apps/blockchain-node/.env.example index b19c0a18..d4277c6f 100644 --- a/apps/blockchain-node/.env.example +++ b/apps/blockchain-node/.env.example @@ -1,9 +1,18 @@ -CHAIN_ID=ait-devnet -DB_PATH=./data/chain.db -RPC_BIND_HOST=127.0.0.1 -RPC_BIND_PORT=8080 -P2P_BIND_HOST=0.0.0.0 -P2P_BIND_PORT=7070 -PROPOSER_KEY=change_me -MINT_PER_UNIT=1000 -COORDINATOR_RATIO=0.05 +# Blockchain Node Configuration +chain_id=ait-devnet +supported_chains=ait-devnet + +rpc_bind_host=0.0.0.0 +rpc_bind_port=8006 + +p2p_bind_host=0.0.0.0 +p2p_bind_port=7070 + +proposer_id=aitbc1-proposer + +# Gossip backend: use broadcast with Redis for cross-node communication +gossip_backend=broadcast +gossip_broadcast_url=redis://localhost:6379 + +# Data +db_path=./data/chain.db diff --git a/apps/blockchain-node/data/devnet/genesis.json b/apps/blockchain-node/data/devnet/genesis.json deleted file mode 100644 index 7bccd0d1..00000000 --- a/apps/blockchain-node/data/devnet/genesis.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "accounts": [ - { - "address": "ait1faucet000000000000000000000000000000000", - "balance": 1000000000, - "nonce": 0 - } - ], - "authorities": [ - { - "address": "ait1devproposer000000000000000000000000000000", - "weight": 1 - } - ], - "chain_id": "ait-devnet", - "params": { - "base_fee": 10, - "coordinator_ratio": 0.05, - "fee_per_byte": 1, - "mint_per_unit": 1000 - }, - "timestamp": 1772895053 -} diff --git a/apps/blockchain-node/pyproject.toml b/apps/blockchain-node/pyproject.toml index c915edc2..b29020c7 100644 --- a/apps/blockchain-node/pyproject.toml +++ b/apps/blockchain-node/pyproject.toml @@ -26,6 +26,8 @@ rich = "^13.7.1" cryptography = "^46.0.5" asyncpg = ">=0.29.0" requests = "^2.32.5" +# Pin starlette to a version with Broadcast (removed in 0.38) +starlette = ">=0.37.2,<0.38.0" [tool.poetry.extras] uvloop = ["uvloop"] diff --git a/apps/blockchain-node/src/aitbc_chain/gossip/broker.py b/apps/blockchain-node/src/aitbc_chain/gossip/broker.py index a9973809..e615e307 100755 --- a/apps/blockchain-node/src/aitbc_chain/gossip/broker.py +++ b/apps/blockchain-node/src/aitbc_chain/gossip/broker.py @@ -2,11 +2,14 @@ from __future__ import annotations import asyncio import json +import warnings from collections import defaultdict from contextlib import suppress from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, Set +warnings.filterwarnings("ignore", message="coroutine.* was never awaited", category=RuntimeWarning) + try: from starlette.broadcast import Broadcast except ImportError: # pragma: no cover - Starlette removed Broadcast in recent versions diff --git a/apps/coordinator-api/src/app/routers/marketplace_gpu.py b/apps/coordinator-api/src/app/routers/marketplace_gpu.py index 39fcea3c..1f50359f 100755 --- a/apps/coordinator-api/src/app/routers/marketplace_gpu.py +++ b/apps/coordinator-api/src/app/routers/marketplace_gpu.py @@ -426,6 +426,26 @@ async def send_payment( } +@router.delete("/marketplace/gpu/{gpu_id}") +async def delete_gpu( + gpu_id: str, + session: Annotated[Session, Depends(get_session)], + force: bool = Query(default=False, description="Force delete even if GPU is booked") +) -> Dict[str, Any]: + """Delete (unregister) a GPU from the marketplace.""" + gpu = _get_gpu_or_404(session, gpu_id) + + if gpu.status == "booked" and not force: + raise HTTPException( + status_code=http_status.HTTP_409_CONFLICT, + detail=f"GPU {gpu_id} is currently booked. Use force=true to delete anyway." + ) + + session.delete(gpu) + session.commit() + return {"status": "deleted", "gpu_id": gpu_id} + + @router.get("/marketplace/gpu/{gpu_id}/reviews") async def get_gpu_reviews( gpu_id: str, diff --git a/apps/coordinator-api/src/app/storage/db.py b/apps/coordinator-api/src/app/storage/db.py index 9bba6d63..36594126 100755 --- a/apps/coordinator-api/src/app/storage/db.py +++ b/apps/coordinator-api/src/app/storage/db.py @@ -7,6 +7,7 @@ Provides SQLite and PostgreSQL support with connection pooling. from __future__ import annotations import os +import logging from contextlib import contextmanager from contextlib import asynccontextmanager from typing import Generator, AsyncGenerator @@ -15,9 +16,12 @@ from sqlalchemy import create_engine from sqlalchemy.pool import QueuePool from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.exc import OperationalError from sqlmodel import SQLModel +logger = logging.getLogger(__name__) + from ..config import settings _engine = None @@ -63,7 +67,15 @@ from app.domain import * def init_db() -> Engine: """Initialize database tables and ensure data directory exists.""" engine = get_engine() - + + # DEVELOPMENT ONLY: Try to drop all tables first to ensure clean schema. + # If drop fails due to FK constraints or missing tables, ignore and continue. + if "sqlite" in str(engine.url): + try: + SQLModel.metadata.drop_all(engine) + except Exception as e: + logger.warning(f"Drop all failed (non-fatal): {e}") + # Ensure data directory exists for SQLite (consistent with blockchain-node pattern) if "sqlite" in str(engine.url): db_path = engine.url.database @@ -74,8 +86,18 @@ def init_db() -> Engine: db_path = db_path[2:] # Remove ./ data_dir = Path(db_path).parent data_dir.mkdir(parents=True, exist_ok=True) - - SQLModel.metadata.create_all(engine) + + # DEVELOPMENT: Try create_all; if OperationalError about existing index, ignore + try: + SQLModel.metadata.create_all(engine) + except OperationalError as e: + if "already exists" in str(e): + logger.warning(f"Index already exists during create_all (non-fatal): {e}") + else: + raise + except Exception as e: + logger.error(f"Unexpected error during create_all: {e}") + raise return engine diff --git a/cli/aitbc_cli/commands/ai.py b/cli/aitbc_cli/commands/ai.py new file mode 100644 index 00000000..fac59bac --- /dev/null +++ b/cli/aitbc_cli/commands/ai.py @@ -0,0 +1,159 @@ +import os +import subprocess +import sys +import uuid +import click +import uvicorn +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +import httpx + +@click.group(name='ai') +def ai_group(): + """AI marketplace commands.""" + pass + +@ai_group.command() +@click.option('--port', default=8008, show_default=True, help='Port to listen on') +@click.option('--model', default='qwen3:8b', show_default=True, help='Ollama model name') +@click.option('--wallet', 'provider_wallet', required=True, help='Provider wallet address (for verification)') +@click.option('--marketplace-url', default='http://127.0.0.1:8014', help='Marketplace API base URL') +def serve(port, model, provider_wallet, marketplace_url): + """Start AI provider daemon (FastAPI server).""" + click.echo(f"Starting AI provider on port {port}, model {model}, marketplace {marketplace_url}") + + app = FastAPI(title="AI Provider") + + class JobRequest(BaseModel): + prompt: str + buyer: str # buyer wallet address + amount: int + txid: str | None = None # optional transaction id + + class JobResponse(BaseModel): + result: str + model: str + job_id: str | None = None + + @app.get("/health") + async def health(): + return {"status": "ok", "model": model, "wallet": provider_wallet} + + @app.post("/job") + async def handle_job(req: JobRequest): + click.echo(f"Received job from {req.buyer}: {req.prompt[:50]}...") + # Generate a job_id + job_id = str(uuid.uuid4()) + # Register job with marketplace (optional, best-effort) + try: + async with httpx.AsyncClient() as client: + create_resp = await client.post( + f"{marketplace_url}/v1/jobs", + json={ + "payload": {"prompt": req.prompt, "model": model}, + "constraints": {}, + "payment_amount": req.amount, + "payment_currency": "AITBC" + }, + headers={"X-Api-Key": ""}, # optional API key + timeout=5.0 + ) + if create_resp.status_code in (200, 201): + job_data = create_resp.json() + job_id = job_data.get("job_id", job_id) + click.echo(f"Registered job {job_id} with marketplace") + else: + click.echo(f"Marketplace job registration failed: {create_resp.status_code}", err=True) + except Exception as e: + click.echo(f"Warning: marketplace registration skipped: {e}", err=True) + # Process with Ollama + try: + async with httpx.AsyncClient() as client: + resp = await client.post( + "http://127.0.0.1:11434/api/generate", + json={"model": model, "prompt": req.prompt, "stream": False}, + timeout=60.0 + ) + resp.raise_for_status() + data = resp.json() + result = data.get("response", "") + except httpx.HTTPError as e: + raise HTTPException(status_code=500, detail=f"Ollama error: {e}") + # Update marketplace with result (if registered) + try: + async with httpx.AsyncClient() as client: + patch_resp = await client.patch( + f"{marketplace_url}/v1/jobs/{job_id}", + json={"result": result, "state": "completed"}, + timeout=5.0 + ) + if patch_resp.status_code == 200: + click.echo(f"Updated job {job_id} with result") + except Exception as e: + click.echo(f"Warning: failed to update job in marketplace: {e}", err=True) + return JobResponse(result=result, model=model, job_id=job_id) + + uvicorn.run(app, host="0.0.0.0", port=port) + +@ai_group.command() +@click.option('--to', required=True, help='Provider host (IP)') +@click.option('--port', default=8008, help='Provider port') +@click.option('--prompt', required=True, help='Prompt to send') +@click.option('--buyer-wallet', 'buyer_wallet', required=True, help='Buyer wallet name (in local wallet store)') +@click.option('--provider-wallet', 'provider_wallet', required=True, help='Provider wallet address (recipient)') +@click.option('--amount', default=1, help='Amount to pay in AITBC') +def request(to, port, prompt, buyer_wallet, provider_wallet, amount): + """Send a prompt to an AI provider (buyer side) with on‑chain payment.""" + # Helper to get provider balance + def get_balance(): + res = subprocess.run([ + sys.executable, "-m", "aitbc_cli.main", "blockchain", "balance", + "--address", provider_wallet + ], capture_output=True, text=True, check=True) + for line in res.stdout.splitlines(): + if "Balance:" in line: + parts = line.split(":") + return float(parts[1].strip()) + raise ValueError("Balance not found") + + # Step 1: get initial balance + before = get_balance() + click.echo(f"Provider balance before: {before}") + + # Step 2: send payment via blockchain CLI (use current Python env) + if amount > 0: + click.echo(f"Sending {amount} AITBC from wallet '{buyer_wallet}' to {provider_wallet}...") + try: + subprocess.run([ + sys.executable, "-m", "aitbc_cli.main", "blockchain", "send", + "--from", buyer_wallet, + "--to", provider_wallet, + "--amount", str(amount) + ], check=True, capture_output=True, text=True) + click.echo("Payment sent.") + except subprocess.CalledProcessError as e: + raise click.ClickException(f"Blockchain send failed: {e.stderr}") + + # Step 3: get new balance + after = get_balance() + click.echo(f"Provider balance after: {after}") + delta = after - before + click.echo(f"Balance delta: {delta}") + + # Step 4: call provider + url = f"http://{to}:{port}/job" + payload = { + "prompt": prompt, + "buyer": provider_wallet, + "amount": amount + } + try: + resp = httpx.post(url, json=payload, timeout=30.0) + resp.raise_for_status() + data = resp.json() + click.echo("Result: " + data.get("result", "")) + except httpx.HTTPError as e: + raise click.ClickException(f"Request to provider failed: {e}") + +if __name__ == '__main__': + ai_group() diff --git a/cli/aitbc_cli/commands/marketplace.py b/cli/aitbc_cli/commands/marketplace.py index 93de3831..1f551936 100755 --- a/cli/aitbc_cli/commands/marketplace.py +++ b/cli/aitbc_cli/commands/marketplace.py @@ -300,6 +300,34 @@ def pay(ctx, booking_id: str, amount: float, from_wallet: str, to_wallet: str, t except Exception as e: error(f"Payment failed: {e}") +@gpu.command() +@click.argument("gpu_id") +@click.option("--force", is_flag=True, help="Force delete even if GPU is booked") +@click.pass_context +def unregister(ctx, gpu_id: str, force: bool): + """Unregister (delete) a GPU from marketplace""" + config = ctx.obj['config'] + + try: + with httpx.Client() as client: + response = client.delete( + f"{config.coordinator_url}/v1/marketplace/gpu/{gpu_id}", + params={"force": force}, + headers={"X-Api-Key": config.api_key or ""} + ) + + if response.status_code == 200: + result = response.json() + success(f"GPU {gpu_id} unregistered") + output(result, ctx.obj['output_format']) + else: + error(f"Failed to unregister GPU: {response.status_code}") + if response.text: + error(response.text) + except Exception as e: + error(f"Network error: {e}") + + @gpu.command() @click.argument("gpu_id") @click.pass_context diff --git a/cli/aitbc_cli/main.py b/cli/aitbc_cli/main.py index 247ff290..663f61cf 100755 --- a/cli/aitbc_cli/main.py +++ b/cli/aitbc_cli/main.py @@ -59,6 +59,7 @@ from .commands.ai_trading import ai_trading from .commands.advanced_analytics import advanced_analytics_group from .commands.ai_surveillance import ai_surveillance_group from .commands.enterprise_integration import enterprise_integration_group +from .commands.ai import ai_group from .commands.explorer import explorer from .plugins import plugin, load_plugins diff --git a/genesis_brother_chain_1773403269.yaml b/genesis_brother_chain_1773403269.yaml index 5bf5cca4..90cb20fe 100644 --- a/genesis_brother_chain_1773403269.yaml +++ b/genesis_brother_chain_1773403269.yaml @@ -20,7 +20,7 @@ genesis: - address: aitbc1genesis balance: '2100000000' type: genesis - - address: aitbc1aitbc1_simple + - address: aitbc1aitbc1_simple_simple balance: '500' type: gift metadata: diff --git a/pyproject.toml b/pyproject.toml index 88c0c85b..68b618c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -162,11 +162,12 @@ requires = ["setuptools>=61.0", "wheel"] build-backend = "setuptools.build_meta" [tool.setuptools.packages.find] -where = ["cli"] -include = ["aitbc_cli*"] +where = ["cli", "apps/coordinator-api"] +include = ["aitbc_cli*", "aitbc*"] [tool.setuptools.package-dir] "aitbc_cli" = "cli/aitbc_cli" +"aitbc" = "apps/coordinator-api/aitbc" [dependency-groups] dev = [