Architecture
Data Layer & Schema Registry

Data Layer & Schema Registry

Introduction

The Data Layer & Schema Registry serves as the foundational vertical for AltSportsLeagues.ai, providing a centralized, type-safe schema system comprising over 150 Pydantic v2 models organized across 7 key business domains. This architecture ensures data consistency, enables seamless multi-platform code generation, and delivers an exceptional developer experience through comprehensive IDE integration and compile-time type checking.

Core Design Principles:

  1. Schema-First Architecture: All data structures are rigorously defined as Pydantic models prior to any implementation, establishing a single source of truth.
  2. Type Safety Everywhere: Runtime type errors are eliminated through exhaustive compile-time validation using mypy and pyright.
  3. Single Source of Truth: One unified schema registry powers backend services, frontend applications, and database layers across the entire platform.
  4. Developer Experience Excellence: Intelligent IDE autocomplete, rich type hints, and actionable error messages accelerate development velocity.
  5. Performance Optimized: Leveraging Pydantic v2's revolutionary 17x faster validation engine for production-scale efficiency.

This data layer is not merely a collection of models but the architectural backbone that ensures data integrity, facilitates rapid iteration, and scales gracefully with the platform's growth. By prioritizing schema definition over ad-hoc data handling, we eliminate the common pitfalls of inconsistent data structures and enable sophisticated features like automatic API documentation, frontend type generation, and comprehensive testing.

Architecture Overview

System Context

The Data Layer & Schema Registry operates at the intersection of development workflows, ensuring that every team memberβ€”from backend engineers to frontend developers and data scientistsβ€”interacts with a unified, validated data model.

This context diagram illustrates how the schema registry permeates every layer of the application stack, from code generation to runtime validation. The system's design ensures that data flows reliably through the entire architecture while maintaining developer productivity.

Container-Level Design

At the implementation level, the registry comprises several interconnected components that handle schema management, generation, validation, and testing.

This container diagram reveals the modular nature of the registry, where each component has a specific responsibility while maintaining loose coupling through well-defined interfaces.

Domain Organization

The schema registry is meticulously organized into 7 business domains, each containing related models that capture the specific data requirements of that domain. This organization facilitates discoverability, maintainability, and team ownership.

Domain Structure

data_layer/schemas/
β”œβ”€β”€ __init__.py                 # Registry entry points
β”œβ”€β”€ leagues/                    # 25+ models - League operations and intelligence
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ questionnaire.py        # League discovery and scoring
β”‚   β”œβ”€β”€ classification.py       # League tier and market classification
β”‚   β”œβ”€β”€ scoring.py              # Partnership and revenue scoring
β”‚   β”œβ”€β”€ metadata.py             # League metadata and versioning
β”‚   └── relationships.py        # League foreign key relationships
β”‚
β”œβ”€β”€ betting_systems/            # 30+ models - Betting markets and odds integration
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ odds.py                 # Odds data structures and validation
β”‚   β”œβ”€β”€ markets.py              # Betting market types and configurations
β”‚   β”œβ”€β”€ sportsbooks.py          # Sportsbook integration models
β”‚   β”œβ”€β”€ translations.py         # Multi-language betting terms
β”‚   └── risk_assessment.py      # Betting risk and limit models
β”‚
β”œβ”€β”€ email_assistant/            # 20+ models - Email intelligence and automation
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ threads.py              # Email thread structures
β”‚   β”œβ”€β”€ classification.py       # Email intent and priority classification
β”‚   β”œβ”€β”€ templates.py            # Response template management
β”‚   β”œβ”€β”€ responses.py            # Generated email responses
β”‚   └── attachments.py          # Email attachment handling
β”‚
β”œβ”€β”€ infrastructure/             # 15+ models - System health and operations
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ health.py               # Health check and monitoring models
β”‚   β”œβ”€β”€ deployment.py           # Deployment configuration schemas
β”‚   β”œβ”€β”€ monitoring.py           # Metrics and alerting structures
β”‚   β”œβ”€β”€ logging.py              # Structured log definitions
β”‚   └── configuration.py        # System configuration models
β”‚
β”œβ”€β”€ saas/                       # 25+ models - SaaS platform and billing
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ users.py                # User profiles and authentication
β”‚   β”œβ”€β”€ subscriptions.py        # Subscription plans and billing
β”‚   β”œβ”€β”€ api_keys.py             # API key management and scopes
β”‚   β”œβ”€β”€ quotas.py               # Usage quotas and limits
β”‚   └── billing.py              # Invoicing and payment models
β”‚
β”œβ”€β”€ sports/                     # 30+ models - Core sports data entities
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ events.py               # Matches, games, and competitions
β”‚   β”œβ”€β”€ teams.py                # Team rosters and statistics
β”‚   β”œβ”€β”€ players.py              # Player profiles and performance
β”‚   β”œβ”€β”€ statistics.py           # Game and season statistics
β”‚   β”œβ”€β”€ combat.py               # MMA/Boxing specific models
β”‚   └── venues.py               # Stadium and venue information
β”‚
└── users/                      # 15+ models - User experience and preferences
    β”œβ”€β”€ __init__.py
    β”œβ”€β”€ profiles.py             # User profile and demographics
    β”œβ”€β”€ preferences.py          # User interface and notification preferences
    β”œβ”€β”€ auth.py                 # Authentication tokens and sessions
    β”œβ”€β”€ sessions.py             # User session management
    └── analytics.py            # User behavior and analytics

Each domain follows consistent naming conventions, documentation standards, and testing patterns, ensuring that developers can quickly navigate and extend the schema system.

Domain-Specific Model Counts

  • Leagues Domain: 28 models (league discovery, scoring, metadata)
  • Betting Systems: 32 models (odds, markets, risk assessment)
  • Email Assistant: 22 models (threads, classification, automation)
  • Infrastructure: 18 models (health, deployment, monitoring)
  • SaaS Platform: 27 models (users, billing, quotas)
  • Sports Data: 35 models (events, teams, statistics)
  • Users: 18 models (profiles, preferences, analytics)

Total: 180 models (exceeding the initial 150 target for comprehensive coverage)

Pydantic Model Patterns

The registry employs sophisticated Pydantic v2 patterns to maximize type safety, performance, and developer experience. Every model follows established conventions for validation, serialization, and documentation.

Base Model Pattern

All schemas inherit from a BaseSchema that provides common configuration and audit fields.

from pydantic import BaseModel, Field, ConfigDict, field_validator, model_validator
from typing import Annotated, Optional
from datetime import datetime
from enum import Enum
from uuid import UUID
 
class BaseSchema(BaseModel):
    """Base schema class with enterprise configuration"""
    
    model_config = ConfigDict(
        # Pydantic v2 core configuration
        validate_assignment=True,      # Validate on assignment
        strict=False,                  # Allow coercion where safe
        use_enum_values=False,         # Serialize enums as strings
        populate_by_name=True,         # Allow field population by alias
        arbitrary_types_allowed=True,  # Support custom types
        
        # JSON serialization
        json_schema_extra={
            "examples": [],            # Auto-populated by factories
            "x-domain": "altsportsleagues",  # Custom metadata
            "x-generated": True        # Mark auto-generated fields
        },
        
        # Validation and performance
        validate_default=True,         # Validate default values
        extra="forbid",                # Prevent unknown fields
        protected_namespaces=(),       # No protected namespaces
    )
    
    # Universal audit fields
    id: Annotated[UUID, Field(default_factory=uuid4, description="Unique identifier")]
    created_at: Annotated[
        datetime, 
        Field(default_factory=datetime.utcnow, description="Record creation timestamp")
    ]
    updated_at: Annotated[
        datetime, 
        Field(default_factory=datetime.utcnow, description="Last update timestamp")
    ]
    version: Annotated[
        int, 
        Field(default=1, ge=1, description="Schema version number")
    ]
    metadata: Annotated[
        dict, 
        Field(default_factory=dict, description="Additional metadata")
    ] = {}
    
    # Audit trail
    created_by: Annotated[Optional[str], Field(None, description="Creator user ID")]
    updated_by: Annotated[Optional[str], Field(None, description="Updater user ID")]
 
    @model_validator(mode='before')
    @classmethod
    def set_timestamps(cls, data):
        """Auto-set timestamps on creation"""
        if isinstance(data, dict):
            now = datetime.utcnow()
            if 'created_at' not in data:
                data['created_at'] = now
            if 'updated_at' not in data:
                data['updated_at'] = now
        return data
 
    def update_timestamp(self):
        """Update timestamp for modifications"""
        self.updated_at = datetime.utcnow()
        return self

Enum Patterns

Enums provide type-safe categorical data with rich metadata.

from enum import Enum
from pydantic import Field
 
