#!/usr/bin/env python3
"""
AITBC Blockchain Explorer - Enhanced Version
Advanced web interface with search, analytics, and export capabilities
"""
import asyncio
import httpx
import json
import csv
import io
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
from fastapi import FastAPI, Request, HTTPException, Query, Response
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
import uvicorn
app = FastAPI(title="AITBC Blockchain Explorer", version="0.1.0")
@app.get("/api/chains")
def list_chains():
"""List all supported chains"""
return {
"chains": [
{"id": "ait-devnet", "name": "AIT Development Network", "status": "active"},
{"id": "ait-testnet", "name": "AIT Test Network", "status": "inactive"},
{"id": "ait-mainnet", "name": "AIT Main Network", "status": "coming_soon"}
]
}
# Configuration - Multi-chain support
BLOCKCHAIN_RPC_URLS = {
"ait-devnet": "http://localhost:8025",
"ait-testnet": "http://localhost:8026",
"ait-mainnet": "http://aitbc.keisanki.net:8082"
}
DEFAULT_CHAIN = "ait-devnet"
EXTERNAL_RPC_URL = "http://aitbc.keisanki.net:8082" # External access
# Pydantic models for API
class TransactionSearch(BaseModel):
address: Optional[str] = None
amount_min: Optional[float] = None
amount_max: Optional[float] = None
tx_type: Optional[str] = None
since: Optional[str] = None
until: Optional[str] = None
limit: int = Field(default=50, ge=1, le=1000)
offset: int = Field(default=0, ge=0)
class BlockSearch(BaseModel):
validator: Optional[str] = None
since: Optional[str] = None
until: Optional[str] = None
min_tx: Optional[int] = None
limit: int = Field(default=50, ge=1, le=1000)
offset: int = Field(default=0, ge=0)
class AnalyticsRequest(BaseModel):
period: str = Field(default="24h", pattern="^(1h|24h|7d|30d)$")
granularity: Optional[str] = None
metrics: List[str] = Field(default_factory=list)
# HTML Template
HTML_TEMPLATE = r"""
AITBC Blockchain Explorer
AITBC Blockchain Explorer
Advanced Search
Analytics Dashboard
Transaction Volume Over Time
Network Activity
| Height |
Hash |
Timestamp |
Transactions |
Actions |
|
Loading blocks...
|
"""
async def get_chain_head(chain_id: str = DEFAULT_CHAIN) -> Dict[str, Any]:
"""Get chain head from specified chain"""
try:
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/head", params={"chain_id": chain_id})
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Error getting chain head for {chain_id}: {e}")
return {}
async def get_transaction(tx_hash: str, chain_id: str = DEFAULT_CHAIN) -> Dict[str, Any]:
"""Get transaction by hash from specified chain"""
try:
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/tx/{tx_hash}", params={"chain_id": chain_id})
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Error getting transaction {tx_hash} for {chain_id}: {e}")
return {}
async def get_block(height: int, chain_id: str = DEFAULT_CHAIN) -> Dict[str, Any]:
"""Get a specific block by height from specified chain"""
try:
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/blocks/{height}", params={"chain_id": chain_id})
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Error getting block {height} for {chain_id}: {e}")
return {}
@app.get("/", response_class=HTMLResponse)
async def root():
"""Serve the explorer UI"""
return HTML_TEMPLATE.replace("{node_url}", BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
@app.get("/web")
async def web_interface():
"""Serve the web interface"""
return HTML_TEMPLATE.replace("{node_url}", BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
@app.get("/api/chain/head")
async def api_chain_head(chain_id: Optional[str] = DEFAULT_CHAIN):
"""API endpoint for chain head"""
return await get_chain_head(chain_id)
@app.get("/api/blocks/{height}")
async def api_block(height: int, chain_id: Optional[str] = DEFAULT_CHAIN):
"""API endpoint for block data"""
return await get_block(height, chain_id)
@app.get("/api/transactions/{tx_hash}")
async def api_transaction(tx_hash: str, chain_id: Optional[str] = DEFAULT_CHAIN):
"""API endpoint for transaction data, normalized for frontend"""
tx = await get_transaction(tx_hash, chain_id)
payload = tx.get("payload", {})
return {
"hash": tx.get("tx_hash"),
"block_height": tx.get("block_height"),
"from": tx.get("sender"),
"to": tx.get("recipient"),
"type": payload.get("type", "transfer"),
"amount": payload.get("amount", 0),
"fee": payload.get("fee", 0),
"timestamp": tx.get("created_at")
}
# Enhanced API endpoints
@app.get("/api/search/transactions")
async def search_transactions(
address: Optional[str] = None,
amount_min: Optional[float] = None,
amount_max: Optional[float] = None,
tx_type: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
limit: int = 50,
offset: int = 0,
chain_id: Optional[str] = DEFAULT_CHAIN
):
"""Advanced transaction search"""
try:
# Build query parameters for blockchain node
params = {}
if address:
params["address"] = address
if amount_min:
params["amount_min"] = amount_min
if amount_max:
params["amount_max"] = amount_max
if tx_type:
params["type"] = tx_type
if since:
params["since"] = since
if until:
params["until"] = until
params["limit"] = limit
params["offset"] = offset
params["chain_id"] = chain_id
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/search/transactions", params=params)
if response.status_code == 200:
return response.json()
else:
# Return mock data for demonstration
return [
{
"hash": "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"type": tx_type or "transfer",
"from": "0xabcdef1234567890abcdef1234567890abcdef1234",
"to": "0x1234567890abcdef1234567890abcdef12345678",
"amount": "1.5",
"fee": "0.001",
"timestamp": datetime.now().isoformat()
}
]
except Exception as e:
raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")
@app.get("/api/search/blocks")
async def search_blocks(
validator: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
min_tx: Optional[int] = None,
limit: int = 50,
offset: int = 0,
chain_id: Optional[str] = DEFAULT_CHAIN
):
"""Advanced block search"""
try:
# Build query parameters
params = {}
if validator:
params["validator"] = validator
if since:
params["since"] = since
if until:
params["until"] = until
if min_tx:
params["min_tx"] = min_tx
params["limit"] = limit
params["offset"] = offset
params["chain_id"] = chain_id
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/search/blocks", params=params)
if response.status_code == 200:
return response.json()
else:
# Return mock data for demonstration
return [
{
"height": 12345,
"hash": "0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
"validator": validator or "0x1234567890abcdef1234567890abcdef12345678",
"tx_count": min_tx or 5,
"timestamp": datetime.now().isoformat()
}
]
except Exception as e:
raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")
@app.get("/api/analytics/overview")
async def analytics_overview(period: str = "24h"):
"""Get analytics overview"""
try:
# Generate mock analytics data
now = datetime.now()
if period == "1h":
labels = [f"{i:02d}:{(i*5)%60:02d}" for i in range(12)]
volume_values = [10 + i * 2 for i in range(12)]
activity_values = [5 + i for i in range(12)]
elif period == "24h":
labels = [f"{i:02d}:00" for i in range(0, 24, 2)]
volume_values = [50 + i * 5 for i in range(12)]
activity_values = [20 + i * 3 for i in range(12)]
elif period == "7d":
labels = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
volume_values = [500, 600, 550, 700, 800, 650, 750]
activity_values = [200, 250, 220, 300, 350, 280, 320]
else: # 30d
labels = [f"Week {i+1}" for i in range(4)]
volume_values = [3000, 3500, 3200, 3800]
activity_values = [1200, 1400, 1300, 1500]
return {
"total_transactions": "1,234",
"transaction_volume": "5,678.90 AITBC",
"active_addresses": "89",
"avg_block_time": "2.1s",
"volume_data": {
"labels": labels,
"values": volume_values
},
"activity_data": {
"labels": labels,
"values": activity_values
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Analytics failed: {str(e)}")
@app.get("/api/export/search")
async def export_search(
format: str = "csv",
type: str = "transactions",
data: str = ""
):
"""Export search results"""
try:
if not data:
raise HTTPException(status_code=400, detail="No data to export")
results = json.loads(data)
if format == "csv":
output = io.StringIO()
if type == "transactions":
writer = csv.writer(output)
writer.writerow(["Hash", "Type", "From", "To", "Amount", "Fee", "Timestamp"])
for tx in results:
writer.writerow([
tx.get("hash", ""),
tx.get("type", ""),
tx.get("from", ""),
tx.get("to", ""),
tx.get("amount", ""),
tx.get("fee", ""),
tx.get("timestamp", "")
])
else: # blocks
writer = csv.writer(output)
writer.writerow(["Height", "Hash", "Validator", "Transactions", "Timestamp"])
for block in results:
writer.writerow([
block.get("height", ""),
block.get("hash", ""),
block.get("validator", ""),
block.get("tx_count", ""),
block.get("timestamp", "")
])
output.seek(0)
return StreamingResponse(
io.BytesIO(output.getvalue().encode()),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=search_results.{format}"}
)
elif format == "json":
return StreamingResponse(
io.BytesIO(json.dumps(results, indent=2).encode()),
media_type="application/json",
headers={"Content-Disposition": f"attachment; filename=search_results.{format}"}
)
else:
raise HTTPException(status_code=400, detail="Unsupported format")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}")
@app.get("/api/export/blocks")
async def export_blocks(format: str = "csv"):
"""Export latest blocks"""
try:
# Get latest blocks
blocks = await get_latest_blocks(50)
if format == "csv":
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["Height", "Hash", "Validator", "Transactions", "Timestamp"])
for block in blocks:
writer.writerow([
block.get("height", ""),
block.get("hash", ""),
block.get("validator", ""),
block.get("tx_count", ""),
block.get("timestamp", "")
])
output.seek(0)
return StreamingResponse(
io.BytesIO(output.getvalue().encode()),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=latest_blocks.{format}"}
)
elif format == "json":
return StreamingResponse(
io.BytesIO(json.dumps(blocks, indent=2).encode()),
media_type="application/json",
headers={"Content-Disposition": f"attachment; filename=latest_blocks.{format}"}
)
else:
raise HTTPException(status_code=400, detail="Unsupported format")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}")
# Helper functions
async def get_latest_blocks(limit: int = 10, chain_id: str = DEFAULT_CHAIN) -> List[Dict]:
"""Get latest blocks"""
try:
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/blocks?limit={limit}", params={"chain_id": chain_id})
if response.status_code == 200:
return response.json()
else:
# Return mock data
return [
{
"height": i,
"hash": f"0x{'1234567890abcdef' * 4}",
"validator": "0x1234567890abcdef1234567890abcdef12345678",
"tx_count": i % 10,
"timestamp": datetime.now().isoformat()
}
for i in range(limit, 0, -1)
]
except Exception:
return []
@app.get("/health")
async def health():
"""Health check endpoint"""
try:
# Test blockchain node connectivity
async with httpx.AsyncClient() as client:
response = await client.get(f"{BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN]}/rpc/head", timeout=5.0)
node_status = "ok" if response.status_code == 200 else "error"
except Exception:
node_status = "error"
return {
"status": "ok" if node_status == "ok" else "degraded",
"node_status": node_status,
"version": "2.0.0",
"features": ["advanced_search", "analytics", "export", "real_time"]
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8004)