Data Layer Architecture
The AltSportsLeagues.ai data layer provides unified access to all data sources, schemas, and processing pipelines. It serves as the central hub for data operations across Supabase, Neo4j, Firebase, and vector databases.
Overview
Purpose
The data layer (data_layer/) provides:
- Unified Data Access: Single interface to multiple data sources
- Schema Management: Centralized schema definitions and validation
- Cross-Platform Utilities: Shared code for Python, TypeScript, and Next.js
- Triple Index System: Efficient querying across relational, graph, and vector stores
- Documentation Embeddings: Semantic search via FAISS/ChromaDB
Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Layer API β
β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ β
β β Shared β β Schemas β β Triple β β
β β Utilities β β Registry β β Index β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ β
ββββββββββββ¬ββββββββββββββββ¬βββββββββββββββ¬βββββββββββββββ
β β β
ββββββββΌβββββββ βββββββΌβββββββ ββββββΌββββββ
β Supabase β β Neo4j β β Firebase β
β (Relational)β β (Graph) β β(Real-time)β
βββββββββββββββ ββββββββββββββ ββββββββββββ
β
ββββββββΌβββββββ
β FAISS/ β
β ChromaDB β
β (Vector) β
βββββββββββββββDirectory Structure
Core Directories
data_layer/
βββ shared/ # Cross-platform utilities
β βββ builders/ # Schema builders
β βββ components/ # Reusable data components
β βββ generators/ # Data generators
β βββ python/ # Python-specific utils
β βββ typescript/ # TypeScript utils
β βββ zustand/ # State management
β βββ nextjs/ # Next.js integration
β βββ generate.py # Main orchestrator
β βββ neo4j_utils.py # Graph database utils
β
βββ schemas/ # Core schema definitions
β βββ generated/ # Auto-generated schemas
β β βββ adapters/ # Schema adapters
β βββ leagues.ts # TypeScript types
β
βββ output_styles/ # Domain-specific schemas
β βββ schemas/
β βββ domains/ # Domain schemas
β βββ leagues/ # League schemas
β βββ outputs/ # Output formats
β βββ data_cubes/ # OLAP definitions
β
βββ triple_index/ # Triple index system
βββ unified_prompt_retrieval/ # Prompt caching
βββ knowledge/ # Knowledge base
βββ n8n_integration/ # n8n workflows
βββ docs/ # DocumentationDatabase Integration
1. Supabase (Relational Data)
Purpose: Primary relational database for structured league data
Tables:
leagues: Main league informationteams: Team dataplayers: Player informationgames: Game schedules and resultscontracts: Partnership contractsquestionnaires: League questionnaires
Access Pattern:
# data_layer/shared/python/supabase_client.py
from supabase import create_client
from typing import Optional
class SupabaseClient:
def __init__(self):
self.client = create_client(
SUPABASE_URL,
SUPABASE_KEY
)
async def get_league(self, league_id: str) -> Optional[dict]:
"""Get league from Supabase."""
result = await self.client.table("leagues") \
.select("*") \
.eq("id", league_id) \
.single()
return result.data
async def upsert_league(self, league_data: dict) -> dict:
"""Upsert league data."""
result = await self.client.table("leagues") \
.upsert(league_data) \
.execute()
return result.dataTypeScript Integration:
// data_layer/shared/typescript/supabase.ts
import { createClient } from '@supabase/supabase-js'
export const supabase = createClient(
process.env.SUPABASE_URL!,
process.env.SUPABASE_ANON_KEY!
)
export async function getLeague(leagueId: string) {
const { data, error } = await supabase
.from('leagues')
.select('*')
.eq('id', leagueId)
.single()
if (error) throw error
return data
}2. Neo4j (Graph Database)
Purpose: Graph relationships between leagues, teams, players, and events
Node Types:
League: Sports leaguesTeam: Teams within leaguesPlayer: Individual playersGame: Games/matchesVenue: Game venuesEvent: Special events
Relationship Types:
BELONGS_TO: Team β LeaguePLAYS_FOR: Player β TeamCOMPETES_IN: Team β GameHOSTED_AT: Game β VenueSIMILAR_TO: League β League
Access Pattern:
# data_layer/shared/neo4j_utils.py
from neo4j import GraphDatabase
from typing import List, Dict
class Neo4jClient:
def __init__(self):
self.driver = GraphDatabase.driver(
NEO4J_URI,
auth=(NEO4J_USER, NEO4J_PASSWORD)
)
async def create_league_graph(self, league_data: dict):
"""Create league node and relationships."""
async with self.driver.session() as session:
# Create league node
await session.run("""
MERGE (l:League {id: $id})
SET l.name = $name,
l.sport = $sport,
l.tier = $tier
""", **league_data)
# Create team nodes and relationships
for team in league_data.get("teams", []):
await session.run("""
MERGE (t:Team {id: $team_id})
SET t.name = $team_name
WITH t
MATCH (l:League {id: $league_id})
MERGE (t)-[:BELONGS_TO]->(l)
""", team_id=team["id"], team_name=team["name"],
league_id=league_data["id"])
async def find_similar_leagues(
self,
league_id: str,
limit: int = 10
) -> List[Dict]:
"""Find similar leagues using graph traversal."""
async with self.driver.session() as session:
result = await session.run("""
MATCH (l1:League {id: $league_id})-[:SIMILAR_TO]-(l2:League)
RETURN l2
ORDER BY l2.similarity_score DESC
LIMIT $limit
""", league_id=league_id, limit=limit)
return [record["l2"] async for record in result]
async def get_league_network(self, league_id: str) -> dict:
"""Get complete league network (teams, players, games)."""
async with self.driver.session() as session:
result = await session.run("""
MATCH (l:League {id: $league_id})
OPTIONAL MATCH (l)<-[:BELONGS_TO]-(t:Team)
OPTIONAL MATCH (t)<-[:PLAYS_FOR]-(p:Player)
OPTIONAL MATCH (t)-[:COMPETES_IN]->(g:Game)
RETURN l, collect(DISTINCT t) as teams,
collect(DISTINCT p) as players,
collect(DISTINCT g) as games
""", league_id=league_id)
record = await result.single()
return {
"league": record["l"],
"teams": record["teams"],
"players": record["players"],
"games": record["games"]
}3. Firebase (Real-time Data)
Purpose: Real-time updates for live data and user sessions
Collections:
live_games: Real-time game updatesuser_sessions: Active user sessionsnotifications: Push notificationsleague_updates: League status changes
Access Pattern:
# data_layer/shared/python/firebase_client.py
import firebase_admin
from firebase_admin import db, firestore
class FirebaseClient:
def __init__(self):
if not firebase_admin._apps:
firebase_admin.initialize_app()
self.realtime_db = db.reference()
self.firestore = firestore.client()
async def update_live_game(self, game_id: str, updates: dict):
"""Update live game data in real-time database."""
ref = self.realtime_db.child(f"live_games/{game_id}")
ref.update(updates)
async def subscribe_to_league_updates(
self,
league_id: str,
callback
):
"""Subscribe to league updates."""
ref = self.realtime_db.child(f"leagues/{league_id}")
ref.listen(callback)
async def store_user_session(self, user_id: str, session_data: dict):
"""Store user session in Firestore."""
doc_ref = self.firestore.collection("user_sessions").document(user_id)
doc_ref.set(session_data)4. FAISS/ChromaDB (Vector Search)
Purpose: Semantic search for documentation and league data
Use Cases:
- Document similarity search
- League recommendation
- Semantic questionnaire matching
- Knowledge base queries
Access Pattern:
# data_layer/shared/python/vector_client.py
import faiss
import numpy as np
from typing import List, Tuple
class VectorClient:
def __init__(self, dimension: int = 1536):
self.dimension = dimension
self.index = faiss.IndexFlatL2(dimension)
self.id_map = {}
async def add_embeddings(
self,
embeddings: np.ndarray,
ids: List[str]
):
"""Add embeddings to FAISS index."""
self.index.add(embeddings)
for i, id in enumerate(ids):
self.id_map[i] = id
async def search(
self,
query_embedding: np.ndarray,
k: int = 10
) -> List[Tuple[str, float]]:
"""Search for similar embeddings."""
distances, indices = self.index.search(
query_embedding.reshape(1, -1),
k
)
return [
(self.id_map[idx], float(dist))
for idx, dist in zip(indices[0], distances[0])
]
async def semantic_league_search(
self,
query: str,
k: int = 10
) -> List[dict]:
"""Search leagues using semantic similarity."""
# Generate embedding
query_embedding = await generate_embedding(query)
# Search index
results = await self.search(query_embedding, k)
# Fetch full league data
league_ids = [id for id, _ in results]
leagues = await fetch_leagues(league_ids)
# Add similarity scores
return [
{
**league,
"similarity_score": 1 / (1 + dist)
}
for league, (_, dist) in zip(leagues, results)
]Shared Utilities
1. Schema Generation
Main Orchestrator (data_layer/shared/generate.py):
from typing import List, Dict
from pydantic import BaseModel
class SchemaGenerator:
def __init__(self):
self.schemas: Dict[str, BaseModel] = {}
def generate_league_schema(self, league_data: dict) -> BaseModel:
"""Generate Pydantic schema from league data."""
class LeagueSchema(BaseModel):
id: str
name: str
sport: str
tier: int
teams: List[str]
founded: int
region: str
return LeagueSchema(**league_data)
def generate_typescript_types(self, schema: BaseModel) -> str:
"""Generate TypeScript types from Pydantic schema."""
fields = schema.model_fields
ts_types = []
for field_name, field_info in fields.items():
ts_type = self._python_to_ts_type(field_info.annotation)
ts_types.append(f" {field_name}: {ts_type}")
return f"interface League {{\n{chr(10).join(ts_types)}\n}}"
def _python_to_ts_type(self, python_type) -> str:
"""Convert Python type to TypeScript."""
type_map = {
str: "string",
int: "number",
float: "number",
bool: "boolean",
list: "Array",
dict: "Record<string, any>"
}
return type_map.get(python_type, "any")Usage:
# Generate schema from league data
generator = SchemaGenerator()
league_schema = generator.generate_league_schema(league_data)
# Generate TypeScript types
ts_types = generator.generate_typescript_types(league_schema)
# Save to file
with open("schemas/leagues.ts", "w") as f:
f.write(ts_types)2. Zustand State Management
Store Pattern (data_layer/shared/zustand/league-store.ts):
import { create } from 'zustand'
import { persist } from 'zustand/middleware'
interface LeagueState {
leagues: League[]
selectedLeague: League | null
loading: boolean
error: string | null
fetchLeagues: () => Promise<void>
selectLeague: (id: string) => void
updateLeague: (id: string, data: Partial<League>) => Promise<void>
}
export const useLeagueStore = create<LeagueState>()(
persist(
(set, get) => ({
leagues: [],
selectedLeague: null,
loading: false,
error: null,
fetchLeagues: async () => {
set({ loading: true, error: null })
try {
const leagues = await fetch('/api/leagues').then(r => r.json())
set({ leagues, loading: false })
} catch (error) {
set({ error: error.message, loading: false })
}
},
selectLeague: (id: string) => {
const league = get().leagues.find(l => l.id === id)
set({ selectedLeague: league || null })
},
updateLeague: async (id: string, data: Partial<League>) => {
set({ loading: true })
try {
const updated = await fetch(`/api/leagues/${id}`, {
method: 'PATCH',
body: JSON.stringify(data)
}).then(r => r.json())
set(state => ({
leagues: state.leagues.map(l =>
l.id === id ? { ...l, ...updated } : l
),
loading: false
}))
} catch (error) {
set({ error: error.message, loading: false })
}
}
}),
{
name: 'league-store'
}
)
)3. Next.js Integration
Server Component (data_layer/shared/nextjs/league-server.tsx):
import { Suspense } from 'react'
import { getLeague } from '@/data_layer/shared/typescript/supabase'
export async function LeagueDetail({ leagueId }: { leagueId: string }) {
const league = await getLeague(leagueId)
return (
<div className="league-detail">
<h1>{league.name}</h1>
<p>Sport: {league.sport}</p>
<p>Tier: {league.tier}</p>
<Suspense fallback={<div>Loading teams...</div>}>
<LeagueTeams leagueId={leagueId} />
</Suspense>
</div>
)
}Client Component (data_layer/shared/nextjs/league-client.tsx):
'use client'
import { useLeagueStore } from '@/data_layer/shared/zustand/league-store'
import { useEffect } from 'react'
export function LeagueList() {
const { leagues, loading, fetchLeagues, selectLeague } = useLeagueStore()
useEffect(() => {
fetchLeagues()
}, [fetchLeagues])
if (loading) return <div>Loading...</div>
return (
<div className="league-list">
{leagues.map(league => (
<div
key={league.id}
onClick={() => selectLeague(league.id)}
className="league-card"
>
<h3>{league.name}</h3>
<p>{league.sport}</p>
</div>
))}
</div>
)
}Triple Index System
Overview
The triple index system enables efficient querying across relational, graph, and vector data stores.
Architecture:
ββββββββββββββββββββββββββββββββββββββββββββββββββ
β Triple Index Coordinator β
β β
β Query β Parse β Route β Execute β Merge β
ββββββββββ¬βββββββββββββββ¬βββββββββββββββ¬ββββββββββ
β β β
ββββββΌβββββ βββββΌβββββ βββββΌβββββ
βSupabase β β Neo4j β β FAISS β
β Index β β Index β β Index β
βββββββββββ ββββββββββ ββββββββββImplementation (data_layer/triple_index/coordinator.py):
from typing import List, Dict, Any
class TripleIndexCoordinator:
def __init__(self):
self.supabase_client = SupabaseClient()
self.neo4j_client = Neo4jClient()
self.vector_client = VectorClient()
async def unified_query(
self,
query: str,
query_type: str = "semantic",
filters: Dict[str, Any] = None
) -> List[Dict]:
"""Execute unified query across all data stores."""
if query_type == "semantic":
# Semantic search via FAISS
vector_results = await self.vector_client.semantic_league_search(
query, k=50
)
league_ids = [r["id"] for r in vector_results]
elif query_type == "graph":
# Graph traversal via Neo4j
graph_results = await self.neo4j_client.graph_query(query)
league_ids = [r["id"] for r in graph_results]
else:
# SQL query via Supabase
sql_results = await self.supabase_client.query(query, filters)
league_ids = [r["id"] for r in sql_results]
# Enrich with data from all sources
enriched_results = await self._enrich_results(league_ids)
return enriched_results
async def _enrich_results(self, league_ids: List[str]) -> List[Dict]:
"""Enrich results with data from all sources."""
results = []
for league_id in league_ids:
# Get relational data
supabase_data = await self.supabase_client.get_league(league_id)
# Get graph data
graph_data = await self.neo4j_client.get_league_network(league_id)
# Combine
results.append({
**supabase_data,
"network": graph_data,
"source": "triple_index"
})
return resultsData Processing Pipelines
Pipeline 1: League Onboarding
Complete Data Flow:
Email β Extract β Validate β Store β Index β Notify
β β β β β β
β β β Supabase Neo4j Firebase
β β β β β β
β β Pydantic Vector Graph Real-time
β β Validation Index Index Updates
β Document
β Processing
Gmail
AttachmentImplementation:
async def onboard_league_pipeline(
questionnaire_pdf: bytes,
email_metadata: dict
) -> dict:
"""Complete league onboarding pipeline."""
# 1. Extract data from PDF
extracted_data = await extract_questionnaire_data(questionnaire_pdf)
# 2. Validate with Pydantic schema
validated_data = LeagueQuestionnaireSchema(**extracted_data)
# 3. Store in Supabase
league_id = await supabase_client.upsert_league(validated_data.dict())
# 4. Create Neo4j graph
await neo4j_client.create_league_graph(validated_data.dict())
# 5. Generate embeddings and index
embedding = await generate_embedding(validated_data.description)
await vector_client.add_embeddings(
np.array([embedding]),
[league_id]
)
# 6. Notify via Firebase
await firebase_client.update_live_game(league_id, {
"status": "onboarded",
"timestamp": datetime.now().isoformat()
})
# 7. Trigger n8n workflow
await trigger_n8n_workflow(
"league-onboarding-complete",
{"league_id": league_id}
)
return {
"league_id": league_id,
"status": "onboarded",
"databases_updated": ["supabase", "neo4j", "faiss", "firebase"]
}Best Practices
1. Data Consistency
Strategy: Use Supabase as source of truth, sync to other databases
async def ensure_data_consistency(league_id: str):
"""Ensure data is consistent across all databases."""
# 1. Get source of truth from Supabase
source_data = await supabase_client.get_league(league_id)
# 2. Update Neo4j
await neo4j_client.update_league(league_id, source_data)
# 3. Update Firebase
await firebase_client.update_league(league_id, source_data)
# 4. Update vector embeddings
embedding = await generate_embedding(source_data["description"])
await vector_client.update_embedding(league_id, embedding)2. Error Handling
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def resilient_data_operation(operation, *args, **kwargs):
"""Execute data operation with retry logic."""
try:
result = await operation(*args, **kwargs)
return result
except Exception as e:
logger.error(f"Data operation failed: {e}")
raise3. Caching
from functools import lru_cache
from datetime import datetime, timedelta
class CachedDataLayer:
def __init__(self):
self.cache = {}
self.cache_ttl = timedelta(minutes=5)
async def get_league_cached(self, league_id: str) -> dict:
"""Get league with caching."""
cache_key = f"league:{league_id}"
if cache_key in self.cache:
cached_data, cached_time = self.cache[cache_key]
if datetime.now() - cached_time < self.cache_ttl:
return cached_data
# Fetch fresh data
data = await self._fetch_league(league_id)
self.cache[cache_key] = (data, datetime.now())
return dataAdditional Resources
Support
For data layer assistance:
- Review
data_layer/shared/utilities - Consult data architecture agents:
@data-architect - Test database connections:
/database:etl:database.etl.supabase.health-check - Use triple index for complex queries