class SportBucket(str, Enum):
    """Standard sport classification for market analysis"""
    
    COMBAT = "combat"
    """Combat sports (MMA, Boxing, Wrestling) - High engagement, premium partnerships"""
    
    LARGE_FIELD = "large_field"
    """Large field sports (Soccer, American Football, Rugby) - Global reach, sponsorship"""
    
    TEAM = "team"
    """Team sports (Basketball, Baseball, Hockey) - Consistent engagement, betting"""
    
    RACING = "racing"
    """Racing sports (Horse, Auto, Cycling) - High frequency events, gambling"""
    
    OTHER = "other"
    """Miscellaneous sports (Golf, Tennis, Esports) - Niche but valuable"""
    
    @classmethod
    def get_description(cls, value: 'SportBucket') -> str:
        """Get human-readable description"""
        descriptions = {
            cls.COMBAT: "Combat sports with high engagement and premium partnerships",
            cls.LARGE_FIELD: "Large field sports with global reach and sponsorship opportunities",
            cls.TEAM: "Team sports with consistent fan engagement and betting markets",
            cls.RACING: "Racing sports with high-frequency events and gambling focus",
            cls.OTHER: "Miscellaneous sports including niche but valuable markets"
        }
        return descriptions.get(value, "Unknown sport bucket")
 
    def get_market_characteristics(self) -> dict:
        """Return market analysis characteristics"""
        characteristics = {
            self.COMBAT: {
                "engagement": "high",
                "partnership_premium": "premium",
                "betting_volume": "medium_high",
                "global_reach": "medium"
            },
            self.LARGE_FIELD: {
                "engagement": "very_high",
                "partnership_premium": "high",
                "betting_volume": "high",
                "global_reach": "very_high"
            },
            # ... other cases
        }
        return characteristics.get(self, {"engagement": "medium"})

Advanced Validation Patterns

Complex models leverage Pydantic's powerful validation capabilities.

from pydantic import BaseModel, Field, field_validator, model_validator
from typing import List, Dict, Any
from enum import Enum
import re
from datetime import date
 
class LeagueTier(str, Enum):
    """League competitive tier classification"""
    TIER_1 = "tier_1"  # Premier leagues (Premier League, La Liga)
    TIER_2 = "tier_2"  # Strong national leagues
    TIER_3 = "tier_3"  # Regional/developmental leagues
    TIER_4 = "tier_4"  # Local/amateur leagues
 
class LeagueQuestionnaire(BaseModel):
    """Comprehensive league questionnaire for partnership analysis"""
    
    # Core identification
    league_name: Annotated[
        str, 
        Field(
            min_length=3, 
            max_length=200,
            pattern=r'^[A-Za-z\s\.\,\-\'\(\)]+$',
            description="Official league name (3-200 characters, letters and basic punctuation)"
        )
    ]
    
    # Sport classification
    sport_bucket: SportBucket = Field(..., description="Primary sport category")
    primary_sport: str = Field(..., description="Specific sport name")
    
    # Contact information
    contact_email: Annotated[
        str, 
        Field(
            pattern=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
            description="Valid contact email address"
        )
    ]
    contact_name: Optional[str] = Field(None, description="Primary contact person")
    contact_phone: Optional[str] = Field(
        None, 
        pattern=r'^\+?[\d\s\-\(\)]{10,15}$',
        description="Contact phone number (international format preferred)"
    )
    
    # League details
    league_tier: LeagueTier = Field(..., description="Competitive tier classification")
    founded_year: Annotated[
        Optional[int], 
        Field(None, ge=1800, le=date.today().year + 1)
    ] = None
    member_count: Annotated[
        int, 
        Field(1, ge=1, le=10000, description="Number of member teams/players")
    ]
    season_structure: str = Field(
        ...,
        description="Season format (e.g., 'annual', 'semester', 'continuous')"
    )
    event_frequency: str = Field(
        ...,
        description="Event frequency (e.g., 'weekly', 'monthly', 'seasonal')"
    )
    
    # Geographic scope
    primary_location: str = Field(..., description="Primary geographic location")
    global_reach: bool = Field(
        False, 
        description="Does the league have international audience/participation?"
    )
    target_markets: List[str] = Field(
        default_factory=list,
        description="Target geographic markets (e.g., 'USA', 'Europe', 'Global')"
    )
    
    # Business model
    revenue_model: str = Field(
        ...,
        description="Primary revenue sources (e.g., 'sponsorships', 'broadcasting', 'tickets')"
    )
    current_partners: int = Field(
        0, 
        ge=0, 
        le=100,
        description="Number of current corporate partners/sponsors"
    )
    partnership_focus: List[str] = Field(
        default_factory=list,
        min_items=1,
        max_items=5,
        description="Preferred partnership categories"
    )
    
    # Digital presence
    website_url: Annotated[
        Optional[str], 
        Field(
            None,
            pattern=r'^https?://[^\s/$.?#].[^\s]*$',
            description="Official website URL (must be HTTPS)"
        )
    ] = None
    social_media_followers: Annotated[
        int, 
        Field(0, ge=0, le=100000000)
    ] = 0
    digital_engagement: str = Field(
        "low",
        description="Digital engagement level (low/medium/high/very_high)"
    )
    
    # Technical requirements
    data_integration: bool = Field(
        False, 
        description="Does the league provide API/data feeds?"
    )
    real_time_scoring: bool = Field(
        False, 
        description="Does the league provide real-time scoring data?"
    )
    historical_data: bool = Field(
        False, 
        description="Does the league provide historical performance data?"
    )
    
    # Additional metadata
    notes: Annotated[
        Optional[str], 
        Field(None, max_length=1000, description="Additional notes or context")
    ] = None
    tags: List[str] = Field(
        default_factory=list,
        max_items=20,
        description="Tags for categorization and search"
    )
    
    # Validation and scoring
    _validation_score: Annotated[float, Field(default=0.0, ge=0, le=1)] = 0.0
    _partnership_potential: Annotated[float, Field(default=0.0, ge=0, le=1)] = 0.0
    _market_maturity: Annotated[float, Field(default=0.0, ge=0, le=1)] = 0.0
 
    @field_validator('league_name')
    @classmethod
    def validate_league_name(cls, v: str) -> str:
        """Validate league name format and content"""
        if not v or len(v.strip()) < 3:
            raise ValueError('League name must be at least 3 characters')
        if not re.match(r'^[A-Za-z\s\.\,\-\'\(\)]+$', v):
            raise ValueError('League name contains invalid characters')
        return v.strip().title()
 
    @field_validator('contact_email')
    @classmethod
    def validate_email(cls, v: str) -> str:
        """Strict email validation"""
        if '@' not in v or '.' not in v.split('@')[-1]:
            raise ValueError('Invalid email format')
        return v.lower()
 
    @field_validator('primary_location')
    @classmethod
    def validate_location(cls, v: str) -> str:
        """Validate location format"""
        if len(v) < 2 or len(v) > 100:
            raise ValueError('Location must be 2-100 characters')
        return v.strip()
 
    @model_validator(mode='after')
    def calculate_scores(self) -> 'LeagueQuestionnaire':
        """Calculate automated scoring metrics"""
        # Partnership potential scoring (simplified)
        potential_score = 0.0
        if self.global_reach:
            potential_score += 0.25
        if self.current_partners > 5:
            potential_score += 0.20
        if self.digital_engagement in ['high', 'very_high']:
            potential_score += 0.30
        if self.data_integration or self.real_time_scoring:
            potential_score += 0.25
        
        self._partnership_potential = min(1.0, potential_score)
        
        # Market maturity scoring
        maturity_score = 0.0
        if self.founded_year and (date.today().year - self.founded_year) > 10:
            maturity_score += 0.40
        if self.member_count > 50:
            maturity_score += 0.30
        if self.social_media_followers > 10000:
            maturity_score += 0.30
        
        self._market_maturity = min(1.0, maturity_score)
        
        return self
 
    def get_scoring_summary(self) -> dict:
        """Generate human-readable scoring summary"""
        return {
            "partnership_potential": f"{self._partnership_potential:.2f}",
            "market_maturity": f"{self._market_maturity:.2f}",
            "recommendation": self._get_recommendation(),
            "strengths": self._identify_strengths(),
            "improvements": self._suggest_improvements()
        }
 
    def _get_recommendation(self) -> str:
        """Generate partnership recommendation"""
        if self._partnership_potential > 0.7:
            return "High potential - immediate partnership outreach recommended"
        elif self._partnership_potential > 0.4:
            return "Moderate potential - targeted approach with digital enhancement"
        else:
        return "Low potential - focus on market development before partnerships"

Advanced Relationship Patterns

Models that represent relational data use sophisticated patterns for handling complex relationships.

from typing import ForwardRef, Optional
from uuid import UUID
 
# Forward declarations for mutual references
LeagueRef = ForwardRef('League')
PartnershipRef = ForwardRef('Partnership')
 
class League(BaseModel):
    """League model with comprehensive relationships"""
    
    id: Annotated[UUID, Field(default_factory=uuid4)]
    name: str
    sport_bucket: SportBucket
    
    # One-to-many relationships
    partnerships: Annotated[
        List['Partnership'], 
        Field(default_factory=list, description="Active partnerships")
    ] = []
    
    # Many-to-one relationships (Foreign Keys)
    primary_sport: Optional['Sport'] = None
    
    # Optional relationships with lazy loading flags
    _load_partnerships: Annotated[
        bool, 
        Field(default=False, exclude=True)
    ] = False
    _load_sport: Annotated[
        bool, 
        Field(default=False, exclude=True)
    ] = False
 
    @model_validator(mode='after')
    def resolve_relationships(self) -> 'League':
        """Resolve and validate relationships"""
        if self._load_partnerships:
            # Simulate relationship loading (in real app, this would be database query)
            self.partnerships = self._load_active_partnerships()
        if self._load_sport:
            self.primary_sport = self._get_primary_sport()
        return self
 
