Architecture
Migration Guide: Generation → Retrieval-First Architecture

Source: data_layer/docs/MIGRATION_GUIDE.md

Migration Guide: Generation → Retrieval-First Architecture

This guide helps you convert existing generation-based code to the retrieval-first architecture.

Table of Contents

  1. Quick Reference
  2. Identifying Generation Points
  3. Step-by-Step Migration
  4. Common Patterns
  5. Gradual Migration Strategy
  6. Testing & Validation

Quick Reference

Before & After

# ❌ BEFORE: Generation-based
def create_contract(league_data):
    prompt = build_contract_prompt(league_data)
    contract = llm.generate(prompt)
    return contract
 
# ✅ AFTER: Retrieval-first
async def create_contract(league_data):
    similar = await vector_index.search(
        query=f"{league_data.tier} {league_data.sport} league",
        filters={"tier": league_data.tier, "sport": league_data.sport}
    )
    contract = composer.assemble(similar[0], league_data)
    await store_for_future_retrieval(contract, league_data)
    return contract

Identifying Generation Points

1. Find LLM Calls

Search your codebase for:

# Common generation patterns
grep -r "openai\\.chat\\.completions\\.create" .
grep -r "llm\\.generate" .
grep -r "llm\\.invoke" .
grep -r "ChatOpenAI" .
grep -r "create_completion" .

2. Identify High-Value Targets

Prioritize converting these generation points:

PriorityTypeReason
HighContract generationSlow, expensive, frequently repeated
HighResponse templatesHigh volume, similar patterns
MediumPrompt buildingModerate cost, cacheable
LowCustom clausesTruly unique, hard to retrieve

3. Audit Existing Content

What successful outputs do you already have?

# Find contract examples
find . -name "*contract*.json" -o -name "*contract*.md"
 
# Find response templates
find . -name "*response*.json" -o -name "*template*.json"
 
# Find prompt examples
find . -name "*prompt*.txt" -o -name "*prompt*.md"

Step-by-Step Migration

Step 1: Set Up Retrieval Infrastructure

# database/knowledge/setup.py
import asyncio
from pathlib import Path
from knowledge.embeddings import EmbeddingService, VectorIndex, EmbeddingConfig
from knowledge.index import TripleStore
 
async def setup_retrieval_system():
    """Initialize retrieval infrastructure"""
 
    # 1. Configure embedding service
    config = EmbeddingConfig.production(api_key="your-key")
    embedding_service = EmbeddingService(config)
    await embedding_service.initialize()
 
    # 2. Initialize vector index
    vector_index = VectorIndex(
        embedding_service,
        backend="chroma",
        persist_directory=Path("./knowledge/storage/vectors")
    )
    await vector_index.initialize()
 
    # 3. Initialize triple store
    triple_store = TripleStore(
        storage_path=Path("./knowledge/storage/triples")
    )
 
    return embedding_service, vector_index, triple_store

Step 2: Import Existing Successful Outputs

# database/scripts/import_existing_contracts.py
import asyncio
import json
from pathlib import Path
from knowledge.embeddings import EmbeddingService, VectorIndex
from knowledge.index import TripleStore, Entity, EntityType
 
