Development
Data Layer Architecture

Data Layer Architecture

The AltSportsLeagues.ai data layer provides unified access to all data sources, schemas, and processing pipelines. It serves as the central hub for data operations across Supabase, Neo4j, Firebase, and vector databases.

Overview

Purpose

The data layer (data_layer/) provides:

  1. Unified Data Access: Single interface to multiple data sources
  2. Schema Management: Centralized schema definitions and validation
  3. Cross-Platform Utilities: Shared code for Python, TypeScript, and Next.js
  4. Triple Index System: Efficient querying across relational, graph, and vector stores
  5. Documentation Embeddings: Semantic search via FAISS/ChromaDB

Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   Data Layer API                         β”‚
β”‚                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚  β”‚   Shared   β”‚  β”‚   Schemas  β”‚  β”‚   Triple   β”‚       β”‚
β”‚  β”‚  Utilities β”‚  β”‚  Registry  β”‚  β”‚   Index    β”‚       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚               β”‚              β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”
    β”‚  Supabase   β”‚ β”‚   Neo4j    β”‚ β”‚ Firebase β”‚
    β”‚ (Relational)β”‚ β”‚  (Graph)   β”‚ β”‚(Real-time)β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                          β”‚
                   β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
                   β”‚ FAISS/      β”‚
                   β”‚ ChromaDB    β”‚
                   β”‚ (Vector)    β”‚
                   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Directory Structure

Core Directories

data_layer/
β”œβ”€β”€ shared/                  # Cross-platform utilities
β”‚   β”œβ”€β”€ builders/           # Schema builders
β”‚   β”œβ”€β”€ components/         # Reusable data components
β”‚   β”œβ”€β”€ generators/         # Data generators
β”‚   β”œβ”€β”€ python/            # Python-specific utils
β”‚   β”œβ”€β”€ typescript/        # TypeScript utils
β”‚   β”œβ”€β”€ zustand/           # State management
β”‚   β”œβ”€β”€ nextjs/            # Next.js integration
β”‚   β”œβ”€β”€ generate.py        # Main orchestrator
β”‚   └── neo4j_utils.py     # Graph database utils
β”‚
β”œβ”€β”€ schemas/               # Core schema definitions
β”‚   β”œβ”€β”€ generated/        # Auto-generated schemas
β”‚   β”‚   └── adapters/     # Schema adapters
β”‚   └── leagues.ts        # TypeScript types
β”‚
β”œβ”€β”€ output_styles/         # Domain-specific schemas
β”‚   └── schemas/
β”‚       └── domains/      # Domain schemas
β”‚           β”œβ”€β”€ leagues/  # League schemas
β”‚           β”œβ”€β”€ outputs/  # Output formats
β”‚           └── data_cubes/ # OLAP definitions
β”‚
β”œβ”€β”€ triple_index/         # Triple index system
β”œβ”€β”€ unified_prompt_retrieval/ # Prompt caching
β”œβ”€β”€ knowledge/            # Knowledge base
β”œβ”€β”€ n8n_integration/      # n8n workflows
└── docs/                 # Documentation

Database Integration

1. Supabase (Relational Data)

Purpose: Primary relational database for structured league data

Tables:

  • leagues: Main league information
  • teams: Team data
  • players: Player information
  • games: Game schedules and results
  • contracts: Partnership contracts
  • questionnaires: League questionnaires

Access Pattern:

# data_layer/shared/python/supabase_client.py
 
from supabase import create_client
from typing import Optional
 
class SupabaseClient:
    def __init__(self):
        self.client = create_client(
            SUPABASE_URL,
            SUPABASE_KEY
        )
 
    async def get_league(self, league_id: str) -> Optional[dict]:
        """Get league from Supabase."""
        result = await self.client.table("leagues") \
            .select("*") \
            .eq("id", league_id) \
            .single()
        return result.data
 
    async def upsert_league(self, league_data: dict) -> dict:
        """Upsert league data."""
        result = await self.client.table("leagues") \
            .upsert(league_data) \
            .execute()
        return result.data

TypeScript Integration:

// data_layer/shared/typescript/supabase.ts
 
import { createClient } from '@supabase/supabase-js'
 
export const supabase = createClient(
  process.env.SUPABASE_URL!,
  process.env.SUPABASE_ANON_KEY!
)
 
