Architecture
Quick Start: Unified Questionnaire-to-Contract Pipeline

Source: data_layer/docs/QUICK_START_UNIFIED_PIPELINE.md

Quick Start: Unified Questionnaire-to-Contract Pipeline

Your optimized system is ready! πŸš€

🎯 What We Built

A single unified workflow that replaces the fragmented 7-stage pipeline:

from ops.workflows.questionnaire_to_contract import QuestionnaireToContractWorkflow
 
# Initialize
workflow = QuestionnaireToContractWorkflow()
 
# Execute: PDF β†’ Contract (all 6 stages automated)
result = await workflow.execute(
    questionnaire_source="path/to/questionnaire.pdf",
    source_type="pdf",
    is_verified=False  # Set True for Firebase sync
)
 
# Access results
print(f"League: {result['questionnaire']['league_name']}")
print(f"Tier: {result['questionnaire']['tier']}")
print(f"Contract PDF: {result['artifacts']['pdf']['url']}")

That's it! The workflow automatically:

  1. βœ… Extracts data from PDF
  2. βœ… Enriches with external data
  3. βœ… Evaluates across 4 dimensions (business/data/risk/strategic)
  4. βœ… Writes to ALL databases (Supabase, Pinecone, Neo4j, GCS, Firebase)
  5. βœ… Generates contextual contract
  6. βœ… Renders in multiple formats (PDF, Google Docs, Markdown, JSON)

πŸ“ New File Structure

database/
β”œβ”€β”€ schemas/                                    # βœ… Single source of truth
β”‚   β”œβ”€β”€ domain/v1/
β”‚   β”‚   └── league_questionnaire_schema.json   # ← Domain schema
β”‚   └── generated/
β”‚       β”œβ”€β”€ models/pydantic/
β”‚       β”‚   └── league_questionnaire_schema.py # ← SINGLE Pydantic model
β”‚       └── adapters/                          # ← DB-specific adapters
β”‚           β”œβ”€β”€ supabase/
β”‚           β”œβ”€β”€ pinecone/
β”‚           β”œβ”€β”€ neo4j/
β”‚           β”œβ”€β”€ gcs/
β”‚           └── firebase/
β”‚
β”œβ”€β”€ ops/                                        # βœ… All operational logic
β”‚   β”œβ”€β”€ workflows/
β”‚   β”‚   └── questionnaire_to_contract.py       # ← NEW: Unified workflow
β”‚   β”œβ”€β”€ integrations/
β”‚   β”‚   └── unified_league_service.py          # ← NEW: Polyglot persistence
β”‚   β”œβ”€β”€ agents/                                # ← 30+ specialized agents
β”‚   β”œβ”€β”€ contextual_contract_builder.py         # ← 7-layer contextual system
β”‚   └── feedback_loop_system.py
β”‚
└── output-styles/                              # βœ… Examples only (no logic)
    └── examples/
        β”œβ”€β”€ questionnaire_extraction_example.json
        β”œβ”€β”€ classification_example.json
        └── contract_example.json

πŸ”„ How the Unified Workflow Works

πŸ“₯ INPUT: Questionnaire (PDF/Form/Email)
    ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ STAGE 1: Document Processing                                  β”‚