async def import_existing_contracts():
    """Import existing contracts into retrieval system"""
 
    embedding_service, vector_index, triple_store = await setup_retrieval_system()
 
    # Load existing contracts
    contracts_dir = Path("./output-styles/league_questionnaire_to_contract/examples")
    contracts = []
 
    for contract_file in contracts_dir.glob("*.json"):
        with open(contract_file) as f:
            contract_data = json.load(f)
            contracts.append(contract_data)
 
    print(f"Found {len(contracts)} existing contracts")
 
    # Import each contract
    for i, contract in enumerate(contracts, 1):
        # Extract key fields
        content = contract.get('content') or contract.get('text')
        metadata = {
            'tier': contract.get('tier'),
            'sport': contract.get('sport'),
            'source': 'migration',
            'imported_at': datetime.now().isoformat()
        }
 
        # Generate embedding
        embedding = await embedding_service.embed(content)
 
        # Create entity
        entity = Entity(
            id=f"contract_migrated_{i:03d}",
            type=EntityType.CONTRACT,
            name=f"Migrated Contract {i}",
            content_ref=str(contract_file),
            embedding_ref=f"emb_migrated_{i:03d}",
            metadata=metadata
        )
        triple_store.add_entity(entity)
 
        # Add to vector index
        await vector_index.add(
            texts=[content],
            ids=[entity.id],
            metadatas=[metadata],
            embeddings=[embedding.vector]
        )
 
        print(f"✓ Imported {i}/{len(contracts)}: {contract_file.name}")
 
    print(f"\n✓ Successfully imported {len(contracts)} contracts")
 
if __name__ == "__main__":
    asyncio.run(import_existing_contracts())

Step 3: Create Retrieval Wrapper

# database/knowledge/retrieval_wrapper.py
from typing import Optional, Dict, Any
from .embeddings import VectorIndex
from .index import TripleStore
 
class RetrievalWrapper:
    """
    Wrapper for easy retrieval operations
 
    Provides a simple interface for common retrieval patterns.
    """
 
    def __init__(self, vector_index: VectorIndex, triple_store: TripleStore):
        self.vector_index = vector_index
        self.triple_store = triple_store
 
    async def retrieve_similar(
        self,
        query: str,
        entity_type: Optional[str] = None,
        filters: Optional[Dict[str, Any]] = None,
        limit: int = 3
    ):
        """Retrieve similar content"""
        return await self.vector_index.search(
            query=query,
            filters=filters,
            limit=limit,
            min_score=0.7
        )
 
    async def get_best_match(self, query: str, filters: Optional[Dict] = None):
        """Get single best match"""
        results = await self.retrieve_similar(query, filters=filters, limit=1)
        return results[0] if results else None
 
    async def store_with_metadata(
        self,
        content: str,
        entity_type: EntityType,
        metadata: Dict[str, Any]
    ):
        """Store new content with automatic embedding"""
        # Implementation here
        pass

Step 4: Convert Generation Function

Original Function:

# OLD: ops/contract_builders/generate_contract.py
def generate_contract(league_data: Dict) -> str:
    """Generate a contract from scratch"""
 
    # Build prompt (slow)
    prompt = build_detailed_prompt(league_data)
 
    # Call LLM (very slow)
    contract = openai.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.7
    ).choices[0].message.content
 
    return contract

Migrated Function:

# NEW: ops/contract_builders/retrieve_contract.py
async def retrieve_and_compose_contract(
    league_data: Dict,
    retrieval: RetrievalWrapper
) -> str:
    """Retrieve similar contract and adapt"""
 
    # Build semantic query
    query = f"{league_data['tier']} tier {league_data['sport']} league with {league_data['teams']} teams"
 
    # Retrieve similar (fast)
    similar = await retrieval.retrieve_similar(
        query=query,
        filters={
            'tier': league_data['tier'],
            'sport': league_data['sport']
        },
        limit=3
    )
 
    if not similar:
        # Fallback to generation (rare)
        return await generate_contract_fallback(league_data)
 
    # Use best match as base
    base_contract = similar[0].text
 
    # Compose with customizations (fast)
    contract = compose_contract(
        base=base_contract,
        customizations=league_data,
        generate_custom_clauses_only=True  # Minimal LLM use
    )
 
    # Store for future retrieval
    await store_successful_contract(contract, league_data, retrieval)
 
    return contract

Step 5: Add Feedback Loop