export async function getLeague(leagueId: string) {
  const { data, error } = await supabase
    .from('leagues')
    .select('*')
    .eq('id', leagueId)
    .single()
 
  if (error) throw error
  return data
}

2. Neo4j (Graph Database)

Purpose: Graph relationships between leagues, teams, players, and events

Node Types:

  • League: Sports leagues
  • Team: Teams within leagues
  • Player: Individual players
  • Game: Games/matches
  • Venue: Game venues
  • Event: Special events

Relationship Types:

  • BELONGS_TO: Team β†’ League
  • PLAYS_FOR: Player β†’ Team
  • COMPETES_IN: Team β†’ Game
  • HOSTED_AT: Game β†’ Venue
  • SIMILAR_TO: League β†’ League

Access Pattern:

# data_layer/shared/neo4j_utils.py
 
from neo4j import GraphDatabase
from typing import List, Dict
 
class Neo4jClient:
    def __init__(self):
        self.driver = GraphDatabase.driver(
            NEO4J_URI,
            auth=(NEO4J_USER, NEO4J_PASSWORD)
        )
 
    async def create_league_graph(self, league_data: dict):
        """Create league node and relationships."""
        async with self.driver.session() as session:
            # Create league node
            await session.run("""
                MERGE (l:League {id: $id})
                SET l.name = $name,
                    l.sport = $sport,
                    l.tier = $tier
            """, **league_data)
 
            # Create team nodes and relationships
            for team in league_data.get("teams", []):
                await session.run("""
                    MERGE (t:Team {id: $team_id})
                    SET t.name = $team_name
                    WITH t
                    MATCH (l:League {id: $league_id})
                    MERGE (t)-[:BELONGS_TO]->(l)
                """, team_id=team["id"], team_name=team["name"],
                    league_id=league_data["id"])
 
    async def find_similar_leagues(
        self,
        league_id: str,
        limit: int = 10
    ) -> List[Dict]:
        """Find similar leagues using graph traversal."""
        async with self.driver.session() as session:
            result = await session.run("""
                MATCH (l1:League {id: $league_id})-[:SIMILAR_TO]-(l2:League)
                RETURN l2
                ORDER BY l2.similarity_score DESC
                LIMIT $limit
            """, league_id=league_id, limit=limit)
 
            return [record["l2"] async for record in result]
 
    async def get_league_network(self, league_id: str) -> dict:
        """Get complete league network (teams, players, games)."""
        async with self.driver.session() as session:
            result = await session.run("""
                MATCH (l:League {id: $league_id})
                OPTIONAL MATCH (l)<-[:BELONGS_TO]-(t:Team)
                OPTIONAL MATCH (t)<-[:PLAYS_FOR]-(p:Player)
                OPTIONAL MATCH (t)-[:COMPETES_IN]->(g:Game)
                RETURN l, collect(DISTINCT t) as teams,
                       collect(DISTINCT p) as players,
                       collect(DISTINCT g) as games
            """, league_id=league_id)
 
            record = await result.single()
            return {
                "league": record["l"],
                "teams": record["teams"],
                "players": record["players"],
                "games": record["games"]
            }

3. Firebase (Real-time Data)

Purpose: Real-time updates for live data and user sessions

Collections:

  • live_games: Real-time game updates
  • user_sessions: Active user sessions
  • notifications: Push notifications
  • league_updates: League status changes

Access Pattern:

# data_layer/shared/python/firebase_client.py
 
import firebase_admin
from firebase_admin import db, firestore
 
class FirebaseClient:
    def __init__(self):
        if not firebase_admin._apps:
            firebase_admin.initialize_app()
 
        self.realtime_db = db.reference()
        self.firestore = firestore.client()
 
    async def update_live_game(self, game_id: str, updates: dict):
        """Update live game data in real-time database."""
        ref = self.realtime_db.child(f"live_games/{game_id}")
        ref.update(updates)
 
    async def subscribe_to_league_updates(
        self,
        league_id: str,
        callback
    ):
        """Subscribe to league updates."""
        ref = self.realtime_db.child(f"leagues/{league_id}")
        ref.listen(callback)
 
    async def store_user_session(self, user_id: str, session_data: dict):
        """Store user session in Firestore."""
        doc_ref = self.firestore.collection("user_sessions").document(user_id)
        doc_ref.set(session_data)