β”‚ β”œβ”€ Agent: document.pdf.agent                                  β”‚
β”‚ β”œβ”€ Agent: document.processor                                  β”‚
β”‚ └─ Output: Extracted data                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ STAGE 2: Data Enrichment                                      β”‚
β”‚ β”œβ”€ Agent: data.enricher                                       β”‚
β”‚ β”œβ”€ Agent: intelligence.market                                 β”‚
β”‚ └─ Output: Enhanced LeagueQuestionnaire                       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ STAGE 3: Multi-Dimensional Evaluation (PARALLEL)              β”‚
β”‚ β”œβ”€ Agent: league.evaluator.business  β†’ Business score         β”‚
β”‚ β”œβ”€ Agent: league.evaluator.data      β†’ Technical score        β”‚
β”‚ β”œβ”€ Agent: league.evaluator.risk      β†’ Risk score             β”‚
β”‚ β”œβ”€ Agent: league.evaluator.strategic β†’ Strategic score        β”‚
β”‚ └─ Output: Overall tier + recommendations                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ STAGE 4: Polyglot Persistence (PARALLEL WRITES)               β”‚
β”‚ Service: UnifiedLeagueService                                 β”‚
β”‚                                                                β”‚
β”‚ await asyncio.gather(                                          β”‚
β”‚   β”œβ”€ PostgreSQL (Supabase)  βœ“ ALL leagues                    β”‚
β”‚   β”œβ”€ Vector DB (Pinecone)   βœ“ Semantic search                β”‚
β”‚   β”œβ”€ Graph (Neo4j)          βœ“ Relationships                  β”‚
β”‚   β”œβ”€ Storage (GCS)          βœ“ Documents                      β”‚
β”‚   └─ Real-time (Firebase)   βœ“ IF verified                    β”‚
β”‚ )                                                              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ STAGE 5: Contract Generation                                  β”‚
β”‚ β”œβ”€ System: contextual_contract_builder.py                     β”‚
β”‚ β”œβ”€ Agent: contract.orchestration                              β”‚
β”‚ β”œβ”€ Agent: contract.generator                                  β”‚
β”‚ └─ Output: Contract with 7-layer context                      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ STAGE 6: Contract Finalization                                β”‚
β”‚ β”œβ”€ Agent: negotiation.facilitator                             β”‚
β”‚ β”œβ”€ Agent: proposal.presenter                                  β”‚
β”‚ └─ Output: PDF, Google Docs, Markdown, JSON                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    ↓
πŸ“€ OUTPUT: Complete Contract Package

πŸ’Ύ Polyglot Persistence Pattern

The UnifiedLeagueService writes to ALL databases automatically:

from ops.integrations.unified_league_service import UnifiedLeagueService
 
service = UnifiedLeagueService(
    supabase_client=supabase,
    pinecone_client=pinecone,
    neo4j_client=neo4j,
    gcs_client=gcs,
    firebase_client=firebase
)
 
# Write once, persist everywhere
result = await service.upsert_league(
    questionnaire=my_league,
    is_verified=True  # Also writes to Firebase
)
 
# Result structure
{
  "status": "success",
  "databases_written": 5,
  "databases_failed": 0,
  "details": {
    "supabase": {"success": True, "league_id": "league_abc123"},
    "pinecone": {"success": True, "league_id": "league_abc123"},
    "neo4j": {"success": True, "nodes_created": 3},
    "gcs": {"success": True, "files_uploaded": 2},
    "firebase": {"success": True, "league_id": "league_abc123"}
  }
}

Database Usage Patterns

DatabaseUsed ForQuery Pattern
SupabasePrimary storage, filteringSELECT * FROM leagues WHERE tier = 'premium'
PineconeSemantic search"Find leagues similar to NASCAR"
Neo4jRelationshipsMATCH (l:League)-[:COMPETES_IN]->(s:Sport)
GCSFiles & documentsFile URLs for contract PDFs
FirebaseReal-time dashboardLive updates for verified leagues

🎨 Example Usage Patterns

Pattern 1: Process PDF Questionnaire

workflow = QuestionnaireToContractWorkflow()
 
result = await workflow.execute(
    questionnaire_source="uploads/premier_racing_league.pdf",
    source_type="pdf",
    is_verified=False
)
 
# Access contract
pdf_url = result['artifacts']['pdf']['url']
google_doc = result['artifacts']['google_doc']['url']

Pattern 2: Process Form Data

form_data = {
    "league_name": "Global Racing Championship",
    "sport": "motorsports",
    "contact": {"email": "contact@grc.com"},
    # ... more fields
}
 
result = await workflow.execute(
    questionnaire_source=form_data,
    source_type="form",
    is_verified=True  # Writes to Firebase
)

Pattern 3: Direct Service Usage

# Just use the upsert service directly
from schemas.generated.models.pydantic.league_questionnaire_schema import LeagueQuestionnaire
 
league = LeagueQuestionnaire(
    league_name="Test League",
    # ... fields
)
 
service = UnifiedLeagueService()
result = await service.upsert_league(league, is_verified=False)

πŸ” Query Patterns After Upsert

