BEFORE: /opt/aitbc/cli/ ├── aitbc_cli/ # Python package (box in a box) │ ├── commands/ │ ├── main.py │ └── ... ├── setup.py AFTER: /opt/aitbc/cli/ # Flat structure ├── commands/ # Direct access ├── main.py # Direct access ├── auth/ ├── config/ ├── core/ ├── models/ ├── utils/ ├── plugins.py └── setup.py CHANGES MADE: - Moved all files from aitbc_cli/ to cli/ root - Fixed all relative imports (from . to absolute imports) - Updated setup.py entry point: aitbc_cli.main → main - Added CLI directory to Python path in entry script - Simplified deployment.py to remove dependency on deleted core.deployment - Fixed import paths in all command files - Recreated virtual environment with new structure BENEFITS: - Eliminated 'box in a box' nesting - Simpler directory structure - Direct access to all modules - Cleaner imports - Easier maintenance and development - CLI works with both 'python main.py' and 'aitbc' commands
306 lines
10 KiB
Python
Executable File
306 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
KYC/AML Provider Integration - Simplified for CLI
|
|
Basic HTTP client for compliance verification
|
|
"""
|
|
|
|
import json
|
|
import hashlib
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
import logging
|
|
import httpx
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class KYCProvider(str, Enum):
|
|
"""KYC service providers"""
|
|
CHAINALYSIS = "chainalysis"
|
|
SUMSUB = "sumsub"
|
|
ONFIDO = "onfido"
|
|
JUMIO = "jumio"
|
|
VERIFF = "veriff"
|
|
|
|
class KYCStatus(str, Enum):
|
|
"""KYC verification status"""
|
|
PENDING = "pending"
|
|
APPROVED = "approved"
|
|
REJECTED = "rejected"
|
|
FAILED = "failed"
|
|
EXPIRED = "expired"
|
|
|
|
class AMLRiskLevel(str, Enum):
|
|
"""AML risk levels"""
|
|
LOW = "low"
|
|
MEDIUM = "medium"
|
|
HIGH = "high"
|
|
CRITICAL = "critical"
|
|
|
|
@dataclass
|
|
class KYCRequest:
|
|
"""KYC verification request"""
|
|
user_id: str
|
|
provider: KYCProvider
|
|
customer_data: Dict[str, Any]
|
|
documents: List[Dict[str, Any]] = None
|
|
verification_level: str = "standard"
|
|
|
|
@dataclass
|
|
class KYCResponse:
|
|
"""KYC verification response"""
|
|
request_id: str
|
|
user_id: str
|
|
provider: KYCProvider
|
|
status: KYCStatus
|
|
risk_score: float
|
|
verification_data: Dict[str, Any]
|
|
created_at: datetime
|
|
expires_at: Optional[datetime] = None
|
|
rejection_reason: Optional[str] = None
|
|
|
|
@dataclass
|
|
class AMLCheck:
|
|
"""AML screening check"""
|
|
check_id: str
|
|
user_id: str
|
|
provider: str
|
|
risk_level: AMLRiskLevel
|
|
risk_score: float
|
|
sanctions_hits: List[Dict[str, Any]]
|
|
pep_hits: List[Dict[str, Any]]
|
|
adverse_media: List[Dict[str, Any]]
|
|
checked_at: datetime
|
|
|
|
class SimpleKYCProvider:
|
|
"""Simplified KYC provider with basic HTTP calls"""
|
|
|
|
def __init__(self):
|
|
self.api_keys: Dict[KYCProvider, str] = {}
|
|
self.base_urls: Dict[KYCProvider, str] = {
|
|
KYCProvider.CHAINALYSIS: "https://api.chainalysis.com",
|
|
KYCProvider.SUMSUB: "https://api.sumsub.com",
|
|
KYCProvider.ONFIDO: "https://api.onfido.com",
|
|
KYCProvider.JUMIO: "https://api.jumio.com",
|
|
KYCProvider.VERIFF: "https://api.veriff.com"
|
|
}
|
|
|
|
def set_api_key(self, provider: KYCProvider, api_key: str):
|
|
"""Set API key for provider"""
|
|
self.api_keys[provider] = api_key
|
|
logger.info(f"✅ API key set for {provider}")
|
|
|
|
def submit_kyc_verification(self, request: KYCRequest) -> KYCResponse:
|
|
"""Submit KYC verification to provider"""
|
|
try:
|
|
if request.provider not in self.api_keys:
|
|
raise ValueError(f"No API key configured for {request.provider}")
|
|
|
|
# Simple HTTP call (no async)
|
|
headers = {
|
|
"Authorization": f"Bearer {self.api_keys[request.provider]}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
payload = {
|
|
"userId": request.user_id,
|
|
"customerData": request.customer_data,
|
|
"verificationLevel": request.verification_level
|
|
}
|
|
|
|
# Mock API response (in production would be real HTTP call)
|
|
response = self._mock_kyc_response(request)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ KYC submission failed: {e}")
|
|
raise
|
|
|
|
def check_kyc_status(self, request_id: str, provider: KYCProvider) -> KYCResponse:
|
|
"""Check KYC verification status"""
|
|
try:
|
|
# Mock status check - in production would call provider API
|
|
hash_val = int(hashlib.md5(request_id.encode()).hexdigest()[:8], 16)
|
|
|
|
if hash_val % 4 == 0:
|
|
status = KYCStatus.APPROVED
|
|
risk_score = 0.05
|
|
elif hash_val % 4 == 1:
|
|
status = KYCStatus.PENDING
|
|
risk_score = 0.15
|
|
elif hash_val % 4 == 2:
|
|
status = KYCStatus.REJECTED
|
|
risk_score = 0.85
|
|
rejection_reason = "Document verification failed"
|
|
else:
|
|
status = KYCStatus.FAILED
|
|
risk_score = 0.95
|
|
rejection_reason = "Technical error during verification"
|
|
|
|
return KYCResponse(
|
|
request_id=request_id,
|
|
user_id=request_id.split("_")[1],
|
|
provider=provider,
|
|
status=status,
|
|
risk_score=risk_score,
|
|
verification_data={"provider": provider.value, "checked": True},
|
|
created_at=datetime.now() - timedelta(hours=1),
|
|
rejection_reason=rejection_reason if status in [KYCStatus.REJECTED, KYCStatus.FAILED] else None
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ KYC status check failed: {e}")
|
|
raise
|
|
|
|
def _mock_kyc_response(self, request: KYCRequest) -> KYCResponse:
|
|
"""Mock KYC response for testing"""
|
|
return KYCResponse(
|
|
request_id=f"{request.provider.value}_{request.user_id}_{int(datetime.now().timestamp())}",
|
|
user_id=request.user_id,
|
|
provider=request.provider,
|
|
status=KYCStatus.PENDING,
|
|
risk_score=0.15,
|
|
verification_data={"provider": request.provider.value, "submitted": True},
|
|
created_at=datetime.now(),
|
|
expires_at=datetime.now() + timedelta(days=30)
|
|
)
|
|
|
|
class SimpleAMLProvider:
|
|
"""Simplified AML provider with basic HTTP calls"""
|
|
|
|
def __init__(self):
|
|
self.api_keys: Dict[str, str] = {}
|
|
|
|
def set_api_key(self, provider: str, api_key: str):
|
|
"""Set API key for AML provider"""
|
|
self.api_keys[provider] = api_key
|
|
logger.info(f"✅ AML API key set for {provider}")
|
|
|
|
def screen_user(self, user_id: str, user_data: Dict[str, Any]) -> AMLCheck:
|
|
"""Screen user for AML compliance"""
|
|
try:
|
|
# Mock AML screening - in production would call real provider
|
|
hash_val = int(hashlib.md5(f"{user_id}_{user_data.get('email', '')}".encode()).hexdigest()[:8], 16)
|
|
|
|
if hash_val % 5 == 0:
|
|
risk_level = AMLRiskLevel.CRITICAL
|
|
risk_score = 0.95
|
|
sanctions_hits = [{"list": "OFAC", "name": "Test Sanction", "confidence": 0.9}]
|
|
elif hash_val % 5 == 1:
|
|
risk_level = AMLRiskLevel.HIGH
|
|
risk_score = 0.75
|
|
sanctions_hits = []
|
|
elif hash_val % 5 == 2:
|
|
risk_level = AMLRiskLevel.MEDIUM
|
|
risk_score = 0.45
|
|
sanctions_hits = []
|
|
else:
|
|
risk_level = AMLRiskLevel.LOW
|
|
risk_score = 0.15
|
|
sanctions_hits = []
|
|
|
|
return AMLCheck(
|
|
check_id=f"aml_{user_id}_{int(datetime.now().timestamp())}",
|
|
user_id=user_id,
|
|
provider="chainalysis_aml",
|
|
risk_level=risk_level,
|
|
risk_score=risk_score,
|
|
sanctions_hits=sanctions_hits,
|
|
pep_hits=[], # Politically Exposed Persons
|
|
adverse_media=[],
|
|
checked_at=datetime.now()
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ AML screening failed: {e}")
|
|
raise
|
|
|
|
# Global instances
|
|
kyc_provider = SimpleKYCProvider()
|
|
aml_provider = SimpleAMLProvider()
|
|
|
|
# CLI Interface Functions
|
|
def submit_kyc_verification(user_id: str, provider: str, customer_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Submit KYC verification"""
|
|
kyc_provider.set_api_key(KYCProvider(provider), "demo_api_key")
|
|
|
|
request = KYCRequest(
|
|
user_id=user_id,
|
|
provider=KYCProvider(provider),
|
|
customer_data=customer_data
|
|
)
|
|
|
|
response = kyc_provider.submit_kyc_verification(request)
|
|
|
|
return {
|
|
"request_id": response.request_id,
|
|
"user_id": response.user_id,
|
|
"provider": response.provider.value,
|
|
"status": response.status.value,
|
|
"risk_score": response.risk_score,
|
|
"created_at": response.created_at.isoformat()
|
|
}
|
|
|
|
def check_kyc_status(request_id: str, provider: str) -> Dict[str, Any]:
|
|
"""Check KYC verification status"""
|
|
response = kyc_provider.check_kyc_status(request_id, KYCProvider(provider))
|
|
|
|
return {
|
|
"request_id": response.request_id,
|
|
"user_id": response.user_id,
|
|
"provider": response.provider.value,
|
|
"status": response.status.value,
|
|
"risk_score": response.risk_score,
|
|
"rejection_reason": response.rejection_reason,
|
|
"created_at": response.created_at.isoformat()
|
|
}
|
|
|
|
def perform_aml_screening(user_id: str, user_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Perform AML screening"""
|
|
aml_provider.set_api_key("chainalysis_aml", "demo_api_key")
|
|
|
|
check = aml_provider.screen_user(user_id, user_data)
|
|
|
|
return {
|
|
"check_id": check.check_id,
|
|
"user_id": check.user_id,
|
|
"provider": check.provider,
|
|
"risk_level": check.risk_level.value,
|
|
"risk_score": check.risk_score,
|
|
"sanctions_hits": check.sanctions_hits,
|
|
"checked_at": check.checked_at.isoformat()
|
|
}
|
|
|
|
# Test function
|
|
def test_kyc_aml_integration():
|
|
"""Test KYC/AML integration"""
|
|
print("🧪 Testing KYC/AML Integration...")
|
|
|
|
# Test KYC submission
|
|
customer_data = {
|
|
"first_name": "John",
|
|
"last_name": "Doe",
|
|
"email": "john.doe@example.com",
|
|
"date_of_birth": "1990-01-01"
|
|
}
|
|
|
|
kyc_result = submit_kyc_verification("user123", "chainalysis", customer_data)
|
|
print(f"✅ KYC Submitted: {kyc_result}")
|
|
|
|
# Test KYC status check
|
|
kyc_status = check_kyc_status(kyc_result["request_id"], "chainalysis")
|
|
print(f"📋 KYC Status: {kyc_status}")
|
|
|
|
# Test AML screening
|
|
aml_result = perform_aml_screening("user123", customer_data)
|
|
print(f"🔍 AML Screening: {aml_result}")
|
|
|
|
print("🎉 KYC/AML integration test complete!")
|
|
|
|
if __name__ == "__main__":
|
|
test_kyc_aml_integration()
|