4. FAISS/ChromaDB (Vector Search)

Purpose: Semantic search for documentation and league data

Use Cases:

  • Document similarity search
  • League recommendation
  • Semantic questionnaire matching
  • Knowledge base queries

Access Pattern:

# data_layer/shared/python/vector_client.py
 
import faiss
import numpy as np
from typing import List, Tuple
 
class VectorClient:
    def __init__(self, dimension: int = 1536):
        self.dimension = dimension
        self.index = faiss.IndexFlatL2(dimension)
        self.id_map = {}
 
    async def add_embeddings(
        self,
        embeddings: np.ndarray,
        ids: List[str]
    ):
        """Add embeddings to FAISS index."""
        self.index.add(embeddings)
        for i, id in enumerate(ids):
            self.id_map[i] = id
 
    async def search(
        self,
        query_embedding: np.ndarray,
        k: int = 10
    ) -> List[Tuple[str, float]]:
        """Search for similar embeddings."""
        distances, indices = self.index.search(
            query_embedding.reshape(1, -1),
            k
        )
 
        return [
            (self.id_map[idx], float(dist))
            for idx, dist in zip(indices[0], distances[0])
        ]
 
    async def semantic_league_search(
        self,
        query: str,
        k: int = 10
    ) -> List[dict]:
        """Search leagues using semantic similarity."""
        # Generate embedding
        query_embedding = await generate_embedding(query)
 
        # Search index
        results = await self.search(query_embedding, k)
 
        # Fetch full league data
        league_ids = [id for id, _ in results]
        leagues = await fetch_leagues(league_ids)
 
        # Add similarity scores
        return [
            {
                **league,
                "similarity_score": 1 / (1 + dist)
            }
            for league, (_, dist) in zip(leagues, results)
        ]

Shared Utilities

1. Schema Generation

Main Orchestrator (data_layer/shared/generate.py):

from typing import List, Dict
from pydantic import BaseModel
 
class SchemaGenerator:
    def __init__(self):
        self.schemas: Dict[str, BaseModel] = {}
 
    def generate_league_schema(self, league_data: dict) -> BaseModel:
        """Generate Pydantic schema from league data."""
        class LeagueSchema(BaseModel):
            id: str
            name: str
            sport: str
            tier: int
            teams: List[str]
            founded: int
            region: str
 
        return LeagueSchema(**league_data)
 
    def generate_typescript_types(self, schema: BaseModel) -> str:
        """Generate TypeScript types from Pydantic schema."""
        fields = schema.model_fields
 
        ts_types = []
        for field_name, field_info in fields.items():
            ts_type = self._python_to_ts_type(field_info.annotation)
            ts_types.append(f"  {field_name}: {ts_type}")
 
        return f"interface League {{\n{chr(10).join(ts_types)}\n}}"
 
    def _python_to_ts_type(self, python_type) -> str:
        """Convert Python type to TypeScript."""
        type_map = {
            str: "string",
            int: "number",
            float: "number",
            bool: "boolean",
            list: "Array",
            dict: "Record<string, any>"
        }
        return type_map.get(python_type, "any")

Usage:

# Generate schema from league data
generator = SchemaGenerator()
league_schema = generator.generate_league_schema(league_data)
 
# Generate TypeScript types
ts_types = generator.generate_typescript_types(league_schema)
 
# Save to file
with open("schemas/leagues.ts", "w") as f:
    f.write(ts_types)

2. Zustand State Management

Store Pattern (data_layer/shared/zustand/league-store.ts):

import { create } from 'zustand'
import { persist } from 'zustand/middleware'
 
interface LeagueState {
  leagues: League[]
  selectedLeague: League | null
  loading: boolean
  error: string | null
 
  fetchLeagues: () => Promise<void>
  selectLeague: (id: string) => void
  updateLeague: (id: string, data: Partial<League>) => Promise<void>
}
 
