#!/usr/bin/env python3 """ AITBC Blockchain Explorer - Enhanced Version Advanced web interface with search, analytics, and export capabilities """ import httpx import json import csv import io import re from datetime import datetime, timedelta from typing import Optional, List import os from fastapi import FastAPI, HTTPException, Query from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field import uvicorn # Import data layer for toggle between mock and real data try: from aitbc import get_data_layer USE_DATA_LAYER = True except ImportError: USE_DATA_LAYER = False app = FastAPI(title="AITBC Blockchain Explorer", version="0.1.0") # Validation patterns for user inputs to prevent SSRF TX_HASH_PATTERN = re.compile(r'^[a-fA-F0-9]{64}$') # 64-character hex string for transaction hash CHAIN_ID_PATTERN = re.compile(r'^[a-zA-Z0-9_-]{3,30}$') # Chain ID pattern def validate_tx_hash(tx_hash: str) -> bool: """Validate transaction hash to prevent SSRF""" if not tx_hash: return False # Check for path traversal or URL manipulation if any(char in tx_hash for char in ['/', '\\', '..', '\n', '\r', '\t', '?', '&']): return False # Validate against hash pattern return bool(TX_HASH_PATTERN.match(tx_hash)) def validate_chain_id(chain_id: str) -> bool: """Validate chain ID to prevent SSRF""" if not chain_id: return False # Check for path traversal or URL manipulation if any(char in chain_id for char in ['/', '\\', '..', '\n', '\r', '\t', '?', '&']): return False # Validate against chain ID pattern return bool(CHAIN_ID_PATTERN.match(chain_id)) @app.get("/api/chains") def list_chains(): """List all supported chains""" return { "chains": [ {"id": "ait-devnet", "name": "AIT Development Network", "status": "active"}, {"id": "ait-testnet", "name": "AIT Test Network", "status": "inactive"}, {"id": "ait-mainnet", "name": "AIT Main Network", "status": "coming_soon"} ] } # Configuration - Multi-chain support BLOCKCHAIN_RPC_URLS = { "ait-devnet": "http://localhost:8025", "ait-testnet": "http://localhost:8026", "ait-mainnet": "http://aitbc.keisanki.net:8082" } DEFAULT_CHAIN = "ait-devnet" EXTERNAL_RPC_URL = "http://aitbc.keisanki.net:8082" # External access # Pydantic models for API class TransactionSearch(BaseModel): address: Optional[str] = None amount_min: Optional[float] = None amount_max: Optional[float] = None tx_type: Optional[str] = None since: Optional[str] = None until: Optional[str] = None limit: int = Field(default=50, ge=1, le=1000) offset: int = Field(default=0, ge=0) class BlockSearch(BaseModel): validator: Optional[str] = None since: Optional[str] = None until: Optional[str] = None min_tx: Optional[int] = None limit: int = Field(default=50, ge=1, le=1000) offset: int = Field(default=0, ge=0) class AnalyticsRequest(BaseModel): period: str = Field(default="24h", pattern="^(1h|24h|7d|30d)$") granularity: Optional[str] = None metrics: List[str] = Field(default_factory=list) # HTML Template HTML_TEMPLATE = r""" AITBC Blockchain Explorer

AITBC Blockchain Explorer

Current Height

-

Latest Block

-

Node Status

-

Advanced Search

Analytics Dashboard

Total Transactions

-

Transaction Volume

-

Active Addresses

-

Avg Block Time

-

Transaction Volume Over Time

Network Activity

Latest Blocks

