Security & Compliance Platform
The Security & Compliance Platform provides defense-in-depth security architecture across all system layers, implementing multi-method authentication (SSO, SAML, OAuth2, JWT, MFA), fine-grained RBAC, comprehensive audit logging, data encryption, secrets management, GDPR/CCPA/SOC 2 compliance automation, real-time threat detection, and security incident response. This platform ensures that AltSportsLeagues.ai maintains enterprise-grade security while meeting regulatory requirements for data protection and compliance.
Key Design Principles:
- Defense-in-Depth: Multiple overlapping security controls at every layer
- Zero Trust Architecture: Verify every access request regardless of network location
- Least Privilege: Grant minimum permissions required for each role
- Security by Default: Secure configurations out-of-the-box with no user action
- Compliance Automation: Automated compliance controls and evidence collection
Executive Summary
The Security & Compliance Platform addresses the critical need for robust security in a multi-tenant, AI-powered platform handling sensitive sports league and partnership data. By implementing comprehensive authentication, authorization, encryption, and compliance controls, the platform ensures data protection, regulatory compliance, and operational integrity.
Core Components:
- Authentication Service: Multi-method auth with MFA and SSO support
- RBAC Engine: Fine-grained role-based access control
- Audit Logger: Immutable forensic logging for compliance
- Encryption Layer: Data protection at rest and in transit
- Secrets Manager: Secure credential and key management
- Threat Detector: Real-time anomaly detection and monitoring
- Compliance Engine: Automated GDPR, CCPA, SOC 2 compliance
Architecture Overview
System Context
Container Diagram
Detailed Requirements
REQ-SC-001: Multi-Method Authentication System
User Story: As a security administrator, I want support for multiple authentication methods (SSO, SAML, OAuth2, JWT, API keys), so that users can authenticate using their organization's preferred method.
Acceptance Criteria
- WHEN users authenticate with SSO, THE Authentication_System SHALL support SAML 2.0 and OpenID Connect protocols
- WHEN OAuth2 is used, THE Authentication_System SHALL implement authorization code flow with PKCE
- WHEN JWT tokens are issued, THE Authentication_System SHALL include claims (user_id, roles, permissions, expiry)
- WHEN API keys are used, THE Authentication_System SHALL validate keys using constant-time comparison
- WHEN authentication fails, THE Authentication_System SHALL log failed attempts and implement lockout after 5 failures
Implementation Example
# security/authentication/auth_service.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List
import jwt
import bcrypt
import pyotp
from datetime import datetime, timedelta
from enum import Enum
class AuthMethod(str, Enum):
"""Supported authentication methods"""
PASSWORD = "password"
OAUTH2 = "oauth2"
SAML = "saml"
API_KEY = "api_key"
SSO = "sso"
class UserCredentials(BaseModel):
"""User authentication credentials"""
email: EmailStr
password: str = Field(min_length=12)
mfa_code: Optional[str] = Field(None, min_length=6, max_length=6)
auth_method: AuthMethod = AuthMethod.PASSWORD
class TokenPair(BaseModel):
"""JWT token pair"""
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int
class AuthService:
"""Multi-method authentication service"""
def __init__(
self,
secret_key: str,
access_token_expire_minutes: int = 30,
refresh_token_expire_days: int = 30
):
self.secret_key = secret_key
self.access_token_expire_minutes = access_token_expire_minutes
self.refresh_token_expire_days = refresh_token_expire_days
self.failed_login_attempts = {} # Track failed attempts
self.lockout_threshold = 5
async def authenticate_password(
self,
email: str,
password: str,
mfa_code: Optional[str] = None
) -> TokenPair:
"""Authenticate user with password + optional MFA"""
# Check lockout status
if self._is_locked_out(email):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Account locked due to too many failed attempts"
)
# Retrieve user from database
user = await self._get_user_by_email(email)
if not user:
self._record_failed_login(email)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
# Verify password using constant-time comparison
if not self._verify_password(password, user.hashed_password):
self._record_failed_login(email)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
# Check MFA if enabled
if user.mfa_enabled:
if not mfa_code:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="MFA code required"
)
if not self._verify_mfa(user.mfa_secret, mfa_code):
self._record_failed_login(email)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid MFA code"
)
# Clear failed login attempts
self._clear_failed_logins(email)
# Generate token pair
return await self._generate_token_pair(user)
async def authenticate_oauth2(
self,
provider: str,
authorization_code: str,
redirect_uri: str
) -> TokenPair:
"""Authenticate using OAuth2 authorization code flow"""
# Validate provider
if provider not in ["google", "github", "microsoft"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Unsupported OAuth2 provider"
)
# Exchange authorization code for access token
provider_config = self._get_oauth2_config(provider)
token_response = await self._exchange_authorization_code(
provider_config,
authorization_code,
redirect_uri
)
# Get user info from provider
user_info = await self._get_oauth2_user_info(
provider,
token_response["access_token"]
)
# Find or create user
user = await self._find_or_create_oauth_user(provider, user_info)
# Generate token pair
return await self._generate_token_pair(user)
async def authenticate_saml(self, saml_response: str) -> TokenPair:
"""Authenticate using SAML 2.0"""
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.settings import OneLogin_Saml2_Settings
# Parse and validate SAML response
saml_settings = self._get_saml_settings()
auth = OneLogin_Saml2_Auth(saml_response, saml_settings)
auth.process_response()
if not auth.is_authenticated():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="SAML authentication failed"
)
# Extract user attributes
attributes = auth.get_attributes()
email = attributes.get('email', [None])[0]
if not email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email not provided in SAML response"
)
# Find or create user
user = await self._find_or_create_saml_user(email, attributes)
# Generate token pair
return await self._generate_token_pair(user)
async def authenticate_api_key(self, api_key: str) -> dict:
"""Authenticate using API key"""
# Validate API key format
if not api_key.startswith("als_"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key format"
)
# Look up API key using constant-time comparison
key_record = await self._get_api_key(api_key)
if not key_record or not key_record.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or inactive API key"
)
# Check expiration
if key_record.expires_at and key_record.expires_at < datetime.utcnow():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key expired"
)
# Update last used timestamp
await self._update_api_key_last_used(api_key)
return {
"user_id": key_record.user_id,
"scopes": key_record.scopes,
"rate_limit": key_record.rate_limit
}
async def setup_mfa(self, user_id: str) -> dict:
"""Setup MFA for user"""
# Generate MFA secret
secret = pyotp.random_base32()
# Generate QR code URI
totp = pyotp.TOTP(secret)
user = await self._get_user_by_id(user_id)
qr_uri = totp.provisioning_uri(
name=user.email,
issuer_name="AltSportsLeagues.ai"
)
# Generate backup codes
backup_codes = [self._generate_backup_code() for _ in range(10)]
# Store MFA secret (not activated until verified)
await self._store_mfa_setup(user_id, secret, backup_codes)
return {
"secret": secret,
"qr_uri": qr_uri,
"backup_codes": backup_codes
}
async def verify_mfa_setup(
self,
user_id: str,
verification_code: str
) -> bool:
"""Verify and activate MFA setup"""
setup = await self._get_mfa_setup(user_id)
if not setup:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No MFA setup found"
)
# Verify code
totp = pyotp.TOTP(setup.secret)
if not totp.verify(verification_code):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid verification code"
)
# Activate MFA
await self._activate_mfa(user_id, setup.secret, setup.backup_codes)
return True
def _verify_password(self, password: str, hashed: str) -> bool:
"""Verify password using bcrypt"""
return bcrypt.checkpw(
password.encode('utf-8'),
hashed.encode('utf-8')
)
def _verify_mfa(self, secret: str, code: str) -> bool:
"""Verify TOTP code"""
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1)
async def _generate_token_pair(self, user) -> TokenPair:
"""Generate JWT access and refresh tokens"""
# Access token claims
access_payload = {
"sub": user.user_id,
"email": user.email,
"roles": user.roles,
"permissions": await self._get_user_permissions(user.user_id),
"exp": datetime.utcnow() + timedelta(minutes=self.access_token_expire_minutes),
"iat": datetime.utcnow(),
"type": "access"
}
# Refresh token claims
refresh_payload = {
"sub": user.user_id,
"exp": datetime.utcnow() + timedelta(days=self.refresh_token_expire_days),
"iat": datetime.utcnow(),
"type": "refresh"
}
# Generate tokens
access_token = jwt.encode(access_payload, self.secret_key, algorithm="HS256")
refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm="HS256")
# Store refresh token
await self._store_refresh_token(user.user_id, refresh_token)
return TokenPair(
access_token=access_token,
refresh_token=refresh_token,
expires_in=self.access_token_expire_minutes * 60
)
def _is_locked_out(self, email: str) -> bool:
"""Check if account is locked out"""
if email not in self.failed_login_attempts:
return False
attempts = self.failed_login_attempts[email]
return attempts['count'] >= self.lockout_threshold
def _record_failed_login(self, email: str):
"""Record failed login attempt"""
if email not in self.failed_login_attempts:
self.failed_login_attempts[email] = {
'count': 0,
'first_attempt': datetime.utcnow()
}
self.failed_login_attempts[email]['count'] += 1
def _clear_failed_logins(self, email: str):
"""Clear failed login attempts"""
if email in self.failed_login_attempts:
del self.failed_login_attempts[email]
def _generate_backup_code(self) -> str:
"""Generate backup recovery code"""
import secrets
return f"{secrets.randbelow(10**4):04d}-{secrets.randbelow(10**4):04d}"
# FastAPI integration
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
auth_service: AuthService = Depends()
):
"""Dependency to get current authenticated user"""
token = credentials.credentials
try:
payload = jwt.decode(token, auth_service.secret_key, algorithms=["HS256"])
# Verify token type
if payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type"
)
# Check expiration
if datetime.fromtimestamp(payload["exp"]) < datetime.utcnow():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired"
)
return payload
except jwt.InvalidTokenError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {str(e)}"
)REQ-SC-002: Multi-Factor Authentication (MFA)
User Story: As a compliance officer, I want mandatory MFA for privileged accounts and optional MFA for regular users, so that account compromise risk is minimized.
Acceptance Criteria
- WHEN MFA is enabled, THE Authentication_System SHALL support TOTP (Time-based One-Time Password) via authenticator apps
- WHEN SMS backup is requested, THE Authentication_System SHALL send verification codes via Twilio or equivalent
- WHEN admin accounts authenticate, THE Authentication_System SHALL require MFA before granting access
- WHEN MFA setup occurs, THE Authentication_System SHALL display QR codes and backup recovery codes
- WHEN recovery codes are used, THE Authentication_System SHALL invalidate used codes and log the recovery event
Implementation Example
# security/authentication/mfa_service.py
import pyotp
from datetime import datetime, timedelta
from typing import Optional, List
class MFAService:
"""Multi-Factor Authentication service"""
def __init__(self, backup_code_length: int = 10):
self.backup_code_length = backup_code_length
self.used_backup_codes = set() # In production, store in database
async def setup_mfa(self, user_id: str, email: str) -> MFASetupResponse:
"""Initialize MFA setup for user"""
# Generate TOTP secret
secret = pyotp.random_base32()
# Generate QR code URI
totp = pyotp.TOTP(secret)
qr_uri = totp.provisioning_uri(
name=email,
issuer_name="AltSportsLeagues.ai"
)
# Generate backup codes
backup_codes = []
for _ in range(self.backup_code_length):
code = self._generate_backup_code()
backup_codes.append({
"code": code,
"used": False,
"created_at": datetime.utcnow()
})
# Store secret and backup codes (encrypted)
await self._store_mfa_secret(user_id, secret)
await self._store_backup_codes(user_id, backup_codes)
return MFASetupResponse(
qr_uri=qr_uri,
secret=secret, # Show once during setup
backup_codes=[bc["code"] for bc in backup_codes],
instructions="Scan QR code with authenticator app or use backup codes"
)
async def verify_totp(self, user_id: str, code: str) -> bool:
"""Verify TOTP code"""
# Retrieve secret
secret = await self._get_mfa_secret(user_id)
if not secret:
raise ValueError("MFA not set up")
totp = pyotp.TOTP(secret)
is_valid = totp.verify(code, valid_window=1)
if is_valid:
# Log successful verification
await self._log_mfa_verification(user_id, "totp", success=True)
return is_valid
async def verify_backup_code(self, user_id: str, code: str) -> bool:
"""Verify backup code and mark as used"""
backup_codes = await self._get_backup_codes(user_id)
for backup in backup_codes:
if backup["code"] == code and not backup["used"]:
# Mark as used
backup["used"] = True
backup["used_at"] = datetime.utcnow()
await self._update_backup_code(user_id, backup)
# Log usage
await self._log_mfa_verification(user_id, "backup_code", success=True)
return True
# Log failed verification
await self._log_mfa_verification(user_id, "backup_code", success=False)
return False
def _generate_backup_code(self) -> str:
"""Generate secure backup code"""
import secrets
return f"{secrets.randbelow(10**4):04d}-{secrets.randbelow(10**4):04d}"
async def generate_new_backup_codes(self, user_id: str) -> List[str]:
"""Generate new backup codes"""
# Invalidate existing codes
await self._invalidate_backup_codes(user_id)
# Generate new codes
backup_codes = []
for _ in range(self.backup_code_length):
code = self._generate_backup_code()
backup_codes.append({
"code": code,
"used": False,
"created_at": datetime.utcnow()
})
await self._store_backup_codes(user_id, backup_codes)
return [bc["code"] for bc in backup_codes]REQ-SC-003: Fine-Grained RBAC (Role-Based Access Control)
User Story: As a system architect, I want fine-grained RBAC with roles, permissions, and resource-level access control, so that users only access data they're authorized to see.
Acceptance Criteria
- WHEN roles are defined, THE RBAC_Engine SHALL support hierarchical roles (admin > manager > user > guest)
- WHEN permissions are checked, THE RBAC_Engine SHALL evaluate resource-level permissions (can_edit_league:league_123)
- WHEN access is denied, THE RBAC_Engine SHALL log authorization failures with user context
- WHEN roles change, THE RBAC_Engine SHALL invalidate cached permissions immediately
- WHEN permission inheritance is used, THE RBAC_Engine SHALL resolve inherited permissions from parent roles
Implementation Example
# security/rbac/rbac_engine.py
from typing import List, Dict, Set, Optional
from pydantic import BaseModel
from enum import Enum
class Permission(str, Enum):
"""System permissions"""
# League permissions
LEAGUE_READ = "league:read"
LEAGUE_WRITE = "league:write"
LEAGUE_DELETE = "league:delete"
LEAGUE_ADMIN = "league:admin"
# User permissions
USER_READ = "user:read"
USER_WRITE = "user:write"
USER_DELETE = "user:delete"
USER_ADMIN = "user:admin"
# System permissions
SYSTEM_ADMIN = "system:admin"
AUDIT_READ = "audit:read"
SETTINGS_WRITE = "settings:write"
class Role(BaseModel):
"""Role definition"""
name: str
permissions: Set[Permission]
parent_role: Optional[str] = None
description: str
class ResourcePermission(BaseModel):
"""Resource-level permission"""
resource_type: str # e.g., "league", "user"
resource_id: str
permission: Permission
class RBACEngine:
"""Fine-grained Role-Based Access Control"""
def __init__(self):
self.roles: Dict[str, Role] = {}
self.user_roles: Dict[str, Set[str]] = {}
self.resource_permissions: Dict[str, List[ResourcePermission]] = {}
# Initialize default roles
self._init_default_roles()
def _init_default_roles(self):
"""Initialize default system roles"""
# Guest role
self.register_role(Role(
name="guest",
permissions={Permission.LEAGUE_READ},
description="Guest user with read-only access"
))
# User role
self.register_role(Role(
name="user",
permissions={
Permission.LEAGUE_READ,
Permission.LEAGUE_WRITE,
Permission.USER_READ
},
parent_role="guest",
description="Regular user"
))
# Manager role
self.register_role(Role(
name="manager",
permissions={
Permission.LEAGUE_READ,
Permission.LEAGUE_WRITE,
Permission.LEAGUE_DELETE,
Permission.USER_READ,
Permission.USER_WRITE,
Permission.AUDIT_READ
},
parent_role="user",
description="Manager with elevated privileges"
))
# Admin role
self.register_role(Role(
name="admin",
permissions={
Permission.LEAGUE_ADMIN,
Permission.USER_ADMIN,
Permission.SYSTEM_ADMIN,
Permission.AUDIT_READ,
Permission.SETTINGS_WRITE
},
parent_role="manager",
description="Full system administrator"
))
def register_role(self, role: Role):
"""Register a new role"""
self.roles[role.name] = role
def assign_role(self, user_id: str, role_name: str):
"""Assign role to user"""
if role_name not in self.roles:
raise ValueError(f"Role {role_name} not found")
if user_id not in self.user_roles:
self.user_roles[user_id] = set()
self.user_roles[user_id].add(role_name)
def revoke_role(self, user_id: str, role_name: str):
"""Revoke role from user"""
if user_id in self.user_roles:
self.user_roles[user_id].discard(role_name)
def get_user_permissions(self, user_id: str) -> Set[Permission]:
"""Get all permissions for user including inherited"""
permissions = set()
if user_id not in self.user_roles:
return permissions
for role_name in self.user_roles[user_id]:
role = self.roles.get(role_name)
if role:
# Add role permissions
permissions.update(role.permissions)
# Add inherited permissions from parent roles
permissions.update(self._get_inherited_permissions(role))
return permissions
def _get_inherited_permissions(self, role: Role) -> Set[Permission]:
"""Recursively get inherited permissions"""
permissions = set()
if role.parent_role:
parent = self.roles.get(role.parent_role)
if parent:
permissions.update(parent.permissions)
permissions.update(self._get_inherited_permissions(parent))
return permissions
def has_permission(
self,
user_id: str,
permission: Permission,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None
) -> bool:
"""Check if user has permission"""
# Check global permissions
user_permissions = self.get_user_permissions(user_id)
if permission in user_permissions:
return True
# Check resource-level permissions
if resource_type and resource_id:
resource_key = f"{resource_type}:{resource_id}"
if resource_key in self.resource_permissions:
for rp in self.resource_permissions[resource_key]:
if rp.permission == permission:
return True
return False
def grant_resource_permission(
self,
user_id: str,
resource_type: str,
resource_id: str,
permission: Permission
):
"""Grant resource-level permission"""
resource_key = f"{resource_type}:{resource_id}"
if resource_key not in self.resource_permissions:
self.resource_permissions[resource_key] = []
self.resource_permissions[resource_key].append(
ResourcePermission(
resource_type=resource_type,
resource_id=resource_id,
permission=permission
)
)
def require_permission(self, permission: Permission):
"""Decorator to require permission"""
def decorator(func):
async def wrapper(*args, current_user: dict = None, **kwargs):
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
user_id = current_user.get("sub")
if not self.has_permission(user_id, permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission {permission} required"
)
return await func(*args, current_user=current_user, **kwargs)
return wrapper
return decoratorREQ-SC-004: Session Management & Security
User Story: As a security engineer, I want secure session management with timeout, refresh, and revocation capabilities, so that inactive sessions are terminated and compromised sessions can be revoked.
Acceptance Criteria
- WHEN sessions are created, THE Session_Manager SHALL set secure, httpOnly, sameSite cookies
- WHEN sessions are idle for 30 minutes, THE Session_Manager SHALL terminate the session
- WHEN refresh tokens are used, THE Session_Manager SHALL rotate refresh tokens on each use
- WHEN session revocation is requested, THE Session_Manager SHALL immediately invalidate all user sessions
- WHEN suspicious activity is detected, THE Session_Manager SHALL force re-authentication
Implementation Example
# security/session/session_manager.py
from datetime import datetime, timedelta
from typing import Optional
import uuid
from fastapi import HTTPException, status
from redis.asyncio import Redis
class SessionManager:
"""Secure session management"""
def __init__(self, redis_client: Redis, session_timeout_minutes: int = 30):
self.redis_client = redis_client
self.session_timeout_minutes = session_timeout_minutes
self.session_prefix = "session:"
async def create_session(self, user_id: str, user_roles: List[str]) -> Session:
"""Create new session"""
session_id = str(uuid.uuid4())
expires_at = datetime.utcnow() + timedelta(minutes=self.session_timeout_minutes)
session_data = {
"user_id": user_id,
"roles": user_roles,
"created_at": datetime.utcnow().isoformat(),
"expires_at": expires_at.isoformat(),
"last_activity": datetime.utcnow().isoformat()
}
# Store session in Redis
await self.redis_client.setex(
f"{self.session_prefix}{session_id}",
self.session_timeout_minutes * 60, # Convert to seconds
json.dumps(session_data)
)
# Set secure cookie
cookie_value = f"{session_id}.{jwt.encode({'session_id': session_id}, self.secret_key, algorithm='HS256')}"
return Session(
session_id=session_id,
cookie_value=cookie_value,
expires_at=expires_at
)
async def validate_session(self, session_cookie: str) -> Optional[SessionData]:
"""Validate session and update activity"""
try:
# Extract session ID from cookie
session_id = session_cookie.split('.')[0]
session_data = await self.redis_client.get(f"{self.session_prefix}{session_id}")
if not session_data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Session expired or invalid"
)
session = json.loads(session_data)
# Check expiration
if datetime.fromisoformat(session["expires_at"]) < datetime.utcnow():
await self.redis_client.delete(f"{self.session_prefix}{session_id}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Session expired"
)
# Update last activity
session["last_activity"] = datetime.utcnow().isoformat()
await self.redis_client.setex(
f"{self.session_prefix}{session_id}",
self.session_timeout_minutes * 60,
json.dumps(session)
)
return {
"session_id": session_id,
"user_id": session["user_id"],
"roles": session["roles"],
"expires_at": datetime.fromisoformat(session["expires_at"])
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid session"
)
async def refresh_session(self, session_id: str) -> TokenPair:
"""Refresh session with new tokens"""
session_data = await self.redis_client.get(f"{self.session_prefix}{session_id}")
if not session_data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Session not found"
)
session = json.loads(session_data)
user_id = session["user_id"]
# Generate new tokens
tokens = await self._generate_token_pair(user_id)
# Update session with new refresh token
session["refresh_token"] = tokens.refresh_token
session["last_activity"] = datetime.utcnow().isoformat()
await self.redis_client.setex(
f"{self.session_prefix}{session_id}",
self.session_timeout_minutes * 60,
json.dumps(session)
)
return tokens
async def revoke_session(self, session_id: str) -> bool:
"""Revoke specific session"""
deleted = await self.redis_client.delete(f"{self.session_prefix}{session_id}")
return deleted > 0
async def revoke_all_sessions(self, user_id: str) -> int:
"""Revoke all sessions for user"""
# Find all sessions for user
pattern = f"{self.session_prefix}*"
keys = await self.redis_client.keys(pattern)
count = 0
for key in keys:
session_data = await self.redis_client.get(key)
if session_data:
session = json.loads(session_data)
if session["user_id"] == user_id:
await self.redis_client.delete(key)
count += 1
return count
async def detect_suspicious_activity(self, user_id: str, activity: Dict) -> bool:
"""Detect suspicious activity and force re-authentication"""
# Example: Multiple failed logins
failed_attempts = await self._get_failed_login_attempts(user_id)
if failed_attempts >= 3:
# Revoke all sessions
await self.revoke_all_sessions(user_id)
# Log security event
await self._log_security_event(user_id, "suspicious_activity", activity)
return True
# Example: Unusual login location
current_ip = activity.get("ip_address")
previous_ips = await self._get_recent_ips(user_id)
if current_ip and current_ip not in previous_ips:
# Log and monitor
await self._log_security_event(user_id, "new_location", activity)
# Force re-authentication after monitoring
return FalseREQ-SC-005: Comprehensive Audit Logging
User Story: As a compliance auditor, I want detailed audit logs of all security-relevant events, so that I can perform forensic analysis and demonstrate compliance.
Acceptance Criteria
- WHEN security events occur, THE Audit_Logger SHALL log timestamp, user_id, action, resource, IP, user_agent
- WHEN sensitive data is accessed, THE Audit_Logger SHALL log data access with query parameters
- WHEN logs are stored, THE Audit_Logger SHALL write to immutable append-only storage (GCS, S3)
- WHEN logs are queried, THE Audit_Logger SHALL support filtering by user, action, resource, time range
- WHEN compliance reports are needed, THE Audit_Logger SHALL export logs in JSON, CSV, and SIEM formats
Implementation Example
# security/audit/audit_logger.py
from typing import Optional, Dict, Any
from pydantic import BaseModel
from datetime import datetime
from enum import Enum
import json
class AuditEventType(str, Enum):
"""Audit event types"""
AUTH_LOGIN = "auth.login"
AUTH_LOGOUT = "auth.logout"
AUTH_FAILED = "auth.failed"
AUTH_MFA_ENABLED = "auth.mfa_enabled"
ACCESS_GRANTED = "access.granted"
ACCESS_DENIED = "access.denied"
DATA_READ = "data.read"
DATA_WRITE = "data.write"
DATA_DELETE = "data.delete"
USER_CREATED = "user.created"
USER_UPDATED = "user.updated"
USER_DELETED = "user.deleted"
PERMISSION_GRANTED = "permission.granted"
PERMISSION_REVOKED = "permission.revoked"
SECURITY_THREAT = "security.threat"
SECURITY_INCIDENT = "security.incident"
class AuditEvent(BaseModel):
"""Audit log entry"""
event_id: str
timestamp: datetime
event_type: AuditEventType
user_id: Optional[str]
user_email: Optional[str]
ip_address: str
user_agent: str
resource_type: Optional[str]
resource_id: Optional[str]
action: str
result: str # "success" or "failure"
details: Dict[str, Any]
session_id: Optional[str]
request_id: Optional[str]
class AuditLogger:
"""Comprehensive audit logging system"""
def __init__(self, storage_backend: str = "postgresql"):
self.storage_backend = storage_backend
async def log_event(
self,
event_type: AuditEventType,
action: str,
result: str,
user_id: Optional[str] = None,
user_email: Optional[str] = None,
ip_address: str = "unknown",
user_agent: str = "unknown",
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
session_id: Optional[str] = None,
request_id: Optional[str] = None
):
"""Log audit event"""
import uuid
event = AuditEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.utcnow(),
event_type=event_type,
user_id=user_id,
user_email=user_email,
ip_address=ip_address,
user_agent=user_agent,
resource_type=resource_type,
resource_id=resource_id,
action=action,
result=result,
details=details or {},
session_id=session_id,
request_id=request_id
)
# Store event
await self._store_event(event)
# Send to SIEM if configured
await self._send_to_siem(event)
async def _store_event(self, event: AuditEvent):
"""Store event in immutable storage"""
if self.storage_backend == "postgresql":
# Store in PostgreSQL with append-only table
pass
elif self.storage_backend == "gcs":
# Store in Google Cloud Storage
pass
async def _send_to_siem(self, event: AuditEvent):
"""Send event to SIEM system"""
# Implement SIEM integration (Splunk, Datadog, etc.)
pass
async def query_events(
self,
start_time: datetime,
end_time: datetime,
event_types: Optional[List[AuditEventType]] = None,
user_id: Optional[str] = None,
resource_type: Optional[str] = None
) -> List[AuditEvent]:
"""Query audit events"""
# Implement query logic
passREQ-SC-006: Data Encryption (At Rest & In Transit)
User Story: As a security architect, I want all data encrypted at rest and in transit using industry-standard algorithms, so that data breaches don't expose sensitive information.
Acceptance Criteria
- WHEN data is stored, THE Encryption_Layer SHALL encrypt using AES-256-GCM with unique keys per dataset
- WHEN data is transmitted, THE Encryption_Layer SHALL enforce TLS 1.3 with perfect forward secrecy
- WHEN encryption keys are managed, THE Encryption_Layer SHALL use GCP KMS or AWS KMS for key rotation
- WHEN PII is stored, THE Encryption_Layer SHALL use field-level encryption for sensitive fields
- WHEN backups are created, THE Encryption_Layer SHALL encrypt backups with separate keys
Implementation Example
# security/encryption/encryption_layer.py
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from google.cloud import kms_v1
import os
import base64
class EncryptionLayer:
"""Data encryption at rest and in transit"""
def __init__(self, kms_project_id: str, kms_location: str, kms_keyring: str):
self.kms_client = kms_v1.KeyManagementServiceClient()
self.kms_key_name = self.kms_client.crypto_key_path(
kms_project_id,
kms_location,
kms_keyring,
"data-encryption-key"
)
def encrypt_field(self, plaintext: str) -> str:
"""Encrypt field using AES-256-GCM"""
# Generate nonce
nonce = os.urandom(12)
# Get data encryption key from KMS
dek = self._get_data_encryption_key()
# Encrypt using AES-GCM
aesgcm = AESGCM(dek)
ciphertext = aesgcm.encrypt(nonce, plaintext.encode(), None)
# Return base64-encoded nonce + ciphertext
return base64.b64encode(nonce + ciphertext).decode()
def decrypt_field(self, ciphertext_b64: str) -> str:
"""Decrypt field"""
# Decode base64
data = base64.b64decode(ciphertext_b64)
# Extract nonce and ciphertext
nonce = data[:12]
ciphertext = data[12:]
# Get data encryption key from KMS
dek = self._get_data_encryption_key()
# Decrypt using AES-GCM
aesgcm = AESGCM(dek)
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
return plaintext.decode()
def _get_data_encryption_key(self) -> bytes:
"""Get data encryption key from GCP KMS"""
# In production, this would rotate keys periodically
# For now, use a fixed key from KMS
response = self.kms_client.encrypt(
request={"name": self.kms_key_name, "plaintext": b"master_key"}
)
return response.ciphertext[:32] # 256-bit keyREQ-SC-007: Secrets Management
User Story: As a DevOps engineer, I want secure secrets management with automatic rotation, so that API keys and passwords are never hardcoded or exposed.
Acceptance Criteria
- WHEN secrets are stored, THE Secrets_Manager SHALL use GCP Secret Manager or HashiCorp Vault
- WHEN secrets are accessed, THE Secrets_Manager SHALL authenticate services using service accounts
- WHEN secrets are rotated, THE Secrets_Manager SHALL automatically rotate keys every 90 days
- WHEN secrets are audited, THE Secrets_Manager SHALL log all secret access attempts
- WHEN secrets are compromised, THE Secrets_Manager SHALL support emergency revocation and rotation
Implementation Example
# security/secrets/secrets_manager.py
from google.cloud import secretmanager_v1
from typing import Dict, Any, Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class SecretsManager:
"""Secure secrets management"""
def __init__(self, project_id: str):
self.client = secretmanager_v1.SecretManagerServiceClient()
self.project_name = f"projects/{project_id}"
async def store_secret(self, secret_name: str, secret_value: str, labels: Dict[str, str] = None) -> str:
"""Store a secret in GCP Secret Manager"""
parent = self.project_name
name = f"{parent}/secrets/{secret_name}"
# Create secret if it doesn't exist
try:
self.client.create_secret(
request={
"parent": parent,
"secret_id": secret_name,
"secret": {
"replication": {
"automatic": {}
},
"labels": labels or {},
},
}
)
logger.info(f"Created secret: {name}")
except Exception as e:
if "already exists" not in str(e):
raise
logger.info(f"Secret already exists: {name}")
# Add secret version
parent = name
payload = secret_value.encode("UTF-8")
response = self.client.add_secret_version(
request={
"parent": parent,
"payload": {
"data": payload,
"mime_type": "text/plain",
},
}
)
# Log access
logger.info(f"Stored secret: {name}, version: {response.name}")
return response.name
async def get_secret(self, secret_name: str, version: str = "latest") -> str:
"""Retrieve a secret"""
name = f"{self.project_name}/secrets/{secret_name}/versions/{version}"
# Get the secret version
response = self.client.access_secret_version(
request={"name": name},
)
# Log access attempt
logger.info(f"Accessed secret: {name}, version: {version}")
# Convert to string
return response.payload.data.decode("UTF-8")
async def rotate_secret(self, secret_name: str, rotation_period_days: int = 90) -> None:
"""Rotate secret by generating new version"""
# Generate new secret value (in practice, this would be a new key)
import secrets
new_value = secrets.token_urlsafe(32)
# Store new version
await self.store_secret(secret_name, new_value)
# Update rotation schedule (in practice, use Cloud Scheduler)
logger.info(f"Rotated secret: {secret_name}")
async def audit_secret_access(self, secret_name: str, user_id: str, action: str = "read") -> None:
"""Audit secret access"""
audit_event = {
"timestamp": datetime.utcnow().isoformat(),
"secret_name": secret_name,
"user_id": user_id,
"action": action,
"ip_address": "TODO: Extract from context",
"user_agent": "TODO: Extract from context"
}
# Log to audit system
await self._log_audit_event(audit_event)
def _log_audit_event(self, event: Dict):
"""Log audit event"""
logger.info("Secret access audit", extra=event)REQ-SC-008: API Security & Rate Limiting
User Story: As a platform operator, I want comprehensive API security with rate limiting, request signing, and threat protection, so that APIs are protected from abuse and attacks.
Acceptance Criteria
- WHEN API requests are received, THE API_Security SHALL validate JWT tokens and API keys
- WHEN rate limits are exceeded, THE API_Security SHALL return 429 with retry-after headers
- WHEN suspicious patterns are detected, THE API_Security SHALL block IPs and require CAPTCHA
- WHEN webhook requests are received, THE API_Security SHALL verify HMAC signatures
- WHEN CORS is configured, THE API_Security SHALL enforce strict origin policies
Implementation Example
# security/middleware/api_security.py
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import time
from collections import defaultdict
from typing import Optional
class RateLimitMiddleware(BaseHTTPMiddleware):
"""Rate limiting middleware"""
def __init__(self, app, requests_per_minute: int = 60):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self.requests = defaultdict(list)
async def dispatch(self, request: Request, call_next):
# Get client identifier (IP or API key)
client_id = request.client.host
# Check rate limit
now = time.time()
minute_ago = now - 60
# Clean old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > minute_ago
]
# Check limit
if len(self.requests[client_id]) >= self.requests_per_minute:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={"Retry-After": "60"}
)
# Record request
self.requests[client_id].append(now)
# Process request
response = await call_next(request)
# Add rate limit headers
response.headers["X-RateLimit-Limit"] = str(self.requests_per_minute)
response.headers["X-RateLimit-Remaining"] = str(
self.requests_per_minute - len(self.requests[client_id])
)
return response
class JWTValidationMiddleware(BaseHTTPMiddleware):
"""JWT token validation middleware"""
def __init__(self, app, secret_key: str):
super().__init__(app)
self.secret_key = secret_key
async def dispatch(self, request: Request, call_next):
# Skip validation for public endpoints
if request.url.path.startswith(("/auth/login", "/auth/register")):
return await call_next(request)
# Extract token from Authorization header
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication header"
)
token = auth_header.split("Bearer ")[1]
try:
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
# Add user info to request state
request.state.user_id = payload["sub"]
request.state.user_roles = payload.get("roles", [])
request.state.user_permissions = payload.get("permissions", [])
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
return await call_next(request)
class HMACValidationMiddleware(BaseHTTPMiddleware):
"""HMAC signature validation for webhooks"""
def __init__(self, app, secret_key: str):
super().__init__(app)
self.secret_key = secret_key.encode()
async def dispatch(self, request: Request, call_next):
# Only validate webhook endpoints
if not request.url.path.startswith("/webhooks/"):
return await call_next(request)
# Extract signature from header
signature = request.headers.get("X-HMAC-Signature")
if not signature:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing HMAC signature"
)
# Get request body
body = await request.body()
timestamp = request.headers.get("X-HMAC-Timestamp")
# Validate timestamp (within 5 minutes)
if timestamp:
timestamp_dt = datetime.fromisoformat(timestamp)
if datetime.utcnow() - timestamp_dt > timedelta(minutes=5):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Request timestamp too old"
)
# Calculate HMAC
import hmac
expected_signature = hmac.new(
self.secret_key,
f"{timestamp}:{body.decode()}",
hashes.SHA256()
).hexdigest()
# Constant-time comparison
import secrets
if not secrets.compare_digest(signature, expected_signature):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid HMAC signature"
)
return await call_next(request)REQ-SC-009: GDPR Compliance
User Story: As a data protection officer, I want GDPR compliance automation including data mapping, consent management, and right-to-deletion, so that we comply with EU data protection laws.
Acceptance Criteria
- WHEN users request data export, THE Compliance_Framework SHALL export all personal data in JSON format within 30 days
- WHEN users request deletion, THE Compliance_Framework SHALL anonymize or delete data within 30 days
- WHEN consent is required, THE Compliance_Framework SHALL track granular consent (analytics, marketing, etc.)
- WHEN data is processed, THE Compliance_Framework SHALL maintain processing records (Article 30)
- WHEN breaches occur, THE Compliance_Framework SHALL support breach notification within 72 hours
Implementation Example
# compliance/gdpr/compliance_service.py
from typing import Dict, Any, List
from datetime import datetime
from enum import Enum
class ConsentCategory(str, Enum):
"""GDPR consent categories"""
ANALYTICS = "analytics"
MARKETING = "marketing"
NEWSLETTER = "newsletter"
PARTNERSHIP_DATA = "partnership_data"
PERSONALIZATION = "personalization"
class GDPRComplianceService:
"""GDPR compliance automation"""
async def request_data_export(self, user_id: str) -> str:
"""Handle GDPR data export request"""
# Create export request
export_request = GDPRExportRequest(
user_id=user_id,
request_type="export",
status="pending",
requested_at=datetime.utcnow(),
completed_at=None
)
# Store request
await self._store_export_request(export_request)
# Queue data export task
await self._queue_data_export(user_id)
# Notify user
await self.notification_service.send_gdpr_notification(
user_id=user_id,
notification_type="data_export_requested",
details={
"request_id": export_request.id,
"estimated_completion": "30 days",
"contact_support": "For urgent requests, contact support@altsportsleagues.ai"
}
)
return export_request.id
async def request_data_deletion(self, user_id: str) -> str:
"""Handle GDPR right to deletion request"""
# Create deletion request
deletion_request = GDPRExportRequest(
user_id=user_id,
request_type="deletion",
status="pending",
requested_at=datetime.utcnow(),
completed_at=None
)
# Store request
await self._store_export_request(deletion_request)
# Queue data deletion task
await self._queue_data_deletion(user_id)
# Notify user
await self.notification_service.send_gdpr_notification(
user_id=user_id,
notification_type="data_deletion_requested",
details={
"request_id": deletion_request.id,
"estimated_completion": "30 days",
"legal_exceptions": "Some data may be retained for legal/compliance reasons"
}
)
return deletion_request.id
async def update_consent_preferences(self, user_id: str, consents: Dict[ConsentCategory, bool]) -> None:
"""Update user consent preferences"""
# Validate consents
for category, granted in consents.items():
if category not in ConsentCategory:
raise ValueError(f"Invalid consent category: {category}")
# Store consents
await self._store_user_consents(user_id, consents)
# Log consent change
await self.audit_logger.log_event(
event_type=AuditEventType.USER_UPDATED,
action="consent_updated",
user_id=user_id,
details={
"consents": consents,
"changed_at": datetime.utcnow().isoformat()
}
)
# Notify compliance team if significant changes
if any(not granted for granted in consents.values()):
await self.notification_service.send_compliance_alert(
alert_type="consent_withdrawal",
user_id=user_id,
consents=consents
)
async def generate_processing_records(self, user_id: str) -> List[Dict]:
"""Generate Article 30 processing records for user"""
# Query all processing activities for user
processing_records = await self._query_processing_activities(user_id)
# Format for compliance report
formatted_records = []
for record in processing_records:
formatted = {
"controller": "AltSportsLeagues.ai",
"purpose": record.purpose,
"categories": record.data_categories,
"retention_period": record.retention_period,
"security_measures": record.security_measures,
"recipient_countries": record.recipient_countries,
"processing_start": record.start_date,
"processing_end": record.end_date,
"data_protection_officer": "dpo@altsportsleagues.ai"
}
formatted_records.append(formatted)
return formatted_records
async def handle_breach_notification(self, breach_data: Dict) -> None:
"""Handle data breach notification (within 72 hours)"""
# Validate breach data
if not self._validate_breach_data(breach_data):
raise ValueError("Invalid breach data")
# Notify affected users
affected_users = await self._get_affected_users(breach_data)
for user_id in affected_users:
await self.notification_service.send_breach_notification(
user_id=user_id,
breach_details=breach_data,
notification_time=datetime.utcnow()
)
# Notify supervisory authority (within 72 hours)
await self.notification_service.send_regulatory_notification(
authority="DPA", # Data Protection Authority
breach_details=breach_data,
notification_time=datetime.utcnow()
)
# Log breach event
await self.audit_logger.log_event(
event_type=AuditEventType.SECURITY_INCIDENT,
action="data_breach",
details=breach_data
)
def _validate_breach_data(self, breach_data: Dict) -> bool:
"""Validate breach notification data"""
required_fields = ["breach_type", "affected_data", "discovery_time", "mitigation_steps"]
return all(field in breach_data for field in required_fields)
async def _queue_data_export(self, user_id: str):
"""Queue data export task"""
# Create export task
export_task = {
"user_id": user_id,
"task_type": "data_export",
"status": "pending",
"priority": "high",
"created_at": datetime.utcnow().isoformat()
}
# Add to queue (Celery, etc.)
await self.task_queue.enqueue(export_task)
async def _queue_data_deletion(self, user_id: str):
"""Queue data deletion task"""
# Create deletion task
deletion_task = {
"user_id": user_id,
"task_type": "data_deletion",
"status": "pending",
"priority": "critical",
"created_at": datetime.utcnow().isoformat()
}
# Add to queue
await self.task_queue.enqueue(deletion_task)REQ-SC-010: CCPA Compliance
User Story: As a legal counsel, I want CCPA compliance including California consumer rights and Do Not Sell directives, so that we comply with California privacy laws.
Acceptance Criteria
- WHEN California users access the site, THE Compliance_Framework SHALL display "Do Not Sell My Personal Information" link
- WHEN users opt out of selling, THE Compliance_Framework SHALL honor opt-outs and suppress third-party sharing
- WHEN users request disclosure, THE Compliance_Framework SHALL disclose data categories collected and sold
- WHEN users request deletion, THE Compliance_Framework SHALL delete data with legal exceptions documented
- WHEN users request portability, THE Compliance_Framework SHALL export data in machine-readable format
Implementation Example
# compliance/ccpa/compliance_service.py
from typing import Dict, Any, List
from datetime import datetime
class CCPAComplianceService:
"""CCPA compliance automation"""
async def handle_do_not_sell_request(self, user_id: str) -> str:
"""Handle 'Do Not Sell' opt-out request"""
# Update user preferences to suppress data selling
await self._update_user_opt_out(user_id, opt_out=True)
# Log opt-out event
await self.audit_logger.log_event(
event_type=AuditEventType.USER_UPDATED,
action="ccpa_do_not_sell",
user_id=user_id,
details={
"opt_out": True,
"timestamp": datetime.utcnow().isoformat()
}
)
# Notify compliance team
await self.notification_service.send_compliance_alert(
alert_type="ccpa_opt_out",
user_id=user_id,
action="do_not_sell"
)
return "Your 'Do Not Sell My Personal Information' request has been processed. We will not sell your data to third parties."
async def get_data_disclosure(self, user_id: str) -> DataDisclosureReport:
"""Provide CCPA data disclosure report"""
# Gather data categories
data_categories = await self._get_collected_data_categories(user_id)
# Check if data has been sold
sold_categories = await self._get_sold_data_categories(user_id)
report = DataDisclosureReport(
collected_categories=data_categories,
sold_categories=sold_categories,
last_updated=datetime.utcnow().isoformat(),
disclosure_date=datetime.utcnow().isoformat()
)
# Log disclosure request
await self.audit_logger.log_event(
event_type=AuditEventType.DATA_READ,
action="ccpa_disclosure_request",
user_id=user_id,
resource_type="personal_data",
details=report.dict()
)
return report
async def handle_data_deletion_request(self, user_id: str) -> str:
"""Handle CCPA data deletion request"""
# Create deletion request
deletion_request = CCPADeletionRequest(
user_id=user_id,
request_type="deletion",
status="pending",
requested_at=datetime.utcnow(),
completed_at=None
)
# Store request
await self._store_deletion_request(deletion_request)
# Queue deletion task with legal exceptions
await self._queue_ccpa_deletion(user_id, legal_exceptions=True)
return "Your CCPA data deletion request has been received and will be processed within 45 days."
async def _get_collected_data_categories(self, user_id: str) -> List[str]:
"""Get data categories collected from user"""
# Query user data collection history
collected = [
"name",
"email",
"phone_number",
"address",
"league_data",
"partnership_preferences",
"compliance_status"
]
return collected
async def _get_sold_data_categories(self, user_id: str) -> List[str]:
"""Get data categories sold about user"""
# In this system, we don't sell personal data, so empty list
return []
async def _update_user_opt_out(self, user_id: str, opt_out: bool):
"""Update user opt-out status"""
# Update user preferences
await self.user_repository.update_preferences(
user_id=user_id,
do_not_sell=opt_out
)
# Suppress data selling for this user
await self.data_selling_suppressor.add_user(user_id)
async def _queue_ccpa_deletion(self, user_id: str, legal_exceptions: bool = False):
"""Queue CCPA data deletion with legal exceptions"""
# Create deletion task
deletion_task = {
"user_id": user_id,
"task_type": "ccpa_deletion",
"status": "pending",
"priority": "high",
"legal_exceptions": legal_exceptions,
"created_at": datetime.utcnow().isoformat()
}
# Add to queue
await self.task_queue.enqueue(deletion_task)REQ-SC-011: SOC 2 Type II Compliance
User Story: As a security auditor, I want SOC 2 compliance controls and evidence collection, so that we can achieve SOC 2 Type II certification.
Acceptance Criteria
- WHEN security controls are implemented, THE Compliance_Framework SHALL map controls to SOC 2 Trust Service Criteria
- WHEN evidence is collected, THE Compliance_Framework SHALL automatically collect evidence (logs, screenshots, configs)
- WHEN access is reviewed, THE Compliance_Framework SHALL perform quarterly access reviews
- WHEN incidents occur, THE Compliance_Framework SHALL document incident response with timelines
- WHEN audits occur, THE Compliance_Framework SHALL generate compliance reports with evidence packages
Implementation Example
# compliance/soc2/compliance_engine.py
from typing import Dict, Any, List
from datetime import datetime
from enum import Enum
class SOCT2Control(str, Enum):
"""SOC 2 Trust Service Criteria"""
CC1_0 = "CC1.0 - Control Environment"
CC2_0 = "CC2.0 - Communication and Information"
CC3_0 = "CC3.0 - Risk Assessment"
CC4_0 = "CC4.0 - Monitoring Activities"
CC5_0 = "CC5.0 - Control Activities"
CC6_0 = "CC6.0 - Logical and Physical Access"
CC7_0 = "CC7.0 - System Operations"
CC8_0 = "CC8.0 - Change Management"
CC9_0 = "CC9.0 - Risk Mitigation"
class SOC2ComplianceEngine:
"""SOC 2 Type II compliance automation"""
def __init__(self):
self.controls_mapping = self._load_controls_mapping()
self.evidence_collector = EvidenceCollector()
def _load_controls_mapping(self) -> Dict[str, List[str]]:
"""Load SOC 2 controls mapping"""
# Map system controls to SOC 2 criteria
return {
SOCT2Control.CC6_0: [
"Multi-factor authentication",
"Role-based access control",
"Session timeout enforcement",
"IP whitelisting"
],
SOCT2Control.CC7_0: [
"Automated backups",
"Disaster recovery procedures",
"System monitoring and alerting",
"Change management processes"
],
SOCT2Control.CC8_0: [
"Code review requirements",
"Automated testing",
"Deployment approval workflow",
"Version control policies"
],
# Add other mappings...
}
async def generate_compliance_report(self, period_start: datetime, period_end: datetime) -> SOC2Report:
"""Generate SOC 2 compliance report"""
report = SOC2Report(
period_start=period_start,
period_end=period_end,
report_date=datetime.utcnow(),
controls_status={},
evidence_summary={},
exceptions=[]
)
# Evaluate each control
for control, system_controls in self.controls_mapping.items():
control_status = await self._evaluate_control(control, system_controls, period_start, period_end)
report.controls_status[control.value] = control_status
# Collect evidence
evidence = await self.evidence_collector.collect_evidence(control, period_start, period_end)
report.evidence_summary[control.value] = evidence
# Check for exceptions
exceptions = await self._identify_exceptions(report.controls_status)
report.exceptions = exceptions
# Generate executive summary
report.executive_summary = self._generate_executive_summary(report)
return report
async def _evaluate_control(
self,
control: SOCT2Control,
system_controls: List[str],
start_date: datetime,
end_date: datetime
) -> ControlEvaluation:
"""Evaluate specific SOC 2 control"""
evaluation = ControlEvaluation(
control_id=control.value,
status="compliant",
evidence_count=0,
issues=[]
)
for system_control in system_controls:
# Check control implementation
implementation_status = await self._check_control_implementation(system_control, start_date, end_date)
if implementation_status == "non-compliant":
evaluation.status = "non-compliant"
evaluation.issues.append({
"control": system_control,
"status": implementation_status,
"remediation": await self._generate_remediation(system_control)
})
# Collect evidence
evidence = await self.evidence_collector.collect_for_control(system_control, start_date, end_date)
evaluation.evidence_count += len(evidence)
return evaluation
async def perform_quarterly_access_review(self, quarter: str) -> AccessReviewReport:
"""Perform quarterly access review"""
# Get all active users
users = await self.user_repository.get_active_users()
review_report = AccessReviewReport(
quarter=quarter,
review_date=datetime.utcnow(),
users_reviewed=0,
high_risk_access=[],
recommendations=[]
)
for user in users:
# Check role assignments
roles = await self.rbac_engine.get_user_roles(user.id)
permissions = await self.rbac_engine.get_user_permissions(user.id)
# Identify high-risk access patterns
if self._is_high_risk_access(roles, permissions, user):
review_report.high_risk_access.append({
"user_id": user.id,
"email": user.email,
"roles": list(roles),
"permissions": list(permissions),
"risk_level": self._calculate_risk_score(roles, permissions),
"recommendation": await self._generate_access_recommendation(user)
})
review_report.users_reviewed += 1
# Generate recommendations
review_report.recommendations = await self._generate_review_recommendations(review_report.high_risk_access)
# Log review completion
await self.audit_logger.log_event(
event_type=AuditEventType.SYSTEM_ADMIN,
action="quarterly_access_review",
details={
"quarter": quarter,
"users_reviewed": review_report.users_reviewed,
"high_risk_count": len(review_report.high_risk_access)
}
)
return review_report
def _is_high_risk_access(self, roles: Set[str], permissions: Set[Permission], user: User) -> bool:
"""Determine if user has high-risk access"""
# Check for admin permissions without proper approval
if Permission.SYSTEM_ADMIN in permissions:
# Verify admin approval exists
approval = await self._check_admin_approval(user.id)
if not approval:
return True
# Check for broad data access
data_permissions = [p for p in permissions if p.value.startswith("data:")]
if len(data_permissions) > 5: # Arbitrary threshold
return True
return False
async def _generate_remediation(self, control: str) -> str:
"""Generate remediation steps for non-compliant control"""
prompt = f"""
Generate specific remediation steps for SOC 2 control {control} that is currently non-compliant.
Context: AltSportsLeagues.ai platform with FastAPI backend, NextJS frontend, PostgreSQL database.
Provide step-by-step instructions including:
1. Immediate actions to take
2. Required code changes or configurations
3. Testing to verify compliance
4. Documentation updates needed
5. Timeline for implementation
Format as numbered list.
"""
response = await self.ai_service.generate_text(prompt)
return responseREQ-SC-012: Security Monitoring & Threat Detection
User Story: As a security operations analyst, I want real-time threat detection and security monitoring, so that attacks are detected and mitigated before damage occurs.
Acceptance Criteria
- WHEN anomalies are detected, THE Threat_Detector SHALL identify unusual patterns (login location, API usage)
- WHEN threats are found, THE Threat_Detector SHALL automatically block IPs and require CAPTCHA
- WHEN security events occur, THE Threat_Detector SHALL integrate with SIEM systems (Splunk, Datadog)
- WHEN threat intelligence is available, THE Threat_Detector SHALL block known malicious IPs and domains
- WHEN alerts are triggered, THE Threat_Detector SHALL send notifications via PagerDuty, Slack, or email
Implementation Example
# security/threat/threat_detector.py
from typing import Dict, Any, List
from datetime import datetime, timedelta
import asyncio
from collections import defaultdict
class ThreatDetector:
"""Real-time threat detection and monitoring"""
def __init__(self, siem_integration: bool = True):
self.anomaly_patterns = defaultdict(list)
self.blocked_ips = set()
self.threat_intelligence_feed = "https://api.threatintel.com"
self.siem_integration = siem_integration
async def monitor_login_attempts(self, login_events: List[LoginEvent]) -> List[ThreatAlert]:
"""Monitor login attempts for anomalies"""
alerts = []
# Group by IP
ip_attempts = defaultdict(list)
for event in login_events:
ip_attempts[event.ip_address].append(event)
for ip, attempts in ip_attempts.items():
# Check for suspicious patterns
suspicious_patterns = await self._detect_suspicious_patterns(attempts)
if suspicious_patterns:
# Generate alert
alert = ThreatAlert(
ip_address=ip,
threat_type="suspicious_login",
severity="high",
patterns=suspicious_patterns,
timestamp=datetime.utcnow(),
action_taken="ip_blocked"
)
# Block IP
self.blocked_ips.add(ip)
# Send to SIEM
if self.siem_integration:
await self._send_to_siem(alert)
# Notify security team
await self._notify_security_team(alert)
alerts.append(alert)
return alerts
async def _detect_suspicious_patterns(self, attempts: List[LoginEvent]) -> List[str]:
"""Detect suspicious login patterns"""
patterns = []
# Multiple failed logins
failed_count = sum(1 for a in attempts if a.success == False)
if failed_count > 3:
patterns.append(f"{failed_count} failed login attempts")
# Unusual locations
locations = list(set(a.location for a in attempts))
if len(locations) > 2:
patterns.append(f"Unusual login locations: {', '.join(locations)}")
# High frequency attempts
time_window = timedelta(minutes=5)
recent_attempts = [a for a in attempts if a.timestamp > datetime.utcnow() - time_window]
if len(recent_attempts) > 10:
patterns.append(f"High frequency attempts: {len(recent_attempts)} in 5 minutes")
# Multiple users from same IP
unique_users = len(set(a.user_id for a in attempts))
if unique_users > 1:
patterns.append(f"Multiple users from same IP: {unique_users}")
return patterns if patterns else []
async def _send_to_siem(self, alert: ThreatAlert):
"""Send alert to SIEM system"""
# Implement SIEM integration (Splunk, Datadog, etc.)
siem_payload = {
"event_type": "security_alert",
"alert": alert.dict(),
"timestamp": datetime.utcnow().isoformat()
}
# Send to SIEM endpoint
# await siem_client.send_event(siem_payload)
logger.info("Sent to SIEM", alert=alert.threat_type)
async def _notify_security_team(self, alert: ThreatAlert):
"""Notify security team via PagerDuty/Slack"""
# Implement notification
notification_message = f"""
Security Alert: {alert.threat_type}
IP: {alert.ip_address}
Severity: {alert.severity}
Patterns: {', '.join(alert.patterns)}
Action: IP blocked
"""
# Send via PagerDuty, Slack, or email
await self.notification_service.send_security_alert(notification_message)
async def update_threat_intelligence(self):
"""Update threat intelligence feed"""
# Fetch latest threat intelligence
# response = await self._fetch_threat_intel()
# self.blocked_ips.update(response.malicious_ips)
logger.info("Threat intelligence updated")
def is_ip_blocked(self, ip_address: str) -> bool:
"""Check if IP is blocked"""
return ip_address in self.blocked_ips
def block_ip(self, ip_address: str, duration_minutes: int = 60):
"""Block IP for specified duration"""
self.blocked_ips.add(ip_address)
# In production, use Redis with TTL
# await self.redis_client.setex(f"blocked_ip:`{ip_address}`", duration_minutes * 60, "blocked")
def unblock_ip(self, ip_address: str):
"""Unblock IP"""
self.blocked_ips.discard(ip_address)
# await self.redis_client.delete(f"blocked_ip:`{ip_address}`")REQ-SC-013: Vulnerability Management & Penetration Testing
User Story: As a security engineer, I want automated vulnerability scanning and regular penetration testing, so that security weaknesses are identified and remediated.
Acceptance Criteria
- WHEN scanning code, THE Vulnerability_Scanner SHALL use SAST tools (Bandit, ESLint security plugins)
- WHEN testing dependencies, THE Vulnerability_Scanner SHALL scan for CVEs using Snyk or Dependabot
- WHEN containers are built, THE Vulnerability_Scanner SHALL scan images for vulnerabilities using Trivy
- WHEN APIs are tested, THE Vulnerability_Scanner SHALL perform OWASP ZAP for dynamic analysis
- WHEN penetration tests are performed, THE Vulnerability_Scanner SHALL schedule quarterly tests with findings remediation
Implementation Example
# security/vulnerability/vulnerability_scanner.py
import subprocess
import json
from typing import Dict, List, Any
from datetime import datetime
class VulnerabilityScanner:
"""Automated vulnerability scanning"""
def __init__(self, project_root: str):
self.project_root = project_root
self.scan_results = []
async def scan_code_sast(self, target: str = "all") -> List[SecurityFinding]:
"""Run SAST code scanning"""
findings = []
# Python code (Bandit)
if target in ["all", "python"]:
try:
result = subprocess.run(
["bandit", "-r", f"{self.project_root}/apps/backend", "-f", "json", "-o", "-"],
capture_output=True,
text=True,
check=True
)
python_findings = json.loads(result.stdout)
for issue in python_findings.get("results", []):
findings.append(SecurityFinding(
tool="bandit",
severity=issue["issue_severity"],
confidence=issue["issue_confidence"],
description=issue["issue_text"],
location=f"{self.project_root}/apps/backend/{issue['filename']}:{issue['line_number']}",
timestamp=datetime.utcnow()
))
except subprocess.CalledProcessError as e:
logger.error("Bandit scan failed", error=str(e))
# JavaScript/TypeScript (ESLint with security plugins)
if target in ["all", "frontend"]:
try:
result = subprocess.run(
[
"npx", "eslint", "--ext", ".js,.ts,.tsx",
"--format", "json",
f"{self.project_root}/clients/frontend/src"
],
capture_output=True,
text=True,
check=True
)
js_findings = json.loads(result.stdout)
for file_results in js_findings:
for issue in file_results["messages"]:
if issue["ruleId"] in ["security/detect-unsafe-regex", "security/detect-possible-timing-attacks"]:
findings.append(SecurityFinding(
tool="eslint-security",
severity=issue["severity"][0].upper(),
confidence="high",
description=issue["message"],
location=f"{self.project_root}/clients/frontend/src/{file_results['filePath']}:{issue['line']}",
timestamp=datetime.utcnow()
))
except subprocess.CalledProcessError as e:
logger.error("ESLint security scan failed", error=str(e))
return findings
async def scan_dependencies(self) -> List[SecurityFinding]:
"""Scan dependencies for vulnerabilities"""
findings = []
# Python dependencies (Safety)
try:
result = subprocess.run(
["safety", "check", "--json"],
capture_output=True,
text=True,
cwd=self.project_root,
check=True
)
vuln_data = json.loads(result.stdout)
for vuln in vuln_data:
findings.append(SecurityFinding(
tool="safety",
severity=vuln["package"]["advisories"][0]["advisory"]["severity"],
confidence="high",
description=f"Dependency vulnerability: {vuln['package']['name']} {vuln['package']['version']} - {vuln['package']['advisories'][0]['advisory']['description']}",
location=f"{self.project_root}/pyproject.toml",
timestamp=datetime.utcnow()
))
except subprocess.CalledProcessError as e:
logger.error("Safety scan failed", error=str(e))
# JavaScript dependencies (npm audit)
try:
result = subprocess.run(
["npm", "audit", "--json"],
capture_output=True,
text=True,
cwd=f"{self.project_root}/clients/frontend",
check=True
)
audit_data = json.loads(result.stdout)
for vuln in audit_data.get("vulnerabilities", {}).values():
if vuln["severity"] in ["high", "critical"]:
findings.append(SecurityFinding(
tool="npm-audit",
severity=vuln["severity"].upper(),
confidence="high",
description=f"Dependency vulnerability: {vuln['name']}@{vuln['version']} - {vuln['overview']}",
location=f"{self.project_root}/clients/frontend/package.json",
timestamp=datetime.utcnow()
))
except subprocess.CalledProcessError as e:
logger.error("npm audit failed", error=str(e))
return findings
async def scan_containers(self) -> List[SecurityFinding]:
"""Scan Docker images for vulnerabilities"""
findings = []
# Run Trivy scan on all images
try:
result = subprocess.run(
["trivy", "image", "--format", "json", "--exit-code", "0", "--no-progress"],
capture_output=True,
text=True,
cwd=self.project_root,
check=True
)
trivy_results = json.loads(result.stdout)
for result in trivy_results.get("Results", []):
for vuln in result.get("Vulnerabilities", []):
if vuln["Severity"] in ["CRITICAL", "HIGH"]:
findings.append(SecurityFinding(
tool="trivy",
severity=vuln["Severity"],
confidence="high",
description=f"Container vulnerability: {vuln['VulnerabilityID']} - {vuln['Title']}",
location=f"Docker image: {result['Target']}",
timestamp=datetime.utcnow()
))
except subprocess.CalledProcessError as e:
logger.error("Trivy scan failed", error=str(e))
return findings
async def run_owasp_zap_scan(self, target_url: str) -> List[SecurityFinding]:
"""Run OWASP ZAP dynamic analysis"""
findings = []
# Configure and run ZAP scan
try:
# Start ZAP in daemon mode (in production, use containerized ZAP)
zap_config = {
"target": target_url,
"ajaxSpider": True,
"context": "Production API",
"attackStrength": "MEDIUM",
"alertThreshold": "HIGH"
}
# Execute scan (simplified - in practice, use ZAP API)
# result = await self._execute_zap_scan(zap_config)
# Parse results
# for alert in result.alerts:
# if alert.risk in ["High", "Medium"]:
# findings.append(SecurityFinding(...))
pass # Placeholder for ZAP integration
except Exception as e:
logger.error("OWASP ZAP scan failed", error=str(e))
return findings
async def generate_vulnerability_report(self, scan_results: List[SecurityFinding]) -> VulnerabilityReport:
"""Generate comprehensive vulnerability report"""
# Categorize findings
critical = [f for f in scan_results if f.severity == "CRITICAL"]
high = [f for f in scan_results if f.severity == "HIGH"]
medium = [f for f in scan_results if f.severity == "MEDIUM"]
low = [f for f in scan_results if f.severity == "LOW"]
# Generate remediation plan
remediation_plan = await self._generate_remediation_plan(scan_results)
return VulnerabilityReport(
total_findings=len(scan_results),
critical_count=len(critical),
high_count=len(high),
medium_count=len(medium),
low_count=len(low),
findings=scan_results,
remediation_plan=remediation_plan,
report_date=datetime.utcnow(),
priority="immediate" if critical else "high"
)REQ-SC-014: Password Policies & Credential Security
User Story: As a security administrator, I want strong password policies and secure credential management, so that weak passwords and credential theft are prevented.
Acceptance Criteria
- WHEN passwords are set, THE Authentication_System SHALL enforce minimum 12 characters with complexity requirements
- WHEN passwords are stored, THE Authentication_System SHALL hash using bcrypt or Argon2 with unique salts
- WHEN password breaches are detected, THE Authentication_System SHALL force password reset for compromised accounts
- WHEN passwords are changed, THE Authentication_System SHALL prevent reuse of last 10 passwords
- WHEN failed logins occur, THE Authentication_System SHALL implement progressive delays (1s, 2s, 4s, 8s)
Implementation Example
# security/authentication/password_service.py
import bcrypt
from typing import Optional, List
from pydantic import validator
from datetime import datetime
class PasswordPolicy:
"""Password policy enforcement"""
MIN_LENGTH = 12
COMPLEXITY_RULES = {
"uppercase": 1,
"lowercase": 1,
"digits": 1,
"special_chars": 1
}
MAX_REUSE_HISTORY = 10
@classmethod
def validate_password(cls, password: str) -> bool:
"""Validate password against policy"""
if len(password) < cls.MIN_LENGTH:
return False
# Check complexity
has_upper = any(c.isupper() for c in password)
has_lower = any(c.islower() for c in password)
has_digit = any(c.isdigit() for c in password)
has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password)
return all([
has_upper,
has_lower,
has_digit,
has_special
])
@classmethod
def hash_password(cls, password: str) -> str:
"""Hash password using bcrypt"""
# Generate salt and hash
salt = bcrypt.gensalt(rounds=12)
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
@classmethod
def verify_password(cls, password: str, hashed: str) -> bool:
"""Verify password using constant-time comparison"""
return bcrypt.checkpw(
password.encode('utf-8'),
hashed.encode('utf-8')
)
class PasswordService:
"""Secure password management"""
def __init__(self, db: Database):
self.db = db
async def set_password(self, user_id: str, password: str) -> bool:
"""Set user password with validation"""
# Validate password
if not PasswordPolicy.validate_password(password):
raise ValueError("Password does not meet complexity requirements")
# Check reuse history
previous_passwords = await self._get_password_history(user_id, limit=PasswordPolicy.MAX_REUSE_HISTORY)
if any(PasswordPolicy.verify_password(password, prev_hash) for prev_hash in previous_passwords):
raise ValueError("Password has been used recently")
# Hash password
hashed_password = PasswordPolicy.hash_password(password)
# Update user password
await self.db.execute(
"UPDATE users SET password_hash = $1, password_updated_at = $2 WHERE id = $3",
(hashed_password, datetime.utcnow(), user_id)
)
# Add to history
await self._add_to_password_history(user_id, hashed_password)
return True
async def check_password_breach(self, user_id: str) -> bool:
"""Check if user's password has been breached"""
# Query known breached passwords (in practice, use Have I Been Pwned API)
user_hash = await self._get_current_password_hash(user_id)
if user_hash:
# Check against breached hashes (simplified)
breach_check = await self._check_breach_api(user_hash)
if breach_check:
# Force password reset
await self._flag_password_for_reset(user_id)
return True
return False
async def force_password_reset(self, user_id: str) -> bool:
"""Force password reset for compromised account"""
# Flag account for reset
await self.db.execute(
"UPDATE users SET password_reset_required = true WHERE id = $1",
(user_id,)
)
# Notify user
await self.notification_service.send_security_notification(
user_id=user_id,
notification_type="password_compromised",
details={
"action_required": "immediate_password_reset",
"contact_security": "Contact security@altsportsleagues.ai"
}
)
return True
async def _get_password_history(self, user_id: str, limit: int) -> List[str]:
"""Get recent password hashes"""
result = await self.db.fetch(
"SELECT password_hash FROM password_history WHERE user_id = $1 ORDER BY updated_at DESC LIMIT $2",
(user_id, limit)
)
return [row["password_hash"] for row in result]
async def _add_to_password_history(self, user_id: str, hashed_password: str):
"""Add password to history"""
await self.db.execute(
"""
INSERT INTO password_history (user_id, password_hash, updated_at)
VALUES ($1, $2, $3)
""",
(user_id, hashed_password, datetime.utcnow())
)
# Cleanup old entries
await self.db.execute(
"""
DELETE FROM password_history
WHERE user_id = $1 AND updated_at < $2
""",
(user_id, datetime.utcnow() - timedelta(days=365))
)REQ-SC-015: Security Incident Response
User Story: As a security incident responder, I want automated incident detection, response playbooks, and forensic capabilities, so that security incidents are handled efficiently.
Acceptance Criteria
- WHEN incidents are detected, THE Incident_Responder SHALL automatically trigger response playbooks
- WHEN incidents are confirmed, THE Incident_Responder SHALL isolate affected systems and preserve evidence
- WHEN forensics are needed, THE Incident_Responder SHALL capture system state, logs, and network traffic
- WHEN incidents are resolved, THE Incident_Responder SHALL document incident response with timelines
- WHEN stakeholders need updates, THE Incident_Responder SHALL send status updates per notification matrix
Implementation Example
# security/incident/incident_responder.py
from typing import Dict, Any, List, Optional
from datetime import datetime
from enum import Enum
class IncidentSeverity(str, Enum):
"""Incident severity levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class IncidentStatus(str, Enum):
"""Incident status tracking"""
DETECTED = "detected"
INVESTIGATING = "investigating"
CONTAINED = "contained"
RESOLVED = "resolved"
POST_MORTEM = "post_mortem"
class IncidentResponder:
"""Security incident response automation"""
def __init__(self, notification_service: NotificationService):
self.notification_service = notification_service
self.playbooks = self._load_playbooks()
async def handle_detected_incident(self, detection_event: ThreatAlert) -> IncidentRecord:
"""Handle newly detected security incident"""
# Create incident record
incident = IncidentRecord(
incident_id=str(uuid.uuid4()),
severity=detection_event.severity,
detection_type=detection_event.threat_type,
affected_resources=detection_event.affected_resources,
detected_at=datetime.utcnow(),
status=IncidentStatus.DETECTED,
assigned_team="security",
priority="high"
)
# Store incident
await self._store_incident(incident)
# Trigger response playbook
playbook = self._select_playbook(detection_event.threat_type)
await self._execute_playbook(incident, playbook)
# Notify stakeholders
await self._notify_stakeholders(incident, "incident_detected")
return incident
async def update_incident_status(
self,
incident_id: str,
new_status: IncidentStatus,
notes: Optional[str] = None,
evidence_collected: Optional[Dict] = None
) -> IncidentRecord:
"""Update incident status"""
incident = await self._get_incident(incident_id)
if not incident:
raise ValueError("Incident not found")
# Update status
incident.status = new_status
incident.updated_at = datetime.utcnow()
if notes:
incident.notes.append({
"timestamp": datetime.utcnow(),
"status": new_status.value,
"notes": notes,
"evidence": evidence_collected
})
# Store update
await self._update_incident(incident)
# Execute status-specific actions
if new_status == IncidentStatus.INVESTIGATING:
await self._start_investigation(incident)
elif new_status == IncidentStatus.CONTAINED:
await self._contain_incident(incident)
elif new_status == IncidentStatus.RESOLVED:
await self._resolve_incident(incident)
elif new_status == IncidentStatus.POST_MORTEM:
await self._start_post_mortem(incident)
# Notify stakeholders
await self._notify_stakeholders(incident, f"status_{new_status.value}")
return incident
async def _execute_playbook(self, incident: IncidentRecord, playbook: Playbook) -> None:
"""Execute automated response playbook"""
# Run playbook steps
for step in playbook.steps:
try:
if step.automated:
await self._execute_automated_step(step, incident)
else:
# Manual step - notify team
await self.notification_service.send_playbook_step(
incident.assigned_team,
step,
incident
)
except Exception as e:
logger.error(f"Playbook step failed: {step.id}", error=str(e))
# Log playbook execution
await self.audit_logger.log_event(
event_type=AuditEventType.SECURITY_INCIDENT,
action="playbook_executed",
details={
"incident_id": incident.incident_id,
"playbook": playbook.name,
"steps_executed": len(playbook.steps)
}
)
async def _contain_incident(self, incident: IncidentRecord) -> None:
"""Contain incident by isolating affected systems"""
# Block affected IPs
if incident.affected_resources.get("ips"):
for ip in incident.affected_resources["ips"]:
await self.threat_detector.block_ip(ip, duration_minutes=60)
# Revoke compromised sessions
if incident.affected_resources.get("user_ids"):
for user_id in incident.affected_resources["user_ids"]:
await self.session_manager.revoke_all_sessions(user_id)
# Preserve evidence
evidence = await self.evidence_collector.collect_evidence(
incident_id=incident.incident_id,
resource_types=incident.affected_resources.keys()
)
# Update incident
incident.evidence_collected = evidence
incident.containment_time = datetime.utcnow()
async def generate_post_mortem_report(self, incident_id: str) -> PostMortemReport:
"""Generate post-mortem report"""
incident = await self._get_incident(incident_id)
if not incident:
raise ValueError("Incident not found")
# Collect incident data
timeline = await self._build_incident_timeline(incident)
root_cause = await self._analyze_root_cause(incident)
remediation = await self._generate_remediation_plan(incident)
report = PostMortemReport(
incident_id=incident.incident_id,
timeline=timeline,
root_cause=root_cause,
impact_assessment=await self._assess_impact(incident),
remediation_steps=remediation,
lessons_learned=await self._extract_lessons(incident),
report_date=datetime.utcnow()
)
# Store report
await self._store_post_mortem(report)
return report
def _select_playbook(self, threat_type: str) -> Playbook:
"""Select appropriate response playbook"""
playbooks = {
"suspicious_login": Playbook(
name="suspicious_login",
steps=[
AutomatedStep(id="block_ip", automated=True, action="block_suspicious_ip"),
ManualStep(id="review_logs", automated=False, action="manual_log_review"),
AutomatedStep(id="notify_team", automated=True, action="security_notification")
]
),
"api_abuse": Playbook(
name="api_abuse",
steps=[
AutomatedStep(id="rate_limit_ip", automated=True, action="apply_rate_limit"),
AutomatedStep(id="block_api_key", automated=True, action="revoke_api_key"),
ManualStep(id="investigate_usage", automated=False, action="manual_investigation")
]
),
# Add other playbooks...
}
return playbooks.get(threat_type, Playbook.default_playbook())Non-Functional Requirements
Performance Requirements
- Authentication Latency: < 500ms including MFA verification
- Authorization Check: < 50ms for permission validation
- Audit Logging: Async writes to prevent blocking
- Encryption Operations: Hardware-accelerated AES-GCM
- Rate Limiting: In-memory counters with Redis backup
Reliability Requirements
- System Availability: 99.9% uptime for authentication and authorization services
- Data Durability: 99.999% for audit logs and compliance records
- Failover Support: Automatic failover to secondary regions
- Recovery Time Objective (RTO): < 4 hours for security services
- Recovery Point Objective (RPO): < 15 minutes for audit logs
Security Requirements
- Key Management: HSM-protected master keys
- Log Integrity: Immutable append-only storage for audit logs
- Access Controls: Least privilege enforcement across all layers
- Data Protection: End-to-end encryption for all sensitive data
- Compliance Monitoring: Continuous automated compliance checks
Maintainability Requirements
- Modular Design: Independent deployment of security services
- API Documentation: Auto-generated OpenAPI specs
- Monitoring Integration: Full observability with Prometheus/Grafana
- Testing Coverage: >95% unit test coverage for security components
- Documentation: Comprehensive security architecture documentation
Success Metrics
- Authentication Security: 100% of authentication attempts use MFA for admin accounts
- Authorization Effectiveness: Zero unauthorized data access incidents
- Audit Coverage: 100% of security-relevant events logged with complete context
- Encryption Coverage: 100% of PII encrypted at rest and in transit
- Compliance Score: 90%+ compliance with GDPR, CCPA, SOC 2 controls
- Vulnerability Remediation: 95% of high/critical vulnerabilities remediated within 30 days
- Incident Response Time: Mean time to detect (MTTD) < 15 minutes, mean time to respond (MTTR) < 2 hours
This guide provides a complete overview of the Security & Compliance Platform, detailing each requirement, implementation examples, and integration points to ensure successful deployment and operation of this critical infrastructure.