class Partnership(BaseModel):
    """Partnership model linking leagues and partners"""
    
    id: Annotated[UUID, Field(default_factory=uuid4)]
    league_id: UUID  # Foreign key to League
    partner_id: UUID  # Foreign key to Partner
    
    # Relationship fields
    league: Optional[LeagueRef] = None
    partner: Optional['Partner'] = None
    
    # Partnership specific fields
    partnership_type: str
    status: PartnershipStatus  # pending, active, expired
    value_estimate: float
    contract_duration: int  # months
    
    model_config = ConfigDict(
        # Enable relationship validation
        validate_assignment=True,
        # Custom JSON schema for relationships
        json_schema_extra={
            "x-relationships": {
                "league": "one",
                "partner": "many"
            }
        }
    )
 
# Partner model (simplified)
class Partner(BaseModel):
    """Partner organization model"""
    id: Annotated[UUID, Field(default_factory=uuid4)]
    name: str
    industry: str
    partnership_focus: List[str]
 
# Resolve forward references after all definitions
League.model_rebuild()
Partnership.model_rebuild()

Multi-Format Generation Pipeline

The registry includes a sophisticated pipeline for generating code and documentation in multiple formats from the canonical Pydantic models, ensuring consistency across the technology stack.

TypeScript Generation Pipeline

Automatic TypeScript generation ensures frontend developers receive type-safe interfaces that mirror the backend exactly.

from dataclasses import dataclass
from pathlib import Path
from jinja2 import Template
from typing import Type
import json
from pydantic import BaseModel
 
@dataclass
class TypeScriptGenerator:
    """Advanced TypeScript generator from Pydantic models"""
    
    template_dir: Path = Path("data_layer/templates/typescript")
    output_dir: Path = Path("frontend/types/generated")
    
    def generate_all(self) -> None:
        """Generate TypeScript for all registered schemas"""
        for domain in self.registry.domains:
            for model in self.registry.get_domain_schemas(domain):
                self.generate_interface(model)
    
    def generate_interface(self, model: Type[BaseModel]) -> str:
        """Generate TypeScript interface for a Pydantic model"""
        
        # Extract field information
        fields = []
        for field_name, field_info in model.model_fields.items():
            ts_type = self._map_python_to_typescript(field_info.annotation)
            optional = "?" if field_info.default is not ... and not field_info.is_required() else ""
            description = field_info.field_info.description or ""
            
            fields.append({
                "name": field_name,
                "type": ts_type,
                "optional": optional,
                "description": description
            })
        
        # Render template
        template = self.template_dir / "interface.jinja"
        rendered = Template(template.read_text()).render({
            "model_name": model.__name__,
            "fields": fields
        })
        
        # Write to file
        filepath = self.output_dir / f"{model.__name__}.ts"
        filepath.parent.mkdir(parents=True, exist_ok=True)
        filepath.write_text(rendered)
        
        return rendered
    
    def _map_python_to_typescript(self, python_type: type) -> str:
        """Map Python types to equivalent TypeScript types"""
        basic_mapping = {
            str: "string",
            int: "number",
            float: "number",
            bool: "boolean",
            datetime: "Date",
            date: "Date",
            UUID: "string",
            Optional[int]: "number | undefined",
            List[str]: "string[]",
            Dict[str, str]: "Record<string, string>"
        }
        
        # Handle unions and complex types
        if hasattr(python_type, "__origin__"):
            origin = python_type.__origin__
            args = python_type.__args__
            
            if origin is Optional:
                base_type = self._map_python_to_typescript(args[0])
                return f"{base_type} | undefined"
            elif origin is List:
                item_type = self._map_python_to_typescript(args[0])
                return f"{item_type}[]"
            elif origin is Dict:
                # Simplified - in production, handle more complex dicts
                return "Record<string, any>"
        
        return basic_mapping.get(python_type, "any")

JSON Schema Generation for Validation

JSON Schema generation enables integration with various validation tools and external systems.

from pydantic import BaseModel
from pydantic.json_schema import GenerateJsonSchema, model_json_schema
from typing import Dict, Any
from pathlib import Path
 
def generate_json_schema(model: Type[BaseModel], version: str = "1.0.0") -> Dict[str, Any]:
    """Generate comprehensive JSON Schema from Pydantic model"""
    
    # Generate base schema using Pydantic's JSON schema generation
    schema = model.model_json_schema(
        mode='validation',
        by_alias=True,
        ref_template='#/components/schemas/{model}',
        schema_generator=GenerateJsonSchema()
    )
    
    # Enhance with metadata
    schema['$schema'] = 'https://json-schema.org/draft/2020-12/schema'
    schema['$id'] = f'https://altsportsleagues.ai/schemas/{model.__name__}/v{version}'
    schema['title'] = model.__name__
    schema['description'] = getattr(model, '__doc__', '').strip()
    
    # Add domain and registry metadata
    schema['x-domain'] = _get_domain_from_model(model)
    schema['x-generated'] = True
    schema['x-pydantic-version'] = '2.5.0'
    schema['x-generation-date'] = datetime.utcnow().isoformat()
    
    # Add examples from model configuration
    if hasattr(model.model_config, 'json_schema_extra'):
        examples = model.model_config['json_schema_extra'].get('examples', [])
        if examples:
            schema['examples'] = examples
    
    return schema
 
def _get_domain_from_model(model: Type[BaseModel]) -> str:
    """Extract domain from model module path"""
    module_path = model.__module__
    if 'leagues' in module_path:
        return 'leagues'
    elif 'betting_systems' in module_path:
        return 'betting_systems'
    # ... other domains
    return 'core'
 
# Example usage
if __name__ == "__main__":
    schema = generate_json_schema(LeagueQuestionnaire, "1.0.0")
    
    # Write to file
    with open(f'schemas/json/{LeagueQuestionnaire.__name__}.json', 'w') as f:
        json.dump(schema, f, indent=2)
    
    print(f"Generated JSON Schema for {LeagueQuestionnaire.__name__}:")
    print(json.dumps(schema, indent=2)[:500] + "..." if len(str(schema)) > 500 else str(schema))

Schema Versioning Strategy

Robust versioning ensures that schema evolution doesn't break existing integrations while allowing the system to grow and improve over time.

Version Numbering Convention

Schemas follow semantic versioning with domain-specific prefixes for clear identification.

{domain}.{model}-v{major}.{minor}.{patch}

Examples:
- leagues.questionnaire-v1.0.0.json          # Initial release
- leagues.questionnaire-v1.1.0.json          # Added optional field
- leagues.questionnaire-v2.0.0.json          # Breaking change (removed required field)
- betting_systems.odds-v1.0.0.json           # Initial odds schema

Compatibility Rules

The versioning system strictly enforces semantic versioning principles:

  1. Major Version Changes (Breaking):

    • Removal of required fields
    • Changes to field types (int to str, etc.)
    • Renaming or reordering fields
    • Changes to enum values
    • Changes to validation rules that would cause previously valid data to fail
  2. Minor Version Changes (Backward Compatible):

    • Addition of optional fields (new fields with defaults or Optional types)
    • Addition of new enum values (never removing existing values)
    • Relaxation of validation constraints
    • Improvements to documentation or metadata
    • Performance optimizations that don't affect data structure
  3. Patch Version Changes (Non-Breaking Fixes):

    • Bug fixes in validation logic
    • Updates to documentation strings
    • Internal performance improvements
    • Clarifications or corrections to field descriptions

Migration Example

When evolving schemas, provide clear migration paths.

from pydantic import BaseModel, Field, validator
from typing import Optional
from datetime import datetime
from enum import Enum
 
# Version 1.0.0 - Initial release
class LeagueQuestionnaireV1(BaseModel):
    """Version 1.0.0 - Initial league questionnaire"""
    league_name: str
    contact_email: str
    created_at: datetime
    version: int = 1
 
# Version 1.1.0 - Added sport classification (minor, backward compatible)
class LeagueQuestionnaireV1_1(LeagueQuestionnaireV1):
    """Version 1.1.0 - Added optional sport bucket"""
    sport_bucket: Optional[str] = None  # New optional field
 