Once data is in all databases, query based on your needs:

Filter/Search in PostgreSQL (Supabase)

# Standard SQL queries
results = supabase.table('leagues')\
    .select('*')\
    .eq('tier', 'premium')\
    .gte('composite_score', 80)\
    .execute()

Semantic Search in Pinecone

# Natural language search
results = pinecone.query(
    vector=embed("racing leagues in North America"),
    top_k=10,
    include_metadata=True
)

Relationship Queries in Neo4j

# Graph queries
query = """
MATCH (l:League)-[:PLAYS]->(s:Sport {name: 'motorsports'})
RETURN l.name, l.composite_score
ORDER BY l.composite_score DESC
LIMIT 10
"""
results = neo4j.run(query)

Real-time Dashboard (Firebase - Verified Only)

// Real-time updates in frontend
firebase.database()
  .ref('leagues')
  .orderByChild('composite_score')
  .limitToLast(20)
  .on('value', snapshot => {
    // Auto-updates when data changes
  })

πŸš€ Getting Started

Step 1: Install Dependencies

# Install required packages
pip install -r requirements.txt

Step 2: Configure Database Clients

# Create .env file with credentials
SUPABASE_URL=your_url
SUPABASE_KEY=your_key
PINECONE_API_KEY=your_key
NEO4J_URI=your_uri
# ... etc

Step 3: Run Your First Pipeline

import asyncio
from ops.workflows.questionnaire_to_contract import QuestionnaireToContractWorkflow
 
async def main():
    workflow = QuestionnaireToContractWorkflow()
    
    result = await workflow.execute(
        questionnaire_source="test_data/sample_questionnaire.json",
        source_type="json"
    )
    
    print(f"βœ… Success! Contract generated: {result['artifacts']['pdf']['url']}")
 
asyncio.run(main())

πŸ“Š Monitoring & Analytics

The workflow tracks timing for each stage:

{
  "execution_id": "abc-123-def",
  "total_duration_seconds": 45.2,
  "stages": {
    "stage_1_extraction": {"duration_seconds": 8.5},
    "stage_2_enrichment": {"duration_seconds": 12.3},
    "stage_3_evaluation": {"duration_seconds": 5.1},
    "stage_4_persistence": {"duration_seconds": 3.2},
    "stage_5_contract_generation": {"duration_seconds": 14.6},
    "stage_6_finalization": {"duration_seconds": 1.5}
  }
}

πŸ”§ Customization

Add Custom Evaluation Logic

# In ops/workflows/questionnaire_to_contract.py
async def _custom_evaluation(self, questionnaire):
    # Your custom scoring logic
    return {"score": 85.0, "custom_metric": "value"}

Modify Contract Generation

# In ops/contextual_contract_builder.py
# Add new context layers or modify existing ones

Add New Database

# In ops/integrations/unified_league_service.py
async def _write_new_db(self, data):
    # Implement your database write
    pass

βœ… Benefits of This Architecture

BenefitBeforeAfter
Simplicity7 separate stage folders1 unified workflow
Duplication3-4 copies of agentsSingle instance each
Database writesManual per stageAutomatic parallel writes
Contract generation2 different systems1 contextual builder
Import complexity15+ patterns3 standard imports
MaintenanceπŸ”΄ High🟒 Low

🎯 Next Steps

  1. βœ… Run test pipeline with sample data
  2. βœ… Configure database clients with real credentials
  3. βœ… Implement agent calls in workflow stages
  4. βœ… Test polyglot persistence with all databases
  5. βœ… Deploy to Cloud Run with proper environment variables

πŸ“š Related Documentation


You now have a production-ready, unified pipeline! πŸŽ‰

Questions? Check the docs or explore the code:

  • Workflow: ops/workflows/questionnaire_to_contract.py
  • Service: ops/integrations/unified_league_service.py
  • Agents: ops/agents/

Platform

Documentation

Community

Support

partnership@altsportsdata.comdev@altsportsleagues.ai

2025 Β© AltSportsLeagues.ai. Powered by AI-driven sports business intelligence.

πŸ€– AI-Enhancedβ€’πŸ“Š Data-Drivenβ€’βš‘ Real-Time