Source: data_layer/docs/MIGRATION_GUIDE.md
Migration Guide: Generation → Retrieval-First Architecture
This guide helps you convert existing generation-based code to the retrieval-first architecture.
Table of Contents
- Quick Reference
- Identifying Generation Points
- Step-by-Step Migration
- Common Patterns
- Gradual Migration Strategy
- Testing & Validation
Quick Reference
Before & After
# ❌ BEFORE: Generation-based
def create_contract(league_data):
prompt = build_contract_prompt(league_data)
contract = llm.generate(prompt)
return contract
# ✅ AFTER: Retrieval-first
async def create_contract(league_data):
similar = await vector_index.search(
query=f"{league_data.tier} {league_data.sport} league",
filters={"tier": league_data.tier, "sport": league_data.sport}
)
contract = composer.assemble(similar[0], league_data)
await store_for_future_retrieval(contract, league_data)
return contractIdentifying Generation Points
1. Find LLM Calls
Search your codebase for:
# Common generation patterns
grep -r "openai\\.chat\\.completions\\.create" .
grep -r "llm\\.generate" .
grep -r "llm\\.invoke" .
grep -r "ChatOpenAI" .
grep -r "create_completion" .2. Identify High-Value Targets
Prioritize converting these generation points:
| Priority | Type | Reason |
|---|---|---|
| High | Contract generation | Slow, expensive, frequently repeated |
| High | Response templates | High volume, similar patterns |
| Medium | Prompt building | Moderate cost, cacheable |
| Low | Custom clauses | Truly unique, hard to retrieve |
3. Audit Existing Content
What successful outputs do you already have?
# Find contract examples
find . -name "*contract*.json" -o -name "*contract*.md"
# Find response templates
find . -name "*response*.json" -o -name "*template*.json"
# Find prompt examples
find . -name "*prompt*.txt" -o -name "*prompt*.md"Step-by-Step Migration
Step 1: Set Up Retrieval Infrastructure
# database/knowledge/setup.py
import asyncio
from pathlib import Path
from knowledge.embeddings import EmbeddingService, VectorIndex, EmbeddingConfig
from knowledge.index import TripleStore
async def setup_retrieval_system():
"""Initialize retrieval infrastructure"""
# 1. Configure embedding service
config = EmbeddingConfig.production(api_key="your-key")
embedding_service = EmbeddingService(config)
await embedding_service.initialize()
# 2. Initialize vector index
vector_index = VectorIndex(
embedding_service,
backend="chroma",
persist_directory=Path("./knowledge/storage/vectors")
)
await vector_index.initialize()
# 3. Initialize triple store
triple_store = TripleStore(
storage_path=Path("./knowledge/storage/triples")
)
return embedding_service, vector_index, triple_storeStep 2: Import Existing Successful Outputs
# database/scripts/import_existing_contracts.py
import asyncio
import json
from pathlib import Path
from knowledge.embeddings import EmbeddingService, VectorIndex
from knowledge.index import TripleStore, Entity, EntityType
async def import_existing_contracts():
"""Import existing contracts into retrieval system"""
embedding_service, vector_index, triple_store = await setup_retrieval_system()
# Load existing contracts
contracts_dir = Path("./output-styles/league_questionnaire_to_contract/examples")
contracts = []
for contract_file in contracts_dir.glob("*.json"):
with open(contract_file) as f:
contract_data = json.load(f)
contracts.append(contract_data)
print(f"Found {len(contracts)} existing contracts")
# Import each contract
for i, contract in enumerate(contracts, 1):
# Extract key fields
content = contract.get('content') or contract.get('text')
metadata = {
'tier': contract.get('tier'),
'sport': contract.get('sport'),
'source': 'migration',
'imported_at': datetime.now().isoformat()
}
# Generate embedding
embedding = await embedding_service.embed(content)
# Create entity
entity = Entity(
id=f"contract_migrated_{i:03d}",
type=EntityType.CONTRACT,
name=f"Migrated Contract {i}",
content_ref=str(contract_file),
embedding_ref=f"emb_migrated_{i:03d}",
metadata=metadata
)
triple_store.add_entity(entity)
# Add to vector index
await vector_index.add(
texts=[content],
ids=[entity.id],
metadatas=[metadata],
embeddings=[embedding.vector]
)
print(f"✓ Imported {i}/{len(contracts)}: {contract_file.name}")
print(f"\n✓ Successfully imported {len(contracts)} contracts")
if __name__ == "__main__":
asyncio.run(import_existing_contracts())Step 3: Create Retrieval Wrapper
# database/knowledge/retrieval_wrapper.py
from typing import Optional, Dict, Any
from .embeddings import VectorIndex
from .index import TripleStore
class RetrievalWrapper:
"""
Wrapper for easy retrieval operations
Provides a simple interface for common retrieval patterns.
"""
def __init__(self, vector_index: VectorIndex, triple_store: TripleStore):
self.vector_index = vector_index
self.triple_store = triple_store
async def retrieve_similar(
self,
query: str,
entity_type: Optional[str] = None,
filters: Optional[Dict[str, Any]] = None,
limit: int = 3
):
"""Retrieve similar content"""
return await self.vector_index.search(
query=query,
filters=filters,
limit=limit,
min_score=0.7
)
async def get_best_match(self, query: str, filters: Optional[Dict] = None):
"""Get single best match"""
results = await self.retrieve_similar(query, filters=filters, limit=1)
return results[0] if results else None
async def store_with_metadata(
self,
content: str,
entity_type: EntityType,
metadata: Dict[str, Any]
):
"""Store new content with automatic embedding"""
# Implementation here
passStep 4: Convert Generation Function
Original Function:
# OLD: ops/contract_builders/generate_contract.py
def generate_contract(league_data: Dict) -> str:
"""Generate a contract from scratch"""
# Build prompt (slow)
prompt = build_detailed_prompt(league_data)
# Call LLM (very slow)
contract = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.7
).choices[0].message.content
return contractMigrated Function:
# NEW: ops/contract_builders/retrieve_contract.py
async def retrieve_and_compose_contract(
league_data: Dict,
retrieval: RetrievalWrapper
) -> str:
"""Retrieve similar contract and adapt"""
# Build semantic query
query = f"{league_data['tier']} tier {league_data['sport']} league with {league_data['teams']} teams"
# Retrieve similar (fast)
similar = await retrieval.retrieve_similar(
query=query,
filters={
'tier': league_data['tier'],
'sport': league_data['sport']
},
limit=3
)
if not similar:
# Fallback to generation (rare)
return await generate_contract_fallback(league_data)
# Use best match as base
base_contract = similar[0].text
# Compose with customizations (fast)
contract = compose_contract(
base=base_contract,
customizations=league_data,
generate_custom_clauses_only=True # Minimal LLM use
)
# Store for future retrieval
await store_successful_contract(contract, league_data, retrieval)
return contractStep 5: Add Feedback Loop
# knowledge/feedback.py
async def store_successful_contract(
contract: str,
league_data: Dict,
retrieval: RetrievalWrapper
):
"""Store successful contract with feedback"""
entity_id = f"contract_{league_data['sport']}_{generate_id()}"
# Generate embedding
embedding = await retrieval.vector_index.embedding_service.embed(contract)
# Create entity
entity = Entity(
id=entity_id,
type=EntityType.CONTRACT,
name=f"{league_data['tier']} {league_data['sport']} Contract",
content_ref=f"storage/contracts/{entity_id}.txt",
embedding_ref=f"emb_{entity_id}",
metadata={
**league_data,
'quality_score': 0.9, # Will be updated based on feedback
'usage_count': 0,
'success_rate': None,
'created_at': datetime.now().isoformat()
}
)
retrieval.triple_store.add_entity(entity)
# Add to vector index
await retrieval.vector_index.add(
texts=[contract],
ids=[entity_id],
metadatas=[entity.metadata],
embeddings=[embedding.vector]
)
# Save content to file
content_path = Path(entity.content_ref)
content_path.parent.mkdir(parents=True, exist_ok=True)
content_path.write_text(contract)
return entityCommon Patterns
Pattern 1: Simple Replacement
# BEFORE
def get_response_template(category: str) -> str:
prompt = f"Generate a {category} response template"
return llm.generate(prompt)
# AFTER
async def get_response_template(category: str, retrieval: RetrievalWrapper) -> str:
result = await retrieval.get_best_match(
query=f"{category} response",
filters={'category': category}
)
return result.text if result else await fallback_generate(category)Pattern 2: Composition
# BEFORE
def build_onboarding_email(league_name: str) -> str:
prompt = f"Write an onboarding email for {league_name}"
return llm.generate(prompt)
# AFTER
async def build_onboarding_email(
league_name: str,
tier: str,
retrieval: RetrievalWrapper
) -> str:
# Get similar emails
similar = await retrieval.retrieve_similar(
query=f"{tier} onboarding email",
filters={'type': 'onboarding'},
limit=1
)
if similar:
# Adapt template
template = similar[0].text
email = template.replace('{{league_name}}', league_name)
else:
# Generate if no template found
email = await generate_onboarding_email(league_name, tier)
return emailPattern 3: Hybrid (Retrieve + Generate)
# Use retrieval for structure, generation for specifics
async def create_custom_contract(
league_data: Dict,
special_clauses: List[str],
retrieval: RetrievalWrapper
) -> str:
# Retrieve base structure (fast)
base = await retrieval.get_best_match(
query=f"{league_data['tier']} {league_data['sport']} contract"
)
# Generate only custom parts (slower but necessary)
custom_sections = await generate_custom_clauses(special_clauses)
# Compose final contract
return compose_with_custom_clauses(base.text, custom_sections)Gradual Migration Strategy
Phase 1: Read-Only Retrieval (Week 1-2)
- Set up retrieval infrastructure
- Import existing content
- Add retrieval alongside generation (A/B testing)
- Measure performance improvements
async def generate_with_retrieval_fallback(league_data, use_retrieval=True):
if use_retrieval:
try:
return await retrieve_and_compose(league_data)
except Exception as e:
logger.warning(f"Retrieval failed: {e}, falling back to generation")
return await generate_from_scratch(league_data)Phase 2: Retrieval-First (Week 3-4)
- Make retrieval the default
- Keep generation as fallback
- Add feedback loops
- Monitor quality metrics
Phase 3: Full Migration (Week 5+)
- Remove generation code for high-success retrievals
- Keep generation only for truly custom content
- Optimize retrieval performance
- Continuous learning from feedback
Testing & Validation
Unit Tests
# tests/test_retrieval_migration.py
import pytest
from knowledge.retrieval_wrapper import RetrievalWrapper
@pytest.mark.asyncio
async def test_retrieval_returns_similar_contract():
"""Test that retrieval finds similar contracts"""
wrapper = RetrievalWrapper(vector_index, triple_store)
result = await wrapper.get_best_match(
query="premium basketball league",
filters={"tier": "premium", "sport": "basketball"}
)
assert result is not None
assert result.metadata['tier'] == 'premium'
assert result.metadata['sport'] == 'basketball'
assert result.score > 0.7Performance Tests
import time
async def test_performance_improvement():
"""Measure speedup from retrieval"""
# Test generation
start = time.time()
generated = await generate_contract(league_data)
generation_time = time.time() - start
# Test retrieval
start = time.time()
retrieved = await retrieve_and_compose_contract(league_data)
retrieval_time = time.time() - start
# Verify speedup
speedup = generation_time / retrieval_time
print(f"Speedup: {speedup:.1f}x")
assert speedup > 5, "Retrieval should be at least 5x faster"Quality Tests
async def test_quality_equivalence():
"""Ensure retrieved content matches quality of generated"""
generated = await generate_contract(league_data)
retrieved = await retrieve_and_compose_contract(league_data)
# Check key sections present
assert 'AGREEMENT' in retrieved
assert league_data['tier'] in retrieved
assert league_data['sport'] in retrieved
# Manual review flag
print("Manual review:")
print(f"Generated: {len(generated)} chars")
print(f"Retrieved: {len(retrieved)} chars")Rollback Plan
If migration causes issues:
1. Feature Flag
# config/features.py
ENABLE_RETRIEVAL = os.getenv('ENABLE_RETRIEVAL', 'false').lower() == 'true'
# In code
if ENABLE_RETRIEVAL:
contract = await retrieve_and_compose(league_data)
else:
contract = await generate_from_scratch(league_data)2. Gradual Rollout
import random
def should_use_retrieval(rollout_percentage=50) -> bool:
"""Gradually roll out retrieval to users"""
return random.random() < (rollout_percentage / 100)3. Monitor Metrics
Track these metrics:
- Response time (should decrease)
- Quality scores (should maintain)
- Error rates (should stay low)
- User feedback (should improve)
Troubleshooting
Issue: No Similar Content Found
Solution: Lower similarity threshold or add more training data
results = await vector_index.search(query, min_score=0.5) # Lower thresholdIssue: Retrieved Content Not Relevant
Solution: Improve metadata filters
results = await vector_index.search(
query=query,
filters={
'tier': league_data['tier'],
'sport': league_data['sport'],
'teams': {'$gte': league_data['teams'] - 5} # Flexible range
}
)Issue: Slow Performance
Solution:
- Check embedding cache:
embedding_service.clear_cache() - Use FAISS backend:
backend="faiss" - Reduce search limit:
limit=1
Next Steps
After successful migration:
- Monitor & Optimize: Track retrieval performance
- Expand Coverage: Add more entity types (prompts, responses, etc.)
- Enhance Relationships: Create more graph connections
- Improve Scoring: Use feedback to weight results
- Add Features: Implement graph neural networks, hybrid scoring
Support
Questions? See:
- Main documentation:
knowledge/RETRIEVAL_SYSTEM_README.md - Example code:
knowledge/examples/test_retrieval_system.py - Architecture guide:
database/CLAUDE.md