Source: data_layer/docs/COMPLETE_ARCHITECTURE.md
Prompt Intelligence System - Complete Architecture
π― Full Intelligence Stack
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Layer 5: MCP SERVER (Self-Discovery + RPC) β
β β’ Other AI agents can query this server β
β β’ Self-discovery of available tools/prompts β
β β’ Agent-to-agent communication protocol β
ββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ
β Uses API
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Layer 4: API (apps/backend/api/prompts.py) β
β β’ REST endpoints for humans & agents β
β β’ /catalog, /search, /update, /execute β
ββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ
β Calls service
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Layer 3: SERVICES (apps/backend/services/) β
β β’ prompts.py - Workflow execution β
β β’ orchestrator.py - Agent creation with tools β
β β’ agent_communication.py - Agent-to-agent protocol β
ββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ
β Gets from store
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Layer 2: STORAGE (apps/backend/stores/prompts.py) β
β β’ InMemoryStore (<1ms) - Primary cache β
β β’ Check memory first β then DB β then files β
ββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ
β Seeded from / Syncs to
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Layer 1: DATABASES (Seeded, then queried) β
β Firebase (User data) + Supabase (League data) β
β β’ Seeded at startup from database/ files β
β β’ Queried when not in InMemoryStore β
β β’ Updated when prompts change β
ββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ
β Seeded from
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Layer 0: SOURCE FILES (database/) β
β β’ database/prompts/*.md (135 files) β
β β’ database/output-styles/ (800+ examples) β
β β’ Git versioned, human editable β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββπ Complete Data Flow (With Seeding)
Startup Sequence:
1. Container starts
β
2. Run database/scripts/build.py (SEEDING)
ββ Read database/prompts/*.md
ββ Read database/output-styles/
ββ Build workflows
ββ Seed Firebase (user-related prompts)
ββ Seed Supabase (league examples)
ββ Populate InMemoryStore
β
3. Server ready
β First query
4. Check InMemoryStore (<1ms)
ββ HIT: Return immediately β‘
ββ MISS: Check databases
ββ Check Supabase (~20ms)
ββ Check Firebase (~20ms)
ββ Fallback to files (~9ms)
β
5. Cache result in InMemoryStore
β
6. Next query: <1ms (cached)π¦ Updated Implementation
File 1: Enhanced Seeding in database/scripts/build.py
Add DB seeding:
async def seed_databases_and_memory():
"""
Complete seeding flow:
1. Read from database/ files
2. Seed Supabase (league examples by sport/tier)
3. Seed Firebase (workflows for cross-instance sync)
4. Populate InMemoryStore (fast cache)
"""
from stores.prompts import get_prompt_store
store = get_prompt_store()
await store.initialize()
# 1. Build workflow from files
workflow = await store.get_workflow("questionnaire_to_contract")
# 2. Seed Supabase with league examples
if store.supabase and not store.supabase.use_mock:
for stage in workflow.get("stages", []):
examples = stage.get("examples", [])
# Group examples by sport
for example in examples:
sport = example.get("sport", "general")
tier = example.get("tier", "standard")
await store.supabase.client.table("league_examples").upsert({
"sport": sport,
"tier": tier,
"stage": stage.get("name"),
"example_data": example,
"updated_at": datetime.now().isoformat()
}).execute()
logger.info(f"β
Seeded Supabase with league examples")
# 3. Seed Firebase with workflows
if store.firebase and not store.firebase.use_mock:
store.firebase.db.collection("prompts").document("workflows").set({
"questionnaire_to_contract": workflow,
"_seeded_at": datetime.now().isoformat()
})
logger.info(f"β
Seeded Firebase with workflows")
logger.info("β
All databases seeded, InMemoryStore populated")File 2: Add MCP Server Wrapper apps/backend/mcp/prompt_server.py
NEW - MCP server for AI-to-AI communication:
"""
apps/backend/mcp/prompt_server.py
MCP Server for Prompt Intelligence
Allows other AI agents to discover and use this system via RPC
"""
from typing import Dict, Any, List
import json
class PromptIntelligenceMCPServer:
"""
MCP server wrapper for prompt intelligence
Enables:
1. Self-discovery (what prompts/workflows available)
2. RPC calls from other AI agents
3. Agent-to-agent communication protocol
"""
def __init__(self):
from stores.prompts import get_prompt_store
self.store = get_prompt_store()
async def handle_mcp_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle MCP RPC request from another AI
Supported methods:
- tools/list (self-discovery)
- prompts/search
- prompts/execute
- agent/communicate
"""
method = request.get("method")
params = request.get("params", {})
if method == "tools/list":
return await self._list_available_capabilities()
elif method == "prompts/search":
return await self._search_prompts_mcp(params)
elif method == "prompts/execute":
return await self._execute_workflow_mcp(params)
elif method == "agent/communicate":
return await self._agent_to_agent_communication(params)
else:
return {
"error": f"Unknown method: {method}",
"supported_methods": [
"tools/list",
"prompts/search",
"prompts/execute",
"agent/communicate"
]
}
async def _list_available_capabilities(self) -> Dict:
"""Self-discovery: What can this server do?"""
catalog = await self.store.get_catalog()
return {
"service": "Prompt Intelligence MCP Server",
"capabilities": {
"prompt_search": "Semantic search across 135+ prompts",
"workflow_execution": "Execute 9-stage LangGraph workflows",
"agent_orchestration": "Create specialized agents with tools",
"batch_processing": "Parallel execution of workflows"
},
"available_workflows": [
{
"name": w["name"],
"description": w.get("metadata", {}).get("description", ""),
"stages": len(w.get("stages", []))
}
for w in catalog.get("workflows", [])
],
"tools_count": 112,
"example_count": 800
}
async def _search_prompts_mcp(self, params: Dict) -> Dict:
"""MCP RPC: Search for prompts"""
query = params.get("query", "")
namespace = params.get("namespace")
results = await self.store.search_prompts(query, namespace)
return {
"results": [
{
"name": r.key,
"score": getattr(r, 'score', 1.0),
"type": "workflow" if hasattr(r, 'stages') else "prompt"
}
for r in results[:5]
]
}
async def _execute_workflow_mcp(self, params: Dict) -> Dict:
"""MCP RPC: Execute workflow"""
from services.prompts import PromptService
service = PromptService()
result = await service.execute_workflow(
workflow_name=params.get("workflow"),
input_data=params.get("input_data", {})
)
return result
async def _agent_to_agent_communication(self, params: Dict) -> Dict:
"""
Agent-to-agent communication protocol
Allows agents to:
- Request prompts from each other
- Share context
- Coordinate workflows
"""
from_agent = params.get("from_agent")
to_agent = params.get("to_agent")
message_type = params.get("message_type")
payload = params.get("payload", {})
# Route based on message type
if message_type == "request_prompt":
# Agent requests a prompt
prompt_name = payload.get("prompt_name")
result = await self.store.get(namespace=("prompts",), key=prompt_name)
return {
"from": "prompt_intelligence_server",
"to": from_agent,
"message_type": "prompt_response",
"payload": {
"prompt": result.value if result else None,
"found": result is not None
}
}
elif message_type == "request_workflow_collaboration":
# Multiple agents coordinate on workflow
workflow_name = payload.get("workflow")
agent_responsibilities = await self._assign_agents_to_stages(workflow_name)
return {
"from": "prompt_intelligence_server",
"to": from_agent,
"message_type": "workflow_assignment",
"payload": agent_responsibilities
}
return {"status": "message_received"}
async def _assign_agents_to_stages(self, workflow_name: str) -> Dict:
"""Assign agents to workflow stages for collaboration"""
workflow = await self.store.get_workflow(workflow_name)
assignments = {}
for i, stage in enumerate(workflow.get("stages", [])):
assignments[f"agent_{i+1}"] = {
"stage": stage.get("name"),
"prompt": stage.get("prompt"),
"tools": self._get_tools_for_stage(stage),
"responsibility": f"Execute stage {i+2}"
}
return assignments
# FastAPI endpoint for MCP
from fastapi import APIRouter
mcp_router = APIRouter(prefix="/mcp", tags=["mcp"])
mcp_server = PromptIntelligenceMCPServer()
@mcp_router.post("/rpc")
async def handle_mcp_rpc(request: Dict[str, Any]):
"""Handle MCP RPC requests from other AI agents"""
return await mcp_server.handle_mcp_request(request)ποΈ Complete Seeding Strategy
Three-Tier Seeding:
Layer 0: SOURCE FILES (database/)
β Read at startup
Layer 1: DATABASES (Seeded)
ββ Supabase (league examples by sport/tier)
ββ Firebase (workflows, user preferences)
β Populate from DB
Layer 2: INMEMORYSTORE (Cache)
ββ All prompts cached for <1ms retrieval
ββ Restored from DB on restartSeeding Flow:
# database/scripts/build.py (ENHANCED)
async def seed_complete_system():
"""
Complete seeding: Files β Databases β InMemoryStore
Run once at deployment to populate everything
"""
# 1. Read source files
workflows = read_from_database_output_styles()
prompts = read_from_database_prompts()
# 2. Seed Supabase (league examples)
for sport in ["basketball", "soccer", "hockey"]:
examples = filter_examples_by_sport(workflows, sport)
await supabase.table("league_examples").upsert({
"sport": sport,
"examples": examples,
"count": len(examples)
}).execute()
# 3. Seed Firebase (workflows + user prompts)
await firebase.set("prompts/workflows", workflows)
await firebase.set("prompts/catalog", prompts)
# 4. Populate InMemoryStore
for workflow in workflows:
store.put(("workflows",), workflow.name, workflow.data)
logger.info("β
Seeding complete: Files β DB β Memory")Query Flow (After Seeding):
# When user queries
result = store.get(("workflows",), "questionnaire_to_contract")
if result:
# HIT: InMemoryStore (<1ms)
return result.value
# MISS: Check databases (seeded at startup)
db_result = await firebase.get("prompts/workflows/questionnaire_to_contract")
if db_result:
# Found in DB (~20ms)
store.put(("workflows",), name, db_result) # Cache it
return db_result
# MISS: Build from files (~9ms)
file_result = build_from_database_files(name)
store.put(("workflows",), name, file_result) # Cache it
await sync_to_databases(file_result) # Seed DB for next time
return file_resultπ€ Agent-to-Agent Communication
Protocol in apps/backend/services/agent_communication.py
"""
apps/backend/services/agent_communication.py
Agent-to-agent communication protocol
Allows agents to share prompts, coordinate workflows
"""
class AgentCommunicationProtocol:
"""
Protocol for agents to communicate with each other
Message types:
- REQUEST_PROMPT: Ask for a specific prompt
- SHARE_CONTEXT: Share execution context
- COORDINATE_WORKFLOW: Multi-agent workflow coordination
- REQUEST_TOOLS: Ask for tool recommendations
"""
def __init__(self):
from stores.prompts import get_prompt_store
self.store = get_prompt_store()
async def handle_agent_message(
self,
from_agent: str,
to_agent: str,
message_type: str,
payload: Dict
) -> Dict:
"""Route agent-to-agent messages"""
if message_type == "REQUEST_PROMPT":
# Agent asks for prompt
prompt_name = payload.get("prompt_name")
context = payload.get("context", {})
# Get from store
result = await self.store.get(
namespace=(payload.get("namespace", "prompts"),),
key=prompt_name
)
return {
"from": "prompt_intelligence",
"to": from_agent,
"message_type": "PROMPT_RESPONSE",
"payload": {
"prompt": result.value if result else None,
"found": result is not None,
"use_with": context
}
}
elif message_type == "COORDINATE_WORKFLOW":
# Multi-agent workflow coordination
workflow_name = payload.get("workflow")
workflow = await self.store.get_workflow(workflow_name)
# Assign each stage to an agent
assignments = {}
for i, stage in enumerate(workflow.get("stages", [])):
agent_id = f"{to_agent}_stage_{i+2}"
assignments[agent_id] = {
"stage_num": i + 2,
"stage_name": stage.get("name"),
"prompt": stage.get("prompt"),
"tools": self._recommend_tools(stage),
"examples": stage.get("examples", [])[:3],
"next_agent": f"{to_agent}_stage_{i+3}" if i < len(workflow["stages"])-1 else None
}
return {
"from": "prompt_intelligence",
"to": from_agent,
"message_type": "WORKFLOW_ASSIGNMENTS",
"payload": {
"workflow": workflow_name,
"total_agents": len(assignments),
"assignments": assignments
}
}
elif message_type == "REQUEST_TOOLS":
# Agent asks for tool recommendations
task_description = payload.get("task")
# Search for similar workflows
results = await self.store.search_prompts(task_description)
if results:
similar_workflow = results[0]
recommended_tools = similar_workflow.value.get("tools_used", [])
else:
recommended_tools = self._infer_tools_from_description(task_description)
return {
"from": "prompt_intelligence",
"to": from_agent,
"message_type": "TOOLS_RECOMMENDATION",
"payload": {
"tools": recommended_tools,
"confidence": 0.85 if results else 0.5
}
}
return {"status": "unknown_message_type"}π MCP Server Self-Discovery
Add MCP Tools in apps/backend/server.py
Register MCP endpoints:
# Add after line 580
# MCP Server for AI-to-AI communication
try:
from mcp.prompt_server import mcp_router
app.include_router(mcp_router)
logger.info("β
MCP Prompt Intelligence Server included at /mcp")
except ImportError as e:
logger.warning(f"MCP server not available: {e}")MCP Self-Discovery Endpoint:
@app.get("/mcp/discover")
async def mcp_self_discovery():
"""
Self-discovery endpoint for other AI agents
Returns what this server can do and how to use it
"""
from stores.prompts import get_prompt_store
store = get_prompt_store()
await store.initialize()
catalog = await store.get_catalog()
return {
"server_name": "altsportsdata_prompt_intelligence",
"version": "1.0.0",
"protocol": "MCP/RPC",
"capabilities": {
"prompt_search": {
"description": "Semantic search across 135+ prompts",
"method": "POST /mcp/rpc",
"params": {
"method": "prompts/search",
"params": {"query": "string", "namespace": "string"}
}
},
"workflow_execution": {
"description": "Execute multi-stage workflows",
"method": "POST /mcp/rpc",
"params": {
"method": "prompts/execute",
"params": {"workflow": "string", "input_data": "object"}
}
},
"agent_coordination": {
"description": "Coordinate multi-agent workflows",
"method": "POST /mcp/rpc",
"params": {
"method": "agent/communicate",
"params": {
"from_agent": "string",
"message_type": "COORDINATE_WORKFLOW",
"payload": {"workflow": "string"}
}
}
}
},
"available_workflows": [
w["name"] for w in catalog.get("workflows", [])
],
"total_prompts": sum(len(items) for items in catalog.values()),
"example_count": 800,
"tools_available": 112,
"usage": {
"discovery": "GET /mcp/discover (this endpoint)",
"rpc": "POST /mcp/rpc (make RPC calls)",
"rest_api": "POST /api/prompts/* (direct REST access)"
}
}π§ Complete File Structure (Final)
apps/backend/
βββ api/ β REST endpoints (for humans/frontends)
β βββ prompts.py β β
Catalog, search, update, execute
β
βββ mcp/ β NEW: MCP server (for AI-to-AI)
β βββ prompt_server.py β Self-discovery + RPC
β
βββ services/ β Business logic
β βββ prompts.py β β
Workflow execution
β βββ orchestrator.py β Agent creation with tools
β βββ agent_communication.py β Agent-to-agent protocol
β
βββ stores/ β Data access
β βββ prompts.py β β
InMemoryStore + DB sync
β
βββ server.py β FastAPI app (imports all)
database/
βββ prompts/*.md β Source files (Git versioned)
βββ output-styles/ β Knowledge base (800+ examples)
βββ scripts/
βββ build.py β Seeding script (Files β DB β Memory)π― Usage Scenarios
Scenario 1: Human User (REST API)
# Human searches via Next.js/Streamlit
POST /api/prompts/search
{"query": "basketball contract"}
β
InMemoryStore search
β
Return resultsScenario 2: AI Agent (MCP RPC)
# Another AI agent discovers this server
GET /mcp/discover
β
Returns: available workflows, tools, capabilities
β
AI agent makes RPC call
POST /mcp/rpc
{
"method": "prompts/execute",
"params": {"workflow": "questionnaire_to_contract", "input_data": {...}}
}
β
Execute workflow
β
Return result to requesting agentScenario 3: Multi-Agent Coordination
# Agent A requests workflow coordination
POST /mcp/rpc
{
"method": "agent/communicate",
"params": {
"from_agent": "agent_a",
"to_agent": "contract_team",
"message_type": "COORDINATE_WORKFLOW",
"payload": {"workflow": "questionnaire_to_contract"}
}
}
β
System assigns stages to agents:
β
{
"agent_1": {stage_2, prompt, tools},
"agent_2": {stage_3, prompt, tools},
...
"agent_9": {stage_7b, prompt, tools}
}
β
Agents execute in parallel/sequence
β
Results aggregatedπ Complete Seeding Flow
On First Deployment:
1. Container builds (Dockerfile)
β
2. RUN python database/scripts/build.py
ββ Read database/prompts/*.md (135 files)
ββ Read database/output-styles/ (800+ examples)
ββ Build workflows in memory
β
ββ Seed Supabase:
β ββ league_examples (by sport/tier)
β ββ workflow_definitions
β ββ prompt_catalog
β
ββ Seed Firebase:
β ββ prompts/workflows
β ββ prompts/catalog
β
ββ Populate InMemoryStore
ββ ("workflows", "questionnaire_to_contract")
ββ ("prompts", ...)
ββ ("components", ...)
β
3. Server starts
β On first query
4. Check InMemoryStore (<1ms)
ββ HIT: Return (already seeded!)On Container Restart:
1. Container restarts (Cloud Run scaling)
β
2. server.py lifespan startup
ββ store.initialize()
ββ Load from Firebase (restore state)
ββ Populate InMemoryStore from DB
β
3. Server ready (no file reads needed!)
β
4. Queries use cached data (<1ms)π Final Architecture Benefits
β Fast
- InMemoryStore: <1ms
- Pre-seeded databases: ~20ms if cache miss
- File fallback: ~9ms worst case
β Reliable
- Three-tier fallback
- Databases pre-seeded
- Always has data
β Intelligent
- Semantic search (vector similarity)
- Agent-to-agent communication
- MCP self-discovery
β Scalable
- Multi-instance via Firebase sync
- Batch parallel processing
- MCP for AI-to-AI coordination
β Maintainable
- Update via API (no redeployment)
- Databases seeded automatically
- Version tracking built-in
π Implementation Checklist (Updated)
Core System:
- InMemoryStore wrapper
- Firebase + Supabase sync with smart routing
- REST API (5 endpoints)
- Workflow execution
- Build/validate scripts
Seeding:
- Seed InMemoryStore from files
- Seed Supabase with league examples (add to build.py)
- Seed Firebase with workflows (add to build.py)
- Test DB restore on restart
MCP Layer (Optional/Advanced):
- Create
mcp/prompt_server.py - Add
/mcp/discoverendpoint - Add
/mcp/rpcendpoint - Implement agent communication protocol
- Add to server.py
Agent Communication (Optional/Advanced):
- Create
services/agent_communication.py - Implement message routing
- Multi-agent workflow coordination
- Tool recommendation system
π Next Steps
Immediate (Core System Complete):
- β REST API working
- β InMemoryStore caching
- β DB sync ready
- β Tests passing
Phase 2 (Database Seeding):
- Enhance
database/scripts/build.pyto seed Supabase + Firebase - Test DB restore on container restart
- Verify cross-instance sync
Phase 3 (MCP Layer):
- Create
mcp/prompt_server.py - Add self-discovery endpoint
- Implement RPC handler
- Test AI-to-AI communication
Phase 4 (Agent Orchestration):
- Create
services/orchestrator.py - Create
services/agent_communication.py - Multi-agent workflow coordination
- Tool assignment system
Current Status: β Core system complete and tested
Optional enhancements above can be added as needed for MCP + agent orchestration features.
The system works perfectly NOW for the three core capabilities. MCP and agent features are additive enhancements. π―