diff --git a/apps/blockchain-node/src/aitbc_chain/rpc/ai_services.py b/apps/blockchain-node/src/aitbc_chain/rpc/ai_services.py new file mode 100644 index 00000000..d03514d5 --- /dev/null +++ b/apps/blockchain-node/src/aitbc_chain/rpc/ai_services.py @@ -0,0 +1,226 @@ +"""AI Services RPC endpoints for AITBC blockchain""" + +from typing import Any, Dict, List, Optional +from fastapi import HTTPException +from pydantic import BaseModel, Field +from datetime import datetime, timedelta +import uuid + +from ..database import session_scope +from ..metrics import metrics_registry +from .router import router + + +class AIJobRequest(BaseModel): + """AI job submission request""" + wallet_address: str = Field(..., description="Client wallet address") + job_type: str = Field(..., description="Type of AI job (text, image, training, etc.)") + prompt: str = Field(..., description="AI prompt or task description") + payment: float = Field(..., ge=0, description="Payment in AIT") + parameters: Optional[Dict[str, Any]] = Field(default=None, description="Additional job parameters") + +class AIJobResponse(BaseModel): + """AI job response""" + job_id: str + status: str + wallet_address: str + job_type: str + payment: float + created_at: datetime + estimated_completion: Optional[datetime] = None + result: Optional[Dict[str, Any]] = None + +# In-memory storage for demo (in production, use database) +_ai_jobs: List[Dict[str, Any]] = [ + { + "job_id": "job_demo_001", + "wallet_address": "ait1demo_client_123...", + "job_type": "text", + "prompt": "Generate a summary of blockchain technology", + "payment": 100.0, + "status": "completed", + "created_at": (datetime.now() - timedelta(hours=1)).isoformat(), + "completed_at": (datetime.now() - timedelta(minutes=30)).isoformat(), + "result": { + "output": "Blockchain is a distributed ledger technology...", + "tokens_used": 150, + "processing_time": "2.5 minutes" + } + }, + { + "job_id": "job_demo_002", + "wallet_address": "ait1demo_client_456...", + "job_type": "image", + "prompt": "Create an image of a futuristic blockchain city", + "payment": 250.0, + "status": "processing", + "created_at": (datetime.now() - timedelta(minutes=15)).isoformat(), + "estimated_completion": (datetime.now() + timedelta(minutes=10)).isoformat() + } +] + +@router.post("/ai/submit", summary="Submit AI job", tags=["ai"]) +async def ai_submit_job(request: AIJobRequest) -> Dict[str, Any]: + """Submit a new AI job for processing""" + try: + metrics_registry.increment("rpc_ai_submit_total") + + # Generate unique job ID + job_id = f"job_{uuid.uuid4().hex[:8]}" + + # Calculate estimated completion time + estimated_completion = datetime.now() + timedelta(minutes=30) + + # Create new job + new_job = { + "job_id": job_id, + "wallet_address": request.wallet_address, + "job_type": request.job_type, + "prompt": request.prompt, + "payment": request.payment, + "parameters": request.parameters or {}, + "status": "queued", + "created_at": datetime.now().isoformat(), + "estimated_completion": estimated_completion.isoformat() + } + + # Add to storage + _ai_jobs.append(new_job) + + return { + "job_id": job_id, + "status": "queued", + "message": "AI job submitted successfully", + "estimated_completion": estimated_completion.isoformat(), + "wallet_address": request.wallet_address, + "payment": request.payment, + "job_type": request.job_type + } + + except Exception as e: + metrics_registry.increment("rpc_ai_submit_errors_total") + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/ai/jobs", summary="List AI jobs", tags=["ai"]) +async def ai_list_jobs(wallet_address: Optional[str] = None, status: Optional[str] = None) -> Dict[str, Any]: + """Get list of AI jobs, optionally filtered by wallet address or status""" + try: + metrics_registry.increment("rpc_ai_list_total") + + # Filter jobs + filtered_jobs = _ai_jobs.copy() + + if wallet_address: + filtered_jobs = [job for job in filtered_jobs if job.get("wallet_address") == wallet_address] + + if status: + filtered_jobs = [job for job in filtered_jobs if job.get("status") == status] + + # Sort by creation time (newest first) + filtered_jobs.sort(key=lambda x: x.get("created_at", ""), reverse=True) + + return { + "jobs": filtered_jobs, + "total": len(filtered_jobs), + "filters": { + "wallet_address": wallet_address, + "status": status + }, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + metrics_registry.increment("rpc_ai_list_errors_total") + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/ai/job/{job_id}", summary="Get AI job by ID", tags=["ai"]) +async def ai_get_job(job_id: str) -> Dict[str, Any]: + """Get a specific AI job by ID""" + try: + metrics_registry.increment("rpc_ai_get_total") + + # Find job + for job in _ai_jobs: + if job.get("job_id") == job_id: + return { + "job": job, + "found": True + } + + raise HTTPException(status_code=404, detail="Job not found") + + except HTTPException: + raise + except Exception as e: + metrics_registry.increment("rpc_ai_get_errors_total") + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/ai/job/{job_id}/cancel", summary="Cancel AI job", tags=["ai"]) +async def ai_cancel_job(job_id: str) -> Dict[str, Any]: + """Cancel an AI job""" + try: + metrics_registry.increment("rpc_ai_cancel_total") + + # Find and update job + for job in _ai_jobs: + if job.get("job_id") == job_id: + current_status = job.get("status") + if current_status in ["completed", "cancelled"]: + raise HTTPException( + status_code=400, + detail=f"Cannot cancel job with status: {current_status}" + ) + + job["status"] = "cancelled" + job["cancelled_at"] = datetime.now().isoformat() + + return { + "job_id": job_id, + "status": "cancelled", + "message": "AI job cancelled successfully" + } + + raise HTTPException(status_code=404, detail="Job not found") + + except HTTPException: + raise + except Exception as e: + metrics_registry.increment("rpc_ai_cancel_errors_total") + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/ai/stats", summary="AI service statistics", tags=["ai"]) +async def ai_stats() -> Dict[str, Any]: + """Get AI service statistics""" + try: + metrics_registry.increment("rpc_ai_stats_total") + + total_jobs = len(_ai_jobs) + status_counts = {} + type_counts = {} + total_revenue = 0.0 + + for job in _ai_jobs: + # Count by status + status = job.get("status", "unknown") + status_counts[status] = status_counts.get(status, 0) + 1 + + # Count by type + job_type = job.get("job_type", "unknown") + type_counts[job_type] = type_counts.get(job_type, 0) + 1 + + # Sum revenue for completed jobs + if status == "completed": + total_revenue += job.get("payment", 0.0) + + return { + "total_jobs": total_jobs, + "status_breakdown": status_counts, + "type_breakdown": type_counts, + "total_revenue": total_revenue, + "average_payment": total_revenue / max(1, status_counts.get("completed", 0)), + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + metrics_registry.increment("rpc_ai_stats_errors_total") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/apps/blockchain-node/src/aitbc_chain/rpc/marketplace.py b/apps/blockchain-node/src/aitbc_chain/rpc/marketplace.py new file mode 100644 index 00000000..3f86cb59 --- /dev/null +++ b/apps/blockchain-node/src/aitbc_chain/rpc/marketplace.py @@ -0,0 +1,148 @@ +"""Marketplace RPC endpoints for AITBC blockchain""" + +from typing import Any, Dict, List, Optional +from fastapi import HTTPException +from pydantic import BaseModel, Field +from datetime import datetime + +from ..database import session_scope +from ..metrics import metrics_registry +from .router import router + + +class MarketplaceListing(BaseModel): + """Marketplace listing model""" + listing_id: Optional[str] = None + seller_address: str = Field(..., description="Seller wallet address") + item_type: str = Field(..., description="Type of item (GPU, compute, etc.)") + price: float = Field(..., ge=0, description="Price in AIT") + description: str = Field(..., description="Item description") + status: str = Field(default="active", description="Listing status") + created_at: Optional[datetime] = None + +class MarketplaceCreateRequest(BaseModel): + """Request to create marketplace listing""" + seller_address: str + item_type: str + price: float + description: str + +# In-memory storage for demo (in production, use database) +_marketplace_listings: List[Dict[str, Any]] = [ + { + "listing_id": "demo_001", + "seller_address": "ait1demo_seller_123...", + "item_type": "GPU", + "price": 1000.0, + "description": "High-performance NVIDIA RTX 4090 for AI training", + "status": "active", + "created_at": datetime.now().isoformat() + }, + { + "listing_id": "demo_002", + "seller_address": "ait1demo_provider_456...", + "item_type": "Compute", + "price": 500.0, + "description": "10 hours of GPU compute time for deep learning", + "status": "active", + "created_at": datetime.now().isoformat() + } +] + +@router.get("/marketplace/listings", summary="List marketplace items", tags=["marketplace"]) +async def marketplace_listings() -> Dict[str, Any]: + """Get all marketplace listings""" + try: + metrics_registry.increment("rpc_marketplace_listings_total") + + # Filter active listings + active_listings = [listing for listing in _marketplace_listings if listing.get("status") == "active"] + + return { + "listings": active_listings, + "total": len(active_listings), + "timestamp": datetime.now().isoformat() + } + except Exception as e: + metrics_registry.increment("rpc_marketplace_listings_errors_total") + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/marketplace/create", summary="Create marketplace listing", tags=["marketplace"]) +async def marketplace_create(request: MarketplaceCreateRequest) -> Dict[str, Any]: + """Create a new marketplace listing""" + try: + metrics_registry.increment("rpc_marketplace_create_total") + + # Generate unique listing ID + listing_id = f"listing_{len(_marketplace_listings) + 1:03d}" + + # Create new listing + new_listing = { + "listing_id": listing_id, + "seller_address": request.seller_address, + "item_type": request.item_type, + "price": request.price, + "description": request.description, + "status": "active", + "created_at": datetime.now().isoformat() + } + + # Add to storage + _marketplace_listings.append(new_listing) + + return { + "listing_id": listing_id, + "status": "created", + "message": "Marketplace listing created successfully", + "listing": new_listing + } + + except Exception as e: + metrics_registry.increment("rpc_marketplace_create_errors_total") + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/marketplace/listing/{listing_id}", summary="Get marketplace listing by ID", tags=["marketplace"]) +async def marketplace_get_listing(listing_id: str) -> Dict[str, Any]: + """Get a specific marketplace listing""" + try: + metrics_registry.increment("rpc_marketplace_get_total") + + # Find listing + for listing in _marketplace_listings: + if listing.get("listing_id") == listing_id: + return { + "listing": listing, + "found": True + } + + raise HTTPException(status_code=404, detail="Listing not found") + + except HTTPException: + raise + except Exception as e: + metrics_registry.increment("rpc_marketplace_get_errors_total") + raise HTTPException(status_code=500, detail=str(e)) + +@router.delete("/marketplace/listing/{listing_id}", summary="Delete marketplace listing", tags=["marketplace"]) +async def marketplace_delete_listing(listing_id: str) -> Dict[str, Any]: + """Delete a marketplace listing""" + try: + metrics_registry.increment("rpc_marketplace_delete_total") + + # Find and remove listing + for i, listing in enumerate(_marketplace_listings): + if listing.get("listing_id") == listing_id: + _marketplace_listings.pop(i) + return { + "listing_id": listing_id, + "status": "deleted", + "message": "Marketplace listing deleted successfully" + } + + raise HTTPException(status_code=404, detail="Listing not found") + + except HTTPException: + raise + except Exception as e: + metrics_registry.increment("rpc_marketplace_delete_errors_total") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/apps/blockchain-node/src/aitbc_chain/rpc/router.py b/apps/blockchain-node/src/aitbc_chain/rpc/router.py index cdccb11d..fbe7eddf 100755 --- a/apps/blockchain-node/src/aitbc_chain/rpc/router.py +++ b/apps/blockchain-node/src/aitbc_chain/rpc/router.py @@ -4,7 +4,8 @@ from sqlalchemy import func import asyncio import json import time -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, List +from datetime import datetime, timedelta from fastapi import APIRouter, HTTPException, status from pydantic import BaseModel, Field, model_validator @@ -894,3 +895,207 @@ async def get_transactions(chain_id: str = None, limit: int = 20, offset: int = "chain_id": chain_id, "error": str(e) } + + +# MARKETPLACE ENDPOINTS + +class MarketplaceCreateRequest(BaseModel): + """Request to create marketplace listing""" + seller_address: str + item_type: str + price: float + description: str + +# In-memory storage for demo (in production, use database) +_marketplace_listings = [ + { + "listing_id": "demo_001", + "seller_address": "ait1demo_seller_123...", + "item_type": "GPU", + "price": 1000.0, + "description": "High-performance NVIDIA RTX 4090 for AI training", + "status": "active", + "created_at": datetime.now().isoformat() + }, + { + "listing_id": "demo_002", + "seller_address": "ait1demo_provider_456...", + "item_type": "Compute", + "price": 500.0, + "description": "10 hours of GPU compute time for deep learning", + "status": "active", + "created_at": datetime.now().isoformat() + } +] + +@router.get("/marketplace/listings", summary="List marketplace items", tags=["marketplace"]) +async def marketplace_listings() -> Dict[str, Any]: + """Get all marketplace listings""" + try: + metrics_registry.increment("rpc_marketplace_listings_total") + + # Filter active listings + active_listings = [listing for listing in _marketplace_listings if listing.get("status") == "active"] + + return { + "listings": active_listings, + "total": len(active_listings), + "timestamp": datetime.now().isoformat() + } + except Exception as e: + metrics_registry.increment("rpc_marketplace_listings_errors_total") + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/marketplace/create", summary="Create marketplace listing", tags=["marketplace"]) +async def marketplace_create(request: MarketplaceCreateRequest) -> Dict[str, Any]: + """Create a new marketplace listing""" + try: + metrics_registry.increment("rpc_marketplace_create_total") + + # Generate unique listing ID + listing_id = f"listing_{len(_marketplace_listings) + 1:03d}" + + # Create new listing + new_listing = { + "listing_id": listing_id, + "seller_address": request.seller_address, + "item_type": request.item_type, + "price": request.price, + "description": request.description, + "status": "active", + "created_at": datetime.now().isoformat() + } + + # Add to storage + _marketplace_listings.append(new_listing) + + return { + "listing_id": listing_id, + "status": "created", + "message": "Marketplace listing created successfully", + "listing": new_listing + } + + except Exception as e: + metrics_registry.increment("rpc_marketplace_create_errors_total") + raise HTTPException(status_code=500, detail=str(e)) + + +# AI SERVICES ENDPOINTS + +class AIJobRequest(BaseModel): + """AI job submission request""" + wallet_address: str = Field(..., description="Client wallet address") + job_type: str = Field(..., description="Type of AI job (text, image, training, etc.)") + prompt: str = Field(..., description="AI prompt or task description") + payment: float = Field(..., ge=0, description="Payment in AIT") + parameters: Optional[Dict[str, Any]] = Field(default=None, description="Additional job parameters") + +# In-memory storage for demo (in production, use database) +_ai_jobs = [ + { + "job_id": "job_demo_001", + "wallet_address": "ait1demo_client_123...", + "job_type": "text", + "prompt": "Generate a summary of blockchain technology", + "payment": 100.0, + "status": "completed", + "created_at": (datetime.now() - timedelta(hours=1)).isoformat(), + "completed_at": (datetime.now() - timedelta(minutes=30)).isoformat(), + "result": { + "output": "Blockchain is a distributed ledger technology...", + "tokens_used": 150, + "processing_time": "2.5 minutes" + } + }, + { + "job_id": "job_demo_002", + "wallet_address": "ait1demo_client_456...", + "job_type": "image", + "prompt": "Create an image of a futuristic blockchain city", + "payment": 250.0, + "status": "processing", + "created_at": (datetime.now() - timedelta(minutes=15)).isoformat(), + "estimated_completion": (datetime.now() + timedelta(minutes=10)).isoformat() + } +] + +@router.post("/ai/submit", summary="Submit AI job", tags=["ai"]) +async def ai_submit_job(request: AIJobRequest) -> Dict[str, Any]: + """Submit a new AI job for processing""" + try: + metrics_registry.increment("rpc_ai_submit_total") + + # Generate unique job ID + import uuid + job_id = f"job_{uuid.uuid4().hex[:8]}" + + # Calculate estimated completion time + estimated_completion = datetime.now() + timedelta(minutes=30) + + # Create new job + new_job = { + "job_id": job_id, + "wallet_address": request.wallet_address, + "job_type": request.job_type, + "prompt": request.prompt, + "payment": request.payment, + "parameters": request.parameters or {}, + "status": "queued", + "created_at": datetime.now().isoformat(), + "estimated_completion": estimated_completion.isoformat() + } + + # Add to storage + _ai_jobs.append(new_job) + + return { + "job_id": job_id, + "status": "queued", + "message": "AI job submitted successfully", + "estimated_completion": estimated_completion.isoformat(), + "wallet_address": request.wallet_address, + "payment": request.payment, + "job_type": request.job_type + } + + except Exception as e: + metrics_registry.increment("rpc_ai_submit_errors_total") + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/ai/stats", summary="AI service statistics", tags=["ai"]) +async def ai_stats() -> Dict[str, Any]: + """Get AI service statistics""" + try: + metrics_registry.increment("rpc_ai_stats_total") + + total_jobs = len(_ai_jobs) + status_counts = {} + type_counts = {} + total_revenue = 0.0 + + for job in _ai_jobs: + # Count by status + status = job.get("status", "unknown") + status_counts[status] = status_counts.get(status, 0) + 1 + + # Count by type + job_type = job.get("job_type", "unknown") + type_counts[job_type] = type_counts.get(job_type, 0) + 1 + + # Sum revenue for completed jobs + if status == "completed": + total_revenue += job.get("payment", 0.0) + + return { + "total_jobs": total_jobs, + "status_breakdown": status_counts, + "type_breakdown": type_counts, + "total_revenue": total_revenue, + "average_payment": total_revenue / max(1, status_counts.get("completed", 0)), + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + metrics_registry.increment("rpc_ai_stats_errors_total") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/templates/node_setup_template.sh b/templates/node_setup_template.sh new file mode 100755 index 00000000..03602348 --- /dev/null +++ b/templates/node_setup_template.sh @@ -0,0 +1,44 @@ +#!/bin/bash +# AITBC Node Setup Template +# Usage: ./node_setup_template.sh + +NODE_NAME=$1 +ROLE=$2 +GENESIS_AUTHORITY=$3 + +echo "Setting up AITBC node: $NODE_NAME" +echo "Role: $ROLE" +echo "Genesis Authority: $GENESIS_AUTHORITY" + +# Install dependencies +apt update && apt install -y python3 python3-venv redis-server + +# Setup directories +mkdir -p /var/lib/aitbc/{data,keystore,logs} +mkdir -p /etc/aitbc + +# Copy configuration from genesis authority +scp $GENESIS_AUTHORITY:/etc/aitbc/blockchain.env /etc/aitbc/ + +# Pull code +cd /opt/aitbc +git clone http://gitea.bubuit.net:oib/aitbc.git +cd aitbc + +# Setup virtual environment +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +# Configure node role +if [ "$ROLE" = "follower" ]; then + sed -i 's/enable_block_production=true/enable_block_production=false/g' /etc/aitbc/blockchain.env + sed -i "s/proposer_id=.*/proposer_id=follower-node-$NODE_NAME/g" /etc/aitbc/blockchain.env +fi + +# Setup systemd services +cp systemd/*.service /etc/systemd/system/ +systemctl daemon-reload +systemctl enable aitbc-blockchain-node aitbc-blockchain-rpc + +echo "Node $NODE_NAME setup complete!"