# Version 2.0.0 - Breaking change - Sport bucket now required (major)
class LeagueQuestionnaireV2(BaseModel):
    """Version 2.0.0 - Enhanced with required sport classification"""
    league_name: str
    contact_email: str
    sport_bucket: str  # Now required - breaking change
    created_at: datetime
    version: int = 2
 
    @classmethod
    def from_v1(cls, v1_data: LeagueQuestionnaireV1) -> 'LeagueQuestionnaireV2':
        """Migrate from v1 to v2 with default value"""
        return cls(
            league_name=v1_data.league_name,
            contact_email=v1_data.contact_email,
            sport_bucket="other",  # Default migration value
            created_at=v1_data.created_at,
            version=2
        )
 
    @classmethod
    def from_v1_1(cls, v1_1_data: LeagueQuestionnaireV1_1) -> 'LeagueQuestionnaireV2':
        """Migrate from v1.1 to v2 preserving existing value"""
        return cls(
            league_name=v1_1_data.league_name,
            contact_email=v1_1_data.contact_email,
            sport_bucket=v1_1_data.sport_bucket or "other",  # Preserve if set
            created_at=v1_1_data.created_at,
            version=2
        )
 
# Migration utility
def migrate_questionnaire(data: dict, target_version: str = "2.0.0") -> dict:
    """Universal migration utility"""
    if target_version.startswith("1.0"):
        # To v1.x
        return {
            "league_name": data.get("league_name"),
            "contact_email": data.get("contact_email"),
            "created_at": data.get("created_at"),
            "version": 1
        }
    elif target_version.startswith("2.0"):
        # To v2.x
        v1_data = migrate_questionnaire(data, "1.1.0")
        return LeagueQuestionnaireV2.from_v1_1(LeagueQuestionnaireV1_1(**v1_data)).model_dump()
    else:
        raise ValueError(f"Unsupported target version: {target_version}")

Schema Migration Tools

Automated tools facilitate safe schema evolution.

Migration Generator

from pathlib import Path
from typing import Dict, Any
import json
from datetime import datetime
 
class SchemaMigrator:
    """Automated schema migration tool"""
    
    def __init__(self, registry: SchemaRegistry):
        self.registry = registry
        self.migration_history = self.load_migration_history()
    
    def generate_migration_script(self, from_version: str, to_version: str, model_name: str) -> Dict[str, Any]:
        """Generate migration script for schema version upgrade"""
        
        from_schema = self.registry.get_schema_version(model_name, from_version)
        to_schema = self.registry.get_schema_version(model_name, to_version)
        
        # Analyze differences
        differences = self.analyze_schema_differences(from_schema, to_schema)
        
        # Generate migration steps
        migration_steps = self._create_migration_steps(differences)
        
        # Create script metadata
        script = {
            "migration_id": f"migrate_{model_name}_{from_version}_to_{to_version}",
            "from_version": from_version,
            "to_version": to_version,
            "model": model_name,
            "description": f"Migrate {model_name} from {from_version} to {to_version}",
            "timestamp": datetime.utcnow().isoformat(),
            "breaking_changes": self._is_breaking_migration(differences),
            "steps": migration_steps,
            "validation": self._generate_validation_steps(differences)
        }
        
        return script
    
    def _analyze_schema_differences(self, from_schema: Type[BaseModel], to_schema: Type[BaseModel]) -> Dict[str, Any]:
        """Analyze differences between schema versions"""
        from_fields = from_schema.model_fields
        to_fields = to_schema.model_fields
        
        differences = {
            "removed_fields": [],
            "added_fields": [],
            "type_changes": [],
            "constraint_changes": []
        }
        
        # Check for removed fields (breaking)
        for field_name in from_fields:
            if field_name not in to_fields:
                differences["removed_fields"].append(field_name)
        
        # Check for added fields
        for field_name in to_fields:
            if field_name not in from_fields:
                differences["added_fields"].append({
                    "name": field_name,
                    "type": to_fields[field_name].annotation
                })
        
        # Check for type changes
        for field_name in set(from_fields) & set(to_fields):
            from_type = from_fields[field_name].annotation
            to_type = to_fields[field_name].annotation
            if from_type != to_type:
                differences["type_changes"].append({
                    "field": field_name,
                    "from": from_type.__name__,
                    "to": to_type.__name__
                })
        
        return differences

Test Fixture Generation

Comprehensive test fixtures ensure reliable testing across all schemas.

Factory Pattern Implementation

Factories provide realistic, parameterized test data generation.

import factory
from factory import fuzzy
from faker import Faker
from datetime import datetime, timedelta
from uuid import uuid4
from typing import Optional
import random
 
fake = Faker()
random.seed(42)  # Consistent test data
 
class BaseFactory(factory.Factory):
    """Base factory with common patterns"""
    
    id = factory.LazyFunction(uuid4)
    created_at = factory.LazyFunction(lambda: datetime.utcnow() - timedelta(days=random.randint(0, 365)))
    updated_at = factory.LazyFunction(datetime.utcnow)
    version = 1
 
class SportBucketFactory(factory.Factory):
    """Factory for SportBucket enum"""
    
    class Meta:
        abstract = True  # Abstract base class
    
    _choices = [e.value for e in SportBucket]
    sport_bucket = factory.Iterator(_choices)
 
class LeagueQuestionnaireFactory(BaseFactory):
    """Factory for LeagueQuestionnaire model"""
    
    class Meta:
        model = LeagueQuestionnaire
    
    league_name = factory.LazyFunction(
        lambda: f"{fake.company()} {random.choice(['Premier', 'Professional', 'Elite']) } League"
    )
    
    sport_bucket = factory.SubFactory(SportBucketFactory)
    
    contact_email = factory.LazyFunction(
        lambda: fake.email(domain="league.com")
    )
    
    contact_name = factory.LazyFunction(fake.name)
    
    contact_phone = factory.LazyFunction(
        lambda: f"+1-{random.randint(200,999)}-{random.randint(100,999)}-{random.randint(1000,9999)}"
    )
    
    league_tier = factory.Iterator([e.value for e in LeagueTier])
    
    founded_year = factory.LazyFunction(
        lambda: random.randint(1900, datetime.now().year - 1)
    )
    
    member_count = fuzzy.FuzzyInteger(10, 1000)
    
    season_structure = factory.Iterator(["annual", "semester", "continuous", "tournament"])
    
    event_frequency = factory.Iterator(["weekly", "monthly", "seasonal", "event-based"])
    
    primary_location = factory.LazyFunction(fake.city)
    
    global_reach = factory.Boolean(0.3)  # 30% chance of global reach
    
    target_markets = factory.List(
        factory.LazyFunction(fake.country_code),
        size=fuzzy.FuzzyInteger(1, 5)
    )
    
    revenue_model = factory.LazyFunction(
        lambda: random.choice(["sponsorships", "broadcasting", "tickets", "merchandise", "digital"])
    )
    
    current_partners = fuzzy.FuzzyInteger(0, 50)
    
    partnership_focus = factory.List(
        factory.LazyFunction(fake.word),
        size=fuzzy.FuzzyInteger(1, 4)
    )
    
    website_url = factory.LazyFunction(
        lambda: f"https://{fake.domain_name()}"
    )
    
    social_media_followers = fuzzy.FuzzyInteger(100, 500000)
    
    digital_engagement = factory.Iterator(["low", "medium", "high", "very_high"])
    
    data_integration = factory.Boolean(0.6)
    
    real_time_scoring = factory.Boolean(0.4)
    
    historical_data = factory.Boolean(0.7)
    
    notes = factory.LazyFunction(
        lambda: fake.paragraph(nb_sentences=random.randint(1, 3))
    )
    
    tags = factory.List(
        factory.LazyFunction(fake.word),
        size=fuzzy.FuzzyInteger(0, 5)
    )
    
    @factory.post_generation
    def post_generation(self, create: bool, extracted: Optional[dict], **kwargs):
        """Post-generation processing"""
        if create:
            # Simulate relationship loading
            self._scoring_profile = self._compute_score()
        return self
 
# Usage patterns
def create_test_league():
    """Create a single test league"""
    return LeagueQuestionnaireFactory.build()
 
def create_league_batch(size: int = 10):
    """Create batch of test leagues"""
    return LeagueQuestionnaireFactory.build_batch(size)
 
def create_specific_scenario():
    """Create league for specific test scenario"""
    return LeagueQuestionnaireFactory(
        league_name="Test Premier League",
        sport_bucket=SportBucket.TEAM,
        league_tier=LeagueTier.TIER_1,
        member_count=20,
        global_reach=True
    )

Seed Data Management

Seed data provides consistent, realistic examples for development and testing.

# fixtures/seeds/leagues_seed.py
from datetime import datetime
from typing import List
from data_layer.schemas.leagues import LeagueQuestionnaire
from faker import Faker
 
fake = Faker()
 
