From ca0c0764468a4770548ca863c7418ab4159e2175 Mon Sep 17 00:00:00 2001 From: aitbc Date: Sat, 9 May 2026 12:34:53 +0200 Subject: [PATCH] refactor: add API versioning and security headers utilities - Create APIVersion enum and api_version decorator - Implement APIVersionRouter for version routing - Add deprecation warnings and sunset dates for deprecated APIs - Create SecurityHeaders dataclass for security headers - Implement SecurityHeadersMiddleware for applying headers - Create CORSMiddleware for CORS policy management - Add production and development security header presets - Implement strict and permissive CORS configurations --- aitbc/api_versioning.py | 136 ++++++++++++++++++++++ aitbc/security_headers.py | 234 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 370 insertions(+) create mode 100644 aitbc/api_versioning.py create mode 100644 aitbc/security_headers.py diff --git a/aitbc/api_versioning.py b/aitbc/api_versioning.py new file mode 100644 index 00000000..f172ba60 --- /dev/null +++ b/aitbc/api_versioning.py @@ -0,0 +1,136 @@ +""" +API versioning utilities for AITBC +Provides API versioning for backward compatibility +""" + +from typing import Optional, Dict, Any, Callable +from functools import wraps +from enum import Enum +from datetime import datetime + +from .aitbc_logging import get_logger + +logger = get_logger(__name__) + + +class APIVersion(Enum): + """API version enumeration""" + V1 = "v1" + V2 = "v2" + LATEST = "latest" + + +class DeprecatedAPIError(Exception): + """Exception raised when deprecated API is called""" + pass + + +def api_version( + version: APIVersion = APIVersion.V1, + deprecated: bool = False, + deprecation_date: Optional[datetime] = None, + sunset_date: Optional[datetime] = None +): + """ + Decorator to mark API endpoint with version information + + Args: + version: API version + deprecated: Whether the endpoint is deprecated + deprecation_date: Date when endpoint was deprecated + sunset_date: Date when endpoint will be removed + """ + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + if deprecated: + warning_msg = f"API endpoint {func.__name__} is deprecated" + if sunset_date: + warning_msg += f" and will be removed on {sunset_date.isoformat()}" + logger.warning(warning_msg) + + # Add version info to response if applicable + result = func(*args, **kwargs) + + if isinstance(result, dict): + result["_meta"] = result.get("_meta", {}) + result["_meta"]["api_version"] = version.value + if deprecated: + result["_meta"]["deprecated"] = True + if deprecation_date: + result["_meta"]["deprecated_since"] = deprecation_date.isoformat() + if sunset_date: + result["_meta"]["sunset_date"] = sunset_date.isoformat() + + return result + + wrapper._api_version = version.value + wrapper._deprecated = deprecated + wrapper._deprecation_date = deprecation_date + wrapper._sunset_date = sunset_date + + return wrapper + + return decorator + + +class APIVersionRouter: + """ + API version router for handling multiple API versions. + Routes requests to appropriate version handlers. + """ + + def __init__(self): + """Initialize API version router""" + self._version_handlers: Dict[str, Callable] = {} + self._default_version = APIVersion.V1.value + + def register_handler(self, version: str, handler: Callable) -> None: + """ + Register a handler for a specific API version + + Args: + version: API version string + handler: Handler function + """ + self._version_handlers[version] = handler + logger.info(f"Registered handler for API version {version}") + + def set_default_version(self, version: str) -> None: + """ + Set default API version + + Args: + version: Default version string + """ + self._default_version = version + logger.info(f"Set default API version to {version}") + + def route(self, version: Optional[str] = None) -> Callable: + """ + Route request to appropriate version handler + + Args: + version: Requested version (uses default if None) + + Returns: + Handler function + + Raises: + ValueError: If version is not supported + """ + target_version = version or self._default_version + + if target_version not in self._version_handlers: + raise ValueError(f"Unsupported API version: {target_version}") + + return self._version_handlers[target_version] + + def get_supported_versions(self) -> list: + """ + Get list of supported API versions + + Returns: + List of supported version strings + """ + return list(self._version_handlers.keys()) diff --git a/aitbc/security_headers.py b/aitbc/security_headers.py new file mode 100644 index 00000000..1d26bbf0 --- /dev/null +++ b/aitbc/security_headers.py @@ -0,0 +1,234 @@ +""" +Security headers and CORS utilities for AITBC web services +Provides security headers and CORS policies configuration +""" + +from typing import Dict, List, Optional, Set +from dataclasses import dataclass + +from .aitbc_logging import get_logger + +logger = get_logger(__name__) + + +@dataclass +class SecurityHeaders: + """Security headers configuration""" + X_Content_Type_Options: str = "nosniff" + X_Frame_Options: str = "DENY" + X_XSS_Protection: str = "1; mode=block" + Strict_Transport_Security: str = "max-age=31536000; includeSubDomains" + Content_Security_Policy: str = "default-src 'self'" + Referrer_Policy: str = "strict-origin-when-cross-origin" + Permissions_Policy: str = "" + Cache_Control: str = "no-cache, no-store, must-revalidate" + Pragma: str = "no-cache" + + +@dataclass +class CORSConfig: + """CORS configuration""" + allow_origins: List[str] + allow_methods: List[str] + allow_headers: List[str] + allow_credentials: bool = False + expose_headers: List[str] = None + max_age: int = 3600 + + +class SecurityHeadersMiddleware: + """ + Security headers middleware for web services. + Adds security headers to HTTP responses. + """ + + def __init__(self, headers: Optional[SecurityHeaders] = None): + """ + Initialize security headers middleware + + Args: + headers: Security headers configuration + """ + self.headers = headers or SecurityHeaders() + + def get_headers(self) -> Dict[str, str]: + """ + Get security headers dictionary + + Returns: + Dictionary of security headers + """ + return { + "X-Content-Type-Options": self.headers.X_Content_Type_Options, + "X-Frame-Options": self.headers.X_Frame_Options, + "X-XSS-Protection": self.headers.X_XSS_Protection, + "Strict-Transport-Security": self.headers.Strict_Transport_Security, + "Content-Security-Policy": self.headers.Content_Security_Policy, + "Referrer-Policy": self.headers.Referrer_Policy, + "Permissions-Policy": self.headers.Permissions_Policy, + "Cache-Control": self.headers.Cache_Control, + "Pragma": self.headers.Pragma + } + + def apply_to_response(self, response_headers: Dict[str, str]) -> Dict[str, str]: + """ + Apply security headers to response + + Args: + response_headers: Existing response headers + + Returns: + Response headers with security headers added + """ + security_headers = self.get_headers() + response_headers.update(security_headers) + return response_headers + + +class CORSMiddleware: + """ + CORS middleware for web services. + Handles Cross-Origin Resource Sharing policies. + """ + + def __init__(self, config: CORSConfig): + """ + Initialize CORS middleware + + Args: + config: CORS configuration + """ + self.config = config + + def get_cors_headers(self, origin: str) -> Dict[str, str]: + """ + Get CORS headers for a request + + Args: + origin: Request origin + + Returns: + Dictionary of CORS headers + """ + headers = {} + + # Check if origin is allowed + if self._is_origin_allowed(origin): + headers["Access-Control-Allow-Origin"] = origin + + if self.config.allow_credentials: + headers["Access-Control-Allow-Credentials"] = "true" + + headers["Access-Control-Allow-Methods"] = ", ".join(self.config.allow_methods) + headers["Access-Control-Allow-Headers"] = ", ".join(self.config.allow_headers) + + if self.config.expose_headers: + headers["Access-Control-Expose-Headers"] = ", ".join(self.config.expose_headers) + + headers["Access-Control-Max-Age"] = str(self.config.max_age) + + return headers + + def _is_origin_allowed(self, origin: str) -> bool: + """ + Check if origin is allowed based on CORS policy + + Args: + origin: Request origin + + Returns: + True if origin is allowed, False otherwise + """ + if "*" in self.config.allow_origins: + return True + + return origin in self.config.allow_origins + + def is_preflight_request(self, method: str) -> bool: + """ + Check if request is a CORS preflight request + + Args: + method: HTTP method + + Returns: + True if preflight request, False otherwise + """ + return method.upper() == "OPTIONS" + + +def create_production_security_headers() -> SecurityHeaders: + """ + Create security headers configuration for production + + Returns: + SecurityHeaders configured for production + """ + return SecurityHeaders( + X_Content_Type_Options="nosniff", + X_Frame_Options="DENY", + X_XSS_Protection="1; mode=block", + Strict_Transport_Security="max-age=31536000; includeSubDomains; preload", + Content_Security_Policy="default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline'", + Referrer_Policy="strict-origin-when-cross-origin", + Permissions_Policy="geolocation=(), microphone=(), camera=()", + Cache_Control="no-cache, no-store, must-revalidate", + Pragma="no-cache" + ) + + +def create_development_security_headers() -> SecurityHeaders: + """ + Create security headers configuration for development + + Returns: + SecurityHeaders configured for development + """ + return SecurityHeaders( + X_Content_Type_Options="nosniff", + X_Frame_Options="SAMEORIGIN", + X_XSS_Protection="1; mode=block", + Strict_Transport_Security="max-age=3600", + Content_Security_Policy="default-src 'self' 'unsafe-inline' 'unsafe-eval'", + Referrer_Policy="strict-origin-when-cross-origin", + Permissions_Policy="", + Cache_Control="no-cache", + Pragma="no-cache" + ) + + +def create_strict_cors_config(allowed_origins: List[str]) -> CORSConfig: + """ + Create strict CORS configuration + + Args: + allowed_origins: List of allowed origins + + Returns: + CORSConfig with strict settings + """ + return CORSConfig( + allow_origins=allowed_origins, + allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"], + allow_headers=["Content-Type", "Authorization", "X-API-Key"], + allow_credentials=True, + expose_headers=["X-Request-ID"], + max_age=3600 + ) + + +def create_permissive_cors_config() -> CORSConfig: + """ + Create permissive CORS configuration (for development) + + Returns: + CORSConfig with permissive settings + """ + return CORSConfig( + allow_origins=["*"], + allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], + allow_headers=["*"], + allow_credentials=False, + expose_headers=["*"], + max_age=86400 + )