Files
aitbc/ecosystem-analytics/analytics_service.py
oib c8be9d7414 feat: add marketplace metrics, privacy features, and service registry endpoints
- Add Prometheus metrics for marketplace API throughput and error rates with new dashboard panels
- Implement confidential transaction models with encryption support and access control
- Add key management system with registration, rotation, and audit logging
- Create services and registry routers for service discovery and management
- Integrate ZK proof generation for privacy-preserving receipts
- Add metrics instru
2025-12-22 10:33:23 +01:00

629 lines
23 KiB
Python

"""
Ecosystem Analytics Service for AITBC
Tracks and analyzes ecosystem metrics including:
- Hackathon participation and outcomes
- Grant program effectiveness
- Extension adoption and usage
- Developer engagement
- Network effects and cross-chain activity
"""
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from decimal import Decimal
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from sqlalchemy import create_engine, select, func
from sqlalchemy.orm import sessionmaker
# Configuration - in production, this would come from environment variables or config file
class Settings:
DATABASE_URL = "postgresql://user:pass@localhost/aitbc"
settings = Settings()
@dataclass
class EcosystemMetric:
"""Base class for ecosystem metrics"""
timestamp: datetime
metric_name: str
value: float
unit: str
dimensions: Dict[str, Any]
source: str
@dataclass
class HackathonMetric:
"""Hackathon-specific metrics"""
event_id: str
event_name: str
start_date: datetime
end_date: datetime
participants: int
submissions: int
winners: int
projects_deployed: int
github_stars: int
community_engagement: float
technical_score: float
innovation_score: float
@dataclass
class GrantMetric:
"""Grant program metrics"""
grant_id: str
project_name: str
amount_awarded: Decimal
amount_disbursed: Decimal
milestones_completed: int
total_milestones: int
users_acquired: int
github_contributors: int
code_commits: int
documentation_score: float
community_score: float
@dataclass
class ExtensionMetric:
"""Extension/connector metrics"""
extension_id: str
extension_name: str
downloads: int
active_installations: int
api_calls: int
error_rate: float
avg_response_time: float
user_satisfaction: float
integration_count: int
revenue_generated: Decimal
class EcosystemAnalyticsService:
"""Main analytics service for ecosystem metrics"""
def __init__(self):
self.engine = create_engine(settings.DATABASE_URL)
self.Session = sessionmaker(bind=self.engine)
self.logger = __import__('logging').getLogger(__name__)
async def collect_hackathon_metrics(self, event_id: str) -> HackathonMetric:
"""Collect metrics for a specific hackathon"""
with self.Session() as db:
# Get event details
event = self._get_hackathon_event(db, event_id)
if not event:
raise ValueError(f"Hackathon {event_id} not found")
# Collect participant metrics
participants = await self._count_participants(event_id)
submissions = await self._count_submissions(event_id)
# Get project metrics
projects = await self._get_hackathon_projects(event_id)
projects_deployed = sum(1 for p in projects if p.get('deployed', False))
# Calculate engagement scores
community_engagement = await self._calculate_community_engagement(event_id)
technical_scores = [p.get('technical_score', 0) for p in projects]
innovation_scores = [p.get('innovation_score', 0) for p in projects]
# Get GitHub metrics
github_stars = sum(p.get('github_stars', 0) for p in projects)
metric = HackathonMetric(
event_id=event_id,
event_name=event['name'],
start_date=event['start_date'],
end_date=event['end_date'],
participants=participants,
submissions=submissions,
winners=len([p for p in projects if p.get('winner', False)]),
projects_deployed=projects_deployed,
github_stars=github_stars,
community_engagement=community_engagement,
technical_score=sum(technical_scores) / len(technical_scores) if technical_scores else 0,
innovation_score=sum(innovation_scores) / len(innovation_scores) if innovation_scores else 0
)
# Store metrics
await self._store_metric(metric)
return metric
async def collect_grant_metrics(self, grant_id: str) -> GrantMetric:
"""Collect metrics for a specific grant"""
with self.Session() as db:
# Get grant details
grant = self._get_grant_details(db, grant_id)
if not grant:
raise ValueError(f"Grant {grant_id} not found")
# Get project metrics
project = await self._get_grant_project(grant_id)
# Calculate completion metrics
milestones_completed = await self._count_completed_milestones(grant_id)
total_milestones = grant.get('total_milestones', 1)
# Get adoption metrics
users_acquired = await self._count_project_users(grant_id)
github_contributors = await self._count_github_contributors(project.get('repo_url'))
code_commits = await self._count_code_commits(project.get('repo_url'))
# Calculate quality scores
documentation_score = await self._evaluate_documentation(project.get('docs_url'))
community_score = await self._evaluate_community_health(project.get('repo_url'))
metric = GrantMetric(
grant_id=grant_id,
project_name=grant['project_name'],
amount_awarded=Decimal(str(grant.get('amount_awarded', 0))),
amount_disbursed=Decimal(str(grant.get('amount_disbursed', 0))),
milestones_completed=milestones_completed,
total_milestones=total_milestones,
users_acquired=users_acquired,
github_contributors=github_contributors,
code_commits=code_commits,
documentation_score=documentation_score,
community_score=community_score
)
# Store metrics
await self._store_metric(metric)
return metric
async def collect_extension_metrics(self, extension_id: str) -> ExtensionMetric:
"""Collect metrics for a specific extension"""
with self.Session() as db:
# Get extension details
extension = self._get_extension_details(db, extension_id)
if not extension:
raise ValueError(f"Extension {extension_id} not found")
# Get usage metrics
downloads = await self._count_downloads(extension_id)
active_installations = await self._count_active_installations(extension_id)
# Get performance metrics
api_calls = await self._count_api_calls(extension_id, days=30)
error_rate = await self._calculate_error_rate(extension_id, days=30)
avg_response_time = await self._calculate_avg_response_time(extension_id, days=30)
# Get quality metrics
user_satisfaction = await self._calculate_user_satisfaction(extension_id)
integration_count = await self._count_integrations(extension_id)
# Get business metrics
revenue_generated = await self._calculate_revenue(extension_id, days=30)
metric = ExtensionMetric(
extension_id=extension_id,
extension_name=extension['name'],
downloads=downloads,
active_installations=active_installations,
api_calls=api_calls,
error_rate=error_rate,
avg_response_time=avg_response_time,
user_satisfaction=user_satisfaction,
integration_count=integration_count,
revenue_generated=Decimal(str(revenue_generated))
)
# Store metrics
await self._store_metric(metric)
return metric
async def generate_ecosystem_dashboard(self, days: int = 30) -> Dict[str, Any]:
"""Generate comprehensive ecosystem dashboard"""
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
dashboard = {
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"days": days
},
"summary": await self._generate_summary_metrics(start_date, end_date),
"hackathons": await self._generate_hackathon_section(start_date, end_date),
"grants": await self._generate_grant_section(start_date, end_date),
"extensions": await self._generate_extension_section(start_date, end_date),
"network_effects": await self._generate_network_effects(start_date, end_date)
}
return dashboard
async def generate_hackathon_report(self, event_id: str) -> Dict[str, Any]:
"""Generate detailed hackathon report"""
metric = await self.collect_hackathon_metrics(event_id)
# Generate visualizations
figures = {}
# Participation funnel
fig_funnel = go.Figure(go.Funnel(
y=["Registrations", "Active Participants", "Submissions", "Deployed Projects", "Winners"],
x=[
metric.participants * 1.5, # Estimated registrations
metric.participants,
metric.submissions,
metric.projects_deployed,
metric.winners
]
))
fig_funnel.update_layout(title="Hackathon Participation Funnel")
figures['funnel'] = fig_funnel.to_json()
# Score distribution
fig_scores = go.Figure()
fig_scores.add_trace(go.Scatter(
x=list(range(metric.submissions)),
y=[{'technical_score': 75, 'innovation_score': 85}] * metric.submissions, # Sample data
mode='markers',
name='Projects'
))
fig_scores.update_layout(title="Project Scores Distribution")
figures['scores'] = fig_scores.to_json()
# Project categories
categories = ['DeFi', 'Enterprise', 'Developer Tools', 'Analytics', 'Other']
counts = [15, 20, 10, 8, 12] # Sample data
fig_categories = px.pie(
values=counts,
names=categories,
title="Project Categories"
)
figures['categories'] = fig_categories.to_json()
report = {
"event": asdict(metric),
"figures": figures,
"insights": await self._generate_hackathon_insights(metric),
"recommendations": await self._generate_hackathon_recommendations(metric)
}
return report
async def generate_grant_impact_report(self, grant_id: str) -> Dict[str, Any]:
"""Generate grant impact report"""
metric = await self.collect_grant_metrics(grant_id)
# Generate ROI analysis
roi_analysis = await self._calculate_grant_roi(metric)
# Generate adoption curve
adoption_data = await self._get_adoption_curve(grant_id)
fig_adoption = px.line(
x=[d['date'] for d in adoption_data],
y=[d['users'] for d in adoption_data],
title="User Adoption Over Time"
)
report = {
"grant": asdict(metric),
"roi_analysis": roi_analysis,
"adoption_chart": fig_adoption.to_json(),
"milestone_progress": {
"completed": metric.milestones_completed,
"total": metric.total_milestones,
"percentage": (metric.milestones_completed / metric.total_milestones * 100) if metric.total_milestones > 0 else 0
},
"quality_metrics": {
"documentation": metric.documentation_score,
"community": metric.community_score,
"overall": (metric.documentation_score + metric.community_score) / 2
}
}
return report
async def export_metrics(self, metric_type: str, format: str = "csv") -> bytes:
"""Export metrics in specified format"""
# Get metrics data
if metric_type == "hackathons":
data = await self._get_all_hackathon_metrics()
elif metric_type == "grants":
data = await self._get_all_grant_metrics()
elif metric_type == "extensions":
data = await self._get_all_extension_metrics()
else:
raise ValueError(f"Unknown metric type: {metric_type}")
# Convert to DataFrame
df = pd.DataFrame([asdict(m) for m in data])
# Export in requested format
if format == "csv":
return df.to_csv(index=False).encode('utf-8')
elif format == "json":
return df.to_json(orient='records', indent=2).encode('utf-8')
elif format == "excel":
return df.to_excel(index=False).encode('utf-8')
else:
raise ValueError(f"Unsupported format: {format}")
# Private helper methods
async def _store_metric(self, metric: Any):
"""Store metric in database"""
# Implementation would store in metrics table
pass
async def _count_participants(self, event_id: str) -> int:
"""Count hackathon participants"""
# Implementation would query participant data
return 150 # Sample
async def _count_submissions(self, event_id: str) -> int:
"""Count hackathon submissions"""
return 45 # Sample
async def _get_hackathon_projects(self, event_id: str) -> List[Dict]:
"""Get all projects from hackathon"""
# Implementation would query project data
return [] # Sample
async def _calculate_community_engagement(self, event_id: str) -> float:
"""Calculate community engagement score"""
return 85.5 # Sample
async def _count_completed_milestones(self, grant_id: str) -> int:
"""Count completed grant milestones"""
return 3 # Sample
async def _count_project_users(self, grant_id: str) -> int:
"""Count users of grant project"""
return 500 # Sample
async def _count_github_contributors(self, repo_url: str) -> int:
"""Count GitHub contributors"""
return 12 # Sample
async def _count_code_commits(self, repo_url: str) -> int:
"""Count code commits"""
return 234 # Sample
async def _evaluate_documentation(self, docs_url: str) -> float:
"""Evaluate documentation quality"""
return 90.0 # Sample
async def _evaluate_community_health(self, repo_url: str) -> float:
"""Evaluate community health"""
return 75.5 # Sample
async def _count_downloads(self, extension_id: str) -> int:
"""Count extension downloads"""
return 1250 # Sample
async def _count_active_installations(self, extension_id: str) -> int:
"""Count active installations"""
return 350 # Sample
async def _count_api_calls(self, extension_id: str, days: int) -> int:
"""Count API calls to extension"""
return 15000 # Sample
async def _calculate_error_rate(self, extension_id: str, days: int) -> float:
"""Calculate error rate"""
return 0.02 # Sample
async def _calculate_avg_response_time(self, extension_id: str, days: int) -> float:
"""Calculate average response time"""
return 125.5 # Sample
async def _calculate_user_satisfaction(self, extension_id: str) -> float:
"""Calculate user satisfaction score"""
return 4.5 # Sample
async def _count_integrations(self, extension_id: str) -> int:
"""Count integrations using extension"""
return 25 # Sample
async def _calculate_revenue(self, extension_id: str, days: int) -> float:
"""Calculate revenue generated"""
return 5000.0 # Sample
async def _generate_summary_metrics(self, start_date: datetime, end_date: datetime) -> Dict:
"""Generate summary metrics for dashboard"""
return {
"total_hackathons": 4,
"total_participants": 600,
"total_grants_awarded": 12,
"total_grant_amount": 500000,
"active_extensions": 25,
"total_downloads": 50000,
"github_stars": 2500,
"community_members": 1500
}
async def _generate_hackathon_section(self, start_date: datetime, end_date: datetime) -> Dict:
"""Generate hackathon section of dashboard"""
return {
"upcoming": [],
"recent": [],
"top_projects": [],
"participation_trend": []
}
async def _generate_grant_section(self, start_date: datetime, end_date: datetime) -> Dict:
"""Generate grant section of dashboard"""
return {
"active_grants": 8,
"completed_grants": 4,
"total_disbursed": 350000,
"roi_average": 2.5,
"success_rate": 0.85
}
async def _generate_extension_section(self, start_date: datetime, end_date: datetime) -> Dict:
"""Generate extension section of dashboard"""
return {
"total_extensions": 25,
"new_extensions": 3,
"most_popular": [],
"growth_rate": 0.15
}
async def _generate_network_effects(self, start_date: datetime, end_date: datetime) -> Dict:
"""Generate network effects analysis"""
return {
"cross_chain_volume": 1000000,
"interoperability_score": 85.5,
"network_value": 25000000,
"metcalfe_coefficient": 1.2
}
async def _generate_hackathon_insights(self, metric: HackathonMetric) -> List[str]:
"""Generate insights from hackathon metrics"""
insights = []
if metric.projects_deployed / metric.submissions > 0.5:
insights.append("High deployment rate indicates strong technical execution")
if metric.community_engagement > 80:
insights.append("Excellent community engagement and participation")
if metric.github_stars > 100:
insights.append("Strong GitHub community interest")
return insights
async def _generate_hackathon_recommendations(self, metric: HackathonMetric) -> List[str]:
"""Generate recommendations for improvement"""
recommendations = []
if metric.projects_deployed / metric.submissions < 0.3:
recommendations.append("Provide more deployment support and infrastructure")
if metric.technical_score < 70:
recommendations.append("Offer technical workshops and mentorship")
if metric.innovation_score < 70:
recommendations.append("Encourage more innovative and ambitious projects")
return recommendations
async def _calculate_grant_roi(self, metric: GrantMetric) -> Dict:
"""Calculate grant ROI"""
if metric.amount_disbursed == 0:
return {"roi": 0, "payback_period": None}
# Simplified ROI calculation
estimated_value = metric.users_acquired * 100 # $100 per user
roi = (estimated_value - float(metric.amount_disbursed)) / float(metric.amount_disbursed)
return {
"roi": roi,
"payback_period": "12 months" if roi > 0 else None,
"value_created": estimated_value
}
async def _get_adoption_curve(self, grant_id: str) -> List[Dict]:
"""Get user adoption over time"""
# Sample data
return [
{"date": "2024-01-01", "users": 50},
{"date": "2024-02-01", "users": 120},
{"date": "2024-03-01", "users": 200},
{"date": "2024-04-01", "users": 350},
{"date": "2024-05-01", "users": 500}
]
def _get_hackathon_event(self, db, event_id: str) -> Optional[Dict]:
"""Get hackathon event details"""
# Implementation would query database
return {
"name": "DeFi Innovation Hackathon",
"start_date": datetime(2024, 1, 15),
"end_date": datetime(2024, 1, 22)
}
def _get_grant_details(self, db, grant_id: str) -> Optional[Dict]:
"""Get grant details"""
# Implementation would query database
return {
"project_name": "Advanced Analytics Platform",
"amount_awarded": 50000,
"amount_disbursed": 25000,
"total_milestones": 4
}
def _get_extension_details(self, db, extension_id: str) -> Optional[Dict]:
"""Get extension details"""
# Implementation would query database
return {
"name": "SAP ERP Connector"
}
async def _get_grant_project(self, grant_id: str) -> Dict:
"""Get grant project details"""
return {
"repo_url": "https://github.com/example/project",
"docs_url": "https://docs.example.com"
}
async def _get_all_hackathon_metrics(self) -> List[HackathonMetric]:
"""Get all hackathon metrics"""
# Implementation would query database
return []
async def _get_all_grant_metrics(self) -> List[GrantMetric]:
"""Get all grant metrics"""
# Implementation would query database
return []
async def _get_all_extension_metrics(self) -> List[ExtensionMetric]:
"""Get all extension metrics"""
# Implementation would query database
return []
# CLI interface for analytics service
async def main():
"""CLI entry point"""
import argparse
parser = argparse.ArgumentParser(description="AITBC Ecosystem Analytics")
parser.add_argument("--dashboard", action="store_true", help="Generate ecosystem dashboard")
parser.add_argument("--hackathon", help="Generate hackathon report for event ID")
parser.add_argument("--grant", help="Generate grant impact report for grant ID")
parser.add_argument("--export", choices=["hackathons", "grants", "extensions"], help="Export metrics")
parser.add_argument("--format", choices=["csv", "json", "excel"], default="json", help="Export format")
parser.add_argument("--days", type=int, default=30, help="Number of days for dashboard")
args = parser.parse_args()
service = EcosystemAnalyticsService()
if args.dashboard:
dashboard = await service.generate_ecosystem_dashboard(args.days)
print(json.dumps(dashboard, indent=2, default=str))
elif args.hackathon:
report = await service.generate_hackathon_report(args.hackathon)
print(json.dumps(report, indent=2, default=str))
elif args.grant:
report = await service.generate_grant_impact_report(args.grant)
print(json.dumps(report, indent=2, default=str))
elif args.export:
data = await service.export_metrics(args.export, args.format)
print(data.decode())
else:
parser.print_help()
if __name__ == "__main__":
asyncio.run(main())