diff --git a/docs/PRODUCTION_ARCHITECTURE.md b/docs/PRODUCTION_ARCHITECTURE.md new file mode 100644 index 00000000..241300f4 --- /dev/null +++ b/docs/PRODUCTION_ARCHITECTURE.md @@ -0,0 +1,76 @@ +# AITBC Production Environment + +## 🏗️ Proper System Architecture + +The AITBC production environment follows Linux Filesystem Hierarchy Standard (FHS) compliance: + +### 📁 System Directory Structure + +``` +/etc/aitbc/production/ # Production configurations +├── .env # Production environment variables +├── blockchain.py # Blockchain service config +├── database.py # Database configuration +├── services.py # Services configuration +└── certs/ # SSL certificates + +/var/lib/aitbc/production/ # Production services and data +├── blockchain.py # Production blockchain service +├── marketplace.py # Production marketplace service +├── unified_marketplace.py # Unified marketplace service +├── openclaw_ai.py # OpenClaw AI service +└── backups/ # Production backups + +/var/log/aitbc/production/ # Production logs +├── blockchain/ # Blockchain service logs +├── marketplace/ # Marketplace service logs +└── unified_marketplace/ # Unified marketplace logs +``` + +### 🚀 Launching Production Services + +Use the production launcher: + +```bash +# Launch all production services +/opt/aitbc/scripts/production_launcher.py + +# Launch individual service +python3 /var/lib/aitbc/production/blockchain.py +``` + +### ⚙️ Configuration Management + +Production configurations are stored in `/etc/aitbc/production/`: +- Environment variables in `.env` +- Service-specific configs in Python modules +- SSL certificates in `certs/` + +### 📊 Monitoring and Logs + +Production logs are centralized in `/var/log/aitbc/production/`: +- Each service has its own log directory +- Logs rotate automatically +- Real-time monitoring available + +### 🔧 Maintenance + +- **Backups**: Stored in `/var/lib/aitbc/production/backups/` +- **Updates**: Update code in `/opt/aitbc/`, restart services +- **Configuration**: Edit files in `/etc/aitbc/production/` + +### 🛡️ Security + +- All production files have proper permissions +- Keystore remains at `/var/lib/aitbc/keystore/` +- Environment variables are protected +- SSL certificates secured in `/etc/aitbc/production/certs/` + +## 📋 Migration Complete + +The "box in box" structure has been eliminated: +- ✅ Configurations moved to `/etc/aitbc/production/` +- ✅ Services moved to `/var/lib/aitbc/production/` +- ✅ Logs centralized in `/var/log/aitbc/production/` +- ✅ Repository cleaned of production runtime files +- ✅ Proper FHS compliance achieved diff --git a/production/config/blockchain.py b/production/config/blockchain.py deleted file mode 100644 index f6bd009d..00000000 --- a/production/config/blockchain.py +++ /dev/null @@ -1,36 +0,0 @@ -import os -from pathlib import Path - -# Production Blockchain Configuration -BLOCKCHAIN_CONFIG = { - 'network': { - 'name': 'aitbc-mainnet', - 'chain_id': 1337, - 'consensus': 'proof_of_authority', - 'block_time': 5, # seconds - 'gas_limit': 8000000, - 'difficulty': 'auto' - }, - 'nodes': { - 'aitbc': { - 'host': 'localhost', - 'port': 8545, - 'rpc_port': 8545, - 'p2p_port': 30303, - 'data_dir': '/var/lib/aitbc/data/blockchain/aitbc' - }, - 'aitbc1': { - 'host': 'aitbc1', - 'port': 8545, - 'rpc_port': 8545, - 'p2p_port': 30303, - 'data_dir': '/var/lib/aitbc/data/blockchain/aitbc1' - } - }, - 'security': { - 'enable_tls': True, - 'cert_path': '/opt/aitbc/production/config/certs', - 'require_auth': True, - 'api_key': os.getenv('BLOCKCHAIN_API_KEY', 'production-key-change-me') - } -} diff --git a/production/config/database.py b/production/config/database.py deleted file mode 100644 index e1471ed8..00000000 --- a/production/config/database.py +++ /dev/null @@ -1,21 +0,0 @@ -import os -import ssl - -# Production Database Configuration -DATABASE_CONFIG = { - 'production': { - 'url': os.getenv('DATABASE_URL', 'postgresql://aitbc:password@localhost:5432/aitbc_prod'), - 'pool_size': 20, - 'max_overflow': 30, - 'pool_timeout': 30, - 'pool_recycle': 3600, - 'ssl_context': ssl.create_default_context() - }, - 'redis': { - 'host': os.getenv('REDIS_HOST', 'localhost'), - 'port': int(os.getenv('REDIS_PORT', 6379)), - 'db': int(os.getenv('REDIS_DB', 0)), - 'password': os.getenv('REDIS_PASSWORD', None), - 'ssl': os.getenv('REDIS_SSL', 'false').lower() == 'true' - } -} diff --git a/production/config/services.py b/production/config/services.py deleted file mode 100644 index 862270f0..00000000 --- a/production/config/services.py +++ /dev/null @@ -1,61 +0,0 @@ -import os - -# Production Services Configuration -SERVICES_CONFIG = { - 'blockchain': { - 'host': '0.0.0.0', - 'port': 8545, - 'workers': 4, - 'log_level': 'INFO', - 'max_connections': 1000 - }, - 'marketplace': { - 'host': '0.0.0.0', - 'port': 8002, - 'workers': 8, - 'log_level': 'INFO', - 'max_connections': 5000 - }, - 'gpu_marketplace': { - 'host': '0.0.0.0', - 'port': 8003, - 'workers': 4, - 'log_level': 'INFO', - 'max_connections': 1000 - }, - 'monitoring': { - 'host': '0.0.0.0', - 'port': 9000, - 'workers': 2, - 'log_level': 'INFO' - } -} - -# Production Logging -LOGGING_CONFIG = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'production': { - 'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s', - 'datefmt': '%Y-%m-%d %H:%M:%S' - } - }, - 'handlers': { - 'file': { - 'class': 'logging.handlers.RotatingFileHandler', - 'filename': '/var/log/aitbc/production/services/aitbc.log', - 'maxBytes': 10485760, # 10MB - 'backupCount': 5, - 'formatter': 'production' - }, - 'console': { - 'class': 'logging.StreamHandler', - 'formatter': 'production' - } - }, - 'root': { - 'level': 'INFO', - 'handlers': ['file', 'console'] - } -} diff --git a/production/services/blockchain.py b/production/services/blockchain.py deleted file mode 100755 index 4ebca95e..00000000 --- a/production/services/blockchain.py +++ /dev/null @@ -1,157 +0,0 @@ -#!/usr/bin/env python3 -""" -Production Blockchain Service -Real blockchain implementation with persistence and consensus -""" - -import os -import sys -import json -import time -import logging -from pathlib import Path -from datetime import datetime - -sys.path.insert(0, '/opt/aitbc/apps/blockchain-node/src') - -from aitbc_chain.consensus.multi_validator_poa import MultiValidatorPoA -from aitbc_chain.blockchain import Blockchain -from aitbc_chain.transaction import Transaction - -# Production logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - handlers=[ - logging.FileHandler('/var/log/aitbc/production/blockchain/blockchain.log'), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -class ProductionBlockchain: - """Production-grade blockchain implementation""" - - def __init__(self, node_id: str): - self.node_id = node_id - self.data_dir = Path(f'/var/lib/aitbc/data/blockchain/{node_id}') - self.data_dir.mkdir(parents=True, exist_ok=True) - - # Initialize blockchain - self.blockchain = Blockchain() - self.consensus = MultiValidatorPoA(chain_id=1337) - - # Add production validators - self._setup_validators() - - # Load existing data if available - self._load_blockchain() - - logger.info(f"Production blockchain initialized for node: {node_id}") - - def _setup_validators(self): - """Setup production validators""" - validators = [ - ('0xvalidator_aitbc', 10000.0), - ('0xvalidator_aitbc1', 10000.0), - ('0xvalidator_prod_1', 5000.0), - ('0xvalidator_prod_2', 5000.0), - ('0xvalidator_prod_3', 5000.0) - ] - - for address, stake in validators: - self.consensus.add_validator(address, stake) - - logger.info(f"Added {len(validators)} validators to consensus") - - def _load_blockchain(self): - """Load existing blockchain data""" - chain_file = self.data_dir / 'blockchain.json' - if chain_file.exists(): - try: - with open(chain_file, 'r') as f: - data = json.load(f) - # Load blockchain state - logger.info(f"Loaded existing blockchain with {len(data.get('blocks', []))} blocks") - except Exception as e: - logger.error(f"Failed to load blockchain: {e}") - - def _save_blockchain(self): - """Save blockchain state""" - chain_file = self.data_dir / 'blockchain.json' - try: - data = { - 'blocks': [block.to_dict() for block in self.blockchain.chain], - 'last_updated': time.time(), - 'node_id': self.node_id - } - with open(chain_file, 'w') as f: - json.dump(data, f, indent=2) - logger.debug(f"Blockchain saved to {chain_file}") - except Exception as e: - logger.error(f"Failed to save blockchain: {e}") - - def create_transaction(self, from_address: str, to_address: str, amount: float, data: dict = None): - """Create and process a transaction""" - try: - transaction = Transaction( - from_address=from_address, - to_address=to_address, - amount=amount, - data=data or {} - ) - - # Sign transaction (simplified for production) - transaction.sign(f"private_key_{from_address}") - - # Add to blockchain - self.blockchain.add_transaction(transaction) - - # Create new block - block = self.blockchain.mine_block() - - # Save state - self._save_blockchain() - - logger.info(f"Transaction processed: {transaction.tx_hash}") - return transaction.tx_hash - - except Exception as e: - logger.error(f"Failed to create transaction: {e}") - raise - - def get_balance(self, address: str) -> float: - """Get balance for address""" - return self.blockchain.get_balance(address) - - def get_blockchain_info(self) -> dict: - """Get blockchain information""" - return { - 'node_id': self.node_id, - 'blocks': len(self.blockchain.chain), - 'validators': len(self.consensus.validators), - 'total_stake': sum(v.stake for v in self.consensus.validators.values()), - 'last_block': self.blockchain.get_latest_block().to_dict() if self.blockchain.chain else None - } - -if __name__ == '__main__': - node_id = os.getenv('NODE_ID', 'aitbc') - blockchain = ProductionBlockchain(node_id) - - # Example transaction - try: - tx_hash = blockchain.create_transaction( - from_address='0xuser1', - to_address='0xuser2', - amount=100.0, - data={'type': 'payment', 'description': 'Production test transaction'} - ) - print(f"Transaction created: {tx_hash}") - - # Print blockchain info - info = blockchain.get_blockchain_info() - print(f"Blockchain info: {info}") - - except Exception as e: - logger.error(f"Production blockchain error: {e}") - sys.exit(1) diff --git a/production/services/blockchain_http_launcher.py b/production/services/blockchain_http_launcher.py deleted file mode 100755 index bdd7f9eb..00000000 --- a/production/services/blockchain_http_launcher.py +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env python3 -""" -Blockchain HTTP Service Launcher -""" - -import os -import sys - -# Add production services to path -sys.path.insert(0, '/opt/aitbc/production/services') - -# Import blockchain manager and create FastAPI app -from mining_blockchain import MultiChainManager -from fastapi import FastAPI - -app = FastAPI(title='AITBC Blockchain HTTP API') - -@app.get('/health') -async def health(): - return {'status': 'ok', 'service': 'blockchain-http', 'port': 8005} - -@app.get('/info') -async def info(): - manager = MultiChainManager() - return manager.get_all_chains_info() - -@app.get('/blocks') -async def blocks(): - manager = MultiChainManager() - return {'blocks': manager.get_all_chains_info()} - -if __name__ == '__main__': - import uvicorn - uvicorn.run( - app, - host='0.0.0.0', - port=int(os.getenv('BLOCKCHAIN_HTTP_PORT', 8005)), - log_level='info' - ) diff --git a/production/services/blockchain_simple.py b/production/services/blockchain_simple.py deleted file mode 100644 index e037d6b0..00000000 --- a/production/services/blockchain_simple.py +++ /dev/null @@ -1,270 +0,0 @@ -#!/usr/bin/env python3 -""" -Production Blockchain Service - Simplified -Working blockchain implementation with persistence -""" - -import os -import sys -import json -import time -import logging -from pathlib import Path -from datetime import datetime -import hashlib - -# Production logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - handlers=[ - logging.FileHandler('/var/log/aitbc/production/blockchain/blockchain.log'), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -class Block: - """Simple block implementation""" - - def __init__(self, index: int, data: dict, previous_hash: str): - self.index = index - self.timestamp = time.time() - self.data = data - self.previous_hash = previous_hash - self.hash = self.calculate_hash() - - def calculate_hash(self) -> str: - """Calculate block hash""" - content = f"{self.index}{self.timestamp}{json.dumps(self.data, sort_keys=True)}{self.previous_hash}" - return hashlib.sha256(content.encode()).hexdigest() - - def to_dict(self) -> dict: - """Convert block to dictionary""" - return { - 'index': self.index, - 'timestamp': self.timestamp, - 'data': self.data, - 'previous_hash': self.previous_hash, - 'hash': self.hash - } - -class Transaction: - """Simple transaction implementation""" - - def __init__(self, from_address: str, to_address: str, amount: float, data: dict = None): - self.from_address = from_address - self.to_address = to_address - self.amount = amount - self.data = data or {} - self.timestamp = time.time() - self.tx_hash = self.calculate_hash() - - def calculate_hash(self) -> str: - """Calculate transaction hash""" - content = f"{self.from_address}{self.to_address}{self.amount}{json.dumps(self.data, sort_keys=True)}{self.timestamp}" - return hashlib.sha256(content.encode()).hexdigest() - - def to_dict(self) -> dict: - """Convert transaction to dictionary""" - return { - 'from_address': self.from_address, - 'to_address': self.to_address, - 'amount': self.amount, - 'data': self.data, - 'timestamp': self.timestamp, - 'tx_hash': self.tx_hash - } - -class ProductionBlockchain: - """Production-grade blockchain implementation""" - - def __init__(self, node_id: str): - self.node_id = node_id - self.data_dir = Path(f'/var/lib/aitbc/data/blockchain/{node_id}') - self.data_dir.mkdir(parents=True, exist_ok=True) - - # Initialize blockchain - self.chain = [] - self.pending_transactions = [] - self.balances = {} - - # Load existing data if available - self._load_blockchain() - - # Create genesis block if empty - if not self.chain: - self._create_genesis_block() - - logger.info(f"Production blockchain initialized for node: {node_id}") - - def _create_genesis_block(self): - """Create genesis block""" - genesis_data = { - 'type': 'genesis', - 'node_id': self.node_id, - 'message': 'AITBC Production Blockchain Genesis Block', - 'timestamp': time.time() - } - - genesis_block = Block(0, genesis_data, '0') - self.chain.append(genesis_block) - self._save_blockchain() - - logger.info("Genesis block created") - - def _load_blockchain(self): - """Load existing blockchain data""" - chain_file = self.data_dir / 'blockchain.json' - balances_file = self.data_dir / 'balances.json' - - try: - if chain_file.exists(): - with open(chain_file, 'r') as f: - data = json.load(f) - - # Load blocks - self.chain = [] - for block_data in data.get('blocks', []): - block = Block( - block_data['index'], - block_data['data'], - block_data['previous_hash'] - ) - block.hash = block_data['hash'] - block.timestamp = block_data['timestamp'] - self.chain.append(block) - - logger.info(f"Loaded {len(self.chain)} blocks") - - if balances_file.exists(): - with open(balances_file, 'r') as f: - self.balances = json.load(f) - logger.info(f"Loaded balances for {len(self.balances)} addresses") - - except Exception as e: - logger.error(f"Failed to load blockchain: {e}") - - def _save_blockchain(self): - """Save blockchain state""" - try: - chain_file = self.data_dir / 'blockchain.json' - balances_file = self.data_dir / 'balances.json' - - # Save blocks - data = { - 'blocks': [block.to_dict() for block in self.chain], - 'last_updated': time.time(), - 'node_id': self.node_id - } - - with open(chain_file, 'w') as f: - json.dump(data, f, indent=2) - - # Save balances - with open(balances_file, 'w') as f: - json.dump(self.balances, f, indent=2) - - logger.debug(f"Blockchain saved to {chain_file}") - - except Exception as e: - logger.error(f"Failed to save blockchain: {e}") - - def create_transaction(self, from_address: str, to_address: str, amount: float, data: dict = None): - """Create and process a transaction""" - try: - transaction = Transaction(from_address, to_address, amount, data) - - # Add to pending transactions - self.pending_transactions.append(transaction) - - # Process transaction (simplified - no validation for demo) - self._process_transaction(transaction) - - # Create new block if we have enough transactions - if len(self.pending_transactions) >= 1: # Create block for each transaction in production - self._create_block() - - logger.info(f"Transaction processed: {transaction.tx_hash}") - return transaction.tx_hash - - except Exception as e: - logger.error(f"Failed to create transaction: {e}") - raise - - def _process_transaction(self, transaction: Transaction): - """Process a transaction""" - # Initialize balances if needed - if transaction.from_address not in self.balances: - self.balances[transaction.from_address] = 10000.0 # Initial balance - if transaction.to_address not in self.balances: - self.balances[transaction.to_address] = 0.0 - - # Check balance (simplified) - if self.balances[transaction.from_address] >= transaction.amount: - self.balances[transaction.from_address] -= transaction.amount - self.balances[transaction.to_address] += transaction.amount - logger.info(f"Transferred {transaction.amount} from {transaction.from_address} to {transaction.to_address}") - else: - logger.warning(f"Insufficient balance for {transaction.from_address}") - - def _create_block(self): - """Create a new block""" - if not self.pending_transactions: - return - - previous_hash = self.chain[-1].hash if self.chain else '0' - - block_data = { - 'transactions': [tx.to_dict() for tx in self.pending_transactions], - 'node_id': self.node_id, - 'block_reward': 10.0 - } - - new_block = Block(len(self.chain), block_data, previous_hash) - self.chain.append(new_block) - - # Clear pending transactions - self.pending_transactions.clear() - - # Save blockchain - self._save_blockchain() - - logger.info(f"Block {new_block.index} created") - - def get_balance(self, address: str) -> float: - """Get balance for address""" - return self.balances.get(address, 0.0) - - def get_blockchain_info(self) -> dict: - """Get blockchain information""" - return { - 'node_id': self.node_id, - 'blocks': len(self.chain), - 'pending_transactions': len(self.pending_transactions), - 'total_addresses': len(self.balances), - 'last_block': self.chain[-1].to_dict() if self.chain else None, - 'total_balance': sum(self.balances.values()) - } - -if __name__ == '__main__': - node_id = os.getenv('NODE_ID', 'aitbc') - blockchain = ProductionBlockchain(node_id) - - # Example transaction - try: - tx_hash = blockchain.create_transaction( - from_address='0xuser1', - to_address='0xuser2', - amount=100.0, - data={'type': 'payment', 'description': 'Production test transaction'} - ) - print(f"Transaction created: {tx_hash}") - - # Print blockchain info - info = blockchain.get_blockchain_info() - print(f"Blockchain info: {info}") - - except Exception as e: - logger.error(f"Production blockchain error: {e}") - sys.exit(1) diff --git a/production/services/gpu_marketplace_launcher.py b/production/services/gpu_marketplace_launcher.py deleted file mode 100755 index 6b759d28..00000000 --- a/production/services/gpu_marketplace_launcher.py +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python3 -""" -GPU Marketplace Service Launcher -""" - -import os -import sys - -# Add production services to path -sys.path.insert(0, '/opt/aitbc/production/services') - -# Import and run the marketplace app -from marketplace import app -import uvicorn - -# Run the app -uvicorn.run( - app, - host='0.0.0.0', - port=int(os.getenv('GPU_MARKETPLACE_PORT', 8003)), - log_level='info' -) diff --git a/production/services/marketplace.py b/production/services/marketplace.py deleted file mode 100755 index 0a383566..00000000 --- a/production/services/marketplace.py +++ /dev/null @@ -1,420 +0,0 @@ -#!/usr/bin/env python3 -""" -Production Marketplace Service -Real marketplace with database persistence and API -""" - -import os -import sys -import json -import time -import logging -from pathlib import Path -from datetime import datetime -from typing import Dict, List, Optional - -sys.path.insert(0, '/opt/aitbc/apps/coordinator-api/src') - -from fastapi import FastAPI, HTTPException -from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel -import uvicorn - -# Production logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - handlers=[ - logging.FileHandler('/var/log/aitbc/production/marketplace/marketplace.log'), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -# Pydantic models -class GPUListing(BaseModel): - id: str - provider: str - gpu_type: str - memory_gb: int - price_per_hour: float - status: str - specs: dict - -class Bid(BaseModel): - id: str - gpu_id: str - agent_id: str - bid_price: float - duration_hours: int - total_cost: float - status: str - -class ProductionMarketplace: - """Production-grade marketplace with persistence""" - - def __init__(self): - self.data_dir = Path("/var/lib/aitbc/data/marketplace") - self.data_dir.mkdir(parents=True, exist_ok=True) - - # Load existing data - self._load_data() - - logger.info("Production marketplace initialized") - - def _load_data(self): - """Load marketplace data from disk""" - self.gpu_listings = {} - self.bids = {} - - listings_file = self.data_dir / 'gpu_listings.json' - bids_file = self.data_dir / 'bids.json' - - try: - if listings_file.exists(): - with open(listings_file, 'r') as f: - self.gpu_listings = json.load(f) - - if bids_file.exists(): - with open(bids_file, 'r') as f: - self.bids = json.load(f) - - logger.info(f"Loaded {len(self.gpu_listings)} GPU listings and {len(self.bids)} bids") - - except Exception as e: - logger.error(f"Failed to load marketplace data: {e}") - - def _save_data(self): - """Save marketplace data to disk""" - try: - listings_file = self.data_dir / 'gpu_listings.json' - bids_file = self.data_dir / 'bids.json' - - with open(listings_file, 'w') as f: - json.dump(self.gpu_listings, f, indent=2) - - with open(bids_file, 'w') as f: - json.dump(self.bids, f, indent=2) - - logger.debug("Marketplace data saved") - - except Exception as e: - logger.error(f"Failed to save marketplace data: {e}") - - def add_gpu_listing(self, listing: dict) -> str: - """Add a new GPU listing""" - try: - gpu_id = f"gpu_{int(time.time())}_{len(self.gpu_listings)}" - listing['id'] = gpu_id - listing['created_at'] = time.time() - listing['status'] = 'available' - - self.gpu_listings[gpu_id] = listing - self._save_data() - - logger.info(f"GPU listing added: {gpu_id}") - return gpu_id - - except Exception as e: - logger.error(f"Failed to add GPU listing: {e}") - raise - - def create_bid(self, bid_data: dict) -> str: - """Create a new bid""" - try: - bid_id = f"bid_{int(time.time())}_{len(self.bids)}" - bid_data['id'] = bid_id - bid_data['created_at'] = time.time() - bid_data['status'] = 'pending' - - self.bids[bid_id] = bid_data - self._save_data() - - logger.info(f"Bid created: {bid_id}") - return bid_id - - except Exception as e: - logger.error(f"Failed to create bid: {e}") - raise - - def get_marketplace_stats(self) -> dict: - """Get marketplace statistics""" - return { - 'total_gpus': len(self.gpu_listings), - 'available_gpus': len([g for g in self.gpu_listings.values() if g['status'] == 'available']), - 'total_bids': len(self.bids), - 'pending_bids': len([b for b in self.bids.values() if b['status'] == 'pending']), - 'total_value': sum(b['total_cost'] for b in self.bids.values()) - } - -# Initialize marketplace -marketplace = ProductionMarketplace() - -# FastAPI app -app = FastAPI( - title="AITBC Production Marketplace", - version="1.0.0", - description="Production-grade GPU marketplace" -) - -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["GET", "POST", "PUT", "DELETE"], - allow_headers=["*"], -) - -@app.get("/health") -async def health(): - """Health check endpoint""" - return { - "status": "healthy", - "service": "production-marketplace", - "timestamp": datetime.utcnow().isoformat(), - "stats": marketplace.get_marketplace_stats() - } - -@app.post("/gpu/listings") -async def add_gpu_listing(listing: dict): - """Add a new GPU listing""" - try: - gpu_id = marketplace.add_gpu_listing(listing) - return {"gpu_id": gpu_id, "status": "created"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/bids") -async def create_bid(bid: dict): - """Create a new bid""" - try: - bid_id = marketplace.create_bid(bid) - return {"bid_id": bid_id, "status": "created"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.get("/stats") -async def get_stats(): - """Get marketplace statistics""" - return marketplace.get_marketplace_stats() - -@app.get("/ai/services") -@app.post("/ai/execute") - - -# AI Marketplace Endpoints -@app.get("/ai/services") -async def get_ai_services(): - """Get AI services including OpenClaw""" - default_services = [ - { - 'id': 'ollama-llama2-7b', - 'name': 'Ollama Llama2 7B', - 'type': 'ollama_inference', - 'capabilities': ['text_generation', 'chat', 'completion'], - 'price_per_task': 3.0, - 'provider': 'Ollama', - 'status': 'available' - }, - { - 'id': 'ollama-llama2-13b', - 'name': 'Ollama Llama2 13B', - 'type': 'ollama_inference', - 'capabilities': ['text_generation', 'chat', 'completion', 'analysis'], - 'price_per_task': 5.0, - 'provider': 'Ollama', - 'status': 'available' - } - ] - - # Add OpenClaw services if available - try: - from openclaw_ai import OpenClawAIService - openclaw_service = OpenClawAIService() - agents = openclaw_service.get_agents_info() - - for agent in agents['agents']: - default_services.append({ - 'id': f"openclaw-{agent['id']}", - 'name': agent['name'], - 'type': 'openclaw_ai', - 'capabilities': agent['capabilities'], - 'price_per_task': agent['price_per_task'], - 'provider': 'OpenClaw AI', - 'status': 'available' - }) - except Exception as e: - print(f"OpenClaw integration failed: {e}") - - return { - 'total_services': len(default_services), - 'services': default_services - } - -@app.post("/ai/execute") -async def execute_ai_task(request: dict): - """Execute AI task""" - service_id = request.get('service_id') - task_data = request.get('task_data', {}) - - try: - # Handle OpenClaw services - if service_id.startswith('openclaw-'): - from openclaw_ai import OpenClawAIService - openclaw_service = OpenClawAIService() - agent_id = service_id.replace('openclaw-', '') - result = openclaw_service.execute_task(agent_id, task_data) - - return { - 'task_id': result.get('task_id'), - 'status': result.get('status'), - 'result': result.get('result'), - 'service_id': service_id, - 'execution_time': result.get('execution_time') - } - - # Handle Ollama services - elif service_id.startswith('ollama-'): - import time - import asyncio - await asyncio.sleep(1) # Simulate processing - - model = service_id.replace('ollama-', '').replace('-', ' ') - prompt = task_data.get('prompt', 'No prompt') - - result = f"Ollama {model} Response: {prompt}" - - return { - 'task_id': f"task_{int(time.time())}", - 'status': 'completed', - 'result': result, - 'service_id': service_id, - 'model': model - } - - else: - return { - 'task_id': f"task_{int(time.time())}", - 'status': 'failed', - 'error': f"Unknown service: {service_id}" - } - - except Exception as e: - return { - 'task_id': f"task_{int(time.time())}", - 'status': 'failed', - 'error': str(e) - } - -@app.get("/unified/stats") -async def get_unified_stats(): - """Get unified marketplace stats""" - gpu_stats = marketplace.get_marketplace_stats() - ai_services = await get_ai_services() - - return { - 'gpu_marketplace': gpu_stats, - 'ai_marketplace': { - 'total_services': ai_services['total_services'], - 'available_services': len([s for s in ai_services['services'] if s['status'] == 'available']) - }, - 'total_listings': gpu_stats['total_gpus'] + ai_services['total_services'] - } - -if __name__ == '__main__': - uvicorn.run( - app, - host="0.0.0.0", - port=int(os.getenv('MARKETPLACE_PORT', 8002)), - workers=int(os.getenv('WORKERS', 4)), - log_level="info" - ) - -# AI Marketplace Extension -try: - sys.path.insert(0, '/opt/aitbc/production/services') - from openclaw_ai import OpenClawAIService - OPENCLAW_AVAILABLE = True -except ImportError: - OPENCLAW_AVAILABLE = False - -# Add AI services to marketplace -async def get_ai_services(): - """Get AI services (simplified for merger)""" - default_services = [ - { - 'id': 'ollama-llama2-7b', - 'name': 'Ollama Llama2 7B', - 'type': 'ollama_inference', - 'capabilities': ['text_generation', 'chat', 'completion'], - 'price_per_task': 3.0, - 'provider': 'Ollama', - 'status': 'available' - }, - { - 'id': 'ollama-llama2-13b', - 'name': 'Ollama Llama2 13B', - 'type': 'ollama_inference', - 'capabilities': ['text_generation', 'chat', 'completion', 'analysis'], - 'price_per_task': 5.0, - 'provider': 'Ollama', - 'status': 'available' - } - ] - - if OPENCLAW_AVAILABLE: - try: - openclaw_service = OpenClawAIService() - agents = openclaw_service.get_agents_info() - for agent in agents['agents']: - default_services.append({ - 'id': f"ai_{agent['id']}", - 'name': agent['name'], - 'type': 'openclaw_ai', - 'capabilities': agent['capabilities'], - 'price_per_task': agent['price_per_task'], - 'provider': 'OpenClaw AI', - 'status': 'available' - }) - except Exception as e: - print(f"OpenClaw integration failed: {e}") - - return { - 'total_services': len(default_services), - 'services': default_services - } - -async def execute_ai_task(request: dict): - """Execute AI task (simplified)""" - service_id = request.get('service_id') - task_data = request.get('task_data', {}) - - # Simulate AI task execution - await asyncio.sleep(2) # Simulate processing - - result = f"AI task executed for service {service_id}. Task data: {task_data.get('prompt', 'No prompt')}" - - return { - 'task_id': f"task_{int(time.time())}", - 'status': 'completed', - 'result': result, - 'service_id': service_id - } - -@app.get("/unified/stats") -async def get_unified_stats(): - """Get unified marketplace stats""" - gpu_stats = marketplace.get_marketplace_stats() - ai_services = await get_ai_services() - - return { - 'gpu_marketplace': gpu_stats, - 'ai_marketplace': { - 'total_services': ai_services['total_services'], - 'available_services': len([s for s in ai_services['services'] if s['status'] == 'available']) - }, - 'total_listings': gpu_stats['total_gpus'] + ai_services['total_services'] - } - -import asyncio -import time diff --git a/production/services/marketplace_original.py b/production/services/marketplace_original.py deleted file mode 100755 index 288ebdc1..00000000 --- a/production/services/marketplace_original.py +++ /dev/null @@ -1,208 +0,0 @@ -#!/usr/bin/env python3 -""" -Production Marketplace Service -Real marketplace with database persistence and API -""" - -import os -import sys -import json -import time -import logging -from pathlib import Path -from datetime import datetime -from typing import Dict, List, Optional - -sys.path.insert(0, '/opt/aitbc/apps/coordinator-api/src') - -from fastapi import FastAPI, HTTPException -from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel -import uvicorn - -# Production logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - handlers=[ - logging.FileHandler('/var/log/aitbc/production/marketplace/marketplace.log'), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -# Pydantic models -class GPUListing(BaseModel): - id: str - provider: str - gpu_type: str - memory_gb: int - price_per_hour: float - status: str - specs: dict - -class Bid(BaseModel): - id: str - gpu_id: str - agent_id: str - bid_price: float - duration_hours: int - total_cost: float - status: str - -class ProductionMarketplace: - """Production-grade marketplace with persistence""" - - def __init__(self): - self.data_dir = Path('/var/lib/aitbc/data/marketplace') - self.data_dir.mkdir(parents=True, exist_ok=True) - - # Load existing data - self._load_data() - - logger.info("Production marketplace initialized") - - def _load_data(self): - """Load marketplace data from disk""" - self.gpu_listings = {} - self.bids = {} - - listings_file = self.data_dir / 'gpu_listings.json' - bids_file = self.data_dir / 'bids.json' - - try: - if listings_file.exists(): - with open(listings_file, 'r') as f: - self.gpu_listings = json.load(f) - - if bids_file.exists(): - with open(bids_file, 'r') as f: - self.bids = json.load(f) - - logger.info(f"Loaded {len(self.gpu_listings)} GPU listings and {len(self.bids)} bids") - - except Exception as e: - logger.error(f"Failed to load marketplace data: {e}") - - def _save_data(self): - """Save marketplace data to disk""" - try: - listings_file = self.data_dir / 'gpu_listings.json' - bids_file = self.data_dir / 'bids.json' - - with open(listings_file, 'w') as f: - json.dump(self.gpu_listings, f, indent=2) - - with open(bids_file, 'w') as f: - json.dump(self.bids, f, indent=2) - - logger.debug("Marketplace data saved") - - except Exception as e: - logger.error(f"Failed to save marketplace data: {e}") - - def add_gpu_listing(self, listing: dict) -> str: - """Add a new GPU listing""" - try: - gpu_id = f"gpu_{int(time.time())}_{len(self.gpu_listings)}" - listing['id'] = gpu_id - listing['created_at'] = time.time() - listing['status'] = 'available' - - self.gpu_listings[gpu_id] = listing - self._save_data() - - logger.info(f"GPU listing added: {gpu_id}") - return gpu_id - - except Exception as e: - logger.error(f"Failed to add GPU listing: {e}") - raise - - def create_bid(self, bid_data: dict) -> str: - """Create a new bid""" - try: - bid_id = f"bid_{int(time.time())}_{len(self.bids)}" - bid_data['id'] = bid_id - bid_data['created_at'] = time.time() - bid_data['status'] = 'pending' - - self.bids[bid_id] = bid_data - self._save_data() - - logger.info(f"Bid created: {bid_id}") - return bid_id - - except Exception as e: - logger.error(f"Failed to create bid: {e}") - raise - - def get_marketplace_stats(self) -> dict: - """Get marketplace statistics""" - return { - 'total_gpus': len(self.gpu_listings), - 'available_gpus': len([g for g in self.gpu_listings.values() if g['status'] == 'available']), - 'total_bids': len(self.bids), - 'pending_bids': len([b for b in self.bids.values() if b['status'] == 'pending']), - 'total_value': sum(b['total_cost'] for b in self.bids.values()) - } - -# Initialize marketplace -marketplace = ProductionMarketplace() - -# FastAPI app -app = FastAPI( - title="AITBC Production Marketplace", - version="1.0.0", - description="Production-grade GPU marketplace" -) - -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["GET", "POST", "PUT", "DELETE"], - allow_headers=["*"], -) - -@app.get("/health") -async def health(): - """Health check endpoint""" - return { - "status": "healthy", - "service": "production-marketplace", - "timestamp": datetime.utcnow().isoformat(), - "stats": marketplace.get_marketplace_stats() - } - -@app.post("/gpu/listings") -async def add_gpu_listing(listing: dict): - """Add a new GPU listing""" - try: - gpu_id = marketplace.add_gpu_listing(listing) - return {"gpu_id": gpu_id, "status": "created"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/bids") -async def create_bid(bid: dict): - """Create a new bid""" - try: - bid_id = marketplace.create_bid(bid) - return {"bid_id": bid_id, "status": "created"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.get("/stats") -async def get_stats(): - """Get marketplace statistics""" - return marketplace.get_marketplace_stats() - -if __name__ == '__main__': - uvicorn.run( - app, - host="0.0.0.0", - port=int(os.getenv('MARKETPLACE_PORT', 8002)), - workers=int(os.getenv('WORKERS', 4)), - log_level="info" - ) diff --git a/production/services/mining_blockchain.py b/production/services/mining_blockchain.py deleted file mode 100755 index 220da974..00000000 --- a/production/services/mining_blockchain.py +++ /dev/null @@ -1,322 +0,0 @@ -#!/usr/bin/env python3 -""" -Real Blockchain with Mining and Multi-Chain Support -""" - -import os -import sys -import json -import time -import hashlib -import logging -from pathlib import Path -from datetime import datetime -from typing import Dict, List, Optional -import threading - -# Production logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - handlers=[ - logging.FileHandler('/var/log/aitbc/production/blockchain/mining.log'), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -class ProofOfWork: - """Real Proof of Work mining algorithm""" - - def __init__(self, difficulty: int = 4): - self.difficulty = difficulty - self.target = "0" * difficulty - - def mine(self, block_data: dict) -> tuple: - """Mine a block with real proof of work""" - nonce = 0 - start_time = time.time() - - while True: - # Create block hash with nonce - content = f"{json.dumps(block_data, sort_keys=True)}{nonce}" - block_hash = hashlib.sha256(content.encode()).hexdigest() - - # Check if hash meets difficulty - if block_hash.startswith(self.target): - mining_time = time.time() - start_time - logger.info(f"Block mined! Nonce: {nonce}, Hash: {block_hash[:16]}..., Time: {mining_time:.2f}s") - return block_hash, nonce, mining_time - - nonce += 1 - - # Prevent infinite loop - if nonce > 10000000: - raise Exception("Mining failed - nonce too high") - -class MultiChainManager: - """Multi-chain blockchain manager""" - - def __init__(self): - self.chains = {} - self.miners = {} - self.node_id = os.getenv('NODE_ID', 'aitbc') - self.data_dir = Path(f'/var/lib/aitbc/data/blockchain/{self.node_id}') - self.data_dir.mkdir(parents=True, exist_ok=True) - - # Initialize multiple chains - self._initialize_chains() - - logger.info(f"Multi-chain manager initialized for node: {self.node_id}") - - def _initialize_chains(self): - """Initialize multiple blockchain chains""" - chains_config = [ - { - 'name': 'aitbc-main', - 'difficulty': 4, - 'block_reward': 50.0, - 'description': 'Main AITBC blockchain' - }, - { - 'name': 'aitbc-gpu', - 'difficulty': 3, - 'block_reward': 25.0, - 'description': 'GPU computing blockchain' - } - ] - - for chain_config in chains_config: - chain_name = chain_config['name'] - self.chains[chain_name] = { - 'name': chain_name, - 'blocks': [], - 'difficulty': chain_config['difficulty'], - 'block_reward': chain_config['block_reward'], - 'description': chain_config['description'], - 'pending_transactions': [], - 'balances': {}, - 'mining_stats': { - 'blocks_mined': 0, - 'total_mining_time': 0, - 'average_mining_time': 0 - } - } - - # Create miner for this chain - self.miners[chain_name] = ProofOfWork(chain_config['difficulty']) - - # Load existing chain data - self._load_chain(chain_name) - - # Create genesis block if empty - if not self.chains[chain_name]['blocks']: - self._create_genesis_block(chain_name) - - logger.info(f"Chain {chain_name} initialized with {len(self.chains[chain_name]['blocks'])} blocks") - - def _load_chain(self, chain_name: str): - """Load existing chain data""" - chain_file = self.data_dir / f'{chain_name}.json' - - try: - if chain_file.exists(): - with open(chain_file, 'r') as f: - data = json.load(f) - - self.chains[chain_name] = data - logger.info(f"Loaded chain {chain_name} with {len(data.get('blocks', []))} blocks") - - except Exception as e: - logger.error(f"Failed to load chain {chain_name}: {e}") - - def _save_chain(self, chain_name: str): - """Save chain data""" - try: - chain_file = self.data_dir / f'{chain_name}.json' - - with open(chain_file, 'w') as f: - json.dump(self.chains[chain_name], f, indent=2) - - logger.debug(f"Chain {chain_name} saved") - - except Exception as e: - logger.error(f"Failed to save chain {chain_name}: {e}") - - def _create_genesis_block(self, chain_name: str): - """Create genesis block for chain""" - chain = self.chains[chain_name] - - genesis_data = { - 'index': 0, - 'timestamp': time.time(), - 'data': { - 'type': 'genesis', - 'chain': chain_name, - 'node_id': self.node_id, - 'description': chain['description'], - 'block_reward': chain['block_reward'] - }, - 'previous_hash': '0', - 'nonce': 0 - } - - # Mine genesis block - block_hash, nonce, mining_time = self.miners[chain_name].mine(genesis_data) - - genesis_block = { - 'index': 0, - 'timestamp': genesis_data['timestamp'], - 'data': genesis_data['data'], - 'previous_hash': '0', - 'hash': block_hash, - 'nonce': nonce, - 'mining_time': mining_time, - 'miner': self.node_id - } - - chain['blocks'].append(genesis_block) - chain['mining_stats']['blocks_mined'] = 1 - chain['mining_stats']['total_mining_time'] = mining_time - chain['mining_stats']['average_mining_time'] = mining_time - - # Initialize miner balance with block reward - chain['balances'][f'miner_{self.node_id}'] = chain['block_reward'] - - self._save_chain(chain_name) - - logger.info(f"Genesis block created for {chain_name} - Reward: {chain['block_reward']} AITBC") - - def mine_block(self, chain_name: str, transactions: List[dict] = None) -> dict: - """Mine a new block on specified chain""" - if chain_name not in self.chains: - raise Exception(f"Chain {chain_name} not found") - - chain = self.chains[chain_name] - - # Prepare block data - block_data = { - 'index': len(chain['blocks']), - 'timestamp': time.time(), - 'data': { - 'transactions': transactions or [], - 'chain': chain_name, - 'node_id': self.node_id - }, - 'previous_hash': chain['blocks'][-1]['hash'] if chain['blocks'] else '0' - } - - # Mine the block - block_hash, nonce, mining_time = self.miners[chain_name].mine(block_data) - - # Create block - new_block = { - 'index': block_data['index'], - 'timestamp': block_data['timestamp'], - 'data': block_data['data'], - 'previous_hash': block_data['previous_hash'], - 'hash': block_hash, - 'nonce': nonce, - 'mining_time': mining_time, - 'miner': self.node_id, - 'transactions_count': len(transactions or []) - } - - # Add to chain - chain['blocks'].append(new_block) - - # Update mining stats - chain['mining_stats']['blocks_mined'] += 1 - chain['mining_stats']['total_mining_time'] += mining_time - chain['mining_stats']['average_mining_time'] = ( - chain['mining_stats']['total_mining_time'] / chain['mining_stats']['blocks_mined'] - ) - - # Reward miner - miner_address = f'miner_{self.node_id}' - if miner_address not in chain['balances']: - chain['balances'][miner_address] = 0 - chain['balances'][miner_address] += chain['block_reward'] - - # Process transactions - for tx in transactions or []: - self._process_transaction(chain, tx) - - self._save_chain(chain_name) - - logger.info(f"Block mined on {chain_name} - Reward: {chain['block_reward']} AITBC") - - return new_block - - def _process_transaction(self, chain: dict, transaction: dict): - """Process a transaction""" - from_addr = transaction.get('from_address') - to_addr = transaction.get('to_address') - amount = transaction.get('amount', 0) - - # Initialize balances - if from_addr not in chain['balances']: - chain['balances'][from_addr] = 1000.0 # Initial balance - if to_addr not in chain['balances']: - chain['balances'][to_addr] = 0.0 - - # Process transaction - if chain['balances'][from_addr] >= amount: - chain['balances'][from_addr] -= amount - chain['balances'][to_addr] += amount - logger.info(f"Transaction processed: {amount} AITBC from {from_addr} to {to_addr}") - - def get_chain_info(self, chain_name: str) -> dict: - """Get chain information""" - if chain_name not in self.chains: - return {'error': f'Chain {chain_name} not found'} - - chain = self.chains[chain_name] - - return { - 'chain_name': chain_name, - 'blocks': len(chain['blocks']), - 'difficulty': chain['difficulty'], - 'block_reward': chain['block_reward'], - 'description': chain['description'], - 'mining_stats': chain['mining_stats'], - 'total_addresses': len(chain['balances']), - 'total_balance': sum(chain['balances'].values()), - 'latest_block': chain['blocks'][-1] if chain['blocks'] else None - } - - def get_all_chains_info(self) -> dict: - """Get information about all chains""" - return { - 'node_id': self.node_id, - 'total_chains': len(self.chains), - 'chains': {name: self.get_chain_info(name) for name in self.chains.keys()} - } - -if __name__ == '__main__': - # Initialize multi-chain manager - manager = MultiChainManager() - - # Mine blocks on all chains - for chain_name in manager.chains.keys(): - try: - # Create sample transactions - transactions = [ - { - 'from_address': f'user_{manager.node_id}', - 'to_address': f'user_other', - 'amount': 10.0, - 'data': {'type': 'payment'} - } - ] - - # Mine block - block = manager.mine_block(chain_name, transactions) - print(f"Mined block on {chain_name}: {block['hash'][:16]}...") - - except Exception as e: - logger.error(f"Failed to mine block on {chain_name}: {e}") - - # Print chain information - info = manager.get_all_chains_info() - print(f"Multi-chain info: {json.dumps(info, indent=2)}") diff --git a/production/services/openclaw_ai.py b/production/services/openclaw_ai.py deleted file mode 100755 index 3fbfa541..00000000 --- a/production/services/openclaw_ai.py +++ /dev/null @@ -1,357 +0,0 @@ -#!/usr/bin/env python3 -""" -OpenClaw AI Service Integration -Real AI agent system with marketplace integration -""" - -import os -import sys -import json -import time -import logging -import subprocess -from pathlib import Path -from datetime import datetime -from typing import Dict, List, Optional - -# Production logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - handlers=[ - logging.FileHandler('/var/lib/aitbc/data/logs/openclaw/openclaw.log'), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -class OpenClawAIService: - """Real OpenClaw AI service""" - - def __init__(self): - self.node_id = os.getenv('NODE_ID', 'aitbc') - self.data_dir = Path(f'/var/lib/aitbc/data/openclaw/{self.node_id}') - self.data_dir.mkdir(parents=True, exist_ok=True) - - # Initialize OpenClaw agents - self.agents = {} - self.tasks = {} - self.results = {} - - self._initialize_agents() - self._load_data() - - logger.info(f"OpenClaw AI service initialized for node: {self.node_id}") - - def _initialize_agents(self): - """Initialize OpenClaw AI agents""" - agents_config = [ - { - 'id': 'openclaw-text-gen', - 'name': 'OpenClaw Text Generator', - 'capabilities': ['text_generation', 'creative_writing', 'content_creation'], - 'model': 'llama2-7b', - 'price_per_task': 5.0, - 'status': 'active' - }, - { - 'id': 'openclaw-research', - 'name': 'OpenClaw Research Agent', - 'capabilities': ['research', 'analysis', 'data_processing'], - 'model': 'llama2-13b', - 'price_per_task': 10.0, - 'status': 'active' - }, - { - 'id': 'openclaw-trading', - 'name': 'OpenClaw Trading Bot', - 'capabilities': ['trading', 'market_analysis', 'prediction'], - 'model': 'custom-trading', - 'price_per_task': 15.0, - 'status': 'active' - } - ] - - for agent_config in agents_config: - self.agents[agent_config['id']] = { - **agent_config, - 'node_id': self.node_id, - 'created_at': time.time(), - 'tasks_completed': 0, - 'total_earnings': 0.0, - 'rating': 5.0 - } - - def _load_data(self): - """Load existing data""" - try: - # Load agents - agents_file = self.data_dir / 'agents.json' - if agents_file.exists(): - with open(agents_file, 'r') as f: - self.agents = json.load(f) - - # Load tasks - tasks_file = self.data_dir / 'tasks.json' - if tasks_file.exists(): - with open(tasks_file, 'r') as f: - self.tasks = json.load(f) - - # Load results - results_file = self.data_dir / 'results.json' - if results_file.exists(): - with open(results_file, 'r') as f: - self.results = json.load(f) - - logger.info(f"Loaded {len(self.agents)} agents, {len(self.tasks)} tasks, {len(self.results)} results") - - except Exception as e: - logger.error(f"Failed to load data: {e}") - - def _save_data(self): - """Save data""" - try: - with open(self.data_dir / 'agents.json', 'w') as f: - json.dump(self.agents, f, indent=2) - - with open(self.data_dir / 'tasks.json', 'w') as f: - json.dump(self.tasks, f, indent=2) - - with open(self.data_dir / 'results.json', 'w') as f: - json.dump(self.results, f, indent=2) - - logger.debug("OpenClaw data saved") - - except Exception as e: - logger.error(f"Failed to save data: {e}") - - def execute_task(self, agent_id: str, task_data: dict) -> dict: - """Execute a task with OpenClaw agent""" - if agent_id not in self.agents: - raise Exception(f"Agent {agent_id} not found") - - agent = self.agents[agent_id] - - # Create task - task_id = f"task_{int(time.time())}_{len(self.tasks)}" - task = { - 'id': task_id, - 'agent_id': agent_id, - 'agent_name': agent['name'], - 'task_type': task_data.get('type', 'text_generation'), - 'prompt': task_data.get('prompt', ''), - 'parameters': task_data.get('parameters', {}), - 'status': 'executing', - 'created_at': time.time(), - 'node_id': self.node_id - } - - self.tasks[task_id] = task - - # Execute task with OpenClaw - try: - result = self._execute_openclaw_task(agent, task) - - # Update task and agent - task['status'] = 'completed' - task['completed_at'] = time.time() - task['result'] = result - - agent['tasks_completed'] += 1 - agent['total_earnings'] += agent['price_per_task'] - - # Store result - self.results[task_id] = result - - self._save_data() - - logger.info(f"Task {task_id} completed by {agent['name']}") - - return { - 'task_id': task_id, - 'status': 'completed', - 'result': result, - 'agent': agent['name'], - 'execution_time': task['completed_at'] - task['created_at'] - } - - except Exception as e: - task['status'] = 'failed' - task['error'] = str(e) - task['failed_at'] = time.time() - - self._save_data() - - logger.error(f"Task {task_id} failed: {e}") - - return { - 'task_id': task_id, - 'status': 'failed', - 'error': str(e) - } - - def _execute_openclaw_task(self, agent: dict, task: dict) -> dict: - """Execute task with OpenClaw""" - task_type = task['task_type'] - prompt = task['prompt'] - - # Simulate OpenClaw execution - if task_type == 'text_generation': - return self._generate_text(agent, prompt) - elif task_type == 'research': - return self._perform_research(agent, prompt) - elif task_type == 'trading': - return self._analyze_trading(agent, prompt) - else: - raise Exception(f"Unsupported task type: {task_type}") - - def _generate_text(self, agent: dict, prompt: str) -> dict: - """Generate text with OpenClaw""" - # Simulate text generation - time.sleep(2) # Simulate processing time - - result = f""" -OpenClaw {agent['name']} Generated Text: - -{prompt} - -This is a high-quality text generation response from OpenClaw AI agent {agent['name']}. -The agent uses the {agent['model']} model to generate creative and coherent text based on the provided prompt. - -Generated at: {datetime.utcnow().isoformat()} -Node: {self.node_id} - """.strip() - - return { - 'type': 'text_generation', - 'content': result, - 'word_count': len(result.split()), - 'model_used': agent['model'], - 'quality_score': 0.95 - } - - def _perform_research(self, agent: dict, query: str) -> dict: - """Perform research with OpenClaw""" - # Simulate research - time.sleep(3) # Simulate processing time - - result = f""" -OpenClaw {agent['name']} Research Results: - -Query: {query} - -Research Findings: -1. Comprehensive analysis of the query has been completed -2. Multiple relevant sources have been analyzed -3. Key insights and patterns have been identified -4. Recommendations have been formulated based on the research - -The research leverages advanced AI capabilities of the {agent['model']} model to provide accurate and insightful analysis. - -Research completed at: {datetime.utcnow().isoformat()} -Node: {self.node_id} - """.strip() - - return { - 'type': 'research', - 'content': result, - 'sources_analyzed': 15, - 'confidence_score': 0.92, - 'model_used': agent['model'] - } - - def _analyze_trading(self, agent: dict, market_data: str) -> dict: - """Analyze trading with OpenClaw""" - # Simulate trading analysis - time.sleep(4) # Simulate processing time - - result = f""" -OpenClaw {agent['name']} Trading Analysis: - -Market Data: {market_data} - -Trading Analysis: -1. Market trend analysis indicates bullish sentiment -2. Technical indicators suggest upward momentum -3. Risk assessment: Moderate volatility expected -4. Trading recommendation: Consider long position with stop-loss - -The analysis utilizes the specialized {agent['model']} trading model to provide actionable market insights. - -Analysis completed at: {datetime.utcnow().isoformat()} -Node: {self.node_id} - """.strip() - - return { - 'type': 'trading_analysis', - 'content': result, - 'market_sentiment': 'bullish', - 'confidence': 0.88, - 'risk_level': 'moderate', - 'model_used': agent['model'] - } - - def get_agents_info(self) -> dict: - """Get information about all agents""" - return { - 'node_id': self.node_id, - 'total_agents': len(self.agents), - 'active_agents': len([a for a in self.agents.values() if a['status'] == 'active']), - 'total_tasks_completed': sum(a['tasks_completed'] for a in self.agents.values()), - 'total_earnings': sum(a['total_earnings'] for a in self.agents.values()), - 'agents': list(self.agents.values()) - } - - def get_marketplace_listings(self) -> dict: - """Get marketplace listings for OpenClaw agents""" - listings = [] - - for agent in self.agents.values(): - if agent['status'] == 'active': - listings.append({ - 'agent_id': agent['id'], - 'agent_name': agent['name'], - 'capabilities': agent['capabilities'], - 'model': agent['model'], - 'price_per_task': agent['price_per_task'], - 'tasks_completed': agent['tasks_completed'], - 'rating': agent['rating'], - 'node_id': agent['node_id'] - }) - - return { - 'node_id': self.node_id, - 'total_listings': len(listings), - 'listings': listings - } - -if __name__ == '__main__': - # Initialize OpenClaw service - service = OpenClawAIService() - - # Execute sample tasks - sample_tasks = [ - { - 'agent_id': 'openclaw-text-gen', - 'type': 'text_generation', - 'prompt': 'Explain the benefits of decentralized AI networks', - 'parameters': {'max_length': 500} - }, - { - 'agent_id': 'openclaw-research', - 'type': 'research', - 'prompt': 'Analyze the current state of blockchain technology', - 'parameters': {'depth': 'comprehensive'} - } - ] - - for task in sample_tasks: - try: - result = service.execute_task(task['agent_id'], task) - print(f"Task completed: {result['task_id']} - {result['status']}") - except Exception as e: - logger.error(f"Task failed: {e}") - - # Print service info - info = service.get_agents_info() - print(f"OpenClaw service info: {json.dumps(info, indent=2)}") diff --git a/production/services/real_marketplace.py b/production/services/real_marketplace.py deleted file mode 100755 index d309cd6e..00000000 --- a/production/services/real_marketplace.py +++ /dev/null @@ -1,293 +0,0 @@ -#!/usr/bin/env python3 -""" -Real Marketplace with OpenClaw AI and Ollama Tasks -""" - -import os -import sys -import json -import time -import logging -from pathlib import Path -from datetime import datetime -from typing import Dict, List, Optional -from fastapi import FastAPI, HTTPException -from pydantic import BaseModel -import uvicorn - -# Import OpenClaw service -sys.path.insert(0, '/opt/aitbc/production/services') -from openclaw_ai import OpenClawAIService - -# Production logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - handlers=[ - logging.FileHandler('/var/log/aitbc/production/marketplace/real_marketplace.log'), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -class RealMarketplace: - """Real marketplace with AI services""" - - def __init__(self): - self.node_id = os.getenv('NODE_ID', 'aitbc') - self.data_dir = Path(f'/var/lib/aitbc/data/marketplace/{self.node_id}') - self.data_dir.mkdir(parents=True, exist_ok=True) - - # Initialize services - self.openclaw_service = OpenClawAIService() - - # Marketplace data - self.ai_services = {} - self.gpu_listings = {} - self.marketplace_stats = {} - - self._load_data() - self._initialize_ai_services() - - logger.info(f"Real marketplace initialized for node: {self.node_id}") - - def _load_data(self): - """Load marketplace data""" - try: - # Load AI services - services_file = self.data_dir / 'ai_services.json' - if services_file.exists(): - with open(services_file, 'r') as f: - self.ai_services = json.load(f) - - # Load GPU listings - gpu_file = self.data_dir / 'gpu_listings.json' - if gpu_file.exists(): - with open(gpu_file, 'r') as f: - self.gpu_listings = json.load(f) - - logger.info(f"Loaded {len(self.ai_services)} AI services, {len(self.gpu_listings)} GPU listings") - - except Exception as e: - logger.error(f"Failed to load marketplace data: {e}") - - def _save_data(self): - """Save marketplace data""" - try: - with open(self.data_dir / 'ai_services.json', 'w') as f: - json.dump(self.ai_services, f, indent=2) - - with open(self.data_dir / 'gpu_listings.json', 'w') as f: - json.dump(self.gpu_listings, f, indent=2) - - logger.debug("Marketplace data saved") - - except Exception as e: - logger.error(f"Failed to save marketplace data: {e}") - - def _initialize_ai_services(self): - """Initialize AI services from OpenClaw""" - openclaw_agents = self.openclaw_service.get_agents_info() - - for agent in openclaw_agents['agents']: - service_id = f"ai_{agent['id']}" - self.ai_services[service_id] = { - 'id': service_id, - 'name': agent['name'], - 'type': 'openclaw_ai', - 'capabilities': agent['capabilities'], - 'model': agent['model'], - 'price_per_task': agent['price_per_task'], - 'provider': 'OpenClaw AI', - 'node_id': self.node_id, - 'rating': agent['rating'], - 'tasks_completed': agent['tasks_completed'], - 'status': 'available', - 'created_at': time.time() - } - - # Add Ollama services - ollama_services = [ - { - 'id': 'ollama-llama2-7b', - 'name': 'Ollama Llama2 7B', - 'type': 'ollama_inference', - 'capabilities': ['text_generation', 'chat', 'completion'], - 'model': 'llama2-7b', - 'price_per_task': 3.0, - 'provider': 'Ollama', - 'node_id': self.node_id, - 'rating': 4.8, - 'tasks_completed': 0, - 'status': 'available', - 'created_at': time.time() - }, - { - 'id': 'ollama-llama2-13b', - 'name': 'Ollama Llama2 13B', - 'type': 'ollama_inference', - 'capabilities': ['text_generation', 'chat', 'completion', 'analysis'], - 'model': 'llama2-13b', - 'price_per_task': 5.0, - 'provider': 'Ollama', - 'node_id': self.node_id, - 'rating': 4.9, - 'tasks_completed': 0, - 'status': 'available', - 'created_at': time.time() - } - ] - - for service in ollama_services: - self.ai_services[service['id']] = service - - self._save_data() - logger.info(f"Initialized {len(self.ai_services)} AI services") - - def get_ai_services(self) -> dict: - """Get all AI services""" - return { - 'node_id': self.node_id, - 'total_services': len(self.ai_services), - 'available_services': len([s for s in self.ai_services.values() if s['status'] == 'available']), - 'services': list(self.ai_services.values()) - } - - def execute_ai_task(self, service_id: str, task_data: dict) -> dict: - """Execute an AI task""" - if service_id not in self.ai_services: - raise Exception(f"AI service {service_id} not found") - - service = self.ai_services[service_id] - - if service['type'] == 'openclaw_ai': - # Execute with OpenClaw - agent_id = service_id.replace('ai_', '') - result = self.openclaw_service.execute_task(agent_id, task_data) - - # Update service stats - service['tasks_completed'] += 1 - self._save_data() - - return result - - elif service['type'] == 'ollama_inference': - # Execute with Ollama - return self._execute_ollama_task(service, task_data) - - else: - raise Exception(f"Unsupported service type: {service['type']}") - - def _execute_ollama_task(self, service: dict, task_data: dict) -> dict: - """Execute task with Ollama""" - try: - # Simulate Ollama execution - model = service['model'] - prompt = task_data.get('prompt', '') - - # Simulate API call to Ollama - time.sleep(2) # Simulate processing time - - result = f""" -Ollama {model} Response: - -{prompt} - -This response is generated by the Ollama {model} model running on {self.node_id}. -The model provides high-quality text generation and completion capabilities. - -Generated at: {datetime.utcnow().isoformat()} -Model: {model} -Node: {self.node_id} - """.strip() - - # Update service stats - service['tasks_completed'] += 1 - self._save_data() - - return { - 'service_id': service['id'], - 'service_name': service['name'], - 'model_used': model, - 'response': result, - 'tokens_generated': len(result.split()), - 'execution_time': 2.0, - 'status': 'completed' - } - - except Exception as e: - logger.error(f"Ollama task failed: {e}") - return { - 'service_id': service['id'], - 'status': 'failed', - 'error': str(e) - } - - def get_marketplace_stats(self) -> dict: - """Get marketplace statistics""" - return { - 'node_id': self.node_id, - 'ai_services': { - 'total': len(self.ai_services), - 'available': len([s for s in self.ai_services.values() if s['status'] == 'available']), - 'total_tasks_completed': sum(s['tasks_completed'] for s in self.ai_services.values()) - }, - 'gpu_listings': { - 'total': len(self.gpu_listings), - 'available': len([g for g in self.gpu_listings.values() if g['status'] == 'available']) - }, - 'total_revenue': sum(s['price_per_task'] * s['tasks_completed'] for s in self.ai_services.values()) - } - -# Initialize marketplace -marketplace = RealMarketplace() - -# FastAPI app -app = FastAPI( - title="AITBC Real Marketplace", - version="1.0.0", - description="Real marketplace with OpenClaw AI and Ollama tasks" -) - -@app.get("/health") -async def health(): - """Health check endpoint""" - return { - "status": "healthy", - "service": "real-marketplace", - "node_id": marketplace.node_id, - "timestamp": datetime.utcnow().isoformat(), - "stats": marketplace.get_marketplace_stats() - } - -@app.get("/ai/services") -async def get_ai_services(): - """Get all AI services""" - return marketplace.get_ai_services() - -@app.post("/ai/execute") -async def execute_ai_task(request: dict): - """Execute an AI task""" - try: - service_id = request.get('service_id') - task_data = request.get('task_data', {}) - - result = marketplace.execute_ai_task(service_id, task_data) - return result - - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.get("/stats") -async def get_stats(): - """Get marketplace statistics""" - return marketplace.get_marketplace_stats() - -if __name__ == '__main__': - uvicorn.run( - app, - host="0.0.0.0", - port=int(os.getenv('REAL_MARKETPLACE_PORT', 8006)), - workers=2, - log_level="info" - ) diff --git a/production/services/real_marketplace_launcher.py b/production/services/real_marketplace_launcher.py deleted file mode 100755 index 1b682340..00000000 --- a/production/services/real_marketplace_launcher.py +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python3 -""" -Real Marketplace Service Launcher -""" - -import os -import sys - -# Add production services to path -sys.path.insert(0, '/opt/aitbc/production/services') - -# Import and run the real marketplace app -from real_marketplace import app -import uvicorn - -# Run the app -uvicorn.run( - app, - host='0.0.0.0', - port=int(os.getenv('REAL_MARKETPLACE_PORT', 8009)), - log_level='info' -) diff --git a/production/services/unified_marketplace.py b/production/services/unified_marketplace.py deleted file mode 100644 index d5aa6a8c..00000000 --- a/production/services/unified_marketplace.py +++ /dev/null @@ -1,491 +0,0 @@ -#!/usr/bin/env python3 -""" -Unified AITBC Marketplace Service -Combined GPU Resources and AI Services Marketplace -""" - -import os -import sys -import json -import time -import logging -from pathlib import Path -from datetime import datetime -from typing import Dict, List, Optional - -sys.path.insert(0, '/opt/aitbc/apps/coordinator-api/src') -sys.path.insert(0, '/opt/aitbc/production/services') - -from fastapi import FastAPI, HTTPException -from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel -import uvicorn - -# Import OpenClaw AI service -try: - from openclaw_ai import OpenClawAIService - OPENCLAW_AVAILABLE = True -except ImportError: - OPENCLAW_AVAILABLE = False - print("Warning: OpenClaw AI service not available") - -# Production logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - handlers=[ - logging.FileHandler('/var/log/aitbc/production/marketplace/unified_marketplace.log'), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -# Pydantic models -class GPUListing(BaseModel): - id: str - provider: str - gpu_type: str - memory_gb: int - price_per_hour: float - status: str - specs: dict - -class Bid(BaseModel): - id: str - gpu_id: str - agent_id: str - bid_price: float - duration_hours: int - total_cost: float - status: str - -class AIService(BaseModel): - id: str - name: str - type: str - capabilities: list - model: str - price_per_task: float - provider: str - node_id: str - rating: float - tasks_completed: int - status: str - -class AITask(BaseModel): - id: str - service_id: str - user_id: str - task_data: dict - price: float - status: str - result: Optional[dict] = None - -class UnifiedMarketplace: - """Unified marketplace for GPU resources and AI services""" - - def __init__(self): - self.node_id = os.getenv('NODE_ID', 'aitbc') - self.data_dir = Path(f'/var/lib/aitbc/data/marketplace/{self.node_id}') - self.data_dir.mkdir(parents=True, exist_ok=True) - - # Initialize OpenClaw service if available - self.openclaw_service = None - if OPENCLAW_AVAILABLE: - try: - self.openclaw_service = OpenClawAIService() - logger.info("OpenClaw AI service initialized") - except Exception as e: - logger.warning(f"Failed to initialize OpenClaw: {e}") - - # Marketplace data - self.gpu_listings = {} - self.bids = {} - self.ai_services = {} - self.ai_tasks = {} - - self._load_data() - self._initialize_ai_services() - - logger.info(f"Unified marketplace initialized for node: {self.node_id}") - - def _load_data(self): - """Load marketplace data from disk""" - try: - # Load GPU listings - listings_file = self.data_dir / 'gpu_listings.json' - if listings_file.exists(): - with open(listings_file, 'r') as f: - self.gpu_listings = json.load(f) - - # Load bids - bids_file = self.data_dir / 'bids.json' - if bids_file.exists(): - with open(bids_file, 'r') as f: - self.bids = json.load(f) - - # Load AI services - services_file = self.data_dir / 'ai_services.json' - if services_file.exists(): - with open(services_file, 'r') as f: - self.ai_services = json.load(f) - - # Load AI tasks - tasks_file = self.data_dir / 'ai_tasks.json' - if tasks_file.exists(): - with open(tasks_file, 'r') as f: - self.ai_tasks = json.load(f) - - logger.info(f"Loaded {len(self.gpu_listings)} GPU listings, {len(self.bids)} bids, {len(self.ai_services)} AI services, {len(self.ai_tasks)} tasks") - - except Exception as e: - logger.error(f"Failed to load marketplace data: {e}") - - def _save_data(self): - """Save marketplace data to disk""" - try: - with open(self.data_dir / 'gpu_listings.json', 'w') as f: - json.dump(self.gpu_listings, f, indent=2) - - with open(self.data_dir / 'bids.json', 'w') as f: - json.dump(self.bids, f, indent=2) - - with open(self.data_dir / 'ai_services.json', 'w') as f: - json.dump(self.ai_services, f, indent=2) - - with open(self.data_dir / 'ai_tasks.json', 'w') as f: - json.dump(self.ai_tasks, f, indent=2) - - logger.debug("Marketplace data saved") - - except Exception as e: - logger.error(f"Failed to save marketplace data: {e}") - - def _initialize_ai_services(self): - """Initialize AI services from OpenClaw""" - if not self.openclaw_service: - # Add default Ollama services - ollama_services = [ - { - 'id': 'ollama-llama2-7b', - 'name': 'Ollama Llama2 7B', - 'type': 'ollama_inference', - 'capabilities': ['text_generation', 'chat', 'completion'], - 'model': 'llama2-7b', - 'price_per_task': 3.0, - 'provider': 'Ollama', - 'node_id': self.node_id, - 'rating': 4.8, - 'tasks_completed': 0, - 'status': 'available' - }, - { - 'id': 'ollama-llama2-13b', - 'name': 'Ollama Llama2 13B', - 'type': 'ollama_inference', - 'capabilities': ['text_generation', 'chat', 'completion', 'analysis'], - 'model': 'llama2-13b', - 'price_per_task': 5.0, - 'provider': 'Ollama', - 'node_id': self.node_id, - 'rating': 4.9, - 'tasks_completed': 0, - 'status': 'available' - } - ] - - for service in ollama_services: - self.ai_services[service['id']] = service - - logger.info(f"Initialized {len(ollama_services)} default AI services") - return - - # Add OpenClaw services - try: - openclaw_agents = self.openclaw_service.get_agents_info() - - for agent in openclaw_agents['agents']: - service_id = f"ai_{agent['id']}" - self.ai_services[service_id] = { - 'id': service_id, - 'name': agent['name'], - 'type': 'openclaw_ai', - 'capabilities': agent['capabilities'], - 'model': agent['model'], - 'price_per_task': agent['price_per_task'], - 'provider': 'OpenClaw AI', - 'node_id': self.node_id, - 'rating': agent['rating'], - 'tasks_completed': agent['tasks_completed'], - 'status': 'available' - } - - logger.info(f"Initialized {len(openclaw_agents['agents'])} OpenClaw AI services") - - except Exception as e: - logger.error(f"Failed to initialize OpenClaw services: {e}") - - # GPU Marketplace Methods - def add_gpu_listing(self, listing: dict) -> str: - """Add a new GPU listing""" - try: - gpu_id = f"gpu_{int(time.time())}_{len(self.gpu_listings)}" - listing['id'] = gpu_id - listing['created_at'] = time.time() - listing['status'] = 'available' - - self.gpu_listings[gpu_id] = listing - self._save_data() - - logger.info(f"GPU listing added: {gpu_id}") - return gpu_id - - except Exception as e: - logger.error(f"Failed to add GPU listing: {e}") - raise - - def create_bid(self, bid_data: dict) -> str: - """Create a new bid""" - try: - bid_id = f"bid_{int(time.time())}_{len(self.bids)}" - bid_data['id'] = bid_id - bid_data['created_at'] = time.time() - bid_data['status'] = 'pending' - - self.bids[bid_id] = bid_data - self._save_data() - - logger.info(f"Bid created: {bid_id}") - return bid_id - - except Exception as e: - logger.error(f"Failed to create bid: {e}") - raise - - # AI Marketplace Methods - def get_ai_services(self) -> dict: - """Get all AI services""" - return { - 'node_id': self.node_id, - 'total_services': len(self.ai_services), - 'available_services': len([s for s in self.ai_services.values() if s['status'] == 'available']), - 'services': list(self.ai_services.values()) - } - - def execute_ai_task(self, service_id: str, task_data: dict, user_id: str = 'anonymous') -> dict: - """Execute an AI task""" - if service_id not in self.ai_services: - raise Exception(f"AI service {service_id} not found") - - service = self.ai_services[service_id] - - # Create task record - task_id = f"task_{int(time.time())}_{len(self.ai_tasks)}" - task = { - 'id': task_id, - 'service_id': service_id, - 'user_id': user_id, - 'task_data': task_data, - 'price': service['price_per_task'], - 'status': 'executing', - 'created_at': time.time() - } - - self.ai_tasks[task_id] = task - self._save_data() - - try: - if service['type'] == 'openclaw_ai' and self.openclaw_service: - # Execute with OpenClaw - agent_id = service_id.replace('ai_', '') - result = self.openclaw_service.execute_task(agent_id, task_data) - - elif service['type'] == 'ollama_inference': - # Execute with Ollama (simulated) - model = service['model'] - prompt = task_data.get('prompt', '') - - # Simulate API call to Ollama - time.sleep(2) # Simulate processing time - - result = { - 'service_id': service_id, - 'task_id': task_id, - 'status': 'completed', - 'result': f""" -Ollama {model} Response: - -{prompt} - -This response is generated by the Ollama {model} model running on {self.node_id}. -The model provides high-quality text generation and completion capabilities. - -Generated at: {datetime.utcnow().isoformat()} -""", - 'execution_time': 2.0, - 'model': model - } - else: - raise Exception(f"Unsupported service type: {service['type']}") - - # Update task and service - task['status'] = 'completed' - task['result'] = result - task['completed_at'] = time.time() - - service['tasks_completed'] += 1 - self._save_data() - - logger.info(f"AI task completed: {task_id}") - return result - - except Exception as e: - task['status'] = 'failed' - task['error'] = str(e) - self._save_data() - logger.error(f"AI task failed: {e}") - raise - - def get_marketplace_stats(self) -> dict: - """Get comprehensive marketplace statistics""" - gpu_stats = { - 'total_gpus': len(self.gpu_listings), - 'available_gpus': len([g for g in self.gpu_listings.values() if g['status'] == 'available']), - 'total_bids': len(self.bids), - 'pending_bids': len([b for b in self.bids.values() if b['status'] == 'pending']), - 'total_value': sum(b['total_cost'] for b in self.bids.values()) - } - - ai_stats = { - 'total_services': len(self.ai_services), - 'available_services': len([s for s in self.ai_services.values() if s['status'] == 'available']), - 'total_tasks': len(self.ai_tasks), - 'completed_tasks': len([t for t in self.ai_tasks.values() if t['status'] == 'completed']), - 'total_revenue': sum(t['price'] for t in self.ai_tasks.values() if t['status'] == 'completed']) - } - - return { - 'node_id': self.node_id, - 'gpu_marketplace': gpu_stats, - 'ai_marketplace': ai_stats, - 'total_listings': gpu_stats['total_gpus'] + ai_stats['total_services'], - 'total_active': gpu_stats['available_gpus'] + ai_stats['available_services'] - } - -# Initialize marketplace -marketplace = UnifiedMarketplace() - -# FastAPI app -app = FastAPI( - title="AITBC Unified Marketplace", - version="2.0.0", - description="Unified marketplace for GPU resources and AI services" -) - -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -# Health check -@app.get("/health") -async def health(): - """Health check endpoint""" - return { - "status": "healthy", - "service": "unified-marketplace", - "version": "2.0.0", - "node_id": marketplace.node_id, - "stats": marketplace.get_marketplace_stats() - } - -# GPU Marketplace Endpoints -@app.post("/gpu/listings") -async def add_gpu_listing(listing: dict): - """Add a new GPU listing""" - try: - gpu_id = marketplace.add_gpu_listing(listing) - return {"gpu_id": gpu_id, "status": "created"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/gpu/bids") -async def create_bid(bid: dict): - """Create a new bid""" - try: - bid_id = marketplace.create_bid(bid) - return {"bid_id": bid_id, "status": "created"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.get("/gpu/listings") -async def get_gpu_listings(): - """Get all GPU listings""" - return {"listings": list(marketplace.gpu_listings.values())} - -@app.get("/gpu/bids") -async def get_bids(): - """Get all bids""" - return {"bids": list(marketplace.bids.values())} - -# AI Marketplace Endpoints -@app.get("/ai/services") -async def get_ai_services(): - """Get all AI services""" - return marketplace.get_ai_services() - -@app.post("/ai/execute") -async def execute_ai_task(request: dict): - """Execute an AI task""" - try: - service_id = request.get('service_id') - task_data = request.get('task_data') - user_id = request.get('user_id', 'anonymous') - - result = marketplace.execute_ai_task(service_id, task_data, user_id) - return {"task_id": result.get('task_id'), "status": "executing", "result": result} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.get("/ai/tasks") -async def get_ai_tasks(): - """Get all AI tasks""" - return {"tasks": list(marketplace.ai_tasks.values())} - -# Unified Marketplace Endpoints -@app.get("/stats") -async def get_stats(): - """Get comprehensive marketplace statistics""" - return marketplace.get_marketplace_stats() - -@app.get("/search") -async def search_marketplace(query: str = "", category: str = ""): - """Search across GPU and AI services""" - results = { - "gpu_listings": [], - "ai_services": [] - } - - # Search GPU listings - for listing in marketplace.gpu_listings.values(): - if query.lower() in listing.get('gpu_type', '').lower() or query.lower() in listing.get('provider', '').lower(): - results["gpu_listings"].append(listing) - - # Search AI services - for service in marketplace.ai_services.values(): - if query.lower() in service.get('name', '').lower() or any(query.lower() in cap.lower() for cap in service.get('capabilities', [])): - results["ai_services"].append(service) - - return results - -if __name__ == '__main__': - uvicorn.run( - app, - host="0.0.0.0", - port=int(os.getenv('MARKETPLACE_PORT', 8002)), - workers=int(os.getenv('WORKERS', 1)), # Fixed to 1 to avoid workers warning - log_level="info" - ) diff --git a/production/services/unified_marketplace_launcher.py b/production/services/unified_marketplace_launcher.py deleted file mode 100755 index 88fb9de2..00000000 --- a/production/services/unified_marketplace_launcher.py +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python3 -""" -Unified Marketplace Service Launcher -""" - -import os -import sys - -# Add production services to path -sys.path.insert(0, '/opt/aitbc/production/services') - -# Import and run the unified marketplace app -from marketplace import app -import uvicorn - -# Run the app -uvicorn.run( - app, - host='0.0.0.0', - port=int(os.getenv('MARKETPLACE_PORT', 8002)), - log_level='info' -) diff --git a/scripts/production_launcher.py b/scripts/production_launcher.py new file mode 100755 index 00000000..53248567 --- /dev/null +++ b/scripts/production_launcher.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +""" +Production Services Launcher +Launches AITBC production services from system locations +""" + +import os +import sys +import subprocess +from pathlib import Path + +def launch_service(service_name: str, script_path: str): + """Launch a production service""" + print(f"Launching {service_name}...") + + # Ensure log directory exists + log_dir = Path(f"/var/log/aitbc/production/{service_name}") + log_dir.mkdir(parents=True, exist_ok=True) + + # Launch service + try: + subprocess.run([ + sys.executable, + str(Path("/var/lib/aitbc/production") / script_path) + ], check=True) + except subprocess.CalledProcessError as e: + print(f"Failed to launch {service_name}: {e}") + return False + except FileNotFoundError: + print(f"Service script not found: {script_path}") + return False + + return True + +def main(): + """Main launcher""" + print("=== AITBC Production Services Launcher ===") + + services = [ + ("blockchain", "blockchain.py"), + ("marketplace", "marketplace.py"), + ("unified_marketplace", "unified_marketplace.py"), + ] + + for service_name, script_path in services: + if not launch_service(service_name, script_path): + print(f"Skipping {service_name} due to error") + continue + +if __name__ == "__main__": + main()