# knowledge/feedback.py
async def store_successful_contract(
    contract: str,
    league_data: Dict,
    retrieval: RetrievalWrapper
):
    """Store successful contract with feedback"""
 
    entity_id = f"contract_{league_data['sport']}_{generate_id()}"
 
    # Generate embedding
    embedding = await retrieval.vector_index.embedding_service.embed(contract)
 
    # Create entity
    entity = Entity(
        id=entity_id,
        type=EntityType.CONTRACT,
        name=f"{league_data['tier']} {league_data['sport']} Contract",
        content_ref=f"storage/contracts/{entity_id}.txt",
        embedding_ref=f"emb_{entity_id}",
        metadata={
            **league_data,
            'quality_score': 0.9,  # Will be updated based on feedback
            'usage_count': 0,
            'success_rate': None,
            'created_at': datetime.now().isoformat()
        }
    )
    retrieval.triple_store.add_entity(entity)
 
    # Add to vector index
    await retrieval.vector_index.add(
        texts=[contract],
        ids=[entity_id],
        metadatas=[entity.metadata],
        embeddings=[embedding.vector]
    )
 
    # Save content to file
    content_path = Path(entity.content_ref)
    content_path.parent.mkdir(parents=True, exist_ok=True)
    content_path.write_text(contract)
 
    return entity

Common Patterns

Pattern 1: Simple Replacement

# BEFORE
def get_response_template(category: str) -> str:
    prompt = f"Generate a {category} response template"
    return llm.generate(prompt)
 
# AFTER
async def get_response_template(category: str, retrieval: RetrievalWrapper) -> str:
    result = await retrieval.get_best_match(
        query=f"{category} response",
        filters={'category': category}
    )
    return result.text if result else await fallback_generate(category)

Pattern 2: Composition

# BEFORE
def build_onboarding_email(league_name: str) -> str:
    prompt = f"Write an onboarding email for {league_name}"
    return llm.generate(prompt)
 
# AFTER
async def build_onboarding_email(
    league_name: str,
    tier: str,
    retrieval: RetrievalWrapper
) -> str:
    # Get similar emails
    similar = await retrieval.retrieve_similar(
        query=f"{tier} onboarding email",
        filters={'type': 'onboarding'},
        limit=1
    )
 
    if similar:
        # Adapt template
        template = similar[0].text
        email = template.replace('{{league_name}}', league_name)
    else:
        # Generate if no template found
        email = await generate_onboarding_email(league_name, tier)
 
    return email

Pattern 3: Hybrid (Retrieve + Generate)

# Use retrieval for structure, generation for specifics
async def create_custom_contract(
    league_data: Dict,
    special_clauses: List[str],
    retrieval: RetrievalWrapper
) -> str:
    # Retrieve base structure (fast)
    base = await retrieval.get_best_match(
        query=f"{league_data['tier']} {league_data['sport']} contract"
    )
 
    # Generate only custom parts (slower but necessary)
    custom_sections = await generate_custom_clauses(special_clauses)
 
    # Compose final contract
    return compose_with_custom_clauses(base.text, custom_sections)

Gradual Migration Strategy

Phase 1: Read-Only Retrieval (Week 1-2)

  1. Set up retrieval infrastructure
  2. Import existing content
  3. Add retrieval alongside generation (A/B testing)
  4. Measure performance improvements
async def generate_with_retrieval_fallback(league_data, use_retrieval=True):
    if use_retrieval:
        try:
            return await retrieve_and_compose(league_data)
        except Exception as e:
            logger.warning(f"Retrieval failed: {e}, falling back to generation")
 
    return await generate_from_scratch(league_data)

Phase 2: Retrieval-First (Week 3-4)

  1. Make retrieval the default
  2. Keep generation as fallback
  3. Add feedback loops
  4. Monitor quality metrics

Phase 3: Full Migration (Week 5+)

  1. Remove generation code for high-success retrievals
  2. Keep generation only for truly custom content
  3. Optimize retrieval performance
  4. Continuous learning from feedback

Testing & Validation

Unit Tests

