refactor: add API versioning and security headers utilities
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled

- Create APIVersion enum and api_version decorator
- Implement APIVersionRouter for version routing
- Add deprecation warnings and sunset dates for deprecated APIs
- Create SecurityHeaders dataclass for security headers
- Implement SecurityHeadersMiddleware for applying headers
- Create CORSMiddleware for CORS policy management
- Add production and development security header presets
- Implement strict and permissive CORS configurations
This commit is contained in:
aitbc
2026-05-09 12:34:53 +02:00
parent a1791bd0d6
commit ca0c076446
2 changed files with 370 additions and 0 deletions

136
aitbc/api_versioning.py Normal file
View File

@@ -0,0 +1,136 @@
"""
API versioning utilities for AITBC
Provides API versioning for backward compatibility
"""
from typing import Optional, Dict, Any, Callable
from functools import wraps
from enum import Enum
from datetime import datetime
from .aitbc_logging import get_logger
logger = get_logger(__name__)
class APIVersion(Enum):
"""API version enumeration"""
V1 = "v1"
V2 = "v2"
LATEST = "latest"
class DeprecatedAPIError(Exception):
"""Exception raised when deprecated API is called"""
pass
def api_version(
version: APIVersion = APIVersion.V1,
deprecated: bool = False,
deprecation_date: Optional[datetime] = None,
sunset_date: Optional[datetime] = None
):
"""
Decorator to mark API endpoint with version information
Args:
version: API version
deprecated: Whether the endpoint is deprecated
deprecation_date: Date when endpoint was deprecated
sunset_date: Date when endpoint will be removed
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
if deprecated:
warning_msg = f"API endpoint {func.__name__} is deprecated"
if sunset_date:
warning_msg += f" and will be removed on {sunset_date.isoformat()}"
logger.warning(warning_msg)
# Add version info to response if applicable
result = func(*args, **kwargs)
if isinstance(result, dict):
result["_meta"] = result.get("_meta", {})
result["_meta"]["api_version"] = version.value
if deprecated:
result["_meta"]["deprecated"] = True
if deprecation_date:
result["_meta"]["deprecated_since"] = deprecation_date.isoformat()
if sunset_date:
result["_meta"]["sunset_date"] = sunset_date.isoformat()
return result
wrapper._api_version = version.value
wrapper._deprecated = deprecated
wrapper._deprecation_date = deprecation_date
wrapper._sunset_date = sunset_date
return wrapper
return decorator
class APIVersionRouter:
"""
API version router for handling multiple API versions.
Routes requests to appropriate version handlers.
"""
def __init__(self):
"""Initialize API version router"""
self._version_handlers: Dict[str, Callable] = {}
self._default_version = APIVersion.V1.value
def register_handler(self, version: str, handler: Callable) -> None:
"""
Register a handler for a specific API version
Args:
version: API version string
handler: Handler function
"""
self._version_handlers[version] = handler
logger.info(f"Registered handler for API version {version}")
def set_default_version(self, version: str) -> None:
"""
Set default API version
Args:
version: Default version string
"""
self._default_version = version
logger.info(f"Set default API version to {version}")
def route(self, version: Optional[str] = None) -> Callable:
"""
Route request to appropriate version handler
Args:
version: Requested version (uses default if None)
Returns:
Handler function
Raises:
ValueError: If version is not supported
"""
target_version = version or self._default_version
if target_version not in self._version_handlers:
raise ValueError(f"Unsupported API version: {target_version}")
return self._version_handlers[target_version]
def get_supported_versions(self) -> list:
"""
Get list of supported API versions
Returns:
List of supported version strings
"""
return list(self._version_handlers.keys())

234
aitbc/security_headers.py Normal file
View File

@@ -0,0 +1,234 @@
"""
Security headers and CORS utilities for AITBC web services
Provides security headers and CORS policies configuration
"""
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
from .aitbc_logging import get_logger
logger = get_logger(__name__)
@dataclass
class SecurityHeaders:
"""Security headers configuration"""
X_Content_Type_Options: str = "nosniff"
X_Frame_Options: str = "DENY"
X_XSS_Protection: str = "1; mode=block"
Strict_Transport_Security: str = "max-age=31536000; includeSubDomains"
Content_Security_Policy: str = "default-src 'self'"
Referrer_Policy: str = "strict-origin-when-cross-origin"
Permissions_Policy: str = ""
Cache_Control: str = "no-cache, no-store, must-revalidate"
Pragma: str = "no-cache"
@dataclass
class CORSConfig:
"""CORS configuration"""
allow_origins: List[str]
allow_methods: List[str]
allow_headers: List[str]
allow_credentials: bool = False
expose_headers: List[str] = None
max_age: int = 3600
class SecurityHeadersMiddleware:
"""
Security headers middleware for web services.
Adds security headers to HTTP responses.
"""
def __init__(self, headers: Optional[SecurityHeaders] = None):
"""
Initialize security headers middleware
Args:
headers: Security headers configuration
"""
self.headers = headers or SecurityHeaders()
def get_headers(self) -> Dict[str, str]:
"""
Get security headers dictionary
Returns:
Dictionary of security headers
"""
return {
"X-Content-Type-Options": self.headers.X_Content_Type_Options,
"X-Frame-Options": self.headers.X_Frame_Options,
"X-XSS-Protection": self.headers.X_XSS_Protection,
"Strict-Transport-Security": self.headers.Strict_Transport_Security,
"Content-Security-Policy": self.headers.Content_Security_Policy,
"Referrer-Policy": self.headers.Referrer_Policy,
"Permissions-Policy": self.headers.Permissions_Policy,
"Cache-Control": self.headers.Cache_Control,
"Pragma": self.headers.Pragma
}
def apply_to_response(self, response_headers: Dict[str, str]) -> Dict[str, str]:
"""
Apply security headers to response
Args:
response_headers: Existing response headers
Returns:
Response headers with security headers added
"""
security_headers = self.get_headers()
response_headers.update(security_headers)
return response_headers
class CORSMiddleware:
"""
CORS middleware for web services.
Handles Cross-Origin Resource Sharing policies.
"""
def __init__(self, config: CORSConfig):
"""
Initialize CORS middleware
Args:
config: CORS configuration
"""
self.config = config
def get_cors_headers(self, origin: str) -> Dict[str, str]:
"""
Get CORS headers for a request
Args:
origin: Request origin
Returns:
Dictionary of CORS headers
"""
headers = {}
# Check if origin is allowed
if self._is_origin_allowed(origin):
headers["Access-Control-Allow-Origin"] = origin
if self.config.allow_credentials:
headers["Access-Control-Allow-Credentials"] = "true"
headers["Access-Control-Allow-Methods"] = ", ".join(self.config.allow_methods)
headers["Access-Control-Allow-Headers"] = ", ".join(self.config.allow_headers)
if self.config.expose_headers:
headers["Access-Control-Expose-Headers"] = ", ".join(self.config.expose_headers)
headers["Access-Control-Max-Age"] = str(self.config.max_age)
return headers
def _is_origin_allowed(self, origin: str) -> bool:
"""
Check if origin is allowed based on CORS policy
Args:
origin: Request origin
Returns:
True if origin is allowed, False otherwise
"""
if "*" in self.config.allow_origins:
return True
return origin in self.config.allow_origins
def is_preflight_request(self, method: str) -> bool:
"""
Check if request is a CORS preflight request
Args:
method: HTTP method
Returns:
True if preflight request, False otherwise
"""
return method.upper() == "OPTIONS"
def create_production_security_headers() -> SecurityHeaders:
"""
Create security headers configuration for production
Returns:
SecurityHeaders configured for production
"""
return SecurityHeaders(
X_Content_Type_Options="nosniff",
X_Frame_Options="DENY",
X_XSS_Protection="1; mode=block",
Strict_Transport_Security="max-age=31536000; includeSubDomains; preload",
Content_Security_Policy="default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline'",
Referrer_Policy="strict-origin-when-cross-origin",
Permissions_Policy="geolocation=(), microphone=(), camera=()",
Cache_Control="no-cache, no-store, must-revalidate",
Pragma="no-cache"
)
def create_development_security_headers() -> SecurityHeaders:
"""
Create security headers configuration for development
Returns:
SecurityHeaders configured for development
"""
return SecurityHeaders(
X_Content_Type_Options="nosniff",
X_Frame_Options="SAMEORIGIN",
X_XSS_Protection="1; mode=block",
Strict_Transport_Security="max-age=3600",
Content_Security_Policy="default-src 'self' 'unsafe-inline' 'unsafe-eval'",
Referrer_Policy="strict-origin-when-cross-origin",
Permissions_Policy="",
Cache_Control="no-cache",
Pragma="no-cache"
)
def create_strict_cors_config(allowed_origins: List[str]) -> CORSConfig:
"""
Create strict CORS configuration
Args:
allowed_origins: List of allowed origins
Returns:
CORSConfig with strict settings
"""
return CORSConfig(
allow_origins=allowed_origins,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["Content-Type", "Authorization", "X-API-Key"],
allow_credentials=True,
expose_headers=["X-Request-ID"],
max_age=3600
)
def create_permissive_cors_config() -> CORSConfig:
"""
Create permissive CORS configuration (for development)
Returns:
CORSConfig with permissive settings
"""
return CORSConfig(
allow_origins=["*"],
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"],
allow_headers=["*"],
allow_credentials=False,
expose_headers=["*"],
max_age=86400
)