SEED_LEAGUES = [
    {
        "league_name": "Power Slap League",
        "sport_bucket": "combat",
        "contact_email": "partnerships@powerslap.com",
        "league_tier": "tier_4",
        "founded_year": 2022,
        "member_count": 12,
        "season_structure": "annual",
        "event_frequency": "monthly",
        "primary_location": "Las Vegas, NV",
        "global_reach": True,
        "target_markets": ["USA", "Europe"],
        "revenue_model": "sponsorships",
        "current_partners": 8,
        "partnership_focus": ["technology", "beverage", "apparel"],
        "website_url": "https://powerslap.com",
        "social_media_followers": 250000,
        "digital_engagement": "high",
        "data_integration": True,
        "real_time_scoring": True,
        "historical_data": False,
        "notes": "High-growth combat sports league with strong digital presence",
        "tags": ["combat", "emerging", "digital-first"]
    },
    {
        "league_name": "Canadian Premier League",
        "sport_bucket": "team",
        "contact_email": "business@canpl.ca",
        "league_tier": "tier_2",
        "founded_year": 2017,
        "member_count": 8,
        "season_structure": "annual",
        "event_frequency": "weekly",
        "primary_location": "Canada",
        "global_reach": False,
        "target_markets": ["Canada", "USA"],
        "revenue_model": "broadcasting",
        "current_partners": 15,
        "partnership_focus": ["financial", "automotive", "technology"],
        "website_url": "https://canpl.ca",
        "social_media_followers": 75000,
        "digital_engagement": "medium",
        "data_integration": True,
        "real_time_scoring": True,
        "historical_data": True,
        "notes": "Growing professional soccer league with strong Canadian market",
        "tags": ["soccer", "north_america", "professional"]
    },
    {
        "league_name": "Elite Youth Basketball Association",
        "sport_bucket": "team",
        "contact_email": "info@eyba.org",
        "league_tier": "tier_3",
        "founded_year": 2015,
        "member_count": 45,
        "season_structure": "semester",
        "event_frequency": "weekly",
        "primary_location": "Midwest USA",
        "global_reach": False,
        "target_markets": ["USA"],
        "revenue_model": "tickets",
        "current_partners": 3,
        "partnership_focus": ["local_business", "education", "sports_gear"],
        "website_url": "https://eyba.org",
        "social_media_followers": 12000,
        "digital_engagement": "low",
        "data_integration": False,
        "real_time_scoring": False,
        "historical_data": False,
        "notes": "Youth development league focused on player pathways to professional basketball",
        "tags": ["basketball", "youth", "development"]
    }
]
 
def load_seed_leagues() -> List[LeagueQuestionnaire]:
    """Load predefined seed leagues"""
    leagues = []
    for seed_data in SEED_LEAGUES:
        questionnaire = LeagueQuestionnaire(**seed_data)
        # Add computed fields
        questionnaire._scoring_profile = questionnaire._compute_score()
        leagues.append(questionnaire)
    return leagues
 
def get_domain_seed_data(domain: str) -> List[BaseModel]:
    """Get seed data for specific domain"""
    if domain == "leagues":
        return load_seed_leagues()
    elif domain == "betting_systems":
        return load_betting_seed_data()
    # ... other domains
    return []

Schema Registry API

The runtime API provides dynamic schema access and validation capabilities.

Core Registry Implementation

from typing import Dict, Type, Optional, List
from collections import defaultdict
from pathlib import Path
import importlib
import json
from datetime import datetime
from pydantic import BaseModel, ValidationError
 
class SchemaRegistry:
    """Enterprise-grade schema registry with dynamic loading and validation"""
    
    def __init__(self, schema_path: Path = Path("data_layer/schemas")):
        self._schemas: Dict[str, Type[BaseModel]] = {}
        self._domains: Dict[str, List[Type[BaseModel]]] = defaultdict(list)
        self._versions: Dict[str, Dict[str, Type[BaseModel]]] = defaultdict(dict)
        self._schema_path = schema_path
        self._loaded_domains = set()
        
        # Cache for performance
        self._validation_cache = {}
        self._schema_metadata = {}
    
    def load_domain(self, domain: str) -> None:
        """Load all schemas from a specific domain"""
        if domain in self._loaded_domains:
            return
            
        domain_path = self._schema_path / domain
        if not domain_path.exists():
            raise ValueError(f"Domain path not found: {domain_path}")
        
        # Import domain module
        try:
            domain_module = importlib.import_module(f"data_layer.schemas.{domain}")
            
            # Register all models in the domain
            for attr_name in dir(domain_module):
                attr = getattr(domain_module, attr_name)
                if isinstance(attr, type) and issubclass(attr, BaseModel) and attr is not BaseModel:
                    self.register(attr, domain)
                    
        except ImportError as e:
            raise ImportError(f"Failed to load domain {domain}: {e}")
        
        self._loaded_domains.add(domain)
    
    def register(self, model: Type[BaseModel], domain: str, version: str = "1.0.0") -> None:
        """Register a schema with domain and version"""
        
        # Generate unique key
        key = f"{domain}.{model.__name__}"
        full_key = f"{key}-v{version}"
        
        # Validate model
        if not issubclass(model, BaseModel):
            raise TypeError(f"{model.__name__} is not a Pydantic BaseModel")
        
        # Store in registries
        if key in self._schemas:
            if version not in self._versions[key]:
                self._versions[key][version] = model
            else:
                raise ValueError(f"Version {version} already exists for {key}")
        else:
            self._schemas[key] = model
            self._domains[domain].append(model)
            self._versions[key][version] = model
        
        # Extract and store metadata
        metadata = self._extract_model_metadata(model)
        self._schema_metadata[full_key] = metadata
    
    def get_schema(self, key: str, version: Optional[str] = None) -> Type[BaseModel]:
        """Retrieve schema by key and optional version"""
        
        if '.' not in key:
            raise ValueError("Schema key must be in 'domain.model' format")
        
        domain, model_name = key.split('.', 1)
        
        if version:
            full_key = f"{domain}.{model_name}-v{version}"
            if full_key in self._schema_metadata:
                return self._versions[f"{domain}.{model_name}"][version]
            else:
                raise ValueError(f"Schema {full_key} not found")
        else:
            # Return latest version
            if f"{domain}.{model_name}" in self._schemas:
                latest_version = max(self._versions[f"{domain}.{model_name}"].keys())
                return self._versions[f"{domain}.{model_name}"][latest_version]
            else:
                raise ValueError(f"Schema {domain}.{model_name} not found")
    
    def validate_data(self, schema_key: str, data: dict) -> BaseModel:
        """Validate data against named schema"""
        
        # Get schema
        schema = self.get_schema(schema_key)
        
        # Check cache first
        cache_key = f"validate_{schema_key}_{hash(str(data))}"
        if cache_key in self._validation_cache:
            cached_result, cached_data = self._validation_cache[cache_key]
            if cached_data == data:
                return cached_result
        
        try:
            validated = schema.model_validate(data)
            
            # Cache successful validation (for performance)
            self._validation_cache[cache_key] = (validated, data)
            
            return validated
            
        except ValidationError as e:
            # Cache validation errors too (prevents repeated failures)
            self._validation_cache[cache_key] = (None, data, str(e))
            raise
    
    def list_schemas(
        self, 
        domain: Optional[str] = None, 
        version: Optional[str] = None
    ) -> List[Dict[str, Any]]:
        """List schemas with filtering options"""
        
        if domain:
            if domain not in self._domains:
                return []
            schemas = self._domains[domain]
        else:
            schemas = []
            for domain_schemas in self._domains.values():
                schemas.extend(domain_schemas)
        
        # Filter by version if specified
        if version:
            filtered = []
            for schema in schemas:
                schema_key = self._get_schema_key(schema)
                if version in self._versions[schema_key]:
                    filtered.append({
                        "domain": self._get_domain_from_key(schema_key),
                        "name": schema.__name__,
                        "version": version,
                        "description": self._schema_metadata.get(f"{schema_key}-v{version}", {}).get("description", "")
                    })
            return filtered
        else:
            # Return all versions for each schema
            result = []
            for schema in schemas:
                schema_key = self._get_schema_key(schema)
                versions = list(self._versions[schema_key].keys())
                latest = max(versions)
                result.append({
                    "domain": self._get_domain_from_key(schema_key),
                    "name": schema.__name__,
                    "latest_version": latest,
                    "all_versions": versions,
                    "description": self._schema_metadata.get(f"{schema_key}-v{latest}", {}).get("description", "")
                })
            return result
    
    def generate_schema_catalog(self) -> Dict[str, Any]:
        """Generate complete schema catalog with all metadata"""
        
        catalog = {
            "registry_version": "1.0.0",
            "total_schemas": len(self._schemas),
            "domains": {},
            "schemas": {}
        }
        
        # Add domain information
        for domain, schemas in self._domains.items():
            catalog["domains"][domain] = {
                "schema_count": len(schemas),
                "models": [schema.__name__ for schema in self._domains[domain]]
            }
        
        # Add schema details
        for key, model in self._schemas.items():
            domain = self._get_domain_from_key(key)
            latest_version = max(self._versions[key].keys())
            metadata = self._schema_metadata.get(f"{key}-v{latest_version}", {})
            
            catalog["schemas"][key] = {
                "domain": domain,
                "latest_version": latest_version,
                "description": metadata.get("description", ""),
                "fields": len(model.model_fields),
                "relationships": self._count_relationships(model)
            }
        
        return catalog
    
    def _extract_model_metadata(self, model: Type[BaseModel]) -> Dict[str, Any]:
        """Extract comprehensive metadata from Pydantic model"""
        
        metadata = {
            "name": model.__name__,
            "module": model.__module__,
            "docstring": getattr(model, "__doc__", ""),
            "fields": {},
            "validators": [],
            "relationships": [],
            "example": self._generate_example(model)
        }
        
        # Extract field metadata
        for field_name, field_info in model.model_fields.items():
            field_meta = {
                "type": str(field_info.annotation),
                "required": field_info.is_required(),
                "default": field_info.default,
                "description": field_info.field_info.description or "",
                "validators": []
            }
            
            # Check for field validators
            if hasattr(model, f"__field_validator__{field_name}"):
                field_meta["validators"].append("custom_field_validator")
            
            metadata["fields"][field_name] = field_meta
        
        # Extract model validators
        if hasattr(model, "model_validator"):
            metadata["validators"].append("model_validator")
        
        # Extract relationships (simplified)
        for field_name, field_info in model.model_fields.items():
            if str(field_info.annotation).startswith("Optional[") or "List[" in str(field_info.annotation):
                annotation_str = str(field_info.annotation)
                if "League" in annotation_str or "Partner" in annotation_str:
                    metadata["relationships"].append({
                        "field": field_name,
                        "type": "relationship",
                        "target": annotation_str.split("'")[1]
                    })
        
        return metadata
    
    def _get_schema_key(self, model: Type[BaseModel]) -> str:
        """Generate schema key from model"""
        # Extract from module path or annotations
        module = model.__module__
        if 'leagues' in module:
            return "leagues." + model.__name__
        # ... other domains
        return f"unknown.{model.__name__}"
    
    def _get_domain_from_key(self, key: str) -> str:
        """Extract domain from schema key"""
        return key.split('.')[0]
    
    def _count_relationships(self, model: Type[BaseModel]) -> int:
        """Count foreign key relationships in model"""
        count = 0
        for field_info in model.model_fields.values():
            annotation = str(field_info.annotation)
            if any(rel in annotation for rel in ["League", "Partner", "User", "Team"]):
                count += 1
        return count
    
    def _generate_example(self, model: Type[BaseModel]) -> dict:
        """Generate example instance for model"""
        try:
            # Use Pydantic's built-in example generation
            return model.model_json_schema()['examples'][0] if model.model_json_schema().get('examples') else {}
        except:
            return {"example": "Generated by registry"}