# tests/test_retrieval_migration.py
import pytest
from knowledge.retrieval_wrapper import RetrievalWrapper
 
@pytest.mark.asyncio
async def test_retrieval_returns_similar_contract():
    """Test that retrieval finds similar contracts"""
    wrapper = RetrievalWrapper(vector_index, triple_store)
 
    result = await wrapper.get_best_match(
        query="premium basketball league",
        filters={"tier": "premium", "sport": "basketball"}
    )
 
    assert result is not None
    assert result.metadata['tier'] == 'premium'
    assert result.metadata['sport'] == 'basketball'
    assert result.score > 0.7

Performance Tests

import time
 
async def test_performance_improvement():
    """Measure speedup from retrieval"""
 
    # Test generation
    start = time.time()
    generated = await generate_contract(league_data)
    generation_time = time.time() - start
 
    # Test retrieval
    start = time.time()
    retrieved = await retrieve_and_compose_contract(league_data)
    retrieval_time = time.time() - start
 
    # Verify speedup
    speedup = generation_time / retrieval_time
    print(f"Speedup: {speedup:.1f}x")
    assert speedup > 5, "Retrieval should be at least 5x faster"

Quality Tests

async def test_quality_equivalence():
    """Ensure retrieved content matches quality of generated"""
 
    generated = await generate_contract(league_data)
    retrieved = await retrieve_and_compose_contract(league_data)
 
    # Check key sections present
    assert 'AGREEMENT' in retrieved
    assert league_data['tier'] in retrieved
    assert league_data['sport'] in retrieved
 
    # Manual review flag
    print("Manual review:")
    print(f"Generated: {len(generated)} chars")
    print(f"Retrieved: {len(retrieved)} chars")

Rollback Plan

If migration causes issues:

1. Feature Flag

# config/features.py
ENABLE_RETRIEVAL = os.getenv('ENABLE_RETRIEVAL', 'false').lower() == 'true'
 
# In code
if ENABLE_RETRIEVAL:
    contract = await retrieve_and_compose(league_data)
else:
    contract = await generate_from_scratch(league_data)

2. Gradual Rollout

import random
 
def should_use_retrieval(rollout_percentage=50) -> bool:
    """Gradually roll out retrieval to users"""
    return random.random() < (rollout_percentage / 100)

3. Monitor Metrics

Track these metrics:

  • Response time (should decrease)
  • Quality scores (should maintain)
  • Error rates (should stay low)
  • User feedback (should improve)

Troubleshooting

Issue: No Similar Content Found

Solution: Lower similarity threshold or add more training data

results = await vector_index.search(query, min_score=0.5)  # Lower threshold

Issue: Retrieved Content Not Relevant

Solution: Improve metadata filters

results = await vector_index.search(
    query=query,
    filters={
        'tier': league_data['tier'],
        'sport': league_data['sport'],
        'teams': {'$gte': league_data['teams'] - 5}  # Flexible range
    }
)

Issue: Slow Performance

Solution:

  1. Check embedding cache: embedding_service.clear_cache()
  2. Use FAISS backend: backend="faiss"
  3. Reduce search limit: limit=1

Next Steps

After successful migration:

  1. Monitor & Optimize: Track retrieval performance
  2. Expand Coverage: Add more entity types (prompts, responses, etc.)
  3. Enhance Relationships: Create more graph connections
  4. Improve Scoring: Use feedback to weight results
  5. Add Features: Implement graph neural networks, hybrid scoring

Support

Questions? See:

  • Main documentation: knowledge/RETRIEVAL_SYSTEM_README.md
  • Example code: knowledge/examples/test_retrieval_system.py
  • Architecture guide: database/CLAUDE.md

Platform

Documentation

Community

Support

partnership@altsportsdata.comdev@altsportsleagues.ai

2025 © AltSportsLeagues.ai. Powered by AI-driven sports business intelligence.

🤖 AI-Enhanced📊 Data-Driven⚡ Real-Time