export const useLeagueStore = create<LeagueState>()(
  persist(
    (set, get) => ({
      leagues: [],
      selectedLeague: null,
      loading: false,
      error: null,
 
      fetchLeagues: async () => {
        set({ loading: true, error: null })
        try {
          const leagues = await fetch('/api/leagues').then(r => r.json())
          set({ leagues, loading: false })
        } catch (error) {
          set({ error: error.message, loading: false })
        }
      },
 
      selectLeague: (id: string) => {
        const league = get().leagues.find(l => l.id === id)
        set({ selectedLeague: league || null })
      },
 
      updateLeague: async (id: string, data: Partial<League>) => {
        set({ loading: true })
        try {
          const updated = await fetch(`/api/leagues/${id}`, {
            method: 'PATCH',
            body: JSON.stringify(data)
          }).then(r => r.json())
 
          set(state => ({
            leagues: state.leagues.map(l =>
              l.id === id ? { ...l, ...updated } : l
            ),
            loading: false
          }))
        } catch (error) {
          set({ error: error.message, loading: false })
        }
      }
    }),
    {
      name: 'league-store'
    }
  )
)

3. Next.js Integration

Server Component (data_layer/shared/nextjs/league-server.tsx):

import { Suspense } from 'react'
import { getLeague } from '@/data_layer/shared/typescript/supabase'
 
export async function LeagueDetail({ leagueId }: { leagueId: string }) {
  const league = await getLeague(leagueId)
 
  return (
    <div className="league-detail">
      <h1>{league.name}</h1>
      <p>Sport: {league.sport}</p>
      <p>Tier: {league.tier}</p>
 
      <Suspense fallback={<div>Loading teams...</div>}>
        <LeagueTeams leagueId={leagueId} />
      </Suspense>
    </div>
  )
}

Client Component (data_layer/shared/nextjs/league-client.tsx):

'use client'
 
import { useLeagueStore } from '@/data_layer/shared/zustand/league-store'
import { useEffect } from 'react'
 
export function LeagueList() {
  const { leagues, loading, fetchLeagues, selectLeague } = useLeagueStore()
 
  useEffect(() => {
    fetchLeagues()
  }, [fetchLeagues])
 
  if (loading) return <div>Loading...</div>
 
  return (
    <div className="league-list">
      {leagues.map(league => (
        <div
          key={league.id}
          onClick={() => selectLeague(league.id)}
          className="league-card"
        >
          <h3>{league.name}</h3>
          <p>{league.sport}</p>
        </div>
      ))}
    </div>
  )
}

Triple Index System

Overview

The triple index system enables efficient querying across relational, graph, and vector data stores.

Architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚            Triple Index Coordinator             β”‚
β”‚                                                 β”‚
β”‚  Query β†’ Parse β†’ Route β†’ Execute β†’ Merge       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚              β”‚              β”‚
    β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”    β”Œβ”€β”€β”€β–Όβ”€β”€β”€β”€β”    β”Œβ”€β”€β”€β–Όβ”€β”€β”€β”€β”
    β”‚Supabase β”‚    β”‚ Neo4j  β”‚    β”‚ FAISS  β”‚
    β”‚ Index   β”‚    β”‚ Index  β”‚    β”‚ Index  β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Implementation (data_layer/triple_index/coordinator.py):

from typing import List, Dict, Any
 
class TripleIndexCoordinator:
    def __init__(self):
        self.supabase_client = SupabaseClient()
        self.neo4j_client = Neo4jClient()
        self.vector_client = VectorClient()
 
    async def unified_query(
        self,
        query: str,
        query_type: str = "semantic",
        filters: Dict[str, Any] = None
    ) -> List[Dict]:
        """Execute unified query across all data stores."""
 
        if query_type == "semantic":
            # Semantic search via FAISS
            vector_results = await self.vector_client.semantic_league_search(
                query, k=50
            )
            league_ids = [r["id"] for r in vector_results]
 
        elif query_type == "graph":
            # Graph traversal via Neo4j
            graph_results = await self.neo4j_client.graph_query(query)
            league_ids = [r["id"] for r in graph_results]
 
        else:
            # SQL query via Supabase
            sql_results = await self.supabase_client.query(query, filters)
            league_ids = [r["id"] for r in sql_results]
 
        # Enrich with data from all sources
        enriched_results = await self._enrich_results(league_ids)
 
        return enriched_results
 
    async def _enrich_results(self, league_ids: List[str]) -> List[Dict]:
        """Enrich results with data from all sources."""
        results = []
 
        for league_id in league_ids:
            # Get relational data
            supabase_data = await self.supabase_client.get_league(league_id)
 
            # Get graph data
            graph_data = await self.neo4j_client.get_league_network(league_id)
 
            # Combine
            results.append({
                **supabase_data,
                "network": graph_data,
                "source": "triple_index"
            })
 
        return results