Validation Engine

The validation engine provides both runtime and compile-time validation capabilities.

from pydantic import BaseModel, ValidationError
from typing import Dict, Any, Optional
import json
from functools import lru_cache
 
class ValidationEngine:
    """Advanced validation engine with caching and error handling"""
    
    def __init__(self, registry: SchemaRegistry):
        self.registry = registry
        self._validators = {}
        self._error_templates = {}
    
    @lru_cache(maxsize=128)
    def get_validator(self, schema_key: str) -> callable:
        """Get cached validator for schema"""
        schema = self.registry.get_schema(schema_key)
        return schema.model_validate
    
    def validate_with_context(self, schema_key: str, data: dict, context: Dict[str, Any]) -> tuple[Optional[BaseModel], Optional[str]]:
        """Validate data with additional business context"""
        
        try:
            # Get validator
            validator = self.get_validator(schema_key)
            
            # Apply business context validation
            context_validated = self._apply_context_validators(data, context)
            
            # Validate against schema
            validated = validator(context_validated)
            
            # Post-validation business rules
            final = self._post_validation_checks(validated, context)
            
            return final, None
            
        except ValidationError as e:
            # Enhance error with context
            enhanced_error = self._enhance_validation_error(e, schema_key, context)
            return None, enhanced_error
    
    def _apply_context_validators(self, data: dict, context: Dict[str, Any]) -> dict:
        """Apply business-specific context validators"""
        
        validated_data = data.copy()
        
        # Example: Validate league data against current season
        if "leagues" in context.get("domain", ""):
            if "founded_year" in data:
                current_year = context.get("current_year", datetime.now().year)
                if data["founded_year"] > current_year:
                    raise ValueError(f"League founded_year {data['founded_year']} cannot be in future")
        
        # Example: Validate betting odds for realism
        if "betting_systems" in context.get("domain", ""):
            if "odds" in data:
                if data["odds"] <= 1.0 or data["odds"] >= 100.0:
                    raise ValueError("Odds must be between 1.0 and 100.0 for realistic betting markets")
        
        return validated_data
    
    def _post_validation_checks(self, model: BaseModel, context: Dict[str, Any]) -> BaseModel:
        """Perform post-validation business rules"""
        
        # Example: Auto-calculate derived fields
        if hasattr(model, "calculate_scores"):
            model = model.calculate_scores()
        
        # Example: Enforce business invariants
        if hasattr(model, "_validate_business_invariants"):
            model._validate_business_invariants()
        
        return model
    
    def _enhance_validation_error(self, error: ValidationError, schema_key: str, context: Dict[str, Any]) -> str:
        """Enhance validation errors with actionable business advice"""
        
        enhanced_messages = []
        
        for err in error.errors():
            message = err["msg"]
            
            # Add business context to errors
            if "email" in err["loc"][0].lower():
                message += " Please ensure the email follows standard business format."
            
            if "phone" in err["loc"][0].lower():
                message += " Business phone numbers should include country code for international compatibility."
            
            if "url" in err["loc"][0].lower():
                message += " URLs must be valid and preferably use HTTPS for security."
            
            enhanced_messages.append(f"{err['loc']}: {message}")
        
        # Add general business advice
        enhanced_messages.append("\nBusiness Context Notes:")
        enhanced_messages.append("- Ensure all contact information is accurate and professional")
        enhanced_messages.append("- League names should reflect official branding")
        enhanced_messages.append("- Geographic locations should be specific for accurate market analysis")
        
        return "\n".join(enhanced_messages)

Integration with IDEs and Editors

The schema registry is designed with modern development tools in mind, providing seamless integration with popular IDEs and editors.

VSCode Configuration

Comprehensive VSCode settings ensure optimal type checking and autocomplete.

// .vscode/settings.json
{
    "python.defaultInterpreterPath": "./.venv/bin/python",
    "python.analysis.typeCheckingMode": "strict",
    "python.analysis.extraPaths": [
        "${workspaceFolder}/data_layer/schemas"
    ],
    "python.analysis.diagnosticMode": "workspace",
    "python.analysis.diagnosticSeverityOverrides": {
        "reportMissingTypeStubs": "none",
        "reportUnknownParameterType": "warning",
        "reportUnknownArgumentType": "warning"
    },
    "python.linting.mypyEnabled": true,
    "python.linting.mypyArgs": [
        "--strict",
        "--warn-redundant-casts",
        "--warn-unused-ignores",
        "--warn-unreachable",
        "--namespace-packages",
        "--show-error-codes",
        "--no-implicit-reexport"
    ],
    "python.testing.pytestEnabled": true,
    "python.testing.unittestEnabled": false,
    "python.testing.pytestArgs": [
        "tests",
        "-v",
        "--cov=data_layer",
        "--cov-report=html",
        "--cov-report=term-missing"
    ],
    "python.formatting.provider": "black",
    "editor.formatOnSave": true,
    "editor.codeActionsOnSave": {
        "source.organizeImports": true
    },
    "files.associations": {
        "*.mdx": "markdown"
    }
}

PyCharm Configuration

For JetBrains users, the following settings optimize the experience.

// .idea/python-interpreter.json (or through UI)
{
    "pythonProjects": {
        "interpreterPath": "./.venv/bin/python",
        "packages": {
            "pydantic": ">=2.5.0"
        }
    },
    "typeCheckers": {
        "mypy": {
            "enabled": true,
            "arguments": [
                "--strict",
                "--show-error-codes",
                "data_layer"
            ]
        },
        "pyright": {
            "enabled": true
        }
    }
}

Type Stub Generation

Generated stubs enhance IDE performance for large schema sets.

# scripts/generate_stubs.py
from pathlib import Path
from typing import get_origin, get_args
import inspect
 
def generate_all_stubs(output_dir: Path = Path("data_layer/stubs")):
    """Generate .pyi stub files for all schemas"""
    
    output_dir.mkdir(parents=True, exist_ok=True)
    
    for domain in registry._domains:
        domain_dir = output_dir / domain
        domain_dir.mkdir(parents=True, exist_ok=True)
        
        for model in registry._domains[domain]:
            stub_content = generate_model_stub(model)
            
            stub_path = domain_dir / f"{model.__name__}.pyi"
            stub_path.write_text(stub_content)
    
    print(f"Generated {len(list(output_dir.rglob('*.pyi')))} stub files")
 
def generate_model_stub(model: Type[BaseModel]) -> str:
    """Generate .pyi stub for a single model"""
    
    lines = [
        '"""Auto-generated type stub for {model.__name__}"""',
        'from typing import Optional, List, Dict, Any',
        'from datetime import datetime',
        f'class {model.__name__}:'
    ]
    
    # Generate fields
    for field_name, field_info in model.model_fields.items():
        annotation = field_info.annotation
        if get_origin(annotation) is Optional:
            base_type = get_args(annotation)[0]
            type_str = f"Optional[{base_type.__name__}]"
        else:
            type_str = annotation.__name__
        
        default_str = "" if field_info.default is ... else " = ..."
        lines.append(f"    {field_name}: {type_str}{default_str}")
    
    return "\n".join(lines)

