Source: data_layer/docs/IMPLEMENTATION_GUIDE.md
π Data Fabric Implementation Guide
Practical steps to transform your current data_layer/ into the optimal architecture
π Current State β Target State
What You Have Now
data_layer/
βββ output-styles/ # Mixed: configs + pipelines
βββ prompts/ # Prompt seeds and components
βββ storage/ # Mixed: code + runtime data
βββ knowledge/ # Python modules for intelligence
βββ kb_catalog/ # Business rules and registriesWhat You'll Have
data_layer/
βββ definitions/ # π· Single source of truth
βββ weave/ # πΆ Transformation logic
βββ views/ # πΈ Materialized outputs
βββ scripts/ # π οΈ Orchestrationπ― Implementation Phases (4 Weeks)
β Week 1: Create Structure + Migrate Schemas
Goal: Establish definitions/ tier with schemas
Tasks:
-
Create directory structure (30 min)
cd data_layer # Create definitions tier mkdir -p definitions/{schemas,config,prompts,examples,kb_catalog} mkdir -p definitions/schemas/{canonical,generated/{pydantic,typescript,zod,drizzle}} mkdir -p definitions/config/business/{pricing,scoring,rules} mkdir -p definitions/config/{sports,workflows} mkdir -p definitions/prompts/{templates,components} mkdir -p definitions/examples/{seeds,generated,validation} # Create weave tier mkdir -p weave/{builders/{prompts,schemas,examples},embedders,retrievers,knowledge,storage} # Create views tier mkdir -p views/{prompts/{agents,workflows},onboarding,embeddings} # Create scripts mkdir -p scripts/{sync,generate,embed} -
Move schemas to canonical location (1 hour)
# Copy (don't delete yet) schemas cp -r output-styles/schemas/domain/v1/json/* definitions/schemas/canonical/ # If they exist in other locations too: # cp database/schemas/*.schema.json definitions/schemas/canonical/ # Add README cat > definitions/schemas/README.md << 'EOF' # Schemas - Single Source of Truth All JSON Schema (Draft 2020-12) definitions. From here, we generate: - Pydantic models (Python validation) - TypeScript types (Frontend types) - Zod schemas (Runtime validation) - Drizzle schemas (ORM) EOF -
Create schema generation script (2 hours)
# weave/builders/schemas/generate_all.py """Generate Pydantic, TypeScript, Zod, Drizzle from JSON Schema""" from pathlib import Path import json from datamodel_code_generator import InputFileType, generate def generate_pydantic(): """JSON Schema β Pydantic""" input_dir = Path("../../definitions/schemas/canonical") output_dir = Path("../../definitions/schemas/generated/pydantic") output_dir.mkdir(parents=True, exist_ok=True) for schema_file in input_dir.glob("*.schema.json"): output_file = output_dir / f"{schema_file.stem.replace('.schema', '')}.py" generate( input_=schema_file, input_file_type=InputFileType.JsonSchema, output=output_file, snake_case_field=True, use_standard_collections=True, use_schema_description=True, use_field_description=True ) print(f"β Generated {output_file.name}") def generate_typescript(): """JSON Schema β TypeScript (using json-schema-to-typescript)""" # Implementation here or call external tool pass def generate_zod(): """JSON Schema β Zod (using json-schema-to-zod)""" pass def generate_drizzle(): """JSON Schema β Drizzle""" pass if __name__ == "__main__": generate_pydantic() generate_typescript() generate_zod() generate_drizzle() -
Test schema generation (30 min)
cd weave/builders/schemas pip install datamodel-code-generator python generate_all.py # Verify outputs ls ../../definitions/schemas/generated/pydantic/ ls ../../definitions/schemas/generated/typescript/
Validation:
-
definitions/schemas/canonical/has all JSON Schemas -
definitions/schemas/generated/pydantic/has Python models - Generation script runs without errors
- Generated Pydantic imports successfully
β Week 2: Migrate Configs + Generate Examples
Goal: Move business configs and create example generation
Tasks:
-
Move config files (30 min)
# Move from output-styles/config cp output-styles/config/business/pricing/tier_presets.v1.json \ definitions/config/business/pricing/ cp output-styles/config/business/pricing/combat.pricing.v1.json \ definitions/config/business/pricing/ cp output-styles/config/business/scoring/scoring_model.v1.json \ definitions/config/business/scoring/ # Add READMEs cat > definitions/config/README.md << 'EOF' # Business Configuration Canonical configuration for pricing, scoring, and sports logic. These files generate training examples, database records, and embeddings. EOF -
Create example generation script (3 hours)
# weave/builders/examples/config_to_examples.py """ Generate training examples from business configs Output: JSONL format for LangMem """ import json from pathlib import Path from typing import List, Dict def generate_tier_examples() -> List[Dict]: """tier_presets.v1.json β training examples""" config_path = Path("../../definitions/config/business/pricing/tier_presets.v1.json") with open(config_path) as f: config = json.load(f) examples = [] for tier_name, tier_data in config['tiers'].items(): # Example 1: Pricing lookup examples.append({ "input": f"What are the pricing terms for {tier_name}?", "output": format_pricing_response(tier_data), "metadata": { "type": "pricing_lookup", "tier": tier_name, "source": "tier_presets.v1.json", "version": config.get('version', 1) } }) # Example 2: Tier recommendation if "example_category" in tier_data: examples.append({ "input": f"Recommend tier for {tier_data['example_category']} league", "output": f"Recommend {tier_name} because: {format_justification(tier_data)}", "metadata": { "type": "tier_recommendation", "tier": tier_name, "category": tier_data['example_category'] } }) # Example 3: SLA query if "sla" in tier_data: examples.append({ "input": f"What SLA does {tier_name} provide?", "output": f"SLA: {json.dumps(tier_data['sla'], indent=2)}", "metadata": { "type": "sla_lookup", "tier": tier_name } }) return examples def format_pricing_response(tier_data: Dict) -> str: """Format tier data into natural language""" development = tier_data.get('development', {}) return f""" Development Fee: ${development.get('one_time_usd', 'N/A')} Monthly (In-Season): ${development.get('monthly_in_season_usd', 'N/A')} Monthly (Off-Season): ${development.get('monthly_off_season_usd', 'N/A')} Revenue Share: {tier_data.get('revenue_share', {})} Contract Length: {tier_data.get('contract_length_years', 'N/A')} years """.strip() def format_justification(tier_data: Dict) -> str: """Create justification text""" category = tier_data.get('example_category', 'league') return f"This tier is designed for {category} with specific pricing and support levels." def generate_scoring_examples() -> List[Dict]: """scoring_model.v1.json β training examples""" config_path = Path("../../definitions/config/business/scoring/scoring_model.v1.json") with open(config_path) as f: config = json.load(f) examples = [] weights = config['scoring_framework']['weights'] # Example: Explain scoring examples.append({ "input": "How is league scoring calculated?", "output": f""" Scoring uses these weights: - Market Potential: {weights['market_potential']*100}% - Data Quality: {weights['data_quality_infra']*100}% - Betting Readiness: {weights['betting_readiness']*100}% - Fan Engagement: {weights['fan_engagement']*100}% - Operational Maturity: {weights['operational_maturity']*100}% - Strategic Fit: {weights['strategic_fit']*100}% """.strip(), "metadata": { "type": "scoring_explanation", "version": config.get('version', 1) } }) # Add tier threshold examples thresholds = config['scoring_framework']['tier_thresholds'] for tier, threshold in thresholds.items(): examples.append({ "input": f"What score is needed for {tier}?", "output": f"A league needs a score of {threshold}+ to qualify for {tier}", "metadata": { "type": "threshold_lookup", "tier": tier, "threshold": threshold } }) return examples def save_examples_jsonl(examples: List[Dict], output_path: Path): """Save as JSONL for LangMem""" output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w') as f: for example in examples: f.write(json.dumps(example) + '\n') print(f"β Saved {len(examples)} examples to {output_path}") if __name__ == "__main__": # Generate tier examples tier_examples = generate_tier_examples() save_examples_jsonl( tier_examples, Path("../../definitions/examples/generated/pricing-examples.jsonl") ) # Generate scoring examples scoring_examples = generate_scoring_examples() save_examples_jsonl( scoring_examples, Path("../../definitions/examples/generated/scoring-examples.jsonl") ) print(f"π Total examples generated: {len(tier_examples) + len(scoring_examples)}") -
Run example generation (15 min)
cd weave/builders/examples python config_to_examples.py # Verify outputs cat ../../definitions/examples/generated/pricing-examples.jsonl | head -3 cat ../../definitions/examples/generated/scoring-examples.jsonl | head -3
Validation:
- Configs moved to
definitions/config/business/ - Example generation creates JSONL files
- Examples are well-formatted with metadata
- At least 20+ examples generated per config
β Week 3: Build Prompt System + Multi-Storage Sync
Goal: Implement prompt builders and sync to databases
Tasks:
-
Move prompt components (1 hour)
# Move from prompts/components cp -r prompts/components/* definitions/prompts/components/ # Move templates if they exist # cp -r prompts/templates/* definitions/prompts/templates/ -
Create prompt builder (3 hours)
# weave/builders/prompts/base_builder.py """ Base class for building prompts from components + config + examples """ from pathlib import Path from typing import List, Dict, Optional import json from jinja2 import Template class PromptBuilder: def __init__(self, components_dir: Path = None): self.components_dir = components_dir or Path("../../definitions/prompts/components") def load_component(self, component_path: str) -> str: """Load a prompt component""" full_path = self.components_dir / component_path with open(full_path) as f: return f.read() def load_config(self, config_path: str) -> Dict: """Load business config""" full_path = Path("../../definitions/config") / config_path with open(full_path) as f: return json.load(f) def inject_config(self, template: str, config: Dict) -> str: """Inject config values into template""" jinja_template = Template(template) return jinja_template.render(**config) def inject_examples(self, template: str, examples: List[Dict]) -> str: """Inject few-shot examples into template""" examples_text = "\n\n".join([ f"Input: {ex['input']}\nOutput: {ex['output']}" for ex in examples ]) return template.replace("{{examples}}", examples_text) def build( self, system_instruction: str, few_shot_template: Optional[str] = None, config_path: Optional[str] = None, examples: Optional[List[Dict]] = None, output_format: Optional[str] = None ) -> str: """Build final prompt from components""" # Load system instruction prompt = self.load_component(system_instruction) # Add config if provided if config_path: config = self.load_config(config_path) prompt = self.inject_config(prompt, config) # Add few-shot examples if provided if few_shot_template and examples: few_shot = self.load_component(few_shot_template) few_shot = self.inject_examples(few_shot, examples) prompt += "\n\n" + few_shot # Add output format if provided if output_format: output_instructions = self.load_component(output_format) prompt += "\n\n" + output_instructions return prompt # weave/builders/prompts/classification_builder.py """Build tier classification prompts""" from .base_builder import PromptBuilder from typing import Dict, List class ClassificationPromptBuilder(PromptBuilder): def build_tier_classifier( self, league_data: Dict, include_examples: bool = True, k_examples: int = 5 ) -> str: """Build tier classification prompt""" # Load config for weights config = self.load_config("business/scoring/scoring_model.v1.json") weights = config['scoring_framework']['weights'] # Get examples if requested examples = [] if include_examples: # Would retrieve from LangMem here # For now, load from generated pass # Build prompt prompt = self.build( system_instruction="system_instructions/tier_classifier.md", few_shot_template="few_shot_patterns/classification_pattern.md" if include_examples else None, config_path="business/scoring/scoring_model.v1.json", examples=examples, output_format="output_formats/json_structure.md" ) # Add league-specific context prompt += f"\n\n## League to Classify\n{json.dumps(league_data, indent=2)}" return prompt -
Create multi-storage sync (4 hours)
# scripts/sync/sync_to_postgresql.py """Sync configs to PostgreSQL as JSONB""" import psycopg2 import json from pathlib import Path from typing import Dict import os DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost/altsports") def create_table_if_not_exists(cursor): """Create business_config table""" cursor.execute(""" CREATE TABLE IF NOT EXISTS business_config ( id SERIAL PRIMARY KEY, config_type VARCHAR(255) NOT NULL, version INTEGER NOT NULL, file_path TEXT, config_data JSONB NOT NULL, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), UNIQUE(config_type, version) ); CREATE INDEX IF NOT EXISTS idx_config_type ON business_config(config_type); CREATE INDEX IF NOT EXISTS idx_config_data ON business_config USING GIN(config_data); """) def sync_config_file(cursor, config_file: Path): """Sync one config file to PostgreSQL""" with open(config_file) as f: data = json.load(f) cursor.execute(""" INSERT INTO business_config (config_type, version, file_path, config_data) VALUES (%s, %s, %s, %s) ON CONFLICT (config_type, version) DO UPDATE SET config_data = EXCLUDED.config_data, updated_at = NOW() RETURNING id """, ( config_file.stem, # tier_presets, scoring_model, etc. data.get('version', 1), str(config_file), json.dumps(data) )) record_id = cursor.fetchone()[0] print(f" β Synced {config_file.name} (id={record_id})") def sync_all_configs(): """Sync all config files from definitions/config""" conn = psycopg2.connect(DATABASE_URL) cursor = conn.cursor() try: create_table_if_not_exists(cursor) config_dir = Path(__file__).parent.parent.parent / "definitions" / "config" config_files = config_dir.rglob("*.json") for config_file in config_files: if config_file.name.startswith('_'): continue # Skip metadata files sync_config_file(cursor, config_file) conn.commit() print("\nβ PostgreSQL sync complete!") finally: cursor.close() conn.close() if __name__ == "__main__": sync_all_configs() # scripts/sync/sync_to_langmem.py """Sync examples to LangMem for RAG""" import json from pathlib import Path from langmem import LangMemClient import os LANGMEM_API_KEY = os.getenv("LANGMEM_API_KEY") def sync_examples_file(client: LangMemClient, examples_file: Path, namespace: str): """Sync one JSONL file to LangMem""" with open(examples_file) as f: for line_num, line in enumerate(f, 1): try: example = json.loads(line) # Combine input + output for embedding content = f"{example['input']}\n\n{example['output']}" # Store with metadata client.store( content=content, metadata={ **example.get('metadata', {}), "source_file": str(examples_file), "line_number": line_num }, namespace=namespace ) except json.JSONDecodeError as e: print(f" β οΈ Error on line {line_num}: {e}") print(f" β Synced {examples_file.name} to namespace '{namespace}'") def sync_all_examples(): """Sync all examples from definitions/examples""" client = LangMemClient(api_key=LANGMEM_API_KEY) examples_dir = Path(__file__).parent.parent.parent / "definitions" / "examples" # Sync seeds (manual examples) for examples_file in (examples_dir / "seeds").rglob("*.jsonl"): sync_examples_file(client, examples_file, namespace="examples-seeds") # Sync generated examples for examples_file in (examples_dir / "generated").rglob("*.jsonl"): sync_examples_file(client, examples_file, namespace="examples-generated") print("\nβ LangMem sync complete!") if __name__ == "__main__": sync_all_examples() # scripts/sync/sync_all.py """Master sync script""" import asyncio from . import sync_to_postgresql, sync_to_langmem # from . import sync_to_redis # Optional async def sync_all(): print("π Starting multi-storage sync...\n") print("1οΈβ£ PostgreSQL (JSONB)...") sync_to_postgresql.sync_all_configs() print("\n2οΈβ£ LangMem (Vectors)...") sync_to_langmem.sync_all_examples() # Optional: Redis caching # print("\n3οΈβ£ Redis (Cache)...") # await sync_to_redis.cache_hot_configs() print("\nβ All syncs complete!") if __name__ == "__main__": asyncio.run(sync_all()) -
Test sync (30 min)
# Set up environment export DATABASE_URL="postgresql://localhost/altsports" export LANGMEM_API_KEY="your-api-key" # Run sync python scripts/sync/sync_all.py # Verify PostgreSQL psql $DATABASE_URL -c "SELECT config_type, version FROM business_config;" # Verify LangMem (check dashboard or CLI)
Validation:
- Prompt builders can load components and configs
- Generated prompts contain actual config values
- PostgreSQL has all configs as JSONB
- LangMem has all examples embedded
β Week 4: Integration + Testing
Goal: Wire everything together and test end-to-end
Tasks:
-
Create master generation script (1 hour)
# scripts/generate/generate_all.py """Generate all artifacts from source""" import sys from pathlib import Path # Add project to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from weave.builders.schemas import generate_all as generate_schemas from weave.builders.examples import config_to_examples from weave.builders.prompts import classification_builder def generate_all(): print("ποΈ Generating all artifacts from definitions/\n") # 1. Schemas β Pydantic/TS/Zod/Drizzle print("1οΈβ£ Generating schemas...") generate_schemas.generate_pydantic() generate_schemas.generate_typescript() generate_schemas.generate_zod() generate_schemas.generate_drizzle() # 2. Configs β Training examples print("\n2οΈβ£ Generating examples from configs...") config_to_examples.generate_tier_examples() config_to_examples.generate_scoring_examples() # 3. Components β Final prompts print("\n3οΈβ£ Building prompts from components...") # Build key prompts # (Would iterate through all prompt types) print("\nβ Generation complete!") if __name__ == "__main__": generate_all() -
Create end-to-end test (2 hours)
# tests/test_end_to_end.py """Test complete data flow""" import pytest from pathlib import Path from data_layer.weave.builders.prompts import classification_builder from data_layer.weave.retrievers import example_retriever from data_layer.definitions.schemas.generated.pydantic import TierClassification def test_complete_classification_pipeline(): """Test: Definitions β Weave β Views β Application""" # 1. Load source config from data_layer.definitions.config.business import load_config scoring_config = load_config("scoring/scoring_model.v1.json") assert 'scoring_framework' in scoring_config # 2. Retrieve examples (would be from LangMem) # Mock for now examples = [ { "input": "Classify combat league", "output": "Tier 1", "metadata": {"tier": "tier_1"} } ] # 3. Build prompt builder = classification_builder.ClassificationPromptBuilder() prompt = builder.build_tier_classifier( league_data={"name": "Test League", "sport": "MMA"}, include_examples=True, k_examples=3 ) # Prompt should contain actual config values assert "0.25" in prompt # market_potential weight assert "0.20" in prompt # data_quality weight # 4. Would send to LLM here... # 5. Validate with Pydantic result_json = { "tier": "tier_1", "score": 85.5, "confidence": 0.92 } validated = TierClassification(**result_json) assert validated.tier == "tier_1" assert validated.score > 80 def test_multi_storage_sync(): """Test configs synced to all storage backends""" # Would test: # - PostgreSQL has config # - LangMem has examples # - Redis has cache pass def test_schema_generation(): """Test schema generation from canonical""" # Check generated files exist pydantic_dir = Path("data_layer/definitions/schemas/generated/pydantic") assert (pydantic_dir / "tier_classification.py").exists() # Can import from data_layer.definitions.schemas.generated.pydantic import TierClassification assert TierClassification is not None -
Update application imports (2 hours)
# Example: Update FastAPI endpoint # OLD: # from database.schemas import TierClassification # NEW: from data_layer.definitions.schemas.generated.pydantic import TierClassification from data_layer.weave.builders.prompts import classification_builder from data_layer.weave.retrievers import example_retriever @app.post("/classify-league") async def classify_league(league_data: dict) -> TierClassification: # Get relevant examples examples = await example_retriever.get_similar( query=f"Classify {league_data['sport']} league", namespace="examples-seeds", k=5 ) # Build prompt builder = classification_builder.ClassificationPromptBuilder() prompt = builder.build_tier_classifier( league_data=league_data, include_examples=True, k_examples=5 ) # LLM generation llm = ChatOpenAI(model="gpt-4") structured_llm = llm.with_structured_output(TierClassification) result = structured_llm.invoke(prompt) return result -
Create developer documentation (1 hour)
# docs/QUICK_START.md cat > docs/QUICK_START.md << 'EOF' # Quick Start - Data Layer ## Daily Workflow 1. Edit source files in `definitions/` 2. Run generation: `python scripts/generate/generate_all.py` 3. Run sync: `python scripts/sync/sync_all.py` 4. Test: `pytest tests/` ## Common Tasks ### Add Business Rule ```bash vim definitions/config/business/new_rule.v1.json python scripts/generate/generate_examples.py python scripts/sync/sync_all.pyUpdate Prompt Component
vim definitions/prompts/components/system_instructions/my_agent.md python scripts/generate/generate_prompts.pyAdd Training Example
echo '{"input": "...", "output": "..."}' >> definitions/examples/seeds/my-examples.jsonl python scripts/sync/sync_to_langmem.pyEOF
Validation:
-
generate_all.pyruns successfully - End-to-end test passes
- Application uses new imports
- Documentation is clear
π Post-Migration Tasks
1. Clean Up Old Locations
# After verifying everything works, archive old structure
mkdir -p _archive/$(date +%Y-%m-%d)
# Move old directories
mv output-styles/config _archive/$(date +%Y-%m-%d)/
mv prompts/components _archive/$(date +%Y-%m-%d)/
# Update .gitignore
echo "_archive/" >> .gitignore
echo "views/" >> .gitignore # Views are generated2. Update CI/CD
# .github/workflows/data-layer-sync.yml
name: Data Layer Sync
on:
push:
paths:
- 'data_layer/definitions/**'
jobs:
sync:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Generate artifacts
run: python data_layer/scripts/generate/generate_all.py
- name: Sync to databases
env:
DATABASE_URL: ${{ secrets.DATABASE_URL }}
LANGMEM_API_KEY: ${{ secrets.LANGMEM_API_KEY }}
run: python data_layer/scripts/sync/sync_all.py
- name: Run tests
run: pytest data_layer/tests/3. Create Monitoring
# scripts/monitor/check_sync_health.py
"""Monitor sync health across all systems"""
import psycopg2
from langmem import LangMemClient
import redis
def check_postgresql():
"""Verify PostgreSQL has latest configs"""
conn = psycopg2.connect(DATABASE_URL)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM business_config")
count = cursor.fetchone()[0]
print(f"PostgreSQL: {count} configs")
assert count >= 3, "Missing configs in PostgreSQL"
def check_langmem():
"""Verify LangMem has embeddings"""
client = LangMemClient()
stats = client.get_namespace_stats("examples-seeds")
print(f"LangMem: {stats['total_documents']} examples")
assert stats['total_documents'] > 0, "No examples in LangMem"
def check_redis():
"""Verify Redis has cached data"""
r = redis.Redis()
keys = r.keys("config:*")
print(f"Redis: {len(keys)} cached configs")
if __name__ == "__main__":
check_postgresql()
check_langmem()
check_redis()
print("β
All systems healthy!")π― Success Checklist
After completion, you should have:
Structure
-
definitions/contains all source files -
weave/contains all transformation code -
views/contains generated outputs -
scripts/orchestrates everything
Generation
- Schemas generate Pydantic, TypeScript, Zod, Drizzle
- Configs generate training examples
- Components generate final prompts
Storage
- PostgreSQL has all configs as JSONB
- LangMem has all examples as vectors
- Redis caches hot data (optional)
Application
- FastAPI uses Pydantic validation
- Frontend uses Zod validation
- Prompts built dynamically from components
- Examples retrieved semantically for few-shot
Testing
- Unit tests for builders/generators
- Integration tests for sync
- End-to-end test for complete flow
Documentation
- README in each major directory
- Quick start guide for developers
- API reference for code modules
π Resources
- Architecture: See
DATA_FABRIC_ARCHITECTURE.md - Decision Tree: See
WHERE_DOES_IT_GO.md(from database/) - Tasks: See
DATABASE_ORGANIZATION_TASKS.md(from database/) - CI/CD: Examples in
.github/workflows/
Ready to start? Begin with Week 1, Day 1: Create directory structure!