Height Hash Timestamp Transactions Actions
Loading blocks...
""" async def get_chain_head(chain_id: str = DEFAULT_CHAIN) -> Dict[str, Any]: """Get chain head from specified chain""" try: rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]) async with httpx.AsyncClient() as client: response = await client.get(f"{rpc_url}/rpc/head", params={"chain_id": chain_id}) if response.status_code == 200: return response.json() except Exception as e: print(f"Error getting chain head for {chain_id}: {e}") return {} async def get_transaction(tx_hash: str, chain_id: str = DEFAULT_CHAIN) -> Dict[str, Any]: """Get transaction by hash from specified chain""" if not validate_tx_hash(tx_hash) or not validate_chain_id(chain_id): print(f"Invalid tx_hash or chain_id format") return {} try: rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]) async with httpx.AsyncClient() as client: response = await client.get(f"{rpc_url}/rpc/tx/{tx_hash}", params={"chain_id": chain_id}) if response.status_code == 200: return response.json() except Exception as e: print(f"Error getting transaction {tx_hash} for {chain_id}: {e}") return {} async def get_block(height: int, chain_id: str = DEFAULT_CHAIN) -> Dict[str, Any]: """Get a specific block by height from specified chain""" if not validate_chain_id(chain_id): print(f"Invalid chain_id format") return {} try: rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]) async with httpx.AsyncClient() as client: response = await client.get(f"{rpc_url}/rpc/blocks/{height}", params={"chain_id": chain_id}) if response.status_code == 200: return response.json() except Exception as e: print(f"Error getting block {height} for {chain_id}: {e}") return {} @app.get("/", response_class=HTMLResponse) async def root(): """Serve the explorer UI""" return HTML_TEMPLATE.replace("{node_url}", BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]) @app.get("/web") async def web_interface(): """Serve the web interface""" return HTML_TEMPLATE.replace("{node_url}", BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]) @app.get("/api/chain/head") async def api_chain_head(chain_id: Optional[str] = DEFAULT_CHAIN): """API endpoint for chain head""" return await get_chain_head(chain_id) @app.get("/api/blocks/{height}") async def api_block(height: int, chain_id: Optional[str] = DEFAULT_CHAIN): """API endpoint for block data""" return await get_block(height, chain_id) @app.get("/api/transactions/{tx_hash}") async def api_transaction(tx_hash: str, chain_id: Optional[str] = DEFAULT_CHAIN): """API endpoint for transaction data, normalized for frontend""" tx = await get_transaction(tx_hash, chain_id) payload = tx.get("payload", {}) return { "hash": tx.get("tx_hash"), "block_height": tx.get("block_height"), "from": tx.get("sender"), "to": tx.get("recipient"), "type": payload.get("type", "transfer"), "amount": payload.get("amount", 0), "fee": payload.get("fee", 0), "timestamp": tx.get("created_at") } # Enhanced API endpoints @app.get("/api/search/transactions") async def search_transactions( address: Optional[str] = None, amount_min: Optional[float] = None, amount_max: Optional[float] = None, tx_type: Optional[str] = None, since: Optional[str] = None, until: Optional[str] = None, limit: int = 50, offset: int = 0, chain_id: Optional[str] = DEFAULT_CHAIN ): """Advanced transaction search""" try: if USE_DATA_LAYER: # Use data layer with toggle support data_layer = get_data_layer() rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]) return await data_layer.get_transactions( address, amount_min, amount_max, tx_type, since, until, limit, offset, chain_id, rpc_url ) else: # Original implementation without data layer # Build query parameters params = {} if address: params["address"] = address if amount_min: params["amount_min"] = amount_min if amount_max: params["amount_max"] = amount_max if tx_type: params["type"] = tx_type if since: params["since"] = since if until: params["until"] = until params["limit"] = limit params["offset"] = offset params["chain_id"] = chain_id rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]) async with httpx.AsyncClient() as client: response = await client.get(f"{rpc_url}/rpc/search/transactions", params=params) if response.status_code == 200: return response.json() elif response.status_code == 404: return [] else: raise HTTPException( status_code=response.status_code, detail=f"Failed to fetch transactions from blockchain RPC: {response.text}" ) except httpx.RequestError as e: raise HTTPException(status_code=503, detail=f"Blockchain RPC unavailable: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") @app.get("/api/search/blocks") async def search_blocks( validator: Optional[str] = None, since: Optional[str] = None, until: Optional[str] = None, min_tx: Optional[int] = None, limit: int = 50, offset: int = 0, chain_id: Optional[str] = DEFAULT_CHAIN ): """Advanced block search""" try: if USE_DATA_LAYER: # Use data layer with toggle support data_layer = get_data_layer() rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]) return await data_layer.get_blocks( validator, since, until, min_tx, limit, offset, chain_id, rpc_url ) else: # Original implementation without data layer params = {} if validator: params["validator"] = validator if since: params["since"] = since if until: params["until"] = until if min_tx: params["min_tx"] = min_tx params["limit"] = limit params["offset"] = offset params["chain_id"] = chain_id rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]) async with httpx.AsyncClient() as client: response = await client.get(f"{rpc_url}/rpc/search/blocks", params=params) if response.status_code == 200: return response.json() elif response.status_code == 404: return [] else: raise HTTPException( status_code=response.status_code, detail=f"Failed to fetch blocks from blockchain RPC: {response.text}" ) except httpx.RequestError as e: raise HTTPException(status_code=503, detail=f"Blockchain RPC unavailable: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") @app.get("/api/analytics/overview") async def analytics_overview(period: str = "24h"): """Get analytics overview from blockchain RPC""" try: if USE_DATA_LAYER: # Use data layer with toggle support data_layer = get_data_layer() rpc_url = BLOCKCHAIN_RPC_URLS.get(DEFAULT_CHAIN) return await data_layer.get_analytics_overview(period, rpc_url) else: # Original implementation without data layer rpc_url = BLOCKCHAIN_RPC_URLS.get(DEFAULT_CHAIN) params = {"period": period} async with httpx.AsyncClient() as client: response = await client.get(f"{rpc_url}/rpc/analytics/overview", params=params) if response.status_code == 200: return response.json() elif response.status_code == 404: raise HTTPException(status_code=501, detail="Analytics endpoint not available on blockchain RPC") else: raise HTTPException( status_code=response.status_code, detail=f"Failed to fetch analytics from blockchain RPC: {response.text}" ) except httpx.RequestError as e: raise HTTPException(status_code=503, detail=f"Blockchain RPC unavailable: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"Analytics failed: {str(e)}") @app.get("/api/export/search") async def export_search( format: str = "csv", type: str = "transactions", data: str = "" ): """Export search results""" try: if not data: raise HTTPException(status_code=400, detail="No data to export") results = json.loads(data) if format == "csv": output = io.StringIO() if type == "transactions": writer = csv.writer(output) writer.writerow(["Hash", "Type", "From", "To", "Amount", "Fee", "Timestamp"]) for tx in results: writer.writerow([ tx.get("hash", ""), tx.get("type", ""), tx.get("from", ""), tx.get("to", ""), tx.get("amount", ""), tx.get("fee", ""), tx.get("timestamp", "") ]) else: # blocks writer = csv.writer(output) writer.writerow(["Height", "Hash", "Validator", "Transactions", "Timestamp"]) for block in results: writer.writerow([ block.get("height", ""), block.get("hash", ""), block.get("validator", ""), block.get("tx_count", ""), block.get("timestamp", "") ]) output.seek(0) return StreamingResponse( io.BytesIO(output.getvalue().encode()), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename=search_results.{format}"} ) elif format == "json": return StreamingResponse( io.BytesIO(json.dumps(results, indent=2).encode()), media_type="application/json", headers={"Content-Disposition": f"attachment; filename=search_results.{format}"} ) else: raise HTTPException(status_code=400, detail="Unsupported format") except Exception as e: raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}") @app.get("/api/export/blocks") async def export_blocks(format: str = "csv"): """Export latest blocks""" try: # Get latest blocks blocks = await get_latest_blocks(50) if format == "csv": output = io.StringIO() writer = csv.writer(output) writer.writerow(["Height", "Hash", "Validator", "Transactions", "Timestamp"]) for block in blocks: writer.writerow([ block.get("height", ""), block.get("hash", ""), block.get("validator", ""), block.get("tx_count", ""), block.get("timestamp", "") ]) output.seek(0) return StreamingResponse( io.BytesIO(output.getvalue().encode()), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename=latest_blocks.{format}"} ) elif format == "json": return StreamingResponse( io.BytesIO(json.dumps(blocks, indent=2).encode()), media_type="application/json", headers={"Content-Disposition": f"attachment; filename=latest_blocks.{format}"} ) else: raise HTTPException(status_code=400, detail="Unsupported format") except Exception as e: raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}") # Helper functions async def get_latest_blocks(limit: int = 10, chain_id: str = DEFAULT_CHAIN) -> List[Dict]: """Get latest blocks""" try: rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]) async with httpx.AsyncClient() as client: response = await client.get(f"{rpc_url}/rpc/blocks?limit={limit}", params={"chain_id": chain_id}) if response.status_code == 200: return response.json() else: # Return mock data return [ { "height": i, "hash": f"0x{'1234567890abcdef' * 4}", "validator": "0x1234567890abcdef1234567890abcdef12345678", "tx_count": i % 10, "timestamp": datetime.now().isoformat() } for i in range(limit, 0, -1) ] except Exception: return [] @app.get("/health") async def health(): """Health check endpoint""" try: # Test blockchain node connectivity async with httpx.AsyncClient() as client: response = await client.get(f"{BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]}/rpc/head", timeout=5.0) node_status = "ok" if response.status_code == 200 else "error" except Exception: node_status = "error" return { "status": "ok" if node_status == "ok" else "degraded", "node_status": node_status, "version": "2.0.0", "features": ["advanced_search", "analytics", "export", "real_time"] } if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8004)