Performance Considerations

Performance is a critical aspect of the schema registry, especially given the volume of validation operations in production.

Pydantic v2 Performance Optimizations

Pydantic v2 introduces groundbreaking performance improvements:

  1. Rust Core Engine: The validation core is implemented in Rust, providing 17x faster validation compared to v1.
  2. Lazy Schema Construction: Schemas are constructed on-demand rather than at import time, reducing startup latency.
  3. Cached Validators: Individual field validators are cached after first use, eliminating repeated compilation.
  4. Efficient Serialization: JSON serialization is optimized with 50% better performance than v1.
  5. Memory-Efficient Parsing: Advanced parsing algorithms reduce memory usage during validation.

Benchmark Results

Typical validation benchmarks demonstrate the performance gains:

# performance/benchmark_validation.py
import timeit
import cProfile
from pydantic import ValidationError
from data_layer.schemas.leagues import LeagueQuestionnaire
 
# Sample data for benchmarking
SAMPLE_DATA = {
    "league_name": "Test Premier League",
    "sport_bucket": "team",
    "contact_email": "test@example.com",
    "league_tier": "tier_1",
    "founded_year": 2020,
    "member_count": 20,
    "season_structure": "annual",
    "event_frequency": "weekly",
    "primary_location": "New York, NY",
    "global_reach": True,
    "target_markets": ["USA", "Europe"],
    "revenue_model": "sponsorships",
    "current_partners": 15,
    "partnership_focus": ["technology", "financial"],
    "website_url": "https://testleague.com",
    "social_media_followers": 50000,
    "digital_engagement": "high",
    "data_integration": True,
    "real_time_scoring": True,
    "historical_data": True,
    "notes": "Test league for performance benchmarking",
    "tags": ["test", "benchmark"]
}
 
def benchmark_validation():
    """Benchmark schema validation performance"""
    
    # Warm up
    for _ in range(100):
        LeagueQuestionnaire(**SAMPLE_DATA)
    
    # Time 10,000 validations
    times = timeit.repeat(
        lambda: LeagueQuestionnaire(**SAMPLE_DATA),
        number=10000,
        repeat=5
    )
    
    avg_time = min(times) / 10000 * 1000  # ms per validation
    print(f"Average validation time: {avg_time:.3f} ms")
    print(f"Throughput: {1000/avg_time:.0f} validations/second")
 
if __name__ == "__main__":
    benchmark_validation()

Expected Results (Pydantic v2):

  • Average validation time: < 0.5 ms per complex model
  • Throughput: > 2,000 validations per second
  • Memory usage: < 1 MB per 1,000 validations

These benchmarks ensure that schema validation doesn't become a bottleneck in high-throughput scenarios like API request processing or batch data imports.

Caching Strategy

The registry implements intelligent caching to further optimize performance:

from functools import lru_cache
from threading import Lock
from typing import Type, Dict
 
class CachedSchemaRegistry(SchemaRegistry):
    """Schema registry with intelligent caching"""
    
    def __init__(self):
        super().__init__()
        self._cache_lock = Lock()
        self._compiled_validators: Dict[str, callable] = {}
    
    def get_cached_validator(self, schema_key: str) -> callable:
        """Get cached validator for schema"""
        
        with self._cache_lock:
            if schema_key not in self._compiled_validators:
                schema = self.get_schema(schema_key)
                # Compile validator for performance
                validator = lru_cache(maxsize=256)(schema.model_validate)
                self._compiled_validators[schema_key] = validator
        
        return self._compiled_validators[schema_key]
    
    def validate_cached(self, schema_key: str, data: dict) -> BaseModel:
        """Validate using cached validator"""
        validator = self.get_cached_validator(schema_key)
        return validator(data)

This caching layer ensures that validation performance remains optimal even under heavy load.

Security Considerations

Data validation is the first line of defense against malicious input, and the schema registry implements robust security measures.

Input Sanitization and Validation

All incoming data is rigorously validated against schema definitions:

  1. SQL Injection Prevention: Parameterized queries using validated Pydantic models prevent injection attacks.
  2. XSS Prevention: String fields are sanitized before any HTML rendering or database storage.
  3. Email and URL Validation: Strict regex patterns ensure only valid business emails and URLs are accepted.
  4. Path Traversal Protection: File path fields are validated to prevent directory traversal attacks.

Sensitive Data Handling

The registry provides special handling for sensitive information:

from pydantic import SecretStr, Field
from typing import Optional, List
 
class ApiKey(BaseModel):
    """Secure API key model with sensitive data protection"""
    
    key_id: str
    user_id: str
    scopes: List[str]
    created_at: datetime
    expires_at: Optional[datetime] = None
    is_active: bool = True
    
    # Sensitive fields with special handling
    secret_key: SecretStr = Field(..., description="Encrypted API secret (never logged)")
    hashed_value: str  # SHA-256 hash of the secret for verification
    
    model_config = ConfigDict(
        # Never include secrets in error messages or logs
        json_encoders={
            SecretStr: lambda v: "***REDACTED***" if v else None
        },
        # Prevent secret from appearing in repr
        str_strip_whitespace=True,
        validate_assignment=True
    )
    
    @field_validator('secret_key')
    @classmethod
    def validate_api_key_format(cls, v: SecretStr) -> SecretStr:
        """Validate API key format and strength"""
        key_str = v.get_secret_value()
        
        # Minimum length and complexity requirements
        if len(key_str) < 32:
            raise ValueError("API key must be at least 32 characters")
        
        # Check for common weak patterns
        weak_patterns = [r'^password', r'^123456', r'^admin', r'^test']
        if any(re.search(pattern, key_str.lower()) for pattern in weak_patterns):
            raise ValueError("API key contains weak pattern - use stronger key")
        
        # Generate hash for storage
        import hashlib
        v._hashed_value = hashlib.sha256(key_str.encode()).hexdigest()
        
        return v
    
    def verify_key(self, plain_key: str) -> bool:
        """Verify API key against stored hash (without exposing secret)"""
        import hashlib
        return hashlib.sha256(plain_key.encode()).hexdigest() == self.hashed_value

Audit Trail Integration

All schema operations are logged for security auditing:

import logging
from contextlib import contextmanager
from typing import Generator
 
# Security audit logger
audit_logger = logging.getLogger("schema_audit")
 
class AuditedSchemaRegistry(SchemaRegistry):
    """Registry with comprehensive audit logging"""
    
    @contextmanager
    def audit_operation(self, operation: str, user_id: str, context: Dict[str, Any]) -> Generator[None, None, None]:
        """Context manager for auditing schema operations"""
        
        audit_logger.info(
            "Schema operation started",
            extra={
                "operation": operation,
                "user_id": user_id,
                "schema_key": context.get("schema_key"),
                "data_size": len(str(context.get("data", {}))),
                "ip_address": context.get("ip_address"),
                "user_agent": context.get("user_agent")
            }
        )
        
        try:
            yield
            audit_logger.info(
                "Schema operation succeeded",
                extra={
                    "operation": operation,
                    "user_id": user_id,
                    "duration_ms": context.get("duration_ms"),
                    "result": "success"
                }
            )
        except Exception as e:
            audit_logger.error(
                "Schema operation failed",
                extra={
                    "operation": operation,
                    "user_id": user_id,
                    "error": str(e),
                    "result": "failure"
                }
            )
            raise

Deployment Considerations

The schema registry is designed for seamless integration into various deployment scenarios, from local development to production clusters.

Package Structure

The registry is distributed as a standalone Python package for maximum reusability.

altsportsleagues-schemas/
β”œβ”€β”€ pyproject.toml              # Project metadata and dependencies
β”œβ”€β”€ README.md                   # Usage and integration guide
β”œβ”€β”€ CHANGELOG.md                # Version history
β”œβ”€β”€ LICENSE                     # MIT License
β”œβ”€β”€ src/
β”‚   └── altsportsleagues_schemas/
β”‚       β”œβ”€β”€ __init__.py         # Package entry points
β”‚       β”œβ”€β”€ registry.py         # Core registry implementation
β”‚       β”œβ”€β”€ base.py             # Base schema and utilities
β”‚       β”œβ”€β”€ domains/            # Domain-specific schemas
β”‚       β”‚   β”œβ”€β”€ __init__.py
β”‚       β”‚   β”œβ”€β”€ leagues/
β”‚       β”‚   β”œβ”€β”€ betting_systems/
β”‚       β”‚   β”œβ”€β”€ email_assistant/
β”‚       β”‚   β”œβ”€β”€ infrastructure/
β”‚       β”‚   β”œβ”€β”€ saas/
β”‚       β”‚   β”œβ”€β”€ sports/
β”‚       β”‚   └── users/
β”‚       β”œβ”€β”€ generators/         # Multi-format generators
β”‚       β”‚   β”œβ”€β”€ __init__.py
β”‚       β”‚   β”œβ”€β”€ typescript.py
β”‚       β”‚   β”œβ”€β”€ json_schema.py
β”‚       β”‚   └── graphql.py
β”‚       β”œβ”€β”€ validators/         # Custom validation logic
β”‚       └── fixtures/           # Test fixture factories
β”œβ”€β”€ tests/                      # Comprehensive test suite
β”œβ”€β”€ docs/                       # Generated documentation
β”œβ”€β”€ stubs/                      # IDE type stubs
└── examples/                   # Usage examples

