feat: add marketplace and AI services RPC endpoints
All checks were successful
Integration Tests / test-service-integration (push) Successful in 49s
Python Tests / test-python (push) Successful in 58s
Security Scanning / security-scan (push) Successful in 52s

📋 Marketplace Endpoints:
• GET /marketplace/listings - List all active marketplace items
• POST /marketplace/create - Create new marketplace listing
• Demo listings for GPU and compute resources
• In-memory storage with active status filtering

🤖 AI Services Endpoints:
• POST /ai/submit - Submit AI jobs with payment
• GET /ai/stats - AI service statistics and revenue tracking
• Support for text, image, and training job types
This commit is contained in:
aitbc1
2026-03-29 17:15:54 +02:00
parent b5da4b15bb
commit 8251853cbd
4 changed files with 624 additions and 1 deletions

View File

@@ -0,0 +1,226 @@
"""AI Services RPC endpoints for AITBC blockchain"""
from typing import Any, Dict, List, Optional
from fastapi import HTTPException
from pydantic import BaseModel, Field
from datetime import datetime, timedelta
import uuid
from ..database import session_scope
from ..metrics import metrics_registry
from .router import router
class AIJobRequest(BaseModel):
"""AI job submission request"""
wallet_address: str = Field(..., description="Client wallet address")
job_type: str = Field(..., description="Type of AI job (text, image, training, etc.)")
prompt: str = Field(..., description="AI prompt or task description")
payment: float = Field(..., ge=0, description="Payment in AIT")
parameters: Optional[Dict[str, Any]] = Field(default=None, description="Additional job parameters")
class AIJobResponse(BaseModel):
"""AI job response"""
job_id: str
status: str
wallet_address: str
job_type: str
payment: float
created_at: datetime
estimated_completion: Optional[datetime] = None
result: Optional[Dict[str, Any]] = None
# In-memory storage for demo (in production, use database)
_ai_jobs: List[Dict[str, Any]] = [
{
"job_id": "job_demo_001",
"wallet_address": "ait1demo_client_123...",
"job_type": "text",
"prompt": "Generate a summary of blockchain technology",
"payment": 100.0,
"status": "completed",
"created_at": (datetime.now() - timedelta(hours=1)).isoformat(),
"completed_at": (datetime.now() - timedelta(minutes=30)).isoformat(),
"result": {
"output": "Blockchain is a distributed ledger technology...",
"tokens_used": 150,
"processing_time": "2.5 minutes"
}
},
{
"job_id": "job_demo_002",
"wallet_address": "ait1demo_client_456...",
"job_type": "image",
"prompt": "Create an image of a futuristic blockchain city",
"payment": 250.0,
"status": "processing",
"created_at": (datetime.now() - timedelta(minutes=15)).isoformat(),
"estimated_completion": (datetime.now() + timedelta(minutes=10)).isoformat()
}
]
@router.post("/ai/submit", summary="Submit AI job", tags=["ai"])
async def ai_submit_job(request: AIJobRequest) -> Dict[str, Any]:
"""Submit a new AI job for processing"""
try:
metrics_registry.increment("rpc_ai_submit_total")
# Generate unique job ID
job_id = f"job_{uuid.uuid4().hex[:8]}"
# Calculate estimated completion time
estimated_completion = datetime.now() + timedelta(minutes=30)
# Create new job
new_job = {
"job_id": job_id,
"wallet_address": request.wallet_address,
"job_type": request.job_type,
"prompt": request.prompt,
"payment": request.payment,
"parameters": request.parameters or {},
"status": "queued",
"created_at": datetime.now().isoformat(),
"estimated_completion": estimated_completion.isoformat()
}
# Add to storage
_ai_jobs.append(new_job)
return {
"job_id": job_id,
"status": "queued",
"message": "AI job submitted successfully",
"estimated_completion": estimated_completion.isoformat(),
"wallet_address": request.wallet_address,
"payment": request.payment,
"job_type": request.job_type
}
except Exception as e:
metrics_registry.increment("rpc_ai_submit_errors_total")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ai/jobs", summary="List AI jobs", tags=["ai"])
async def ai_list_jobs(wallet_address: Optional[str] = None, status: Optional[str] = None) -> Dict[str, Any]:
"""Get list of AI jobs, optionally filtered by wallet address or status"""
try:
metrics_registry.increment("rpc_ai_list_total")
# Filter jobs
filtered_jobs = _ai_jobs.copy()
if wallet_address:
filtered_jobs = [job for job in filtered_jobs if job.get("wallet_address") == wallet_address]
if status:
filtered_jobs = [job for job in filtered_jobs if job.get("status") == status]
# Sort by creation time (newest first)
filtered_jobs.sort(key=lambda x: x.get("created_at", ""), reverse=True)
return {
"jobs": filtered_jobs,
"total": len(filtered_jobs),
"filters": {
"wallet_address": wallet_address,
"status": status
},
"timestamp": datetime.now().isoformat()
}
except Exception as e:
metrics_registry.increment("rpc_ai_list_errors_total")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ai/job/{job_id}", summary="Get AI job by ID", tags=["ai"])
async def ai_get_job(job_id: str) -> Dict[str, Any]:
"""Get a specific AI job by ID"""
try:
metrics_registry.increment("rpc_ai_get_total")
# Find job
for job in _ai_jobs:
if job.get("job_id") == job_id:
return {
"job": job,
"found": True
}
raise HTTPException(status_code=404, detail="Job not found")
except HTTPException:
raise
except Exception as e:
metrics_registry.increment("rpc_ai_get_errors_total")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ai/job/{job_id}/cancel", summary="Cancel AI job", tags=["ai"])
async def ai_cancel_job(job_id: str) -> Dict[str, Any]:
"""Cancel an AI job"""
try:
metrics_registry.increment("rpc_ai_cancel_total")
# Find and update job
for job in _ai_jobs:
if job.get("job_id") == job_id:
current_status = job.get("status")
if current_status in ["completed", "cancelled"]:
raise HTTPException(
status_code=400,
detail=f"Cannot cancel job with status: {current_status}"
)
job["status"] = "cancelled"
job["cancelled_at"] = datetime.now().isoformat()
return {
"job_id": job_id,
"status": "cancelled",
"message": "AI job cancelled successfully"
}
raise HTTPException(status_code=404, detail="Job not found")
except HTTPException:
raise
except Exception as e:
metrics_registry.increment("rpc_ai_cancel_errors_total")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ai/stats", summary="AI service statistics", tags=["ai"])
async def ai_stats() -> Dict[str, Any]:
"""Get AI service statistics"""
try:
metrics_registry.increment("rpc_ai_stats_total")
total_jobs = len(_ai_jobs)
status_counts = {}
type_counts = {}
total_revenue = 0.0
for job in _ai_jobs:
# Count by status
status = job.get("status", "unknown")
status_counts[status] = status_counts.get(status, 0) + 1
# Count by type
job_type = job.get("job_type", "unknown")
type_counts[job_type] = type_counts.get(job_type, 0) + 1
# Sum revenue for completed jobs
if status == "completed":
total_revenue += job.get("payment", 0.0)
return {
"total_jobs": total_jobs,
"status_breakdown": status_counts,
"type_breakdown": type_counts,
"total_revenue": total_revenue,
"average_payment": total_revenue / max(1, status_counts.get("completed", 0)),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
metrics_registry.increment("rpc_ai_stats_errors_total")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -0,0 +1,148 @@
"""Marketplace RPC endpoints for AITBC blockchain"""
from typing import Any, Dict, List, Optional
from fastapi import HTTPException
from pydantic import BaseModel, Field
from datetime import datetime
from ..database import session_scope
from ..metrics import metrics_registry
from .router import router
class MarketplaceListing(BaseModel):
"""Marketplace listing model"""
listing_id: Optional[str] = None
seller_address: str = Field(..., description="Seller wallet address")
item_type: str = Field(..., description="Type of item (GPU, compute, etc.)")
price: float = Field(..., ge=0, description="Price in AIT")
description: str = Field(..., description="Item description")
status: str = Field(default="active", description="Listing status")
created_at: Optional[datetime] = None
class MarketplaceCreateRequest(BaseModel):
"""Request to create marketplace listing"""
seller_address: str
item_type: str
price: float
description: str
# In-memory storage for demo (in production, use database)
_marketplace_listings: List[Dict[str, Any]] = [
{
"listing_id": "demo_001",
"seller_address": "ait1demo_seller_123...",
"item_type": "GPU",
"price": 1000.0,
"description": "High-performance NVIDIA RTX 4090 for AI training",
"status": "active",
"created_at": datetime.now().isoformat()
},
{
"listing_id": "demo_002",
"seller_address": "ait1demo_provider_456...",
"item_type": "Compute",
"price": 500.0,
"description": "10 hours of GPU compute time for deep learning",
"status": "active",
"created_at": datetime.now().isoformat()
}
]
@router.get("/marketplace/listings", summary="List marketplace items", tags=["marketplace"])
async def marketplace_listings() -> Dict[str, Any]:
"""Get all marketplace listings"""
try:
metrics_registry.increment("rpc_marketplace_listings_total")
# Filter active listings
active_listings = [listing for listing in _marketplace_listings if listing.get("status") == "active"]
return {
"listings": active_listings,
"total": len(active_listings),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
metrics_registry.increment("rpc_marketplace_listings_errors_total")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/marketplace/create", summary="Create marketplace listing", tags=["marketplace"])
async def marketplace_create(request: MarketplaceCreateRequest) -> Dict[str, Any]:
"""Create a new marketplace listing"""
try:
metrics_registry.increment("rpc_marketplace_create_total")
# Generate unique listing ID
listing_id = f"listing_{len(_marketplace_listings) + 1:03d}"
# Create new listing
new_listing = {
"listing_id": listing_id,
"seller_address": request.seller_address,
"item_type": request.item_type,
"price": request.price,
"description": request.description,
"status": "active",
"created_at": datetime.now().isoformat()
}
# Add to storage
_marketplace_listings.append(new_listing)
return {
"listing_id": listing_id,
"status": "created",
"message": "Marketplace listing created successfully",
"listing": new_listing
}
except Exception as e:
metrics_registry.increment("rpc_marketplace_create_errors_total")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/marketplace/listing/{listing_id}", summary="Get marketplace listing by ID", tags=["marketplace"])
async def marketplace_get_listing(listing_id: str) -> Dict[str, Any]:
"""Get a specific marketplace listing"""
try:
metrics_registry.increment("rpc_marketplace_get_total")
# Find listing
for listing in _marketplace_listings:
if listing.get("listing_id") == listing_id:
return {
"listing": listing,
"found": True
}
raise HTTPException(status_code=404, detail="Listing not found")
except HTTPException:
raise
except Exception as e:
metrics_registry.increment("rpc_marketplace_get_errors_total")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/marketplace/listing/{listing_id}", summary="Delete marketplace listing", tags=["marketplace"])
async def marketplace_delete_listing(listing_id: str) -> Dict[str, Any]:
"""Delete a marketplace listing"""
try:
metrics_registry.increment("rpc_marketplace_delete_total")
# Find and remove listing
for i, listing in enumerate(_marketplace_listings):
if listing.get("listing_id") == listing_id:
_marketplace_listings.pop(i)
return {
"listing_id": listing_id,
"status": "deleted",
"message": "Marketplace listing deleted successfully"
}
raise HTTPException(status_code=404, detail="Listing not found")
except HTTPException:
raise
except Exception as e:
metrics_registry.increment("rpc_marketplace_delete_errors_total")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -4,7 +4,8 @@ from sqlalchemy import func
import asyncio
import json
import time
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List
from datetime import datetime, timedelta
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, Field, model_validator
@@ -894,3 +895,207 @@ async def get_transactions(chain_id: str = None, limit: int = 20, offset: int =
"chain_id": chain_id,
"error": str(e)
}
# MARKETPLACE ENDPOINTS
class MarketplaceCreateRequest(BaseModel):
"""Request to create marketplace listing"""
seller_address: str
item_type: str
price: float
description: str
# In-memory storage for demo (in production, use database)
_marketplace_listings = [
{
"listing_id": "demo_001",
"seller_address": "ait1demo_seller_123...",
"item_type": "GPU",
"price": 1000.0,
"description": "High-performance NVIDIA RTX 4090 for AI training",
"status": "active",
"created_at": datetime.now().isoformat()
},
{
"listing_id": "demo_002",
"seller_address": "ait1demo_provider_456...",
"item_type": "Compute",
"price": 500.0,
"description": "10 hours of GPU compute time for deep learning",
"status": "active",
"created_at": datetime.now().isoformat()
}
]
@router.get("/marketplace/listings", summary="List marketplace items", tags=["marketplace"])
async def marketplace_listings() -> Dict[str, Any]:
"""Get all marketplace listings"""
try:
metrics_registry.increment("rpc_marketplace_listings_total")
# Filter active listings
active_listings = [listing for listing in _marketplace_listings if listing.get("status") == "active"]
return {
"listings": active_listings,
"total": len(active_listings),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
metrics_registry.increment("rpc_marketplace_listings_errors_total")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/marketplace/create", summary="Create marketplace listing", tags=["marketplace"])
async def marketplace_create(request: MarketplaceCreateRequest) -> Dict[str, Any]:
"""Create a new marketplace listing"""
try:
metrics_registry.increment("rpc_marketplace_create_total")
# Generate unique listing ID
listing_id = f"listing_{len(_marketplace_listings) + 1:03d}"
# Create new listing
new_listing = {
"listing_id": listing_id,
"seller_address": request.seller_address,
"item_type": request.item_type,
"price": request.price,
"description": request.description,
"status": "active",
"created_at": datetime.now().isoformat()
}
# Add to storage
_marketplace_listings.append(new_listing)
return {
"listing_id": listing_id,
"status": "created",
"message": "Marketplace listing created successfully",
"listing": new_listing
}
except Exception as e:
metrics_registry.increment("rpc_marketplace_create_errors_total")
raise HTTPException(status_code=500, detail=str(e))
# AI SERVICES ENDPOINTS
class AIJobRequest(BaseModel):
"""AI job submission request"""
wallet_address: str = Field(..., description="Client wallet address")
job_type: str = Field(..., description="Type of AI job (text, image, training, etc.)")
prompt: str = Field(..., description="AI prompt or task description")
payment: float = Field(..., ge=0, description="Payment in AIT")
parameters: Optional[Dict[str, Any]] = Field(default=None, description="Additional job parameters")
# In-memory storage for demo (in production, use database)
_ai_jobs = [
{
"job_id": "job_demo_001",
"wallet_address": "ait1demo_client_123...",
"job_type": "text",
"prompt": "Generate a summary of blockchain technology",
"payment": 100.0,
"status": "completed",
"created_at": (datetime.now() - timedelta(hours=1)).isoformat(),
"completed_at": (datetime.now() - timedelta(minutes=30)).isoformat(),
"result": {
"output": "Blockchain is a distributed ledger technology...",
"tokens_used": 150,
"processing_time": "2.5 minutes"
}
},
{
"job_id": "job_demo_002",
"wallet_address": "ait1demo_client_456...",
"job_type": "image",
"prompt": "Create an image of a futuristic blockchain city",
"payment": 250.0,
"status": "processing",
"created_at": (datetime.now() - timedelta(minutes=15)).isoformat(),
"estimated_completion": (datetime.now() + timedelta(minutes=10)).isoformat()
}
]
@router.post("/ai/submit", summary="Submit AI job", tags=["ai"])
async def ai_submit_job(request: AIJobRequest) -> Dict[str, Any]:
"""Submit a new AI job for processing"""
try:
metrics_registry.increment("rpc_ai_submit_total")
# Generate unique job ID
import uuid
job_id = f"job_{uuid.uuid4().hex[:8]}"
# Calculate estimated completion time
estimated_completion = datetime.now() + timedelta(minutes=30)
# Create new job
new_job = {
"job_id": job_id,
"wallet_address": request.wallet_address,
"job_type": request.job_type,
"prompt": request.prompt,
"payment": request.payment,
"parameters": request.parameters or {},
"status": "queued",
"created_at": datetime.now().isoformat(),
"estimated_completion": estimated_completion.isoformat()
}
# Add to storage
_ai_jobs.append(new_job)
return {
"job_id": job_id,
"status": "queued",
"message": "AI job submitted successfully",
"estimated_completion": estimated_completion.isoformat(),
"wallet_address": request.wallet_address,
"payment": request.payment,
"job_type": request.job_type
}
except Exception as e:
metrics_registry.increment("rpc_ai_submit_errors_total")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ai/stats", summary="AI service statistics", tags=["ai"])
async def ai_stats() -> Dict[str, Any]:
"""Get AI service statistics"""
try:
metrics_registry.increment("rpc_ai_stats_total")
total_jobs = len(_ai_jobs)
status_counts = {}
type_counts = {}
total_revenue = 0.0
for job in _ai_jobs:
# Count by status
status = job.get("status", "unknown")
status_counts[status] = status_counts.get(status, 0) + 1
# Count by type
job_type = job.get("job_type", "unknown")
type_counts[job_type] = type_counts.get(job_type, 0) + 1
# Sum revenue for completed jobs
if status == "completed":
total_revenue += job.get("payment", 0.0)
return {
"total_jobs": total_jobs,
"status_breakdown": status_counts,
"type_breakdown": type_counts,
"total_revenue": total_revenue,
"average_payment": total_revenue / max(1, status_counts.get("completed", 0)),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
metrics_registry.increment("rpc_ai_stats_errors_total")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -0,0 +1,44 @@
#!/bin/bash
# AITBC Node Setup Template
# Usage: ./node_setup_template.sh <node-name> <role> <genesis-authority>
NODE_NAME=$1
ROLE=$2
GENESIS_AUTHORITY=$3
echo "Setting up AITBC node: $NODE_NAME"
echo "Role: $ROLE"
echo "Genesis Authority: $GENESIS_AUTHORITY"
# Install dependencies
apt update && apt install -y python3 python3-venv redis-server
# Setup directories
mkdir -p /var/lib/aitbc/{data,keystore,logs}
mkdir -p /etc/aitbc
# Copy configuration from genesis authority
scp $GENESIS_AUTHORITY:/etc/aitbc/blockchain.env /etc/aitbc/
# Pull code
cd /opt/aitbc
git clone http://gitea.bubuit.net:oib/aitbc.git
cd aitbc
# Setup virtual environment
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# Configure node role
if [ "$ROLE" = "follower" ]; then
sed -i 's/enable_block_production=true/enable_block_production=false/g' /etc/aitbc/blockchain.env
sed -i "s/proposer_id=.*/proposer_id=follower-node-$NODE_NAME/g" /etc/aitbc/blockchain.env
fi
# Setup systemd services
cp systemd/*.service /etc/systemd/system/
systemctl daemon-reload
systemctl enable aitbc-blockchain-node aitbc-blockchain-rpc
echo "Node $NODE_NAME setup complete!"