Backend API & MCP Integration Platform
Introduction
This specification defines requirements for the AltSportsLeagues.ai Backend API & MCP Integration Platform - a FastAPI-based backend system with comprehensive Model Context Protocol (MCP) server integration, enabling AI-powered sports intelligence, league analysis, and business automation.
Key Principle: Build a scalable, production-ready backend that seamlessly integrates MCP servers while providing robust REST APIs for frontend clients.
The platform serves as the central nervous system for AltSportsLeagues.ai, handling data processing, AI orchestration, business logic execution, and secure API exposure. It bridges the gap between our intelligent frontend and the complex backend services powering sports league intelligence.
Glossary
- Backend_API: FastAPI application providing REST endpoints for frontend and external clients
- MCP_Server: Model Context Protocol server providing standardized AI tool interfaces for agents
- FastMCP: Python framework for building MCP-compatible servers with FastAPI integration
- Tool_Registry: Central registry managing all available MCP tools, their schemas, and capabilities
- Service_Layer: Modular business logic services (e.g., LangMem, Document Pipeline, League Intelligence)
- Data_Layer: Strongly-typed Pydantic models, schema definitions, and data validation
- Authentication_Layer: JWT-based authentication, role-based authorization, and API key management
- Rate_Limiter: Request throttling, quota management, and abuse prevention
- Health_Monitor: Comprehensive system health checks, service status, and observability
- API_Gateway: Unified entry point for all backend services with request routing and validation
This backend architecture follows modern microservices principles while maintaining tight integration with our AI capabilities through MCP protocol. The system is designed to handle high-throughput sports data processing, real-time analytics, and AI-driven business intelligence at scale.
Requirements
Requirement 1: FastAPI REST API Foundation
User Story: As a frontend developer, I want a well-documented REST API with OpenAPI specifications, so that I can build client applications efficiently.
Acceptance Criteria
-
OpenAPI Documentation: WHEN the backend starts, THE Backend_API SHALL expose comprehensive OpenAPI documentation at
/docs(Swagger UI) and/redocendpoints, including all API endpoints, request/response schemas, authentication requirements, and example payloads. -
Request Validation: WHEN an API endpoint is called, THE Backend_API SHALL validate request bodies, query parameters, and headers using Pydantic models with automatic schema generation for OpenAPI.
-
Validation Errors: WHEN validation fails, THE Backend_API SHALL return HTTP 422 Unprocessable Entity with detailed error messages following the standard FastAPI error format, including field-specific errors and type information.
-
Standardized Responses: WHEN requests succeed, THE Backend_API SHALL return standardized JSON responses with proper HTTP status codes (200 OK for GET, 201 Created for POST, etc.) and consistent response structure including data, metadata, and optional warnings.
-
Error Handling: WHEN server errors occur, THE Backend_API SHALL log full stack traces with trace IDs for debugging and return user-friendly error messages with HTTP 500 Internal Server Error, hiding sensitive implementation details from clients.
Example Implementation Snippet:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
app = FastAPI(
title="AltSportsLeagues API",
description="Backend API for sports league intelligence",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
class LeagueQuery(BaseModel):
sport: str
level: str = "professional"
location: Optional[str] = None
@app.get("/leagues", response_model=List[LeagueResponse])
async def get_leagues(query: LeagueQuery):
try:
# Business logic here
leagues = await league_service.search(query)
return leagues
except ValidationError as e:
raise HTTPException(status_code=422, detail=e.errors())
except BusinessLogicError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")Requirement 2: MCP Server Integration
User Story: As an AI agent developer, I want to access all backend functionality through MCP protocol, so that I can integrate with Claude, ChatGPT, and custom AI agents seamlessly.
Acceptance Criteria
-
Tool Registration: WHEN the MCP server starts, THE Backend_API SHALL automatically register all available tools in the Tool_Registry, including tool schemas, descriptions, parameters, and return types following MCP protocol specifications.
-
Tool Discovery: WHEN an AI agent calls
list_toolsvia MCP, THE MCP_Server SHALL return a comprehensive list of all available tools with their JSON schemas, parameter requirements, authentication needs, and example usage. -
Tool Execution: WHEN an AI agent calls a specific tool via MCP, THE MCP_Server SHALL validate input parameters against the tool schema, execute the corresponding backend service, and return structured results following the MCP response format.
-
Success Handling: WHEN tool execution succeeds, THE MCP_Server SHALL return a standardized MCP response with the tool result, execution metadata (duration, tokens used), and optional follow-up suggestions or related tools.
-
Error Handling: WHEN tool execution fails, THE MCP_Server SHALL return a detailed MCP error response with error type, human-readable message, technical details (for debugging), recovery suggestions, and alternative tools that might achieve similar results.
Example MCP Tool Definition:
from fastmcp import MCPServer, Tool
mcp_server = MCPServer("altsportsleagues-mcp")
@Tool(
name="search_leagues",
description="Search for sports leagues by sport, level, and location",
parameters={
"sport": {"type": "string", "description": "Sport name (e.g., 'soccer', 'basketball')"},
"level": {"type": "string", "enum": ["professional", "amateur", "youth"], "default": "professional"},
"location": {"type": "string", "description": "Geographic location filter"}
},
returns={
"type": "array",
"items": {"type": "object", "properties": {"name": {"type": "string"}, "id": {"type": "string"}}}
}
)
async def search_leagues(sport: str, level: str = "professional", location: Optional[str] = None) -> List[League]:
"""MCP tool implementation for league search"""
try:
results = await league_service.search(sport, level, location)
return [{"name": league.name, "id": league.id} for league in results]
except Exception as e:
raise MCPServerError(f"League search failed: {str(e)}", recovery_suggestions=["Try different sport name", "Check location spelling"])Requirement 3: Service Layer Architecture
User Story: As a backend developer, I want a clean service layer architecture, so that business logic is separated from API routes and easily testable.
Acceptance Criteria
-
Dependency Injection: WHEN services are initialized, THE Backend_API SHALL use dependency injection to provide required dependencies (database connections, AI clients, configuration, logging) following FastAPI's Depends pattern.
-
Business Logic Separation: WHEN processing requests, THE Backend_API SHALL delegate all business logic to dedicated Service_Layer modules, keeping API routes thin and focused on request/response handling.
-
Service Modularity: WHEN services execute, THE Service_Layer SHALL handle all business logic, data transformations, validation, and orchestration while maintaining single responsibility principles.
-
Exception Handling: WHEN services encounter errors, THE Service_Layer SHALL raise custom, typed exceptions with rich context (e.g., BusinessLogicError, ValidationError, IntegrationError) for proper error handling.
-
Response Standardization: WHEN services succeed, THE Service_Layer SHALL return standardized response objects with data, metadata, and optional warnings, ensuring consistent API contracts.
Example Service Layer Structure:
# services/league_service.py
from abc import ABC, abstractmethod
from typing import List, Optional
from pydantic import BaseModel
from services.base_service import BaseService
class LeagueService(BaseService):
"""Service for league-related business operations"""
async def search_leagues(
self,
sport: str,
level: str = "professional",
location: Optional[str] = None
) -> List[League]:
"""Search for leagues matching criteria"""
try:
# Business logic: validate inputs, query database, apply filters
validated_query = self.validate_league_query(sport, level, location)
db_results = await self.league_repository.search(validated_query)
enriched_leagues = await self.enrich_league_data(db_results)
return enriched_leagues
except ValidationError as e:
raise BusinessLogicError(f"Invalid league query: {e}")
except DatabaseError as e:
raise IntegrationError(f"League search failed: {e}")
async def analyze_league_partnerships(self, league_id: str) -> LeaguePartnershipAnalysis:
"""Analyze partnership opportunities for a league"""
# Complex business logic with multiple service calls
league = await self.get_league(league_id)
market_data = await self.market_service.get_market_data(league.sport)
partnership_scores = await self.partnership_engine.score_opportunities(league, market_data)
return LeaguePartnershipAnalysis(league=league, scores=partnership_scores)
# api/routes/leagues.py (Thin API layer)
from fastapi import APIRouter, Depends
from services.league_service import LeagueService
from api.dependencies import get_league_service
router = APIRouter(prefix="/leagues", tags=["leagues"])
@router.get("/", response_model=List[LeagueResponse])
async def search_leagues(
sport: str,
level: str = "professional",
location: Optional[str] = None,
service: LeagueService = Depends(get_league_service)
):
"""Search for leagues"""
results = await service.search_leagues(sport, level, location)
return resultsRequirement 4: Authentication & Authorization
User Story: As a system administrator, I want secure authentication and role-based authorization, so that I can control access to sensitive operations.
Acceptance Criteria
-
JWT Authentication: WHEN users authenticate via
/auth/loginor/auth/register, THE Authentication_Layer SHALL issue JWT tokens containing user claims (ID, role, permissions, expiration) using secure signing algorithms (RS256 recommended). -
Token Validation: WHEN protected endpoints are called, THE Authentication_Layer SHALL validate JWT tokens from
Authorization: Bearer <token>header, checking signature, expiration, issuer, and audience. -
Role-Based Access: WHEN authorization is required, THE Authentication_Layer SHALL check user roles (admin, user, analyst, partner) and permissions against endpoint requirements using FastAPI's security dependencies.
-
Token Expiration: WHEN JWT tokens expire, THE Authentication_Layer SHALL return HTTP 401 Unauthorized with
WWW-Authenticate: Bearerheader and require re-authentication via refresh tokens or new login. -
API Key Support: WHEN API keys are used for machine-to-machine authentication, THE Authentication_Layer SHALL validate keys against the database, check scopes/permissions, and implement key rotation and revocation.
Example Authentication Implementation:
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from pydantic import BaseModel
# Security configuration
SECRET_KEY = settings.secret_key
ALGORITHM = "RS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = HTTPBearer()
class Token(BaseModel):
access_token: str
token_type: str
expires_in: int
class TokenData(BaseModel):
username: Optional[str] = None
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
# Protected route example
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
# Role-based authorization
def require_role(required_role: str):
def role_checker(current_user: User = Depends(get_current_user)):
if current_user.role != required_role:
raise HTTPException(
status_code=403,
detail=f"Insufficient permissions. Required role: {required_role}"
)
return current_user
return role_checker
@app.get("/admin/metrics")
async def get_admin_metrics(current_user: User = Depends(require_role("admin"))):
return {"metrics": admin_service.get_metrics()}Requirement 5: Rate Limiting & Quotas
User Story: As a platform operator, I want rate limiting and usage quotas, so that I can prevent abuse and manage costs effectively.
Acceptance Criteria
-
Rate Limit Enforcement: WHEN requests exceed configured rate limits (e.g., 100 requests/minute per user), THE Rate_Limiter SHALL return HTTP 429 Too Many Requests with
Retry-Afterheader indicating when the limit resets. -
Quota Exceeded: WHEN users exceed monthly quotas (e.g., API calls, AI tokens), THE Rate_Limiter SHALL return HTTP 403 Forbidden with detailed quota information and upgrade options.
-
Tiered Limits: WHEN premium users access APIs, THE Rate_Limiter SHALL apply higher limits based on subscription tier (e.g., basic: 100/min, pro: 1000/min, enterprise: unlimited).
-
Usage Tracking: WHEN monitoring usage, THE Rate_Limiter SHALL track requests per user, endpoint, IP address, and time window using Redis or database for persistence.
-
Limit Reset: WHEN time windows expire, THE Rate_Limiter SHALL automatically clear counters and allow new requests within the refreshed limits.
Example Rate Limiting Implementation:
from fastapi import FastAPI, Depends, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/leagues/search")
@limiter.limit("5/minute") # 5 requests per minute per IP
async def search_leagues(
request: Request,
current_user: Optional[User] = Depends(get_current_user_optional)
):
# Apply different limits based on user tier
if current_user and current_user.tier == "premium":
# Premium users have higher limits (handled by different limiter instance)
pass
return await league_service.search_leagues()
# User-specific rate limiting
user_limiter = Limiter(key_func=lambda request: f"user:{get_current_user(request).id}")
@app.get("/ai/analysis")
@user_limiter.limit("10/hour") # 10 AI analyses per hour per user
async def get_ai_analysis(request: Request, current_user: User = Depends(get_current_user)):
return await ai_service.analyze_league(current_user)Requirement 6: Health Monitoring & Observability
User Story: As a DevOps engineer, I want comprehensive health checks and metrics, so that I can monitor system health and diagnose issues quickly.
Acceptance Criteria
-
Health Endpoint: WHEN
/healthis called, THE Health_Monitor SHALL perform comprehensive checks on database connectivity, AI service availability, MCP server status, Redis connection, external APIs, and background task queues, returning aggregated health status. -
Detailed Status: WHEN services are healthy, THE Health_Monitor SHALL return HTTP 200 with detailed JSON response including service statuses, response times, uptime percentages, and recent error rates.
-
Degraded Services: WHEN individual services are degraded or unavailable, THE Health_Monitor SHALL return HTTP 503 Service Unavailable with specific failure details, affected services, and estimated recovery times.
-
Metrics Exposure: WHEN
/metricsis requested, THE Health_Monitor SHALL expose Prometheus-compatible metrics including request counts, error rates, response times, database query performance, AI token usage, and MCP tool execution statistics. -
Structured Logging: WHEN errors or warnings occur, THE Health_Monitor SHALL emit structured JSON logs with trace IDs, timestamps, severity levels, service context, and actionable error messages to Cloud Logging or similar.
Example Health Monitoring Implementation:
from fastapi import FastAPI, status
from fastapi.responses import JSONResponse
from typing import Dict, Any
import asyncio
from services import database_service, ai_service, mcp_service
import time
@app.get("/health")
async def health_check() -> Dict[str, Any]:
"""Comprehensive health check for all backend services"""
start_time = time.time()
# Run parallel health checks
health_tasks = await asyncio.gather(
check_database_health(),
check_ai_service_health(),
check_mcp_server_health(),
check_external_apis_health(),
check_background_tasks_health(),
return_exceptions=True
)
# Aggregate results
health_results = {
"timestamp": datetime.utcnow().isoformat(),
"overall_status": "healthy" if all_successful(health_tasks) else "degraded",
"services": process_health_results(health_tasks),
"response_time_ms": int((time.time() - start_time) * 1000),
"uptime_percentage": get_uptime_percentage()
}
if health_results["overall_status"] == "degraded":
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content=health_results
)
return health_results
async def check_database_health():
"""Check database connectivity and performance"""
try:
start = time.time()
result = await database_service.execute_health_query()
duration = time.time() - start
return {
"service": "database",
"status": "healthy",
"response_time_ms": int(duration * 1000),
"details": {"connection_pool_size": len(database_service.pool)}
}
except Exception as e:
return {
"service": "database",
"status": "unhealthy",
"error": str(e),
"recovery": "Restart database connection pool"
}
# Prometheus metrics endpoint
from prometheus_client import generate_latest, Counter, Histogram, Gauge
REQUEST_COUNT = Counter('backend_requests_total', 'Total requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('backend_request_duration_seconds', 'Request duration')
DATABASE_CONNECTIONS = Gauge('database_connections_active', 'Active database connections')
@app.get("/metrics")
async def metrics():
return generate_latest()
# Middleware for metrics collection
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path
).observe(time.time() - start_time)
return responseRequirement 7: Data Layer & Pydantic Models
User Story: As a developer, I want strongly-typed Pydantic models for all data structures, so that I have type safety and automatic validation throughout the application.
Acceptance Criteria
-
Input Validation: WHEN data enters the system through APIs or MCP tools, THE Data_Layer SHALL validate all inputs using Pydantic models with comprehensive field validation, custom validators, and type coercion.
-
Model Definitions: WHEN models are defined, THE Data_Layer SHALL include complete field definitions with types, descriptions, validation rules, default values, and relationships between models for complex data structures.
-
Serialization: WHEN serializing data for API responses or database storage, THE Data_Layer SHALL use
model_dump()ormodel_dump_json()methods to ensure consistent JSON serialization with proper handling of nested objects and dates. -
Deserialization: WHEN deserializing data from requests or database results, THE Data_Layer SHALL use
model_validate()ormodel_validate_json()for type-safe deserialization with automatic error reporting for validation failures. -
Schema Generation: WHEN OpenAPI documentation is generated, THE Data_Layer SHALL automatically generate JSON schemas from Pydantic models for all request/response bodies, ensuring complete API documentation.
Example Pydantic Model Implementation:
from pydantic import BaseModel, Field, validator, root_validator
from typing import Optional, List, Dict, Any
from enum import Enum
from datetime import datetime
import re
class UserRole(str, Enum):
ADMIN = "admin"
ANALYST = "analyst"
PARTNER = "partner"
USER = "user"
class League(BaseModel):
id: str = Field(..., description="Unique league identifier")
name: str = Field(..., min_length=1, max_length=200, description="League name")
sport: str = Field(..., description="Sport type (soccer, basketball, etc.)")
level: str = Field(..., description="Competition level")
location: Optional[str] = Field(None, description="Geographic location")
founded_year: Optional[int] = Field(None, ge=1800, le=2100)
member_count: int = Field(0, ge=0, description="Number of member teams/players")
website: Optional[str] = Field(None, description="Official website URL")
@validator('sport')
def validate_sport(cls, v):
valid_sports = ['soccer', 'basketball', 'baseball', 'american_football', 'hockey']
if v.lower() not in valid_sports:
raise ValueError(f'Sport must be one of: {", ".join(valid_sports)}')
return v.lower()
@root_validator
def validate_league_complete(cls, values):
if 'name' in values and 'sport' in values:
if len(values['name']) < 3:
raise ValueError('League name must be at least 3 characters')
return values
class LeaguePartnershipOpportunity(BaseModel):
league_id: str
partner_type: str # sponsor, broadcaster, venue, technology
estimated_value: float = Field(..., gt=0, description="Estimated partnership value in USD")
probability: float = Field(..., ge=0, le=1, description="Success probability (0-1)")
timeline: str # Q1 2024, immediate, etc.
key_contacts: List[str] = Field(default_factory=list)
competitive_landscape: Optional[Dict[str, Any]] = None
ai_confidence: float = Field(0.0, ge=0, le=1, description="AI confidence in analysis")
@validator('estimated_value')
def validate_value_range(cls, v):
if v < 1000:
raise ValueError('Partnership value must be at least $1,000')
if v > 100000000:
raise ValueError('Partnership value cannot exceed $100M')
return v
class PartnershipAnalysisResponse(BaseModel):
league: League
opportunities: List[LeaguePartnershipOpportunity]
summary: str
total_potential_value: float
high_confidence_opportunities: List[LeaguePartnershipOpportunity]
risk_factors: List[str]
recommendations: List[str]
ai_metadata: Dict[str, Any] = Field(..., description="AI processing metadata")
@root_validator(pre=True)
def validate_opportunities(cls, values):
opportunities = values.get('opportunities', [])
if not opportunities:
raise ValueError('Analysis must include at least one opportunity')
return values
# Usage in API
@app.post("/leagues/{league_id}/partnership-analysis", response_model=PartnershipAnalysisResponse)
async def analyze_partnerships(
league_id: str,
analysis_request: PartnershipAnalysisRequest,
service: PartnershipService = Depends(get_partnership_service)
):
# Automatic validation and serialization
result = await service.analyze_league_partnerships(league_id, analysis_request)
return resultRequirement 8: Async Processing & Background Tasks
User Story: As a user, I want long-running operations to execute asynchronously, so that I get immediate responses and can track progress of complex analyses.
Acceptance Criteria
-
Task Queuing: WHEN expensive operations (AI analysis, report generation, data imports) are requested, THE Backend_API SHALL immediately queue background tasks using Celery or FastAPI BackgroundTasks and return HTTP 202 Accepted with task ID and status endpoint.
-
Task Status: WHEN tasks are queued, THE Backend_API SHALL provide
/tasks/{task_id}/statusendpoint returning current state (pending, running, completed, failed) with progress percentage, estimated time remaining, and partial results if available. -
Result Retrieval: WHEN tasks complete successfully, THE Backend_API SHALL store results in database or cache and make them available via
/tasks/{task_id}/resultendpoint with proper authentication and expiration. -
Failure Handling: WHEN background tasks fail, THE Backend_API SHALL implement retry logic with exponential backoff (up to 3 attempts), store error details, and notify users via email or in-app notifications with actionable recovery steps.
-
Cancellation Support: WHEN users request task cancellation via
/tasks/{task_id}/cancel, THE Backend_API SHALL gracefully stop the task if possible and update status to cancelled, preserving any partial work.
Example Async Processing Implementation:
from fastapi import BackgroundTasks, HTTPException
from celery import Celery
from uuid import uuid4
from typing import Optional
import asyncio
from datetime import datetime
# Celery configuration for production
celery_app = Celery('altsportsleagues', broker=settings.celery_broker)
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class TaskResult(BaseModel):
task_id: str
status: TaskStatus
created_at: datetime
completed_at: Optional[datetime] = None
progress: float = 0.0
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
estimated_duration: Optional[int] = None # seconds
task_manager = TaskManager() # Handles task lifecycle
@app.post("/leagues/{league_id}/deep-analysis", response_model=TaskResult)
async def start_deep_analysis(
league_id: str,
analysis_params: DeepAnalysisParams,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user)
):
"""Start deep league analysis as background task"""
task_id = str(uuid4())
# Create task record
task = Task(
id=task_id,
user_id=current_user.id,
type="deep_league_analysis",
status=TaskStatus.PENDING,
params=analysis_params.dict(),
created_at=datetime.utcnow()
)
await task_manager.create_task(task)
# Queue background task
deep_analysis_task.delay(task_id, league_id, analysis_params.dict())
return TaskResult(
task_id=task_id,
status=TaskStatus.PENDING,
created_at=datetime.utcnow(),
estimated_duration=300 # 5 minutes estimate
)
@app.get("/tasks/{task_id}/status", response_model=TaskResult)
async def get_task_status(task_id: str, current_user: User = Depends(get_current_user)):
"""Get current task status and progress"""
task = await task_manager.get_task(task_id, current_user.id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return TaskResult.from_orm(task)
@app.post("/tasks/{task_id}/cancel")
async def cancel_task(task_id: str, current_user: User = Depends(get_current_user)):
"""Cancel a running task"""
task = await task_manager.get_task(task_id, current_user.id)
if not task or task.status not in [TaskStatus.PENDING, TaskStatus.RUNNING]:
raise HTTPException(status_code=400, detail="Cannot cancel completed task")
await task_manager.cancel_task(task)
return {"message": "Task cancellation requested"}
# Celery task worker
@celery_app.task(bind=True)
def deep_analysis_task(self, task_id: str, league_id: str, params: Dict):
"""Background task for deep league analysis"""
try:
# Update task status
task_manager.update_status(task_id, TaskStatus.RUNNING, progress=0)
# Step 1: Data collection (20%)
data = await collect_league_data(league_id)
task_manager.update_status(task_id, TaskStatus.RUNNING, progress=20)
# Step 2: AI analysis (60%)
analysis = await ai_service.deep_analyze_league(data, **params)
task_manager.update_status(task_id, TaskStatus.RUNNING, progress=80)
# Step 3: Report generation (20%)
report = generate_analysis_report(analysis)
task_manager.complete_task(task_id, result=report)
except Exception as e:
task_manager.fail_task(task_id, error=str(e))
# Send notification to user
send_task_failure_notification(task_id, str(e))Requirement 9: Database Integration
User Story: As a data engineer, I want seamless database integration with connection pooling and migrations, so that I can manage persistent data efficiently and reliably.
Acceptance Criteria
-
Connection Pooling: WHEN the backend starts, THE Backend_API SHALL establish database connection pools using SQLAlchemy or asyncpg for PostgreSQL, with configurable pool size, overflow, and timeout settings.
-
Query Execution: WHEN database queries are executed, THE Backend_API SHALL use connection pooling to efficiently manage connections, with automatic reconnection on failures and query timeout protection.
-
Error Recovery: WHEN database errors occur (connection lost, deadlock, etc.), THE Backend_API SHALL implement retry logic with exponential backoff, circuit breaker patterns, and graceful degradation for non-critical queries.
-
Schema Management: WHEN database schema changes are needed, THE Backend_API SHALL support Alembic migrations with automatic generation, version control, and rollback capabilities for safe database evolution.
-
Environment Support: WHEN in development mode, THE Backend_API SHALL use SQLite for local development with automatic schema creation; in staging/production, use PostgreSQL with Supabase or Cloud SQL.
Example Database Integration:
# database.py
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from sqlalchemy.pool import NullPool
from contextlib import asynccontextmanager
import asyncio
from typing import AsyncGenerator
# Database configuration
DATABASE_URL = settings.database_url # postgresql+asyncpg://...
# Async engine with connection pooling
engine = create_async_engine(
DATABASE_URL,
echo=settings.debug,
pool_pre_ping=True, # Validate connections
pool_recycle=300, # Recycle connections every 5 minutes
pool_size=20, # Maximum pool size
max_overflow=10, # Allow temporary overflow
pool_timeout=30, # Connection timeout
connect_args={"command_timeout": 60} # Query timeout
)
# Session factory
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
class League(Base):
__tablename__ = "leagues"
id = Column(String, primary_key=True, index=True)
name = Column(String, nullable=False, index=True)
sport = Column(String, nullable=False, index=True)
level = Column(String, nullable=False)
location = Column(String)
founded_year = Column(Integer)
member_count = Column(Integer, default=0)
website = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
partnerships = relationship("LeaguePartnership", back_populates="league")
# Dependency for database sessions
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# Health check
async def check_database_health():
async with engine.connect() as conn:
result = await conn.execute(text("SELECT 1"))
await result.fetchone()
return {"status": "healthy", "message": "Database connection successful"}
# Alembic configuration (alembic.ini and env.py setup required)
# alembic revision --autogenerate -m "Add leagues table"
# alembic upgrade headRequirement 10: AI Service Integration
User Story: As an AI developer, I want seamless integration with multiple AI providers, so that I can use the best model for each task while maintaining consistency and reliability.
Acceptance Criteria
-
Multi-Provider Routing: WHEN AI services are called, THE Backend_API SHALL intelligently route requests to appropriate providers (OpenAI GPT-4, Anthropic Claude 3, Google Vertex AI, etc.) based on task type, cost, availability, and performance requirements.
-
Fallback Strategy: WHEN primary AI providers fail (rate limits, downtime, errors), THE Backend_API SHALL automatically fallback to secondary providers with minimal disruption and log fallback events for monitoring.
-
Rate Limit Handling: WHEN rate limits are hit on AI providers, THE Backend_API SHALL implement intelligent queuing, exponential backoff retries, and request batching to optimize throughput while respecting provider limits.
-
Cost Tracking: WHEN AI requests are processed, THE Backend_API SHALL track token usage, provider costs, and attribution per request/user/endpoint for billing, optimization, and cost analysis.
-
Streaming Support: WHEN real-time AI responses are needed (chat interfaces, live analysis), THE Backend_API SHALL support Server-Sent Events (SSE) and streaming responses from AI providers with proper error handling and reconnection logic.
Example AI Service Integration:
# services/ai_service.py
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, AsyncGenerator
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
from google.generativeai import GenerativeModel
import asyncio
from services.base_service import BaseService
class AIProvider(ABC):
@abstractmethod
async def generate_text(self, prompt: str, model: str, **kwargs) -> str:
pass
@abstractmethod
async def stream_text(self, prompt: str, model: str, **kwargs) -> AsyncGenerator[str, None]:
pass
@abstractmethod
def get_token_cost(self, input_tokens: int, output_tokens: int) -> float:
pass
class OpenAIProvider(AIProvider):
def __init__(self, api_key: str):
self.client = AsyncOpenAI(api_key=api_key)
self.cost_per_1k_input = 0.03 / 1000 # GPT-4 costs
self.cost_per_1k_output = 0.06 / 1000
async def generate_text(self, prompt: str, model: str = "gpt-4", **kwargs) -> str:
try:
response = await self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=kwargs.get("max_tokens", 1000),
temperature=kwargs.get("temperature", 0.7)
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"OpenAI error: {e}")
raise AIProviderError(f"OpenAI generation failed: {e}")
async def stream_text(self, prompt: str, model: str = "gpt-4", **kwargs) -> AsyncGenerator[str, None]:
stream = await self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
**kwargs
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
def get_token_cost(self, input_tokens: int, output_tokens: int) -> float:
return (input_tokens * self.cost_per_1k_input + output_tokens * self.cost_per_1k_output) / 1000
class AnthropicProvider(AIProvider):
def __init__(self, api_key: str):
self.client = AsyncAnthropic(api_key=api_key)
self.cost_per_1k_input = 0.25 / 1000 # Claude 3 costs
self.cost_per_1k_output = 1.25 / 1000
async def generate_text(self, prompt: str, model: str = "claude-3-opus-20240229", **kwargs) -> str:
try:
response = await self.client.messages.create(
model=model,
max_tokens=kwargs.get("max_tokens", 1000),
messages=[{"role": "user", "content": prompt}],
temperature=kwargs.get("temperature", 0.7)
)
return response.content[0].text
except Exception as e:
logger.error(f"Anthropic error: {e}")
raise AIProviderError(f"Anthropic generation failed: {e}")
# Similar streaming implementation...
class AIService(BaseService):
def __init__(self):
self.providers = {
"openai": OpenAIProvider(settings.openai_key),
"anthropic": AnthropicProvider(settings.anthropic_key),
"google": GoogleAIProvider(settings.google_key) # Implementation similar
}
self.primary_provider = "anthropic" # Prefer Claude for business reasoning
self.fallback_order = ["openai", "google"]
async def generate_business_analysis(
self,
league_data: LeagueData,
analysis_type: str,
provider: Optional[str] = None
) -> BusinessAnalysis:
"""Generate comprehensive business analysis using optimal AI provider"""
if not provider:
provider = self.primary_provider
try:
prompt = self.build_analysis_prompt(league_data, analysis_type)
# Get provider instance
ai_provider = self.providers[provider]
# Generate analysis
analysis_text = await ai_provider.generate_text(
prompt=prompt,
model=self.get_model_for_analysis(analysis_type),
max_tokens=2000,
temperature=0.3 # Lower temperature for business accuracy
)
# Post-process and structure response
analysis = self.parse_analysis_response(analysis_text, league_data)
# Track costs
tokens_used = self.estimate_tokens(prompt + analysis_text)
cost = ai_provider.get_token_cost(
input_tokens=len(prompt.split()),
output_tokens=len(analysis_text.split())
)
# Log usage
await self.log_ai_usage(provider, tokens_used, cost, analysis_type)
return BusinessAnalysis(
text=analysis_text,
structured=analysis,
provider=provider,
confidence=self.calculate_confidence(analysis),
tokens_used=tokens_used,
cost=cost
)
except Exception as e:
logger.error(f"AI analysis failed with {provider}: {e}")
# Fallback to next provider
for fallback in self.fallback_order:
if fallback != provider:
try:
logger.info(f"Falling back to {fallback}")
return await self.generate_business_analysis(
league_data, analysis_type, fallback
)
except Exception as fallback_error:
logger.error(f"Fallback {fallback} also failed: {fallback_error}")
continue
# All providers failed
raise AIIntegrationError(
f"All AI providers failed for {analysis_type} analysis: {e}"
)
async def stream_analysis(self, league_data: LeagueData, analysis_type: str) -> AsyncGenerator[str, None]:
"""Stream real-time AI analysis for interactive use cases"""
provider = self.primary_provider
prompt = self.build_analysis_prompt(league_data, analysis_type)
ai_provider = self.providers[provider]
async for chunk in ai_provider.stream_text(prompt, model="claude-3-sonnet-20240229"):
yield chunkRequirement 11: Google Cloud Integration
User Story: As a platform engineer, I want native Google Cloud integration, so that I can leverage Firestore, Cloud Storage, Vertex AI, and other GCP services efficiently.
Acceptance Criteria
-
Cloud Storage: WHEN storing documents, media files, or large datasets, THE Backend_API SHALL use Google Cloud Storage with proper IAM permissions, lifecycle policies, and public/private access controls.
-
Firestore Integration: WHEN querying semi-structured data or needing real-time synchronization, THE Backend_API SHALL use Firestore with efficient query patterns, indexing strategies, and offline support.
-
Vertex AI Integration: WHEN using advanced AI capabilities, THE Backend_API SHALL integrate with Vertex AI for embeddings, RAG pipelines, custom models, and enterprise-grade AI services with proper authentication.
-
Cloud Run Deployment: WHEN deploying the backend, THE Backend_API SHALL run on Google Cloud Run with automatic scaling, custom domains, and integration with Cloud Load Balancer for high availability.
-
Observability Integration: WHEN monitoring and logging, THE Backend_API SHALL send structured logs to Cloud Logging, metrics to Cloud Monitoring, and traces to Cloud Trace for comprehensive observability.
Example Google Cloud Integration:
# services/gcp_service.py
from google.cloud import storage, firestore, aiplatform
from google.oauth2 import service_account
from typing import Dict, Any, List, AsyncIterator
import asyncio
from services.base_service import BaseService
class GCPService(BaseService):
def __init__(self):
# Initialize Google Cloud clients
credentials = service_account.Credentials.from_service_account_file(
settings.gcp_service_account_file
)
self.storage_client = storage.Client(credentials=credentials, project=settings.gcp_project_id)
self.firestore_client = firestore.Client(credentials=credentials, project=settings.gcp_project_id)
self.vertex_ai = aiplatform.gapic.PredictionServiceAsyncClient(
credentials=credentials,
client_options={"api_endpoint": f"{settings.gcp_region}-aiplatform.googleapis.com"}
)
async def upload_league_document(self, league_id: str, file_content: bytes, filename: str) -> str:
"""Upload league-related documents to Cloud Storage"""
bucket = self.storage_client.bucket(settings.gcp_storage_bucket)
blob = bucket.blob(f"leagues/{league_id}/documents/{filename}")
# Set metadata for content type and caching
blob.metadata = {
"content-type": self.detect_content_type(filename),
"cache-control": "public, max-age=3600"
}
# Upload with checksum validation
blob.upload_from_string(file_content, checksum="md5")
# Generate signed URL for temporary access
signed_url = blob.generate_signed_url(
version="v4",
expiration=datetime.utcnow() + timedelta(hours=24),
method="GET"
)
return signed_url
async def store_league_analysis(self, league_id: str, analysis: LeagueAnalysis) -> str:
"""Store structured league analysis in Firestore"""
doc_ref = self.firestore_client.collection("leagues").document(league_id).collection("analyses").add({
"analysis_id": analysis.id,
"created_at": firestore.SERVER_TIMESTAMP,
"user_id": analysis.user_id,
"analysis_type": analysis.type,
"confidence": analysis.confidence,
"key_insights": analysis.key_insights,
"recommendations": analysis.recommendations,
"data_sources": analysis.data_sources
})
return doc_ref[1].id
async def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using Vertex AI"""
# Prepare batch request
instances = [{"content": text} for text in texts]
# Call Vertex AI embedding endpoint
response = await self.vertex_ai.predict(
endpoint=f"projects/{settings.gcp_project_id}/locations/{settings.gcp_region}/endpoints/{settings.vertex_embedding_endpoint}",
instances=instances,
parameters={"sampleCount": len(texts)}
)
# Extract embeddings
embeddings = [instance.predictions[0]["embeddings"]["values"] for instance in response]
return embeddings
async def stream_ai_analysis(self, prompt: str) -> AsyncIterator[str]:
"""Stream AI analysis using Vertex AI"""
# Implementation for streaming Vertex AI responses
# This would use the Vertex AI streaming API
pass
# Integration in main service
class LeagueIntelligenceService(BaseService):
def __init__(self, gcp_service: GCPService = Depends(get_gcp_service)):
self.gcp_service = gcp_service
async def process_league_intelligence(self, league_data: LeagueData) -> LeagueIntelligenceReport:
# Upload documents to Cloud Storage
document_url = await self.gcp_service.upload_league_document(
league_data.id,
league_data.document_content,
f"{league_data.name}_analysis.pdf"
)
# Store analysis metadata in Firestore
analysis_id = await self.gcp_service.store_league_analysis(league_data.id, analysis_data)
# Generate embeddings for RAG
embeddings = await self.gcp_service.generate_embeddings(league_data.key_phrases)
# Perform AI analysis using embeddings
ai_analysis = await self.ai_service.analyze_with_embeddings(embeddings, league_data)
return LeagueIntelligenceReport(
document_url=document_url,
analysis_id=analysis_id,
embeddings=embeddings,
ai_analysis=ai_analysis
)Requirement 12: Development Experience
User Story: As a developer, I want excellent developer experience with hot reload, debugging, comprehensive logging, and testing support, so that I can develop and debug efficiently.
Acceptance Criteria
-
Hot Reload: WHEN running locally with
uvicorn main:app --reload, THE Backend_API SHALL support hot reloading for rapid development iteration with automatic restarts on code changes. -
Debugging Support: WHEN debugging in development mode, THE Backend_API SHALL provide detailed stack traces, variable inspection, and breakpoint support through integration with debugging tools like VS Code and PyCharm.
-
Structured Logging: WHEN logging events, errors, or metrics, THE Backend_API SHALL use structured JSON logging format with consistent fields (timestamp, level, service, user_id, trace_id, message, context) for easy parsing and analysis.
-
Testing Infrastructure: WHEN running tests, THE Backend_API SHALL provide comprehensive pytest fixtures for database setup/teardown, AI service mocking, MCP tool testing, and API endpoint testing with 80%+ code coverage.
-
Documentation Generation: WHEN the application starts, THE Backend_API SHALL auto-generate OpenAPI specifications from Pydantic models and route decorators, and provide type hints throughout for IDE autocompletion and static analysis.
Example Development Configuration:
# main.py (Development configuration)
from fastapi import FastAPI
from contextlib import asynccontextmanager
import logging
from logging.handlers import RotatingFileHandler
import uvicorn
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("Starting AltSportsLeagues backend...")
# Initialize tracing
if settings.enable_tracing:
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(OTLPSpanExporter())
)
# Initialize services
await initialize_services()
yield
# Shutdown
await shutdown_services()
logger.info("Backend shutdown complete")
app = FastAPI(
title="AltSportsLeagues API",
description="Production backend for sports league intelligence",
version="1.0.0",
docs_url="/docs" if settings.debug else None,
redoc_url="/redoc" if settings.debug else None,
openapi_url="/openapi.json" if settings.debug else None,
lifespan=lifespan
)
# Structured logging configuration
logging.basicConfig(
level=logging.DEBUG if settings.debug else logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
RotatingFileHandler('logs/backend.log', maxBytes=10*1024*1024, backupCount=5)
]
)
# Custom JSON formatter for structured logs
class StructuredJSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'service': 'backend',
'logger': record.name,
'message': record.getMessage(),
'trace_id': trace.get_current_span().get_span_context().trace_id if trace.get_current_span() else None,
'user_id': getattr(record, 'user_id', None),
'request_id': getattr(record, 'request_id', None),
'context': getattr(record, 'context', {})
}
return json.dumps(log_entry)
# Apply to all loggers
for handler in logging.getLogger().handlers:
handler.setFormatter(StructuredJSONFormatter())
# Development server
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=settings.debug,
log_level="debug" if settings.debug else "info",
access_log=True
)Requirement 13: Deployment & Containerization
User Story: As a DevOps engineer, I want containerized deployment with Docker and Cloud Run support, so that I can deploy reliably to any environment with zero downtime.
Acceptance Criteria
-
Docker Images: WHEN building the application, THE Backend_API SHALL create optimized Docker images using multi-stage builds, minimizing image size, and including only necessary dependencies for production.
-
Cloud Run Compatibility: WHEN deploying to Google Cloud Run, THE Backend_API SHALL handle Cloud Run requirements including PORT environment variable, request timeout handling, graceful shutdowns, and proper health check endpoints.
-
Horizontal Scaling: WHEN traffic increases, THE Backend_API SHALL support horizontal scaling with stateless design, shared Redis cache, and database connection pooling to handle concurrent requests efficiently.
-
Zero-Downtime Deployments: WHEN updating the application, THE Backend_API SHALL support zero-downtime deployments using blue-green deployment patterns, rolling updates, and health checks to ensure service availability.
-
Rollback Support: WHEN deployments fail or issues are detected, THE Backend_API SHALL support instant rollback to previous versions with automated database migration rollbacks and configuration restoration.
Example Dockerfile and Deployment:
# Dockerfile
FROM python:3.11-slim AS builder
# Install build dependencies
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
# Production stage
FROM python:3.11-slim
# Install runtime dependencies
RUN apt-get update && apt-get install -y \
libpq5 \
&& rm -rf /var/lib/apt/lists/*
# Create non-root user
RUN useradd --create-home --shell /bin/bash appuser
WORKDIR /app
COPY --from=builder /root/.local /home/appuser/.local
RUN chown -R appuser:appuser /app /home/appuser
# Copy application code
COPY . .
RUN chown -R appuser:appuser /app
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]# cloudbuild.yaml (Google Cloud Build)
steps:
# Build Docker image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', 'gcr.io/$PROJECT_ID/altsportsleagues-backend:$COMMIT_SHA', '.']
# Push to Container Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'gcr.io/$PROJECT_ID/altsportsleagues-backend:$COMMIT_SHA']
# Deploy to Cloud Run (blue-green deployment)
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: 'gcloud'
args:
- 'run'
- 'deploy'
- 'altsportsleagues-backend'
- '--image'
- 'gcr.io/$PROJECT_ID/altsportsleagues-backend:$COMMIT_SHA'
- '--region'
- 'us-central1'
- '--platform'
- 'managed'
- '--allow-unauthenticated'
- '--timeout'
- '300'
- '--memory'
- '1Gi'
- '--cpu'
- '2'
- '--max-instances'
- '100'
- '--min-instances'
- '1'
- '--set-env-vars'
- 'DATABASE_URL=$DATABASE_URL,AI_KEYS=$AI_KEYS'
- '--update-service-tags'
- 'production-v2' # Blue-green tag
# Trigger on push to main
options:
logging: CLOUD_LOGGING_ONLY
timeout: 1800sThis Backend API & MCP Integration Platform specification provides the complete foundation for AltSportsLeagues.ai's intelligent backend infrastructure. The implementation ensures scalability, security, developer experience, and seamless AI integration while maintaining production-grade reliability and observability.