Data Layer & Schema Registry
Introduction
The Data Layer & Schema Registry serves as the foundational vertical for AltSportsLeagues.ai, providing a centralized, type-safe schema system comprising over 150 Pydantic v2 models organized across 7 key business domains. This architecture ensures data consistency, enables seamless multi-platform code generation, and delivers an exceptional developer experience through comprehensive IDE integration and compile-time type checking.
Core Design Principles:
- Schema-First Architecture: All data structures are rigorously defined as Pydantic models prior to any implementation, establishing a single source of truth.
- Type Safety Everywhere: Runtime type errors are eliminated through exhaustive compile-time validation using mypy and pyright.
- Single Source of Truth: One unified schema registry powers backend services, frontend applications, and database layers across the entire platform.
- Developer Experience Excellence: Intelligent IDE autocomplete, rich type hints, and actionable error messages accelerate development velocity.
- Performance Optimized: Leveraging Pydantic v2's revolutionary 17x faster validation engine for production-scale efficiency.
This data layer is not merely a collection of models but the architectural backbone that ensures data integrity, facilitates rapid iteration, and scales gracefully with the platform's growth. By prioritizing schema definition over ad-hoc data handling, we eliminate the common pitfalls of inconsistent data structures and enable sophisticated features like automatic API documentation, frontend type generation, and comprehensive testing.
Architecture Overview
System Context
The Data Layer & Schema Registry operates at the intersection of development workflows, ensuring that every team memberβfrom backend engineers to frontend developers and data scientistsβinteracts with a unified, validated data model.
This context diagram illustrates how the schema registry permeates every layer of the application stack, from code generation to runtime validation. The system's design ensures that data flows reliably through the entire architecture while maintaining developer productivity.
Container-Level Design
At the implementation level, the registry comprises several interconnected components that handle schema management, generation, validation, and testing.
This container diagram reveals the modular nature of the registry, where each component has a specific responsibility while maintaining loose coupling through well-defined interfaces.
Domain Organization
The schema registry is meticulously organized into 7 business domains, each containing related models that capture the specific data requirements of that domain. This organization facilitates discoverability, maintainability, and team ownership.
Domain Structure
data_layer/schemas/
βββ __init__.py # Registry entry points
βββ leagues/ # 25+ models - League operations and intelligence
β βββ __init__.py
β βββ questionnaire.py # League discovery and scoring
β βββ classification.py # League tier and market classification
β βββ scoring.py # Partnership and revenue scoring
β βββ metadata.py # League metadata and versioning
β βββ relationships.py # League foreign key relationships
β
βββ betting_systems/ # 30+ models - Betting markets and odds integration
β βββ __init__.py
β βββ odds.py # Odds data structures and validation
β βββ markets.py # Betting market types and configurations
β βββ sportsbooks.py # Sportsbook integration models
β βββ translations.py # Multi-language betting terms
β βββ risk_assessment.py # Betting risk and limit models
β
βββ email_assistant/ # 20+ models - Email intelligence and automation
β βββ __init__.py
β βββ threads.py # Email thread structures
β βββ classification.py # Email intent and priority classification
β βββ templates.py # Response template management
β βββ responses.py # Generated email responses
β βββ attachments.py # Email attachment handling
β
βββ infrastructure/ # 15+ models - System health and operations
β βββ __init__.py
β βββ health.py # Health check and monitoring models
β βββ deployment.py # Deployment configuration schemas
β βββ monitoring.py # Metrics and alerting structures
β βββ logging.py # Structured log definitions
β βββ configuration.py # System configuration models
β
βββ saas/ # 25+ models - SaaS platform and billing
β βββ __init__.py
β βββ users.py # User profiles and authentication
β βββ subscriptions.py # Subscription plans and billing
β βββ api_keys.py # API key management and scopes
β βββ quotas.py # Usage quotas and limits
β βββ billing.py # Invoicing and payment models
β
βββ sports/ # 30+ models - Core sports data entities
β βββ __init__.py
β βββ events.py # Matches, games, and competitions
β βββ teams.py # Team rosters and statistics
β βββ players.py # Player profiles and performance
β βββ statistics.py # Game and season statistics
β βββ combat.py # MMA/Boxing specific models
β βββ venues.py # Stadium and venue information
β
βββ users/ # 15+ models - User experience and preferences
βββ __init__.py
βββ profiles.py # User profile and demographics
βββ preferences.py # User interface and notification preferences
βββ auth.py # Authentication tokens and sessions
βββ sessions.py # User session management
βββ analytics.py # User behavior and analyticsEach domain follows consistent naming conventions, documentation standards, and testing patterns, ensuring that developers can quickly navigate and extend the schema system.
Domain-Specific Model Counts
- Leagues Domain: 28 models (league discovery, scoring, metadata)
- Betting Systems: 32 models (odds, markets, risk assessment)
- Email Assistant: 22 models (threads, classification, automation)
- Infrastructure: 18 models (health, deployment, monitoring)
- SaaS Platform: 27 models (users, billing, quotas)
- Sports Data: 35 models (events, teams, statistics)
- Users: 18 models (profiles, preferences, analytics)
Total: 180 models (exceeding the initial 150 target for comprehensive coverage)
Pydantic Model Patterns
The registry employs sophisticated Pydantic v2 patterns to maximize type safety, performance, and developer experience. Every model follows established conventions for validation, serialization, and documentation.
Base Model Pattern
All schemas inherit from a BaseSchema that provides common configuration and audit fields.
from pydantic import BaseModel, Field, ConfigDict, field_validator, model_validator
from typing import Annotated, Optional
from datetime import datetime
from enum import Enum
from uuid import UUID
class BaseSchema(BaseModel):
"""Base schema class with enterprise configuration"""
model_config = ConfigDict(
# Pydantic v2 core configuration
validate_assignment=True, # Validate on assignment
strict=False, # Allow coercion where safe
use_enum_values=False, # Serialize enums as strings
populate_by_name=True, # Allow field population by alias
arbitrary_types_allowed=True, # Support custom types
# JSON serialization
json_schema_extra={
"examples": [], # Auto-populated by factories
"x-domain": "altsportsleagues", # Custom metadata
"x-generated": True # Mark auto-generated fields
},
# Validation and performance
validate_default=True, # Validate default values
extra="forbid", # Prevent unknown fields
protected_namespaces=(), # No protected namespaces
)
# Universal audit fields
id: Annotated[UUID, Field(default_factory=uuid4, description="Unique identifier")]
created_at: Annotated[
datetime,
Field(default_factory=datetime.utcnow, description="Record creation timestamp")
]
updated_at: Annotated[
datetime,
Field(default_factory=datetime.utcnow, description="Last update timestamp")
]
version: Annotated[
int,
Field(default=1, ge=1, description="Schema version number")
]
metadata: Annotated[
dict,
Field(default_factory=dict, description="Additional metadata")
] = {}
# Audit trail
created_by: Annotated[Optional[str], Field(None, description="Creator user ID")]
updated_by: Annotated[Optional[str], Field(None, description="Updater user ID")]
@model_validator(mode='before')
@classmethod
def set_timestamps(cls, data):
"""Auto-set timestamps on creation"""
if isinstance(data, dict):
now = datetime.utcnow()
if 'created_at' not in data:
data['created_at'] = now
if 'updated_at' not in data:
data['updated_at'] = now
return data
def update_timestamp(self):
"""Update timestamp for modifications"""
self.updated_at = datetime.utcnow()
return selfEnum Patterns
Enums provide type-safe categorical data with rich metadata.
from enum import Enum
from pydantic import Field
class SportBucket(str, Enum):
"""Standard sport classification for market analysis"""
COMBAT = "combat"
"""Combat sports (MMA, Boxing, Wrestling) - High engagement, premium partnerships"""
LARGE_FIELD = "large_field"
"""Large field sports (Soccer, American Football, Rugby) - Global reach, sponsorship"""
TEAM = "team"
"""Team sports (Basketball, Baseball, Hockey) - Consistent engagement, betting"""
RACING = "racing"
"""Racing sports (Horse, Auto, Cycling) - High frequency events, gambling"""
OTHER = "other"
"""Miscellaneous sports (Golf, Tennis, Esports) - Niche but valuable"""
@classmethod
def get_description(cls, value: 'SportBucket') -> str:
"""Get human-readable description"""
descriptions = {
cls.COMBAT: "Combat sports with high engagement and premium partnerships",
cls.LARGE_FIELD: "Large field sports with global reach and sponsorship opportunities",
cls.TEAM: "Team sports with consistent fan engagement and betting markets",
cls.RACING: "Racing sports with high-frequency events and gambling focus",
cls.OTHER: "Miscellaneous sports including niche but valuable markets"
}
return descriptions.get(value, "Unknown sport bucket")
def get_market_characteristics(self) -> dict:
"""Return market analysis characteristics"""
characteristics = {
self.COMBAT: {
"engagement": "high",
"partnership_premium": "premium",
"betting_volume": "medium_high",
"global_reach": "medium"
},
self.LARGE_FIELD: {
"engagement": "very_high",
"partnership_premium": "high",
"betting_volume": "high",
"global_reach": "very_high"
},
# ... other cases
}
return characteristics.get(self, {"engagement": "medium"})Advanced Validation Patterns
Complex models leverage Pydantic's powerful validation capabilities.
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import List, Dict, Any
from enum import Enum
import re
from datetime import date
class LeagueTier(str, Enum):
"""League competitive tier classification"""
TIER_1 = "tier_1" # Premier leagues (Premier League, La Liga)
TIER_2 = "tier_2" # Strong national leagues
TIER_3 = "tier_3" # Regional/developmental leagues
TIER_4 = "tier_4" # Local/amateur leagues
class LeagueQuestionnaire(BaseModel):
"""Comprehensive league questionnaire for partnership analysis"""
# Core identification
league_name: Annotated[
str,
Field(
min_length=3,
max_length=200,
pattern=r'^[A-Za-z\s\.\,\-\'\(\)]+$',
description="Official league name (3-200 characters, letters and basic punctuation)"
)
]
# Sport classification
sport_bucket: SportBucket = Field(..., description="Primary sport category")
primary_sport: str = Field(..., description="Specific sport name")
# Contact information
contact_email: Annotated[
str,
Field(
pattern=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
description="Valid contact email address"
)
]
contact_name: Optional[str] = Field(None, description="Primary contact person")
contact_phone: Optional[str] = Field(
None,
pattern=r'^\+?[\d\s\-\(\)]{10,15}$',
description="Contact phone number (international format preferred)"
)
# League details
league_tier: LeagueTier = Field(..., description="Competitive tier classification")
founded_year: Annotated[
Optional[int],
Field(None, ge=1800, le=date.today().year + 1)
] = None
member_count: Annotated[
int,
Field(1, ge=1, le=10000, description="Number of member teams/players")
]
season_structure: str = Field(
...,
description="Season format (e.g., 'annual', 'semester', 'continuous')"
)
event_frequency: str = Field(
...,
description="Event frequency (e.g., 'weekly', 'monthly', 'seasonal')"
)
# Geographic scope
primary_location: str = Field(..., description="Primary geographic location")
global_reach: bool = Field(
False,
description="Does the league have international audience/participation?"
)
target_markets: List[str] = Field(
default_factory=list,
description="Target geographic markets (e.g., 'USA', 'Europe', 'Global')"
)
# Business model
revenue_model: str = Field(
...,
description="Primary revenue sources (e.g., 'sponsorships', 'broadcasting', 'tickets')"
)
current_partners: int = Field(
0,
ge=0,
le=100,
description="Number of current corporate partners/sponsors"
)
partnership_focus: List[str] = Field(
default_factory=list,
min_items=1,
max_items=5,
description="Preferred partnership categories"
)
# Digital presence
website_url: Annotated[
Optional[str],
Field(
None,
pattern=r'^https?://[^\s/$.?#].[^\s]*$',
description="Official website URL (must be HTTPS)"
)
] = None
social_media_followers: Annotated[
int,
Field(0, ge=0, le=100000000)
] = 0
digital_engagement: str = Field(
"low",
description="Digital engagement level (low/medium/high/very_high)"
)
# Technical requirements
data_integration: bool = Field(
False,
description="Does the league provide API/data feeds?"
)
real_time_scoring: bool = Field(
False,
description="Does the league provide real-time scoring data?"
)
historical_data: bool = Field(
False,
description="Does the league provide historical performance data?"
)
# Additional metadata
notes: Annotated[
Optional[str],
Field(None, max_length=1000, description="Additional notes or context")
] = None
tags: List[str] = Field(
default_factory=list,
max_items=20,
description="Tags for categorization and search"
)
# Validation and scoring
_validation_score: Annotated[float, Field(default=0.0, ge=0, le=1)] = 0.0
_partnership_potential: Annotated[float, Field(default=0.0, ge=0, le=1)] = 0.0
_market_maturity: Annotated[float, Field(default=0.0, ge=0, le=1)] = 0.0
@field_validator('league_name')
@classmethod
def validate_league_name(cls, v: str) -> str:
"""Validate league name format and content"""
if not v or len(v.strip()) < 3:
raise ValueError('League name must be at least 3 characters')
if not re.match(r'^[A-Za-z\s\.\,\-\'\(\)]+$', v):
raise ValueError('League name contains invalid characters')
return v.strip().title()
@field_validator('contact_email')
@classmethod
def validate_email(cls, v: str) -> str:
"""Strict email validation"""
if '@' not in v or '.' not in v.split('@')[-1]:
raise ValueError('Invalid email format')
return v.lower()
@field_validator('primary_location')
@classmethod
def validate_location(cls, v: str) -> str:
"""Validate location format"""
if len(v) < 2 or len(v) > 100:
raise ValueError('Location must be 2-100 characters')
return v.strip()
@model_validator(mode='after')
def calculate_scores(self) -> 'LeagueQuestionnaire':
"""Calculate automated scoring metrics"""
# Partnership potential scoring (simplified)
potential_score = 0.0
if self.global_reach:
potential_score += 0.25
if self.current_partners > 5:
potential_score += 0.20
if self.digital_engagement in ['high', 'very_high']:
potential_score += 0.30
if self.data_integration or self.real_time_scoring:
potential_score += 0.25
self._partnership_potential = min(1.0, potential_score)
# Market maturity scoring
maturity_score = 0.0
if self.founded_year and (date.today().year - self.founded_year) > 10:
maturity_score += 0.40
if self.member_count > 50:
maturity_score += 0.30
if self.social_media_followers > 10000:
maturity_score += 0.30
self._market_maturity = min(1.0, maturity_score)
return self
def get_scoring_summary(self) -> dict:
"""Generate human-readable scoring summary"""
return {
"partnership_potential": f"{self._partnership_potential:.2f}",
"market_maturity": f"{self._market_maturity:.2f}",
"recommendation": self._get_recommendation(),
"strengths": self._identify_strengths(),
"improvements": self._suggest_improvements()
}
def _get_recommendation(self) -> str:
"""Generate partnership recommendation"""
if self._partnership_potential > 0.7:
return "High potential - immediate partnership outreach recommended"
elif self._partnership_potential > 0.4:
return "Moderate potential - targeted approach with digital enhancement"
else:
return "Low potential - focus on market development before partnerships"Advanced Relationship Patterns
Models that represent relational data use sophisticated patterns for handling complex relationships.
from typing import ForwardRef, Optional
from uuid import UUID
# Forward declarations for mutual references
LeagueRef = ForwardRef('League')
PartnershipRef = ForwardRef('Partnership')
class League(BaseModel):
"""League model with comprehensive relationships"""
id: Annotated[UUID, Field(default_factory=uuid4)]
name: str
sport_bucket: SportBucket
# One-to-many relationships
partnerships: Annotated[
List['Partnership'],
Field(default_factory=list, description="Active partnerships")
] = []
# Many-to-one relationships (Foreign Keys)
primary_sport: Optional['Sport'] = None
# Optional relationships with lazy loading flags
_load_partnerships: Annotated[
bool,
Field(default=False, exclude=True)
] = False
_load_sport: Annotated[
bool,
Field(default=False, exclude=True)
] = False
@model_validator(mode='after')
def resolve_relationships(self) -> 'League':
"""Resolve and validate relationships"""
if self._load_partnerships:
# Simulate relationship loading (in real app, this would be database query)
self.partnerships = self._load_active_partnerships()
if self._load_sport:
self.primary_sport = self._get_primary_sport()
return self
class Partnership(BaseModel):
"""Partnership model linking leagues and partners"""
id: Annotated[UUID, Field(default_factory=uuid4)]
league_id: UUID # Foreign key to League
partner_id: UUID # Foreign key to Partner
# Relationship fields
league: Optional[LeagueRef] = None
partner: Optional['Partner'] = None
# Partnership specific fields
partnership_type: str
status: PartnershipStatus # pending, active, expired
value_estimate: float
contract_duration: int # months
model_config = ConfigDict(
# Enable relationship validation
validate_assignment=True,
# Custom JSON schema for relationships
json_schema_extra={
"x-relationships": {
"league": "one",
"partner": "many"
}
}
)
# Partner model (simplified)
class Partner(BaseModel):
"""Partner organization model"""
id: Annotated[UUID, Field(default_factory=uuid4)]
name: str
industry: str
partnership_focus: List[str]
# Resolve forward references after all definitions
League.model_rebuild()
Partnership.model_rebuild()Multi-Format Generation Pipeline
The registry includes a sophisticated pipeline for generating code and documentation in multiple formats from the canonical Pydantic models, ensuring consistency across the technology stack.
TypeScript Generation Pipeline
Automatic TypeScript generation ensures frontend developers receive type-safe interfaces that mirror the backend exactly.
from dataclasses import dataclass
from pathlib import Path
from jinja2 import Template
from typing import Type
import json
from pydantic import BaseModel
@dataclass
class TypeScriptGenerator:
"""Advanced TypeScript generator from Pydantic models"""
template_dir: Path = Path("data_layer/templates/typescript")
output_dir: Path = Path("frontend/types/generated")
def generate_all(self) -> None:
"""Generate TypeScript for all registered schemas"""
for domain in self.registry.domains:
for model in self.registry.get_domain_schemas(domain):
self.generate_interface(model)
def generate_interface(self, model: Type[BaseModel]) -> str:
"""Generate TypeScript interface for a Pydantic model"""
# Extract field information
fields = []
for field_name, field_info in model.model_fields.items():
ts_type = self._map_python_to_typescript(field_info.annotation)
optional = "?" if field_info.default is not ... and not field_info.is_required() else ""
description = field_info.field_info.description or ""
fields.append({
"name": field_name,
"type": ts_type,
"optional": optional,
"description": description
})
# Render template
template = self.template_dir / "interface.jinja"
rendered = Template(template.read_text()).render({
"model_name": model.__name__,
"fields": fields
})
# Write to file
filepath = self.output_dir / f"{model.__name__}.ts"
filepath.parent.mkdir(parents=True, exist_ok=True)
filepath.write_text(rendered)
return rendered
def _map_python_to_typescript(self, python_type: type) -> str:
"""Map Python types to equivalent TypeScript types"""
basic_mapping = {
str: "string",
int: "number",
float: "number",
bool: "boolean",
datetime: "Date",
date: "Date",
UUID: "string",
Optional[int]: "number | undefined",
List[str]: "string[]",
Dict[str, str]: "Record<string, string>"
}
# Handle unions and complex types
if hasattr(python_type, "__origin__"):
origin = python_type.__origin__
args = python_type.__args__
if origin is Optional:
base_type = self._map_python_to_typescript(args[0])
return f"{base_type} | undefined"
elif origin is List:
item_type = self._map_python_to_typescript(args[0])
return f"{item_type}[]"
elif origin is Dict:
# Simplified - in production, handle more complex dicts
return "Record<string, any>"
return basic_mapping.get(python_type, "any")JSON Schema Generation for Validation
JSON Schema generation enables integration with various validation tools and external systems.
from pydantic import BaseModel
from pydantic.json_schema import GenerateJsonSchema, model_json_schema
from typing import Dict, Any
from pathlib import Path
def generate_json_schema(model: Type[BaseModel], version: str = "1.0.0") -> Dict[str, Any]:
"""Generate comprehensive JSON Schema from Pydantic model"""
# Generate base schema using Pydantic's JSON schema generation
schema = model.model_json_schema(
mode='validation',
by_alias=True,
ref_template='#/components/schemas/{model}',
schema_generator=GenerateJsonSchema()
)
# Enhance with metadata
schema['$schema'] = 'https://json-schema.org/draft/2020-12/schema'
schema['$id'] = f'https://altsportsleagues.ai/schemas/{model.__name__}/v{version}'
schema['title'] = model.__name__
schema['description'] = getattr(model, '__doc__', '').strip()
# Add domain and registry metadata
schema['x-domain'] = _get_domain_from_model(model)
schema['x-generated'] = True
schema['x-pydantic-version'] = '2.5.0'
schema['x-generation-date'] = datetime.utcnow().isoformat()
# Add examples from model configuration
if hasattr(model.model_config, 'json_schema_extra'):
examples = model.model_config['json_schema_extra'].get('examples', [])
if examples:
schema['examples'] = examples
return schema
def _get_domain_from_model(model: Type[BaseModel]) -> str:
"""Extract domain from model module path"""
module_path = model.__module__
if 'leagues' in module_path:
return 'leagues'
elif 'betting_systems' in module_path:
return 'betting_systems'
# ... other domains
return 'core'
# Example usage
if __name__ == "__main__":
schema = generate_json_schema(LeagueQuestionnaire, "1.0.0")
# Write to file
with open(f'schemas/json/{LeagueQuestionnaire.__name__}.json', 'w') as f:
json.dump(schema, f, indent=2)
print(f"Generated JSON Schema for {LeagueQuestionnaire.__name__}:")
print(json.dumps(schema, indent=2)[:500] + "..." if len(str(schema)) > 500 else str(schema))Schema Versioning Strategy
Robust versioning ensures that schema evolution doesn't break existing integrations while allowing the system to grow and improve over time.
Version Numbering Convention
Schemas follow semantic versioning with domain-specific prefixes for clear identification.
{domain}.{model}-v{major}.{minor}.{patch}
Examples:
- leagues.questionnaire-v1.0.0.json # Initial release
- leagues.questionnaire-v1.1.0.json # Added optional field
- leagues.questionnaire-v2.0.0.json # Breaking change (removed required field)
- betting_systems.odds-v1.0.0.json # Initial odds schemaCompatibility Rules
The versioning system strictly enforces semantic versioning principles:
-
Major Version Changes (Breaking):
- Removal of required fields
- Changes to field types (int to str, etc.)
- Renaming or reordering fields
- Changes to enum values
- Changes to validation rules that would cause previously valid data to fail
-
Minor Version Changes (Backward Compatible):
- Addition of optional fields (new fields with defaults or Optional types)
- Addition of new enum values (never removing existing values)
- Relaxation of validation constraints
- Improvements to documentation or metadata
- Performance optimizations that don't affect data structure
-
Patch Version Changes (Non-Breaking Fixes):
- Bug fixes in validation logic
- Updates to documentation strings
- Internal performance improvements
- Clarifications or corrections to field descriptions
Migration Example
When evolving schemas, provide clear migration paths.
from pydantic import BaseModel, Field, validator
from typing import Optional
from datetime import datetime
from enum import Enum
# Version 1.0.0 - Initial release
class LeagueQuestionnaireV1(BaseModel):
"""Version 1.0.0 - Initial league questionnaire"""
league_name: str
contact_email: str
created_at: datetime
version: int = 1
# Version 1.1.0 - Added sport classification (minor, backward compatible)
class LeagueQuestionnaireV1_1(LeagueQuestionnaireV1):
"""Version 1.1.0 - Added optional sport bucket"""
sport_bucket: Optional[str] = None # New optional field
# Version 2.0.0 - Breaking change - Sport bucket now required (major)
class LeagueQuestionnaireV2(BaseModel):
"""Version 2.0.0 - Enhanced with required sport classification"""
league_name: str
contact_email: str
sport_bucket: str # Now required - breaking change
created_at: datetime
version: int = 2
@classmethod
def from_v1(cls, v1_data: LeagueQuestionnaireV1) -> 'LeagueQuestionnaireV2':
"""Migrate from v1 to v2 with default value"""
return cls(
league_name=v1_data.league_name,
contact_email=v1_data.contact_email,
sport_bucket="other", # Default migration value
created_at=v1_data.created_at,
version=2
)
@classmethod
def from_v1_1(cls, v1_1_data: LeagueQuestionnaireV1_1) -> 'LeagueQuestionnaireV2':
"""Migrate from v1.1 to v2 preserving existing value"""
return cls(
league_name=v1_1_data.league_name,
contact_email=v1_1_data.contact_email,
sport_bucket=v1_1_data.sport_bucket or "other", # Preserve if set
created_at=v1_1_data.created_at,
version=2
)
# Migration utility
def migrate_questionnaire(data: dict, target_version: str = "2.0.0") -> dict:
"""Universal migration utility"""
if target_version.startswith("1.0"):
# To v1.x
return {
"league_name": data.get("league_name"),
"contact_email": data.get("contact_email"),
"created_at": data.get("created_at"),
"version": 1
}
elif target_version.startswith("2.0"):
# To v2.x
v1_data = migrate_questionnaire(data, "1.1.0")
return LeagueQuestionnaireV2.from_v1_1(LeagueQuestionnaireV1_1(**v1_data)).model_dump()
else:
raise ValueError(f"Unsupported target version: {target_version}")Schema Migration Tools
Automated tools facilitate safe schema evolution.
Migration Generator
from pathlib import Path
from typing import Dict, Any
import json
from datetime import datetime
class SchemaMigrator:
"""Automated schema migration tool"""
def __init__(self, registry: SchemaRegistry):
self.registry = registry
self.migration_history = self.load_migration_history()
def generate_migration_script(self, from_version: str, to_version: str, model_name: str) -> Dict[str, Any]:
"""Generate migration script for schema version upgrade"""
from_schema = self.registry.get_schema_version(model_name, from_version)
to_schema = self.registry.get_schema_version(model_name, to_version)
# Analyze differences
differences = self.analyze_schema_differences(from_schema, to_schema)
# Generate migration steps
migration_steps = self._create_migration_steps(differences)
# Create script metadata
script = {
"migration_id": f"migrate_{model_name}_{from_version}_to_{to_version}",
"from_version": from_version,
"to_version": to_version,
"model": model_name,
"description": f"Migrate {model_name} from {from_version} to {to_version}",
"timestamp": datetime.utcnow().isoformat(),
"breaking_changes": self._is_breaking_migration(differences),
"steps": migration_steps,
"validation": self._generate_validation_steps(differences)
}
return script
def _analyze_schema_differences(self, from_schema: Type[BaseModel], to_schema: Type[BaseModel]) -> Dict[str, Any]:
"""Analyze differences between schema versions"""
from_fields = from_schema.model_fields
to_fields = to_schema.model_fields
differences = {
"removed_fields": [],
"added_fields": [],
"type_changes": [],
"constraint_changes": []
}
# Check for removed fields (breaking)
for field_name in from_fields:
if field_name not in to_fields:
differences["removed_fields"].append(field_name)
# Check for added fields
for field_name in to_fields:
if field_name not in from_fields:
differences["added_fields"].append({
"name": field_name,
"type": to_fields[field_name].annotation
})
# Check for type changes
for field_name in set(from_fields) & set(to_fields):
from_type = from_fields[field_name].annotation
to_type = to_fields[field_name].annotation
if from_type != to_type:
differences["type_changes"].append({
"field": field_name,
"from": from_type.__name__,
"to": to_type.__name__
})
return differencesTest Fixture Generation
Comprehensive test fixtures ensure reliable testing across all schemas.
Factory Pattern Implementation
Factories provide realistic, parameterized test data generation.
import factory
from factory import fuzzy
from faker import Faker
from datetime import datetime, timedelta
from uuid import uuid4
from typing import Optional
import random
fake = Faker()
random.seed(42) # Consistent test data
class BaseFactory(factory.Factory):
"""Base factory with common patterns"""
id = factory.LazyFunction(uuid4)
created_at = factory.LazyFunction(lambda: datetime.utcnow() - timedelta(days=random.randint(0, 365)))
updated_at = factory.LazyFunction(datetime.utcnow)
version = 1
class SportBucketFactory(factory.Factory):
"""Factory for SportBucket enum"""
class Meta:
abstract = True # Abstract base class
_choices = [e.value for e in SportBucket]
sport_bucket = factory.Iterator(_choices)
class LeagueQuestionnaireFactory(BaseFactory):
"""Factory for LeagueQuestionnaire model"""
class Meta:
model = LeagueQuestionnaire
league_name = factory.LazyFunction(
lambda: f"{fake.company()} {random.choice(['Premier', 'Professional', 'Elite']) } League"
)
sport_bucket = factory.SubFactory(SportBucketFactory)
contact_email = factory.LazyFunction(
lambda: fake.email(domain="league.com")
)
contact_name = factory.LazyFunction(fake.name)
contact_phone = factory.LazyFunction(
lambda: f"+1-{random.randint(200,999)}-{random.randint(100,999)}-{random.randint(1000,9999)}"
)
league_tier = factory.Iterator([e.value for e in LeagueTier])
founded_year = factory.LazyFunction(
lambda: random.randint(1900, datetime.now().year - 1)
)
member_count = fuzzy.FuzzyInteger(10, 1000)
season_structure = factory.Iterator(["annual", "semester", "continuous", "tournament"])
event_frequency = factory.Iterator(["weekly", "monthly", "seasonal", "event-based"])
primary_location = factory.LazyFunction(fake.city)
global_reach = factory.Boolean(0.3) # 30% chance of global reach
target_markets = factory.List(
factory.LazyFunction(fake.country_code),
size=fuzzy.FuzzyInteger(1, 5)
)
revenue_model = factory.LazyFunction(
lambda: random.choice(["sponsorships", "broadcasting", "tickets", "merchandise", "digital"])
)
current_partners = fuzzy.FuzzyInteger(0, 50)
partnership_focus = factory.List(
factory.LazyFunction(fake.word),
size=fuzzy.FuzzyInteger(1, 4)
)
website_url = factory.LazyFunction(
lambda: f"https://{fake.domain_name()}"
)
social_media_followers = fuzzy.FuzzyInteger(100, 500000)
digital_engagement = factory.Iterator(["low", "medium", "high", "very_high"])
data_integration = factory.Boolean(0.6)
real_time_scoring = factory.Boolean(0.4)
historical_data = factory.Boolean(0.7)
notes = factory.LazyFunction(
lambda: fake.paragraph(nb_sentences=random.randint(1, 3))
)
tags = factory.List(
factory.LazyFunction(fake.word),
size=fuzzy.FuzzyInteger(0, 5)
)
@factory.post_generation
def post_generation(self, create: bool, extracted: Optional[dict], **kwargs):
"""Post-generation processing"""
if create:
# Simulate relationship loading
self._scoring_profile = self._compute_score()
return self
# Usage patterns
def create_test_league():
"""Create a single test league"""
return LeagueQuestionnaireFactory.build()
def create_league_batch(size: int = 10):
"""Create batch of test leagues"""
return LeagueQuestionnaireFactory.build_batch(size)
def create_specific_scenario():
"""Create league for specific test scenario"""
return LeagueQuestionnaireFactory(
league_name="Test Premier League",
sport_bucket=SportBucket.TEAM,
league_tier=LeagueTier.TIER_1,
member_count=20,
global_reach=True
)Seed Data Management
Seed data provides consistent, realistic examples for development and testing.
# fixtures/seeds/leagues_seed.py
from datetime import datetime
from typing import List
from data_layer.schemas.leagues import LeagueQuestionnaire
from faker import Faker
fake = Faker()
SEED_LEAGUES = [
{
"league_name": "Power Slap League",
"sport_bucket": "combat",
"contact_email": "partnerships@powerslap.com",
"league_tier": "tier_4",
"founded_year": 2022,
"member_count": 12,
"season_structure": "annual",
"event_frequency": "monthly",
"primary_location": "Las Vegas, NV",
"global_reach": True,
"target_markets": ["USA", "Europe"],
"revenue_model": "sponsorships",
"current_partners": 8,
"partnership_focus": ["technology", "beverage", "apparel"],
"website_url": "https://powerslap.com",
"social_media_followers": 250000,
"digital_engagement": "high",
"data_integration": True,
"real_time_scoring": True,
"historical_data": False,
"notes": "High-growth combat sports league with strong digital presence",
"tags": ["combat", "emerging", "digital-first"]
},
{
"league_name": "Canadian Premier League",
"sport_bucket": "team",
"contact_email": "business@canpl.ca",
"league_tier": "tier_2",
"founded_year": 2017,
"member_count": 8,
"season_structure": "annual",
"event_frequency": "weekly",
"primary_location": "Canada",
"global_reach": False,
"target_markets": ["Canada", "USA"],
"revenue_model": "broadcasting",
"current_partners": 15,
"partnership_focus": ["financial", "automotive", "technology"],
"website_url": "https://canpl.ca",
"social_media_followers": 75000,
"digital_engagement": "medium",
"data_integration": True,
"real_time_scoring": True,
"historical_data": True,
"notes": "Growing professional soccer league with strong Canadian market",
"tags": ["soccer", "north_america", "professional"]
},
{
"league_name": "Elite Youth Basketball Association",
"sport_bucket": "team",
"contact_email": "info@eyba.org",
"league_tier": "tier_3",
"founded_year": 2015,
"member_count": 45,
"season_structure": "semester",
"event_frequency": "weekly",
"primary_location": "Midwest USA",
"global_reach": False,
"target_markets": ["USA"],
"revenue_model": "tickets",
"current_partners": 3,
"partnership_focus": ["local_business", "education", "sports_gear"],
"website_url": "https://eyba.org",
"social_media_followers": 12000,
"digital_engagement": "low",
"data_integration": False,
"real_time_scoring": False,
"historical_data": False,
"notes": "Youth development league focused on player pathways to professional basketball",
"tags": ["basketball", "youth", "development"]
}
]
def load_seed_leagues() -> List[LeagueQuestionnaire]:
"""Load predefined seed leagues"""
leagues = []
for seed_data in SEED_LEAGUES:
questionnaire = LeagueQuestionnaire(**seed_data)
# Add computed fields
questionnaire._scoring_profile = questionnaire._compute_score()
leagues.append(questionnaire)
return leagues
def get_domain_seed_data(domain: str) -> List[BaseModel]:
"""Get seed data for specific domain"""
if domain == "leagues":
return load_seed_leagues()
elif domain == "betting_systems":
return load_betting_seed_data()
# ... other domains
return []Schema Registry API
The runtime API provides dynamic schema access and validation capabilities.
Core Registry Implementation
from typing import Dict, Type, Optional, List
from collections import defaultdict
from pathlib import Path
import importlib
import json
from datetime import datetime
from pydantic import BaseModel, ValidationError
class SchemaRegistry:
"""Enterprise-grade schema registry with dynamic loading and validation"""
def __init__(self, schema_path: Path = Path("data_layer/schemas")):
self._schemas: Dict[str, Type[BaseModel]] = {}
self._domains: Dict[str, List[Type[BaseModel]]] = defaultdict(list)
self._versions: Dict[str, Dict[str, Type[BaseModel]]] = defaultdict(dict)
self._schema_path = schema_path
self._loaded_domains = set()
# Cache for performance
self._validation_cache = {}
self._schema_metadata = {}
def load_domain(self, domain: str) -> None:
"""Load all schemas from a specific domain"""
if domain in self._loaded_domains:
return
domain_path = self._schema_path / domain
if not domain_path.exists():
raise ValueError(f"Domain path not found: {domain_path}")
# Import domain module
try:
domain_module = importlib.import_module(f"data_layer.schemas.{domain}")
# Register all models in the domain
for attr_name in dir(domain_module):
attr = getattr(domain_module, attr_name)
if isinstance(attr, type) and issubclass(attr, BaseModel) and attr is not BaseModel:
self.register(attr, domain)
except ImportError as e:
raise ImportError(f"Failed to load domain {domain}: {e}")
self._loaded_domains.add(domain)
def register(self, model: Type[BaseModel], domain: str, version: str = "1.0.0") -> None:
"""Register a schema with domain and version"""
# Generate unique key
key = f"{domain}.{model.__name__}"
full_key = f"{key}-v{version}"
# Validate model
if not issubclass(model, BaseModel):
raise TypeError(f"{model.__name__} is not a Pydantic BaseModel")
# Store in registries
if key in self._schemas:
if version not in self._versions[key]:
self._versions[key][version] = model
else:
raise ValueError(f"Version {version} already exists for {key}")
else:
self._schemas[key] = model
self._domains[domain].append(model)
self._versions[key][version] = model
# Extract and store metadata
metadata = self._extract_model_metadata(model)
self._schema_metadata[full_key] = metadata
def get_schema(self, key: str, version: Optional[str] = None) -> Type[BaseModel]:
"""Retrieve schema by key and optional version"""
if '.' not in key:
raise ValueError("Schema key must be in 'domain.model' format")
domain, model_name = key.split('.', 1)
if version:
full_key = f"{domain}.{model_name}-v{version}"
if full_key in self._schema_metadata:
return self._versions[f"{domain}.{model_name}"][version]
else:
raise ValueError(f"Schema {full_key} not found")
else:
# Return latest version
if f"{domain}.{model_name}" in self._schemas:
latest_version = max(self._versions[f"{domain}.{model_name}"].keys())
return self._versions[f"{domain}.{model_name}"][latest_version]
else:
raise ValueError(f"Schema {domain}.{model_name} not found")
def validate_data(self, schema_key: str, data: dict) -> BaseModel:
"""Validate data against named schema"""
# Get schema
schema = self.get_schema(schema_key)
# Check cache first
cache_key = f"validate_{schema_key}_{hash(str(data))}"
if cache_key in self._validation_cache:
cached_result, cached_data = self._validation_cache[cache_key]
if cached_data == data:
return cached_result
try:
validated = schema.model_validate(data)
# Cache successful validation (for performance)
self._validation_cache[cache_key] = (validated, data)
return validated
except ValidationError as e:
# Cache validation errors too (prevents repeated failures)
self._validation_cache[cache_key] = (None, data, str(e))
raise
def list_schemas(
self,
domain: Optional[str] = None,
version: Optional[str] = None
) -> List[Dict[str, Any]]:
"""List schemas with filtering options"""
if domain:
if domain not in self._domains:
return []
schemas = self._domains[domain]
else:
schemas = []
for domain_schemas in self._domains.values():
schemas.extend(domain_schemas)
# Filter by version if specified
if version:
filtered = []
for schema in schemas:
schema_key = self._get_schema_key(schema)
if version in self._versions[schema_key]:
filtered.append({
"domain": self._get_domain_from_key(schema_key),
"name": schema.__name__,
"version": version,
"description": self._schema_metadata.get(f"{schema_key}-v{version}", {}).get("description", "")
})
return filtered
else:
# Return all versions for each schema
result = []
for schema in schemas:
schema_key = self._get_schema_key(schema)
versions = list(self._versions[schema_key].keys())
latest = max(versions)
result.append({
"domain": self._get_domain_from_key(schema_key),
"name": schema.__name__,
"latest_version": latest,
"all_versions": versions,
"description": self._schema_metadata.get(f"{schema_key}-v{latest}", {}).get("description", "")
})
return result
def generate_schema_catalog(self) -> Dict[str, Any]:
"""Generate complete schema catalog with all metadata"""
catalog = {
"registry_version": "1.0.0",
"total_schemas": len(self._schemas),
"domains": {},
"schemas": {}
}
# Add domain information
for domain, schemas in self._domains.items():
catalog["domains"][domain] = {
"schema_count": len(schemas),
"models": [schema.__name__ for schema in self._domains[domain]]
}
# Add schema details
for key, model in self._schemas.items():
domain = self._get_domain_from_key(key)
latest_version = max(self._versions[key].keys())
metadata = self._schema_metadata.get(f"{key}-v{latest_version}", {})
catalog["schemas"][key] = {
"domain": domain,
"latest_version": latest_version,
"description": metadata.get("description", ""),
"fields": len(model.model_fields),
"relationships": self._count_relationships(model)
}
return catalog
def _extract_model_metadata(self, model: Type[BaseModel]) -> Dict[str, Any]:
"""Extract comprehensive metadata from Pydantic model"""
metadata = {
"name": model.__name__,
"module": model.__module__,
"docstring": getattr(model, "__doc__", ""),
"fields": {},
"validators": [],
"relationships": [],
"example": self._generate_example(model)
}
# Extract field metadata
for field_name, field_info in model.model_fields.items():
field_meta = {
"type": str(field_info.annotation),
"required": field_info.is_required(),
"default": field_info.default,
"description": field_info.field_info.description or "",
"validators": []
}
# Check for field validators
if hasattr(model, f"__field_validator__{field_name}"):
field_meta["validators"].append("custom_field_validator")
metadata["fields"][field_name] = field_meta
# Extract model validators
if hasattr(model, "model_validator"):
metadata["validators"].append("model_validator")
# Extract relationships (simplified)
for field_name, field_info in model.model_fields.items():
if str(field_info.annotation).startswith("Optional[") or "List[" in str(field_info.annotation):
annotation_str = str(field_info.annotation)
if "League" in annotation_str or "Partner" in annotation_str:
metadata["relationships"].append({
"field": field_name,
"type": "relationship",
"target": annotation_str.split("'")[1]
})
return metadata
def _get_schema_key(self, model: Type[BaseModel]) -> str:
"""Generate schema key from model"""
# Extract from module path or annotations
module = model.__module__
if 'leagues' in module:
return "leagues." + model.__name__
# ... other domains
return f"unknown.{model.__name__}"
def _get_domain_from_key(self, key: str) -> str:
"""Extract domain from schema key"""
return key.split('.')[0]
def _count_relationships(self, model: Type[BaseModel]) -> int:
"""Count foreign key relationships in model"""
count = 0
for field_info in model.model_fields.values():
annotation = str(field_info.annotation)
if any(rel in annotation for rel in ["League", "Partner", "User", "Team"]):
count += 1
return count
def _generate_example(self, model: Type[BaseModel]) -> dict:
"""Generate example instance for model"""
try:
# Use Pydantic's built-in example generation
return model.model_json_schema()['examples'][0] if model.model_json_schema().get('examples') else {}
except:
return {"example": "Generated by registry"}Validation Engine
The validation engine provides both runtime and compile-time validation capabilities.
from pydantic import BaseModel, ValidationError
from typing import Dict, Any, Optional
import json
from functools import lru_cache
class ValidationEngine:
"""Advanced validation engine with caching and error handling"""
def __init__(self, registry: SchemaRegistry):
self.registry = registry
self._validators = {}
self._error_templates = {}
@lru_cache(maxsize=128)
def get_validator(self, schema_key: str) -> callable:
"""Get cached validator for schema"""
schema = self.registry.get_schema(schema_key)
return schema.model_validate
def validate_with_context(self, schema_key: str, data: dict, context: Dict[str, Any]) -> tuple[Optional[BaseModel], Optional[str]]:
"""Validate data with additional business context"""
try:
# Get validator
validator = self.get_validator(schema_key)
# Apply business context validation
context_validated = self._apply_context_validators(data, context)
# Validate against schema
validated = validator(context_validated)
# Post-validation business rules
final = self._post_validation_checks(validated, context)
return final, None
except ValidationError as e:
# Enhance error with context
enhanced_error = self._enhance_validation_error(e, schema_key, context)
return None, enhanced_error
def _apply_context_validators(self, data: dict, context: Dict[str, Any]) -> dict:
"""Apply business-specific context validators"""
validated_data = data.copy()
# Example: Validate league data against current season
if "leagues" in context.get("domain", ""):
if "founded_year" in data:
current_year = context.get("current_year", datetime.now().year)
if data["founded_year"] > current_year:
raise ValueError(f"League founded_year {data['founded_year']} cannot be in future")
# Example: Validate betting odds for realism
if "betting_systems" in context.get("domain", ""):
if "odds" in data:
if data["odds"] <= 1.0 or data["odds"] >= 100.0:
raise ValueError("Odds must be between 1.0 and 100.0 for realistic betting markets")
return validated_data
def _post_validation_checks(self, model: BaseModel, context: Dict[str, Any]) -> BaseModel:
"""Perform post-validation business rules"""
# Example: Auto-calculate derived fields
if hasattr(model, "calculate_scores"):
model = model.calculate_scores()
# Example: Enforce business invariants
if hasattr(model, "_validate_business_invariants"):
model._validate_business_invariants()
return model
def _enhance_validation_error(self, error: ValidationError, schema_key: str, context: Dict[str, Any]) -> str:
"""Enhance validation errors with actionable business advice"""
enhanced_messages = []
for err in error.errors():
message = err["msg"]
# Add business context to errors
if "email" in err["loc"][0].lower():
message += " Please ensure the email follows standard business format."
if "phone" in err["loc"][0].lower():
message += " Business phone numbers should include country code for international compatibility."
if "url" in err["loc"][0].lower():
message += " URLs must be valid and preferably use HTTPS for security."
enhanced_messages.append(f"{err['loc']}: {message}")
# Add general business advice
enhanced_messages.append("\nBusiness Context Notes:")
enhanced_messages.append("- Ensure all contact information is accurate and professional")
enhanced_messages.append("- League names should reflect official branding")
enhanced_messages.append("- Geographic locations should be specific for accurate market analysis")
return "\n".join(enhanced_messages)Integration with IDEs and Editors
The schema registry is designed with modern development tools in mind, providing seamless integration with popular IDEs and editors.
VSCode Configuration
Comprehensive VSCode settings ensure optimal type checking and autocomplete.
// .vscode/settings.json
{
"python.defaultInterpreterPath": "./.venv/bin/python",
"python.analysis.typeCheckingMode": "strict",
"python.analysis.extraPaths": [
"${workspaceFolder}/data_layer/schemas"
],
"python.analysis.diagnosticMode": "workspace",
"python.analysis.diagnosticSeverityOverrides": {
"reportMissingTypeStubs": "none",
"reportUnknownParameterType": "warning",
"reportUnknownArgumentType": "warning"
},
"python.linting.mypyEnabled": true,
"python.linting.mypyArgs": [
"--strict",
"--warn-redundant-casts",
"--warn-unused-ignores",
"--warn-unreachable",
"--namespace-packages",
"--show-error-codes",
"--no-implicit-reexport"
],
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false,
"python.testing.pytestArgs": [
"tests",
"-v",
"--cov=data_layer",
"--cov-report=html",
"--cov-report=term-missing"
],
"python.formatting.provider": "black",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
},
"files.associations": {
"*.mdx": "markdown"
}
}PyCharm Configuration
For JetBrains users, the following settings optimize the experience.
// .idea/python-interpreter.json (or through UI)
{
"pythonProjects": {
"interpreterPath": "./.venv/bin/python",
"packages": {
"pydantic": ">=2.5.0"
}
},
"typeCheckers": {
"mypy": {
"enabled": true,
"arguments": [
"--strict",
"--show-error-codes",
"data_layer"
]
},
"pyright": {
"enabled": true
}
}
}Type Stub Generation
Generated stubs enhance IDE performance for large schema sets.
# scripts/generate_stubs.py
from pathlib import Path
from typing import get_origin, get_args
import inspect
def generate_all_stubs(output_dir: Path = Path("data_layer/stubs")):
"""Generate .pyi stub files for all schemas"""
output_dir.mkdir(parents=True, exist_ok=True)
for domain in registry._domains:
domain_dir = output_dir / domain
domain_dir.mkdir(parents=True, exist_ok=True)
for model in registry._domains[domain]:
stub_content = generate_model_stub(model)
stub_path = domain_dir / f"{model.__name__}.pyi"
stub_path.write_text(stub_content)
print(f"Generated {len(list(output_dir.rglob('*.pyi')))} stub files")
def generate_model_stub(model: Type[BaseModel]) -> str:
"""Generate .pyi stub for a single model"""
lines = [
'"""Auto-generated type stub for {model.__name__}"""',
'from typing import Optional, List, Dict, Any',
'from datetime import datetime',
f'class {model.__name__}:'
]
# Generate fields
for field_name, field_info in model.model_fields.items():
annotation = field_info.annotation
if get_origin(annotation) is Optional:
base_type = get_args(annotation)[0]
type_str = f"Optional[{base_type.__name__}]"
else:
type_str = annotation.__name__
default_str = "" if field_info.default is ... else " = ..."
lines.append(f" {field_name}: {type_str}{default_str}")
return "\n".join(lines)Performance Considerations
Performance is a critical aspect of the schema registry, especially given the volume of validation operations in production.
Pydantic v2 Performance Optimizations
Pydantic v2 introduces groundbreaking performance improvements:
- Rust Core Engine: The validation core is implemented in Rust, providing 17x faster validation compared to v1.
- Lazy Schema Construction: Schemas are constructed on-demand rather than at import time, reducing startup latency.
- Cached Validators: Individual field validators are cached after first use, eliminating repeated compilation.
- Efficient Serialization: JSON serialization is optimized with 50% better performance than v1.
- Memory-Efficient Parsing: Advanced parsing algorithms reduce memory usage during validation.
Benchmark Results
Typical validation benchmarks demonstrate the performance gains:
# performance/benchmark_validation.py
import timeit
import cProfile
from pydantic import ValidationError
from data_layer.schemas.leagues import LeagueQuestionnaire
# Sample data for benchmarking
SAMPLE_DATA = {
"league_name": "Test Premier League",
"sport_bucket": "team",
"contact_email": "test@example.com",
"league_tier": "tier_1",
"founded_year": 2020,
"member_count": 20,
"season_structure": "annual",
"event_frequency": "weekly",
"primary_location": "New York, NY",
"global_reach": True,
"target_markets": ["USA", "Europe"],
"revenue_model": "sponsorships",
"current_partners": 15,
"partnership_focus": ["technology", "financial"],
"website_url": "https://testleague.com",
"social_media_followers": 50000,
"digital_engagement": "high",
"data_integration": True,
"real_time_scoring": True,
"historical_data": True,
"notes": "Test league for performance benchmarking",
"tags": ["test", "benchmark"]
}
def benchmark_validation():
"""Benchmark schema validation performance"""
# Warm up
for _ in range(100):
LeagueQuestionnaire(**SAMPLE_DATA)
# Time 10,000 validations
times = timeit.repeat(
lambda: LeagueQuestionnaire(**SAMPLE_DATA),
number=10000,
repeat=5
)
avg_time = min(times) / 10000 * 1000 # ms per validation
print(f"Average validation time: {avg_time:.3f} ms")
print(f"Throughput: {1000/avg_time:.0f} validations/second")
if __name__ == "__main__":
benchmark_validation()Expected Results (Pydantic v2):
- Average validation time: < 0.5 ms per complex model
- Throughput: > 2,000 validations per second
- Memory usage: < 1 MB per 1,000 validations
These benchmarks ensure that schema validation doesn't become a bottleneck in high-throughput scenarios like API request processing or batch data imports.
Caching Strategy
The registry implements intelligent caching to further optimize performance:
from functools import lru_cache
from threading import Lock
from typing import Type, Dict
class CachedSchemaRegistry(SchemaRegistry):
"""Schema registry with intelligent caching"""
def __init__(self):
super().__init__()
self._cache_lock = Lock()
self._compiled_validators: Dict[str, callable] = {}
def get_cached_validator(self, schema_key: str) -> callable:
"""Get cached validator for schema"""
with self._cache_lock:
if schema_key not in self._compiled_validators:
schema = self.get_schema(schema_key)
# Compile validator for performance
validator = lru_cache(maxsize=256)(schema.model_validate)
self._compiled_validators[schema_key] = validator
return self._compiled_validators[schema_key]
def validate_cached(self, schema_key: str, data: dict) -> BaseModel:
"""Validate using cached validator"""
validator = self.get_cached_validator(schema_key)
return validator(data)This caching layer ensures that validation performance remains optimal even under heavy load.
Security Considerations
Data validation is the first line of defense against malicious input, and the schema registry implements robust security measures.
Input Sanitization and Validation
All incoming data is rigorously validated against schema definitions:
- SQL Injection Prevention: Parameterized queries using validated Pydantic models prevent injection attacks.
- XSS Prevention: String fields are sanitized before any HTML rendering or database storage.
- Email and URL Validation: Strict regex patterns ensure only valid business emails and URLs are accepted.
- Path Traversal Protection: File path fields are validated to prevent directory traversal attacks.
Sensitive Data Handling
The registry provides special handling for sensitive information:
from pydantic import SecretStr, Field
from typing import Optional, List
class ApiKey(BaseModel):
"""Secure API key model with sensitive data protection"""
key_id: str
user_id: str
scopes: List[str]
created_at: datetime
expires_at: Optional[datetime] = None
is_active: bool = True
# Sensitive fields with special handling
secret_key: SecretStr = Field(..., description="Encrypted API secret (never logged)")
hashed_value: str # SHA-256 hash of the secret for verification
model_config = ConfigDict(
# Never include secrets in error messages or logs
json_encoders={
SecretStr: lambda v: "***REDACTED***" if v else None
},
# Prevent secret from appearing in repr
str_strip_whitespace=True,
validate_assignment=True
)
@field_validator('secret_key')
@classmethod
def validate_api_key_format(cls, v: SecretStr) -> SecretStr:
"""Validate API key format and strength"""
key_str = v.get_secret_value()
# Minimum length and complexity requirements
if len(key_str) < 32:
raise ValueError("API key must be at least 32 characters")
# Check for common weak patterns
weak_patterns = [r'^password', r'^123456', r'^admin', r'^test']
if any(re.search(pattern, key_str.lower()) for pattern in weak_patterns):
raise ValueError("API key contains weak pattern - use stronger key")
# Generate hash for storage
import hashlib
v._hashed_value = hashlib.sha256(key_str.encode()).hexdigest()
return v
def verify_key(self, plain_key: str) -> bool:
"""Verify API key against stored hash (without exposing secret)"""
import hashlib
return hashlib.sha256(plain_key.encode()).hexdigest() == self.hashed_valueAudit Trail Integration
All schema operations are logged for security auditing:
import logging
from contextlib import contextmanager
from typing import Generator
# Security audit logger
audit_logger = logging.getLogger("schema_audit")
class AuditedSchemaRegistry(SchemaRegistry):
"""Registry with comprehensive audit logging"""
@contextmanager
def audit_operation(self, operation: str, user_id: str, context: Dict[str, Any]) -> Generator[None, None, None]:
"""Context manager for auditing schema operations"""
audit_logger.info(
"Schema operation started",
extra={
"operation": operation,
"user_id": user_id,
"schema_key": context.get("schema_key"),
"data_size": len(str(context.get("data", {}))),
"ip_address": context.get("ip_address"),
"user_agent": context.get("user_agent")
}
)
try:
yield
audit_logger.info(
"Schema operation succeeded",
extra={
"operation": operation,
"user_id": user_id,
"duration_ms": context.get("duration_ms"),
"result": "success"
}
)
except Exception as e:
audit_logger.error(
"Schema operation failed",
extra={
"operation": operation,
"user_id": user_id,
"error": str(e),
"result": "failure"
}
)
raiseDeployment Considerations
The schema registry is designed for seamless integration into various deployment scenarios, from local development to production clusters.
Package Structure
The registry is distributed as a standalone Python package for maximum reusability.
altsportsleagues-schemas/
βββ pyproject.toml # Project metadata and dependencies
βββ README.md # Usage and integration guide
βββ CHANGELOG.md # Version history
βββ LICENSE # MIT License
βββ src/
β βββ altsportsleagues_schemas/
β βββ __init__.py # Package entry points
β βββ registry.py # Core registry implementation
β βββ base.py # Base schema and utilities
β βββ domains/ # Domain-specific schemas
β β βββ __init__.py
β β βββ leagues/
β β βββ betting_systems/
β β βββ email_assistant/
β β βββ infrastructure/
β β βββ saas/
β β βββ sports/
β β βββ users/
β βββ generators/ # Multi-format generators
β β βββ __init__.py
β β βββ typescript.py
β β βββ json_schema.py
β β βββ graphql.py
β βββ validators/ # Custom validation logic
β βββ fixtures/ # Test fixture factories
βββ tests/ # Comprehensive test suite
βββ docs/ # Generated documentation
βββ stubs/ # IDE type stubs
βββ examples/ # Usage examplesDistribution Configuration
# pyproject.toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "altsportsleagues-schemas"
version = "1.0.0"
description = "Type-safe Pydantic schemas for AltSportsLeagues.ai platform"
readme = "README.md"
license = {text = "MIT"}
authors = [
{name = "AltSportsLeagues Team", email = "team@altsportsleagues.ai"}
]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3.11",
"Programming Language :: TypeScript",
"Topic :: Software Development :: Libraries :: Python Modules",
"Typing :: Typed"
]
requires-python = ">=3.11"
dependencies = [
"pydantic >= 2.5.0",
"annotated-types >= 0.6.0",
"pydantic-settings >= 2.1.0",
"typing-extensions >= 4.8.0"
]
[project.optional-dependencies]
dev = [
"mypy >= 1.7.0",
"pyright >= 1.1.0",
"factory-boy >= 3.3.0",
"faker >= 20.0.0",
"pytest >= 7.4.0",
"pytest-cov >= 4.1.0",
"black >= 23.0.0",
"isort >= 5.12.0"
]
test = [
"pytest >= 7.4.0",
"pytest-cov >= 4.1.0",
"factory-boy >= 3.3.0",
"faker >= 20.0.0"
]
docs = [
"mkdocs >= 1.5.0",
"mkdocs-material >= 9.0.0",
"pydantic >= 2.5.0"
]
[project.urls]
Homepage = "https://altsportsleagues.ai/schemas"
Documentation = "https://altsportsleagues.ai/schemas/docs"
Repository = "https://github.com/altsportsleagues/schemas"
Issues = "https://github.com/altsportsleagues/schemas/issues"
[tool.hatch.build.targets.wheel]
packages = ["src/altsportsleagues_schemas"]
[tool.mypy]
python_version = "3.11"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
check_untyped_defs = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = falseInstallation and Usage
# Install the schema package
pip install altsportsleagues-schemas
# Or from source
git clone https://github.com/altsportsleagues/schemas.git
cd schemas
pip install -e .[dev]
# Verify installation
python -c "from altsportsleagues_schemas.registry import registry; print(registry.list_schemas())"Error Handling Patterns
Robust error handling ensures that schema issues are caught early and provide actionable feedback.
Custom Exception Hierarchy
from pydantic import ValidationError as PydanticValidationError
from typing import Optional, Dict, Any, List
class SchemaRegistryError(Exception):
"""Base exception for all schema registry errors"""
def __init__(self, message: str, context: Optional[Dict[str, Any]] = None):
self.message = message
self.context = context or {}
super().__init__(self.message)
class SchemaNotFoundError(SchemaRegistryError):
"""Raised when schema is not found in registry"""
def __init__(self, schema_key: str, available_schemas: List[str]):
context = {
"schema_key": schema_key,
"available_schemas": available_schemas[:10], # Limit for readability
"total_available": len(available_schemas),
"suggestion": f"Did you mean one of: {', '.join(available_schemas[:3])}?"
}
super().__init__("Schema not found in registry", context)
class SchemaVersionMismatch(SchemaRegistryError):
"""Raised when schema version is incompatible"""
def __init__(self, requested_version: str, available_versions: List[str]):
context = {
"requested_version": requested_version,
"available_versions": available_versions,
"latest_version": max(available_versions) if available_versions else None,
"suggestion": f"Try using latest version: {max(available_versions) if available_versions else 'unknown'}"
}
super().__init__("Schema version not compatible", context)
class ValidationSchemaError(PydanticValidationError):
"""Enhanced validation error with schema context"""
def __init__(self, original_error: PydanticValidationError, schema_key: str):
self.schema_key = schema_key
self.enhanced_errors = self._enhance_errors(original_error.errors(), schema_key)
super().__init__(self.enhanced_errors)
def _enhance_errors(self, errors: List[Dict[str, Any]], schema_key: str) -> List[Dict[str, Any]]:
"""Add schema-specific context to validation errors"""
enhanced = []
for error in errors:
enhanced_error = error.copy()
# Add schema context
enhanced_error["schema"] = schema_key
enhanced_error["domain"] = self._extract_domain(schema_key)
# Provide actionable fixes
if "required" in error["type"]:
enhanced_error["fix"] = f"Add missing required field: {error['loc'][-1]}"
elif "string_too_short" in error["type"]:
enhanced_error["fix"] = f"Field {error['loc'][-1]} must have at least {error['context']['min_length']} characters"
elif "value_error.email" in error["type"]:
enhanced_error["fix"] = "Provide a valid email address in format user@domain.com"
enhanced.append(enhanced_error)
return enhanced
def _extract_domain(self, schema_key: str) -> str:
"""Extract domain from schema key"""
return schema_key.split('.')[0] if '.' in schema_key else "unknown"
def safe_validate(schema_key: str, data: dict, context: Optional[Dict[str, Any]] = None) -> tuple[Optional[BaseModel], Optional[str]]:
"""Safely validate data against schema with enhanced error handling"""
try:
registry = SchemaRegistry()
schema = registry.get_schema(schema_key)
validated = schema.model_validate(data)
return validated, None
except SchemaNotFoundError as e:
return None, f"Schema not found: {e.message} {e.context}"
except SchemaVersionMismatch as e:
return None, f"Version mismatch: {e.message} {e.context}"
except PydanticValidationError as e:
enhanced_error = ValidationSchemaError(e, schema_key)
return None, f"Validation failed: {str(enhanced_error)}"
except Exception as e:
return None, f"Unexpected validation error: {str(e)}"Error Response Standardization
All validation errors follow a consistent format for easy consumption by developers and monitoring systems.
def format_validation_error(enhanced_error: ValidationSchemaError) -> Dict[str, Any]:
"""Format validation error for API responses and logging"""
return {
"error_type": "VALIDATION_ERROR",
"schema": enhanced_error.schema_key,
"domain": enhanced_error._extract_domain(enhanced_error.schema_key),
"message": "Data validation failed against schema requirements",
"details": [
{
"field": "/".join(map(str, error["loc"])),
"error": error["msg"],
"type": error["type"],
"value": error.get("input", "unknown"),
"fix": error.get("fix", "Review schema documentation")
}
for error in enhanced_error.enhanced_errors
],
"total_errors": len(enhanced_error.enhanced_errors),
"suggestion": "Validate your data against the schema documentation at /schemas/{domain}",
"timestamp": datetime.utcnow().isoformat()
}Future Enhancements
The schema registry is designed for long-term evolution and expansion. Planned enhancements include:
-
GraphQL Schema Generation: Automatic GraphQL schema definition language (SDL) generation from Pydantic models, enabling type-safe GraphQL APIs with introspection support.
-
Database Synchronization: Bi-directional synchronization between Pydantic models and database schemas using SQLAlchemy or Prisma, ensuring that database migrations stay in sync with application models.
-
Real-Time Schema Validation: WebSocket-based real-time validation for frontend forms and dynamic UIs, providing instant feedback during data entry.
-
Schema Diff and Migration Tools: Comprehensive schema comparison tools that generate detailed migration reports, including data transformation scripts and compatibility matrices.
-
Visual Schema Designer: A web-based UI for designing, editing, and visualizing schemas, allowing non-technical stakeholders to contribute to data model evolution.
-
Schema Governance Framework: Enterprise-grade schema governance with approval workflows, change tracking, and compliance reporting for regulated industries.
-
Runtime Schema Validation API: Public API endpoints for validating arbitrary JSON against registered schemas, enabling third-party integrations and data quality gates.
-
Schema Evolution Analytics: Analytics and reporting on schema usage patterns, helping teams understand which models need attention and optimization.
These enhancements will further solidify the schema registry as the cornerstone of AltSportsLeagues.ai's data architecture, enabling sophisticated data management while maintaining developer productivity and system reliability.
This comprehensive Data Layer & Schema Registry documentation provides the complete blueprint for AltSportsLeagues.ai's type-safe data foundation. From schema organization and validation patterns to multi-format generation and deployment strategies, this system ensures data integrity while accelerating development across the entire platform.