Data Processing Pipelines

Pipeline 1: League Onboarding

Complete Data Flow:

Email β†’ Extract β†’ Validate β†’ Store β†’ Index β†’ Notify
   β”‚        β”‚         β”‚         β”‚        β”‚        β”‚
   β”‚        β”‚         β”‚      Supabase  Neo4j  Firebase
   β”‚        β”‚         β”‚         β”‚        β”‚        β”‚
   β”‚        β”‚    Pydantic    Vector   Graph   Real-time
   β”‚        β”‚    Validation  Index    Index   Updates
   β”‚     Document
   β”‚     Processing
  Gmail
  Attachment

Implementation:

async def onboard_league_pipeline(
    questionnaire_pdf: bytes,
    email_metadata: dict
) -> dict:
    """Complete league onboarding pipeline."""
 
    # 1. Extract data from PDF
    extracted_data = await extract_questionnaire_data(questionnaire_pdf)
 
    # 2. Validate with Pydantic schema
    validated_data = LeagueQuestionnaireSchema(**extracted_data)
 
    # 3. Store in Supabase
    league_id = await supabase_client.upsert_league(validated_data.dict())
 
    # 4. Create Neo4j graph
    await neo4j_client.create_league_graph(validated_data.dict())
 
    # 5. Generate embeddings and index
    embedding = await generate_embedding(validated_data.description)
    await vector_client.add_embeddings(
        np.array([embedding]),
        [league_id]
    )
 
    # 6. Notify via Firebase
    await firebase_client.update_live_game(league_id, {
        "status": "onboarded",
        "timestamp": datetime.now().isoformat()
    })
 
    # 7. Trigger n8n workflow
    await trigger_n8n_workflow(
        "league-onboarding-complete",
        {"league_id": league_id}
    )
 
    return {
        "league_id": league_id,
        "status": "onboarded",
        "databases_updated": ["supabase", "neo4j", "faiss", "firebase"]
    }

Best Practices

1. Data Consistency

Strategy: Use Supabase as source of truth, sync to other databases

async def ensure_data_consistency(league_id: str):
    """Ensure data is consistent across all databases."""
 
    # 1. Get source of truth from Supabase
    source_data = await supabase_client.get_league(league_id)
 
    # 2. Update Neo4j
    await neo4j_client.update_league(league_id, source_data)
 
    # 3. Update Firebase
    await firebase_client.update_league(league_id, source_data)
 
    # 4. Update vector embeddings
    embedding = await generate_embedding(source_data["description"])
    await vector_client.update_embedding(league_id, embedding)

2. Error Handling

from tenacity import retry, stop_after_attempt, wait_exponential
 
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def resilient_data_operation(operation, *args, **kwargs):
    """Execute data operation with retry logic."""
    try:
        result = await operation(*args, **kwargs)
        return result
    except Exception as e:
        logger.error(f"Data operation failed: {e}")
        raise

3. Caching

from functools import lru_cache
from datetime import datetime, timedelta
 
class CachedDataLayer:
    def __init__(self):
        self.cache = {}
        self.cache_ttl = timedelta(minutes=5)
 
    async def get_league_cached(self, league_id: str) -> dict:
        """Get league with caching."""
        cache_key = f"league:{league_id}"
 
        if cache_key in self.cache:
            cached_data, cached_time = self.cache[cache_key]
            if datetime.now() - cached_time < self.cache_ttl:
                return cached_data
 
        # Fetch fresh data
        data = await self._fetch_league(league_id)
        self.cache[cache_key] = (data, datetime.now())
 
        return data

Additional Resources

Support

For data layer assistance:

  1. Review data_layer/shared/ utilities
  2. Consult data architecture agents: @data-architect
  3. Test database connections: /database:etl:database.etl.supabase.health-check
  4. Use triple index for complex queries

Platform

Documentation

Community

Support

partnership@altsportsdata.comdev@altsportsleagues.ai

2025 Β© AltSportsLeagues.ai. Powered by AI-driven sports business intelligence.

πŸ€– AI-Enhancedβ€’πŸ“Š Data-Drivenβ€’βš‘ Real-Time