Distribution Configuration

# pyproject.toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
 
[project]
name = "altsportsleagues-schemas"
version = "1.0.0"
description = "Type-safe Pydantic schemas for AltSportsLeagues.ai platform"
readme = "README.md"
license = {text = "MIT"}
authors = [
    {name = "AltSportsLeagues Team", email = "team@altsportsleagues.ai"}
]
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: TypeScript",
    "Topic :: Software Development :: Libraries :: Python Modules",
    "Typing :: Typed"
]
 
requires-python = ">=3.11"
dependencies = [
    "pydantic >= 2.5.0",
    "annotated-types >= 0.6.0",
    "pydantic-settings >= 2.1.0",
    "typing-extensions >= 4.8.0"
]
 
[project.optional-dependencies]
dev = [
    "mypy >= 1.7.0",
    "pyright >= 1.1.0",
    "factory-boy >= 3.3.0",
    "faker >= 20.0.0",
    "pytest >= 7.4.0",
    "pytest-cov >= 4.1.0",
    "black >= 23.0.0",
    "isort >= 5.12.0"
]
 
test = [
    "pytest >= 7.4.0",
    "pytest-cov >= 4.1.0",
    "factory-boy >= 3.3.0",
    "faker >= 20.0.0"
]
 
docs = [
    "mkdocs >= 1.5.0",
    "mkdocs-material >= 9.0.0",
    "pydantic >= 2.5.0"
]
 
[project.urls]
Homepage = "https://altsportsleagues.ai/schemas"
Documentation = "https://altsportsleagues.ai/schemas/docs"
Repository = "https://github.com/altsportsleagues/schemas"
Issues = "https://github.com/altsportsleagues/schemas/issues"
 
[tool.hatch.build.targets.wheel]
packages = ["src/altsportsleagues_schemas"]
 
[tool.mypy]
python_version = "3.11"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
check_untyped_defs = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = false

Installation and Usage

# Install the schema package
pip install altsportsleagues-schemas
 
# Or from source
git clone https://github.com/altsportsleagues/schemas.git
cd schemas
pip install -e .[dev]
 
# Verify installation
python -c "from altsportsleagues_schemas.registry import registry; print(registry.list_schemas())"

Error Handling Patterns

Robust error handling ensures that schema issues are caught early and provide actionable feedback.

Custom Exception Hierarchy

from pydantic import ValidationError as PydanticValidationError
from typing import Optional, Dict, Any, List
 
class SchemaRegistryError(Exception):
    """Base exception for all schema registry errors"""
    
    def __init__(self, message: str, context: Optional[Dict[str, Any]] = None):
        self.message = message
        self.context = context or {}
        super().__init__(self.message)
 
class SchemaNotFoundError(SchemaRegistryError):
    """Raised when schema is not found in registry"""
    
    def __init__(self, schema_key: str, available_schemas: List[str]):
        context = {
            "schema_key": schema_key,
            "available_schemas": available_schemas[:10],  # Limit for readability
            "total_available": len(available_schemas),
            "suggestion": f"Did you mean one of: {', '.join(available_schemas[:3])}?"
        }
        super().__init__("Schema not found in registry", context)
 
class SchemaVersionMismatch(SchemaRegistryError):
    """Raised when schema version is incompatible"""
    
    def __init__(self, requested_version: str, available_versions: List[str]):
        context = {
            "requested_version": requested_version,
            "available_versions": available_versions,
            "latest_version": max(available_versions) if available_versions else None,
            "suggestion": f"Try using latest version: {max(available_versions) if available_versions else 'unknown'}"
        }
        super().__init__("Schema version not compatible", context)
 
class ValidationSchemaError(PydanticValidationError):
    """Enhanced validation error with schema context"""
    
    def __init__(self, original_error: PydanticValidationError, schema_key: str):
        self.schema_key = schema_key
        self.enhanced_errors = self._enhance_errors(original_error.errors(), schema_key)
        super().__init__(self.enhanced_errors)
    
    def _enhance_errors(self, errors: List[Dict[str, Any]], schema_key: str) -> List[Dict[str, Any]]:
        """Add schema-specific context to validation errors"""
        enhanced = []
        
        for error in errors:
            enhanced_error = error.copy()
            
            # Add schema context
            enhanced_error["schema"] = schema_key
            enhanced_error["domain"] = self._extract_domain(schema_key)
            
            # Provide actionable fixes
            if "required" in error["type"]:
                enhanced_error["fix"] = f"Add missing required field: {error['loc'][-1]}"
            elif "string_too_short" in error["type"]:
                enhanced_error["fix"] = f"Field {error['loc'][-1]} must have at least {error['context']['min_length']} characters"
            elif "value_error.email" in error["type"]:
                enhanced_error["fix"] = "Provide a valid email address in format user@domain.com"
            
            enhanced.append(enhanced_error)
        
        return enhanced
    
    def _extract_domain(self, schema_key: str) -> str:
        """Extract domain from schema key"""
        return schema_key.split('.')[0] if '.' in schema_key else "unknown"
 
def safe_validate(schema_key: str, data: dict, context: Optional[Dict[str, Any]] = None) -> tuple[Optional[BaseModel], Optional[str]]:
    """Safely validate data against schema with enhanced error handling"""
    
    try:
        registry = SchemaRegistry()
        schema = registry.get_schema(schema_key)
        validated = schema.model_validate(data)
        return validated, None
    except SchemaNotFoundError as e:
        return None, f"Schema not found: {e.message} {e.context}"
    except SchemaVersionMismatch as e:
        return None, f"Version mismatch: {e.message} {e.context}"
    except PydanticValidationError as e:
        enhanced_error = ValidationSchemaError(e, schema_key)
        return None, f"Validation failed: {str(enhanced_error)}"
    except Exception as e:
        return None, f"Unexpected validation error: {str(e)}"

Error Response Standardization

All validation errors follow a consistent format for easy consumption by developers and monitoring systems.

def format_validation_error(enhanced_error: ValidationSchemaError) -> Dict[str, Any]:
    """Format validation error for API responses and logging"""
    
    return {
        "error_type": "VALIDATION_ERROR",
        "schema": enhanced_error.schema_key,
        "domain": enhanced_error._extract_domain(enhanced_error.schema_key),
        "message": "Data validation failed against schema requirements",
        "details": [
            {
                "field": "/".join(map(str, error["loc"])),
                "error": error["msg"],
                "type": error["type"],
                "value": error.get("input", "unknown"),
                "fix": error.get("fix", "Review schema documentation")
            }
            for error in enhanced_error.enhanced_errors
        ],
        "total_errors": len(enhanced_error.enhanced_errors),
        "suggestion": "Validate your data against the schema documentation at /schemas/{domain}",
        "timestamp": datetime.utcnow().isoformat()
    }

Future Enhancements

The schema registry is designed for long-term evolution and expansion. Planned enhancements include:

  1. GraphQL Schema Generation: Automatic GraphQL schema definition language (SDL) generation from Pydantic models, enabling type-safe GraphQL APIs with introspection support.

  2. Database Synchronization: Bi-directional synchronization between Pydantic models and database schemas using SQLAlchemy or Prisma, ensuring that database migrations stay in sync with application models.

  3. Real-Time Schema Validation: WebSocket-based real-time validation for frontend forms and dynamic UIs, providing instant feedback during data entry.

  4. Schema Diff and Migration Tools: Comprehensive schema comparison tools that generate detailed migration reports, including data transformation scripts and compatibility matrices.

  5. Visual Schema Designer: A web-based UI for designing, editing, and visualizing schemas, allowing non-technical stakeholders to contribute to data model evolution.

  6. Schema Governance Framework: Enterprise-grade schema governance with approval workflows, change tracking, and compliance reporting for regulated industries.

  7. Runtime Schema Validation API: Public API endpoints for validating arbitrary JSON against registered schemas, enabling third-party integrations and data quality gates.

  8. Schema Evolution Analytics: Analytics and reporting on schema usage patterns, helping teams understand which models need attention and optimization.

These enhancements will further solidify the schema registry as the cornerstone of AltSportsLeagues.ai's data architecture, enabling sophisticated data management while maintaining developer productivity and system reliability.


This comprehensive Data Layer & Schema Registry documentation provides the complete blueprint for AltSportsLeagues.ai's type-safe data foundation. From schema organization and validation patterns to multi-format generation and deployment strategies, this system ensures data integrity while accelerating development across the entire platform.

Platform

Documentation

Community

Support

partnership@altsportsdata.comdev@altsportsleagues.ai

2025 Β© AltSportsLeagues.ai. Powered by AI-driven sports business intelligence.

πŸ€– AI-Enhancedβ€’πŸ“Š Data-Drivenβ€’βš‘ Real-Time