Testing & QA Framework
The AltSportsLeagues.ai Testing & QA Framework provides comprehensive quality assurance through multi-layered testing strategies, AI-powered evaluation with 6 specialized personas, and continuous quality monitoring. Built on pytest, Vitest, Playwright, and custom AI testing infrastructure, the framework ensures production-grade reliability for the entire platform, from backend services to frontend applications.
Core Philosophy: Prevention over detection, automation over manual effort, quality as code. The framework integrates with the existing schema-driven data layer, leveraging 150+ Pydantic models for type-safe testing and validated fixtures.
Tech Stack:
- Backend Testing: pytest + pytest-asyncio + TestClient
- Frontend Testing: Vitest + React Testing Library + Playwright
- AI Testing: 6-persona evaluation framework (OpenAI/Anthropic)
- Performance: Locust + k6 + Grafana
- Security: Bandit + Safety + OWASP ZAP
- Coverage: Coverage.py + c8 + SonarQube
- CI/CD: GitHub Actions + Cloud Build
Executive Summary
The Testing & QA Framework addresses the need for robust quality assurance in a complex, AI-powered platform handling sports league data, partnerships, and betting operations. By implementing comprehensive testing across all layers (unit, integration, E2E, performance, security), AI-driven evaluation, and continuous monitoring, the framework ensures the platform meets enterprise-grade reliability standards.
Key Features:
- Multi-Layer Testing: Unit, integration, E2E, performance, security testing
- AI-Powered Evaluation: 6 specialized personas for diverse perspective testing
- Schema Integration: Type-safe testing using 150+ Pydantic models from data-layer-registry
- Automated CI/CD: Quality gates preventing deployment of untested code
- Performance Monitoring: Load testing and real-time metrics
- Security Scanning: Automated vulnerability detection and remediation
The framework achieves >80% code coverage, <1% test failure rate, and ensures all critical user flows are covered by automated tests.
System Architecture
High-Level Testing Architecture
Detailed Requirements
REQ-TQF-001: Unit Testing Infrastructure
User Story: As a developer, I want comprehensive unit testing infrastructure, so that I can verify individual components work correctly in isolation.
Acceptance Criteria
- WHEN writing backend tests, THE Testing_Framework SHALL use pytest with async support
- WHEN writing frontend tests, THE Testing_Framework SHALL use Vitest with React Testing Library
- WHEN running unit tests, THE Testing_Framework SHALL achieve 80%+ code coverage
- WHEN mocking dependencies, THE Testing_Framework SHALL provide fixture factories and mock generators
- WHEN tests fail, THE Testing_Framework SHALL provide clear error messages with stack traces
Implementation Example (Backend - pytest)
"""
Unit tests for LeagueAnalysisService using Pydantic schemas
from data-layer-registry for type-safe testing.
"""
import pytest
from unittest.mock import AsyncMock, patch
from datetime import datetime
# Import schemas from data-layer-registry
from apps.backend.schemas.league import (
LeagueSchema,
LeagueAnalysisRequest,
LeagueAnalysisResponse,
PartnershipScoreSchema,
TierRecommendationSchema
)
from apps.backend.schemas.scoring import (
ScoringDimensionSchema,
ScoreBreakdownSchema
)
from apps.backend.services.league_analysis_service import LeagueAnalysisService
from apps.backend.tests.fixtures.league_fixtures import (
league_factory,
analysis_request_factory,
mock_ai_client
)
class TestLeagueAnalysisService:
"""Test suite for League Analysis Service"""
@pytest.fixture
def service(self, mock_ai_client):
"""Create service instance with mocked AI client"""
return LeagueAnalysisService(ai_client=mock_ai_client)
@pytest.fixture
def sample_league(self):
"""Generate sample league using factory with schema validation"""
return league_factory.create(
name="Premier Lacrosse League",
sport="Lacrosse",
region="North America",
tier=None # To be determined by analysis
)
@pytest.mark.asyncio
async def test_analyze_league_success(self, service, sample_league):
"""
GIVEN: A valid league object conforming to LeagueSchema
WHEN: analyze_league() is called
THEN: Returns LeagueAnalysisResponse with tier recommendation
"""
# Create request using schema
request = LeagueAnalysisRequest(
league_name=sample_league.name,
website_url="https://premierlacrosseleague.com",
contact_email="partnerships@pll.com"
)
# Execute analysis
response = await service.analyze_league(request)
# Validate response schema
assert isinstance(response, LeagueAnalysisResponse)
assert response.league_name == sample_league.name
assert response.recommended_tier in ["1.1", "1.2", "2.1", "2.2", "3.1", "3.2", "4.1", "4.2"]
assert 0 <= response.partnership_score.total_score <= 100
# Validate scoring dimensions
assert len(response.partnership_score.dimensions) == 7
dimension_names = {d.dimension_name for d in response.partnership_score.dimensions}
expected_dimensions = {
"market_size", "data_availability", "betting_popularity",
"growth_potential", "technical_feasibility",
"partnership_readiness", "revenue_potential"
}
assert dimension_names == expected_dimensions
@pytest.mark.asyncio
async def test_analyze_league_invalid_data(self, service):
"""
GIVEN: Invalid league data (fails schema validation)
WHEN: analyze_league() is called
THEN: Raises ValidationError with detailed message
"""
from pydantic import ValidationError
with pytest.raises(ValidationError) as exc_info:
request = LeagueAnalysisRequest(
league_name="", # Invalid: empty string
website_url="not-a-url", # Invalid URL format
contact_email="invalid-email" # Invalid email
)
errors = exc_info.value.errors()
assert len(errors) >= 3
@pytest.mark.asyncio
async def test_scoring_breakdown_calculation(self, service, sample_league):
"""
GIVEN: A league with known characteristics
WHEN: Partnership score is calculated
THEN: Each dimension score is weighted correctly
"""
request = LeagueAnalysisRequest(
league_name=sample_league.name,
website_url="https://example.com",
contact_email="test@example.com"
)
with patch.object(service, '_fetch_market_data') as mock_market:
# Mock market data with known values
mock_market.return_value = {
"market_size": 85,
"data_availability": 70,
"betting_popularity": 60
}
response = await service.analyze_league(request)
# Verify weighted calculation
breakdown = response.partnership_score
total = sum(d.score * d.weight for d in breakdown.dimensions)
expected_total = total / sum(d.weight for d in breakdown.dimensions)
assert abs(breakdown.total_score - expected_total) < 0.01Pytest Configuration
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
asyncio_mode = auto
markers =
unit: Unit tests (fast, isolated)
integration: Integration tests (slower, requires services)
slow: Slow tests (>1s execution time)
ai_persona: AI persona evaluation tests
security: Security-focused tests
addopts =
--strict-markers
--cov=apps/backend
--cov-report=html
--cov-report=term-missing
--cov-fail-under=80
--maxfail=5
-vREQ-TQF-002: Integration Testing System
User Story: As a QA engineer, I want integration tests that verify component interactions, so that I can ensure modules work together correctly.
Acceptance Criteria
- WHEN testing API endpoints, THE Testing_Framework SHALL use TestClient for FastAPI routes
- WHEN testing database operations, THE Testing_Framework SHALL use test databases with transaction rollback
- WHEN testing external integrations, THE Testing_Framework SHALL use contract testing (Pact)
- WHEN testing async workflows, THE Testing_Framework SHALL handle async/await patterns correctly
- WHEN tests complete, THE Testing_Framework SHALL cleanup resources automatically
Implementation Example
# apps/backend/tests/integration/test_league_integration.py
import pytest
from httpx import AsyncClient
from apps.backend.main import app
from apps.backend.tests.fixtures.database_fixtures import (
create_test_database,
truncate_tables
)
@pytest.mark.asyncio
async def test_league_creation_workflow():
"""
Integration test for complete league creation workflow
Tests: API endpoint -> Database persistence -> Schema validation
"""
# Setup test database
test_db = await create_test_database()
# Create test client
async with AsyncClient(app=app, base_url="http://test") as ac:
# Test 1: Create league via API
response = await ac.post(
"/api/leagues",
json={
"name": "Test League Integration",
"sport": "Basketball",
"region": "North America",
"contact_email": "test@league.com"
},
headers={"Authorization": "Bearer test_token"}
)
assert response.status_code == 201
created_league = response.json()
assert created_league["name"] == "Test League Integration"
assert "id" in created_league
# Test 2: Verify database persistence
league_id = created_league["id"]
db_league = await test_db.fetchrow(
"SELECT * FROM leagues WHERE id = $1",
(league_id,)
)
assert db_league["name"] == "Test League Integration"
assert db_league["sport"] == "Basketball"
# Test 3: Update league via API
update_response = await ac.put(
f"/api/leagues/{league_id}",
json={
"name": "Updated League Integration",
"sport": "Basketball",
"region": "Europe"
},
headers={"Authorization": "Bearer test_token"}
)
assert update_response.status_code == 200
updated_league = update_response.json()
assert updated_league["name"] == "Updated League Integration"
assert updated_league["region"] == "Europe"
# Test 4: Verify database update
db_updated = await test_db.fetchrow(
"SELECT * FROM leagues WHERE id = $1",
(league_id,)
)
assert db_updated["name"] == "Updated League Integration"
assert db_updated["region"] == "Europe"
# Test 5: Delete league via API
delete_response = await ac.delete(
f"/api/leagues/{league_id}",
headers={"Authorization": "Bearer test_token"}
)
assert delete_response.status_code == 200
# Test 6: Verify database deletion
db_deleted = await test_db.fetchrow(
"SELECT * FROM leagues WHERE id = $1",
(league_id,)
)
assert db_deleted is None
# Cleanup - transaction rollback handled by fixtureREQ-TQF-003: End-to-End Testing
User Story: As a product manager, I want E2E tests that simulate real user workflows, so that I can verify the entire system works as expected.
Acceptance Criteria
- WHEN running E2E tests, THE Testing_Framework SHALL use Playwright for browser automation
- WHEN testing user flows, THE Testing_Framework SHALL cover critical paths (signup, document upload, contract generation)
- WHEN capturing failures, THE Testing_Framework SHALL save screenshots, videos, and network logs
- WHEN testing across browsers, THE Testing_Framework SHALL support Chrome, Firefox, Safari
- WHEN running in CI, THE Testing_Framework SHALL use headless mode for speed
Implementation Example (E2E - Playwright)
// clients/frontend/e2e/league-onboarding.spec.ts
import { test, expect } from '@playwright/test';
import { LeagueOnboardingPage } from './pages/LeagueOnboardingPage';
import { DashboardPage } from './pages/DashboardPage';
test.describe('League Onboarding E2E Flow', () => {
test('Complete league onboarding journey', async ({ page }) => {
const onboardingPage = new LeagueOnboardingPage(page);
const dashboardPage = new DashboardPage(page);
// Step 1: Navigate to registration
await page.goto('/register');
await expect(page).toHaveTitle(/Register/);
// Step 2: Fill registration form
await onboardingPage.fillRegistrationForm({
email: 'testuser@example.com',
password: 'SecurePass123!',
leagueName: 'Test Integration League',
sportDomain: 'team_player_actions',
targetSportsbooks: [
{ id: 'draftkings', priority: 1 },
{ id: 'underdog', priority: 2 }
]
});
// Step 3: Complete registration
await onboardingPage.submitRegistration();
await expect(page).toHaveURL(/dashboard/);
// Step 4: Verify dashboard
await dashboardPage.verifyLeagueCreated('Test Integration League');
await expect(dashboardPage.leagueCard).toBeVisible();
// Step 5: Upload document
await dashboardPage.uploadDocument('test-questionnaire.pdf');
await expect(dashboardPage.successMessage).toBeVisible({ text: /uploaded/ });
// Step 6: Verify compliance status
await dashboardPage.verifyComplianceStatus('Test Integration League');
await expect(dashboardPage.complianceScore).toBeVisible();
// Step 7: Generate contract
await dashboardPage.generateContract();
await expect(dashboardPage.contractStatus).toBeVisible({ text: /generated/ });
});
test('Handle registration error scenarios', async ({ page }) => {
const onboardingPage = new LeagueOnboardingPage(page);
await page.goto('/register');
// Test invalid email
await onboardingPage.fillRegistrationForm({
email: 'invalid-email',
password: 'SecurePass123!',
leagueName: 'Test League'
});
await onboardingPage.submitRegistration();
await expect(onboardingPage.errorMessage).toBeVisible({ text: /email/ });
// Test weak password
await onboardingPage.fillRegistrationForm({
email: 'valid@example.com',
password: 'weak',
leagueName: 'Test League'
});
await onboardingPage.submitRegistration();
await expect(onboardingPage.errorMessage).toBeVisible({ text: /password/ });
});
});REQ-TQF-004: 6-Persona AI Testing Framework
User Story: As a quality engineer, I want AI-powered testing with multiple personas, so that I can evaluate the system from diverse user perspectives.
Acceptance Criteria
- WHEN evaluating features, THE AI_Personas SHALL include League Admin, Tech Evaluator, Business Partner, Sports Betting Expert, Data Scientist, and Newcomer personas
- WHEN running persona tests, THE Testing_Framework SHALL execute tests in parallel for efficiency
- WHEN generating reports, THE AI_Personas SHALL provide satisfaction scores (0-100) with detailed feedback
- WHEN identifying issues, THE AI_Personas SHALL categorize by severity (critical, high, medium, low)
- WHEN completing evaluation, THE Testing_Framework SHALL aggregate results into comprehensive reports
Implementation Example (AI Persona Evaluator)
"""
Base AI Persona class that uses schemas for structured evaluation.
All persona evaluations return validated SchematicEvaluationResult.
"""
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from pydantic import BaseModel, Field, validator
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
# Import evaluation schemas from data-layer-registry
from apps.backend.schemas.evaluation import (
EvaluationResultSchema,
SatisfactionScoreSchema,
IssueReportSchema,
PersonaFeedbackSchema
)
class AIPersona(ABC):
"""Base class for AI testing personas"""
def __init__(
self,
persona_name: str,
persona_description: str,
expertise_areas: List[str],
ai_client: Optional[AsyncOpenAI | AsyncAnthropic] = None
):
self.persona_name = persona_name
self.persona_description = persona_description
self.expertise_areas = expertise_areas
self.ai_client = ai_client or AsyncOpenAI()
@abstractmethod
async def evaluate_feature(
self,
feature_name: str,
feature_data: Dict[str, Any],
evaluation_criteria: List[str]
) -> EvaluationResultSchema:
"""
Evaluate a feature from persona's perspective.
Returns validated EvaluationResultSchema from data-layer-registry.
"""
pass
async def _generate_evaluation(
self,
prompt: str,
context: Dict[str, Any]
) -> PersonaFeedbackSchema:
"""Generate persona feedback using LLM"""
system_prompt = f"""
You are {self.persona_name}: {self.persona_description}
Your expertise areas: {', '.join(self.expertise_areas)}
Evaluate the feature and provide structured feedback in JSON format
matching the PersonaFeedbackSchema:
{{
"satisfaction_score": 0-100,
"strengths": ["list of strengths"],
"weaknesses": ["list of weaknesses"],
"suggestions": ["list of improvement suggestions"],
"critical_issues": [
{{
"severity": "critical|high|medium|low",
"category": "usability|performance|security|functionality",
"description": "issue description",
"suggested_fix": "how to fix"
}}
],
"overall_impression": "detailed feedback"
}}
"""
response = await self.ai_client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
temperature=0.7
)
feedback_data = json.loads(response.choices[0].message.content)
# Validate against schema
return PersonaFeedbackSchema(**feedback_data)
class LeagueAdminPersona(AIPersona):
"""
Persona: League Administrator
Perspective: Non-technical league operations manager
Focus: Ease of use, onboarding, partnership process
"""
def __init__(self):
super().__init__(
persona_name="League Administrator",
persona_description="A non-technical operations manager for a mid-sized sports league",
expertise_areas=[
"League operations",
"Partnership management",
"Document processing",
"Non-technical workflows"
]
)
async def evaluate_feature(
self,
feature_name: str,
feature_data: Dict[str, Any],
evaluation_criteria: List[str]
) -> EvaluationResultSchema:
"""Evaluate from league admin perspective"""
prompt = f"""
Evaluate the '{feature_name}' feature as a league administrator.
Feature Details:
{json.dumps(feature_data, indent=2)}
Evaluation Criteria:
{chr(10).join(f"- {criterion}" for criterion in evaluation_criteria)}
Focus on:
1. How easy is it for non-technical users?
2. Is the onboarding process clear?
3. Can I complete tasks without technical help?
4. Are error messages understandable?
5. Is the partnership process transparent?
"""
feedback = await self._generate_evaluation(prompt, feature_data)
# Convert to full evaluation result
return EvaluationResultSchema(
persona_name=self.persona_name,
feature_name=feature_name,
satisfaction_score=SatisfactionScoreSchema(
overall=feedback.satisfaction_score,
usability=feedback.satisfaction_score, # League admin focuses on usability
functionality=feedback.satisfaction_score * 0.9,
performance=feedback.satisfaction_score * 0.8
),
strengths=feedback.strengths,
weaknesses=feedback.weaknesses,
suggestions=feedback.suggestions,
critical_issues=feedback.critical_issues,
overall_impression=feedback.overall_impression,
timestamp=datetime.utcnow().isoformat()
)
class TechEvaluatorPersona(AIPersona):
"""
Persona: Technical Evaluator
Perspective: Senior software engineer
Focus: Code quality, architecture, performance, security
"""
def __init__(self):
super().__init__(
persona_name="Technical Evaluator",
persona_description="A senior software engineer with 10+ years experience",
expertise_areas=[
"Software architecture",
"Code quality",
"Performance optimization",
"Security best practices",
"API design"
]
)
async def evaluate_feature(
self,
feature_name: str,
feature_data: Dict[str, Any],
evaluation_criteria: List[str]
) -> EvaluationResultSchema:
"""Evaluate from technical perspective"""
prompt = f"""
Evaluate the '{feature_name}' feature as a senior software engineer.
Feature Details:
{json.dumps(feature_data, indent=2)}
Evaluation Criteria:
{chr(10).join(f"- {criterion}" for criterion in evaluation_criteria)}
Focus on:
1. Is the code well-architected and maintainable?
2. Are there performance bottlenecks?
3. Are security best practices followed?
4. Is error handling comprehensive?
5. Is the API design RESTful and intuitive?
6. Are schemas properly validated?
7. Is the code testable?
"""
feedback = await self._generate_evaluation(prompt, feature_data)
return EvaluationResultSchema(
persona_name=self.persona_name,
feature_name=feature_name,
satisfaction_score=SatisfactionScoreSchema(
overall=feedback.satisfaction_score,
usability=feedback.satisfaction_score * 0.7,
functionality=feedback.satisfaction_score,
performance=feedback.satisfaction_score * 0.95
),
strengths=feedback.strengths,
weaknesses=feedback.weaknesses,
suggestions=feedback.suggestions,
critical_issues=feedback.critical_issues,
overall_impression=feedback.overall_impression,
timestamp=datetime.utcnow().isoformat()
)Running 6-Persona Evaluation Suite
"""
Orchestrates parallel execution of 6 AI persona evaluations.
Aggregates results into comprehensive quality report.
"""
import asyncio
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor
from apps.backend.schemas.evaluation import (
EvaluationResultSchema,
AggregatedEvaluationSchema,
QualityReportSchema
)
from tests.ai_personas.personas import (
LeagueAdminPersona,
TechEvaluatorPersona,
BusinessPartnerPersona,
SportsBettingExpertPersona,
DataScientistPersona,
NewcomerPersona
)
class PersonaEvaluationOrchestrator:
"""Orchestrates 6-persona AI evaluation"""
def __init__(self):
self.personas = [
LeagueAdminPersona(),
TechEvaluatorPersona(),
BusinessPartnerPersona(),
SportsBettingExpertPersona(),
DataScientistPersona(),
NewcomerPersona()
]
async def evaluate_system(
self,
features: List[Dict[str, Any]],
evaluation_criteria: List[str]
) -> QualityReportSchema:
"""
Run comprehensive 6-persona evaluation in parallel.
Returns aggregated QualityReportSchema.
"""
all_evaluations: List[EvaluationResultSchema] = []
# Run all persona evaluations in parallel
tasks = []
for feature in features:
for persona in self.personas:
task = persona.evaluate_feature(
feature_name=feature['name'],
feature_data=feature['data'],
evaluation_criteria=evaluation_criteria
)
tasks.append(task)
# Execute in parallel with timeout
evaluations = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions and collect valid evaluations
for eval_result in evaluations:
if isinstance(eval_result, EvaluationResultSchema):
all_evaluations.append(eval_result)
else:
# Log exception
print(f"Evaluation failed: {eval_result}")
# Aggregate results
return self._aggregate_evaluations(all_evaluations)
def _aggregate_evaluations(
self,
evaluations: List[EvaluationResultSchema]
) -> QualityReportSchema:
"""Aggregate persona evaluations into quality report"""
# Calculate average satisfaction scores
avg_overall = sum(e.satisfaction_score.overall for e in evaluations) / len(evaluations)
avg_usability = sum(e.satisfaction_score.usability for e in evaluations) / len(evaluations)
avg_functionality = sum(e.satisfaction_score.functionality for e in evaluations) / len(evaluations)
avg_performance = sum(e.satisfaction_score.performance for e in evaluations) / len(evaluations)
# Collect all critical issues
all_issues = []
for evaluation in evaluations:
all_issues.extend(evaluation.critical_issues)
# Group issues by severity
critical_issues = [i for i in all_issues if i.severity == "critical"]
high_issues = [i for i in all_issues if i.severity == "high"]
# Generate quality grade
quality_grade = self._calculate_quality_grade(
avg_overall,
len(critical_issues),
len(high_issues)
)
return QualityReportSchema(
report_id=str(uuid.uuid4()),
average_satisfaction=avg_overall,
satisfaction_breakdown={
"usability": avg_usability,
"functionality": avg_functionality,
"performance": avg_performance
},
total_evaluations=len(evaluations),
personas_evaluated=[e.persona_name for e in evaluations],
critical_issues_count=len(critical_issues),
high_issues_count=len(high_issues),
quality_grade=quality_grade,
recommendations=self._generate_recommendations(evaluations),
detailed_evaluations=evaluations,
generated_at=datetime.utcnow().isoformat()
)
def _calculate_quality_grade(
self,
avg_score: float,
critical_issues: int,
high_issues: int
) -> str:
"""Calculate quality grade: A+, A, B+, B, C, D, F"""
if critical_issues > 0:
return "D" if critical_issues <= 2 else "F"
if high_issues > 5:
return "C"
if avg_score >= 90:
return "A+" if high_issues == 0 else "A"
elif avg_score >= 80:
return "B+" if high_issues <= 2 else "B"
elif avg_score >= 70:
return "C"
else:
return "D"REQ-TQF-005: Performance Testing
User Story: As a DevOps engineer, I want performance testing tools, so that I can ensure the system meets performance requirements under load.
Acceptance Criteria
- WHEN load testing, THE Performance_Testing SHALL use Locust or k6 for API endpoints
- WHEN stress testing, THE Performance_Testing SHALL identify system breaking points
- WHEN measuring latency, THE Performance_Testing SHALL track p50, p95, p99 response times
- WHEN testing concurrency, THE Performance_Testing SHALL simulate 100+ concurrent users
- WHEN generating reports, THE Performance_Testing SHALL visualize metrics in Grafana dashboards
Implementation Example (Performance - Locust)
"""
Performance testing using Locust.
Tests backend API endpoints with realistic load patterns.
"""
from locust import HttpUser, task, between
import random
from datetime import datetime
# Import request/response schemas for type-safe testing
from apps.backend.schemas.league import LeagueAnalysisRequest
from apps.backend.schemas.document import DocumentUploadRequest
class LeagueManagementUser(HttpUser):
"""Simulates typical user behavior"""
wait_time = between(1, 5) # 1-5 seconds between requests
def on_start(self):
"""Login and setup"""
response = self.client.post("/api/auth/login", json={
"email": "test@example.com",
"password": "testpass123"
})
self.token = response.json()["access_token"]
self.headers = {"Authorization": f"Bearer {self.token}"}
@task(3) # 30% of requests
def view_dashboard(self):
"""Load league dashboard"""
self.client.get("/api/leagues", headers=self.headers)
@task(2) # 20% of requests
def analyze_league(self):
"""Trigger league analysis"""
request = LeagueAnalysisRequest(
league_name=f"Test League {random.randint(1, 100)}",
website_url="https://example.com",
contact_email="test@example.com"
)
with self.client.post(
"/api/leagues/analyze",
json=request.dict(),
headers=self.headers,
catch_response=True
) as response:
if response.status_code == 200:
# Verify response matches schema
data = response.json()
if "recommended_tier" in data and "partnership_score" in data:
response.success()
else:
response.failure("Invalid response schema")
else:
response.failure(f"Failed with status {response.status_code}")
@task(1) # 10% of requests
def upload_document(self):
"""Upload league questionnaire"""
files = {
"file": ("questionnaire.pdf", b"fake pdf content", "application/pdf")
}
self.client.post(
"/api/documents/upload",
files=files,
headers=self.headers
)
@task(1) # 10% of requests
def search_leagues(self):
"""Search for leagues"""
query = random.choice(["basketball", "football", "lacrosse", "hockey"])
self.client.get(
f"/api/leagues/search?q={query}",
headers=self.headers
)REQ-TQF-006: Security Testing
User Story: As a security engineer, I want automated security testing, so that I can identify and fix vulnerabilities before production.
Acceptance Criteria
- WHEN scanning code, THE Security_Testing SHALL use SAST tools (Bandit, ESLint security plugins)
- WHEN testing dependencies, THE Security_Testing SHALL scan for known vulnerabilities (npm audit, Safety)
- WHEN testing APIs, THE Security_Testing SHALL perform OWASP Top 10 checks
- WHEN testing authentication, THE Security_Testing SHALL verify JWT validation and session management
- WHEN finding vulnerabilities, THE Security_Testing SHALL generate prioritized reports with remediation guidance
Implementation Example
# security/vulnerability/vulnerability_scanner.py
import subprocess
import json
from typing import Dict, List, Any
from datetime import datetime
class VulnerabilityScanner:
"""Automated vulnerability scanning"""
def __init__(self, project_root: str):
self.project_root = project_root
self.scan_results = []
async def scan_code_sast(self, target: str = "all") -> List[SecurityFinding]:
"""Run SAST code scanning"""
findings = []
# Python code (Bandit)
if target in ["all", "python"]:
try:
result = subprocess.run(
["bandit", "-r", f"{self.project_root}/apps/backend", "-f", "json", "-o", "-"],
capture_output=True,
text=True,
check=True
)
python_findings = json.loads(result.stdout)
for issue in python_findings.get("results", []):
findings.append(SecurityFinding(
tool="bandit",
severity=issue["issue_severity"],
confidence=issue["issue_confidence"],
description=issue["issue_text"],
location=f"{self.project_root}/apps/backend/{issue['filename']}:{issue['line_number']}",
timestamp=datetime.utcnow()
))
except subprocess.CalledProcessError as e:
logger.error("Bandit scan failed", error=str(e))
# JavaScript/TypeScript (ESLint with security plugins)
if target in ["all", "frontend"]:
try:
result = subprocess.run(
[
"npx", "eslint", "--ext", ".js,.ts,.tsx",
"--format", "json",
f"{self.project_root}/clients/frontend/src"
],
capture_output=True,
text=True,
check=True
)
js_findings = json.loads(result.stdout)
for file_results in js_findings:
for issue in file_results["messages"]:
if issue["ruleId"] in ["security/detect-unsafe-regex", "security/detect-possible-timing-attacks"]:
findings.append(SecurityFinding(
tool="eslint-security",
severity=issue["severity"][0].upper(),
confidence="high",
description=issue["message"],
location=f"{self.project_root}/clients/frontend/src/{file_results['filePath']}:{issue['line']}",
timestamp=datetime.utcnow()
))
except subprocess.CalledProcessError as e:
logger.error("ESLint security scan failed", error=str(e))
return findings
async def scan_dependencies(self) -> List[SecurityFinding]:
"""Scan dependencies for vulnerabilities"""
findings = []
# Python dependencies (Safety)
try:
result = subprocess.run(
["safety", "check", "--json"],
capture_output=True,
text=True,
cwd=self.project_root,
check=True
)
vuln_data = json.loads(result.stdout)
for vuln in vuln_data:
findings.append(SecurityFinding(
tool="safety",
severity=vuln["package"]["advisories"][0]["advisory"]["severity"],
confidence="high",
description=f"Dependency vulnerability: {vuln['package']['name']} {vuln['package']['version']} - {vuln['package']['advisories'][0]['advisory']['description']}",
location=f"{self.project_root}/pyproject.toml",
timestamp=datetime.utcnow()
))
except subprocess.CalledProcessError as e:
logger.error("Safety scan failed", error=str(e))
# JavaScript dependencies (npm audit)
try:
result = subprocess.run(
["npm", "audit", "--json"],
capture_output=True,
text=True,
cwd=f"{self.project_root}/clients/frontend",
check=True
)
audit_data = json.loads(result.stdout)
for vuln in audit_data.get("vulnerabilities", {}).values():
if vuln["severity"] in ["high", "critical"]:
findings.append(SecurityFinding(
tool="npm-audit",
severity=vuln["severity"].upper(),
confidence="high",
description=f"Dependency vulnerability: {vuln['name']}@{vuln['version']} - {vuln['overview']}",
location=f"{self.project_root}/clients/frontend/package.json",
timestamp=datetime.utcnow()
))
except subprocess.CalledProcessError as e:
logger.error("npm audit failed", error=str(e))
return findings
async def scan_containers(self) -> List[SecurityFinding]:
"""Scan Docker images for vulnerabilities"""
findings = []
# Run Trivy scan on all images
try:
result = subprocess.run(
["trivy", "image", "--format", "json", "--exit-code", "0", "--no-progress"],
capture_output=True,
text=True,
cwd=self.project_root,
check=True
)
trivy_results = json.loads(result.stdout)
for result in trivy_results.get("Results", []):
for vuln in result.get("Vulnerabilities", []):
if vuln["Severity"] in ["CRITICAL", "HIGH"]:
findings.append(SecurityFinding(
tool="trivy",
severity=vuln["Severity"],
confidence="high",
description=f"Container vulnerability: {vuln['VulnerabilityID']} - {vuln['Title']}",
location=f"Docker image: {result['Target']}",
timestamp=datetime.utcnow()
))
except subprocess.CalledProcessError as e:
logger.error("Trivy scan failed", error=str(e))
return findings
async def run_owasp_zap_scan(self, target_url: str) -> List[SecurityFinding]:
"""Run OWASP ZAP dynamic analysis"""
findings = []
# Configure and run ZAP scan
try:
# Start ZAP in daemon mode (in production, use containerized ZAP)
zap_config = {
"target": target_url,
"ajaxSpider": True,
"context": "Production API",
"attackStrength": "MEDIUM",
"alertThreshold": "HIGH"
}
# Execute scan (simplified - in practice, use ZAP API)
# result = await self._execute_zap_scan(zap_config)
# Parse results
# for alert in result.alerts:
# if alert.risk in ["High", "Medium"]:
# findings.append(SecurityFinding(...))
pass # Placeholder for ZAP integration
except Exception as e:
logger.error("OWASP ZAP scan failed", error=str(e))
return findings
async def generate_vulnerability_report(self, scan_results: List[SecurityFinding]) -> VulnerabilityReport:
"""Generate comprehensive vulnerability report"""
# Categorize findings
critical = [f for f in scan_results if f.severity == "CRITICAL"]
high = [f for f in scan_results if f.severity == "HIGH"]
medium = [f for f in scan_results if f.severity == "MEDIUM"]
low = [f for f in scan_results if f.severity == "LOW"]
# Generate remediation plan
remediation_plan = await self._generate_remediation_plan(scan_results)
return VulnerabilityReport(
total_findings=len(scan_results),
critical_count=len(critical),
high_count=len(high),
medium_count=len(medium),
low_count=len(low),
findings=scan_results,
remediation_plan=remediation_plan,
report_date=datetime.utcnow(),
priority="immediate" if critical else "high"
)REQ-TQF-007: Test Data Management
User Story: As a tester, I want realistic test data generation, so that I can test with production-like scenarios.
Acceptance Criteria
- WHEN generating test data, THE Test_Data SHALL use Faker for synthetic data creation
- WHEN creating fixtures, THE Test_Data SHALL provide factory patterns for domain models
- WHEN seeding databases, THE Test_Data SHALL maintain referential integrity
- WHEN anonymizing data, THE Test_Data SHALL mask PII for compliance
- WHEN managing test data, THE Test_Data SHALL version fixtures with migrations
Implementation Example
# apps/backend/tests/fixtures/league_fixtures.py
import factory
from factory import fuzzy
from faker import Faker
from datetime import datetime, timedelta
from uuid import uuid4
from apps.backend.models.league import League
from apps.backend.models.user import User
from apps.backend.tests.fixtures.user_fixtures import user_factory
fake = Faker()
class LeagueFactory(factory.Factory):
"""Factory for League model with schema validation"""
class Meta:
model = League
id = factory.LazyAttribute(lambda _: str(uuid4()))
name = fuzzy.FuzzyText(length=20)
sport = fuzzy.FuzzyChoice(choices=["Basketball", "Football", "Lacrosse", "Hockey"])
region = fuzzy.FuzzyChoice(choices=["North America", "Europe", "Asia"])
tier = fuzzy.FuzzyChoice(choices=["1.1", "1.2", "2.1", "2.2", "3.1", "3.2", "4.1", "4.2"])
partnership_score = fuzzy.FuzzyFloat(low=0.0, high=100.0)
created_at = factory.LazyAttribute(lambda _: datetime.utcnow())
updated_at = factory.LazyAttribute(lambda _: datetime.utcnow())
owner = factory.SubFactory(user_factory)
@factory.post_generation
def post_generation(self, create, extracted, **kwargs):
"""Post-generation hooks for complex relationships"""
if create:
# Ensure owner exists
if not self.owner:
self.owner = user_factory.create()
# Validate tier format
if self.tier not in ["1.1", "1.2", "2.1", "2.2", "3.1", "3.2", "4.1", "4.2"]:
self.tier = "2.1" # Default valid tier
@classmethod
def create_league_with_partnerships(cls, count: int = 3):
"""Create league with associated partnerships"""
league = cls.create()
# Create partnerships
for _ in range(count):
partnership = PartnershipFactory.create(league=league)
return league
@classmethod
def create_realistic_league(cls, sport_domain: str = "team_player_actions"):
"""Create realistic league for given sport domain"""
sport_choices = {
"combat": ["Boxing", "MMA", "Wrestling"],
"racing": ["Horse Racing", "Auto Racing", "Motorcycle Racing"],
"action_sports": ["Skateboarding", "BMX", "Snowboarding"],
"team_player_actions": ["Basketball", "Football", "Hockey"],
"competitive_turn_taking": ["Chess", "Poker", "Esports"]
}
sport = random.choice(sport_choices.get(sport_domain, ["Basketball"]))
return cls.create(
name=f"{sport} Professional League",
sport=sport,
region=fake.country(),
tier=random.choice(["2.1", "3.1", "4.1"]),
partnership_score=random.uniform(50, 85)
)REQ-TQF-008: CI/CD Integration
User Story: As a DevOps engineer, I want automated testing in CI/CD pipelines, so that quality gates prevent buggy code from reaching production.
Acceptance Criteria
- WHEN code is pushed, THE CI_CD_Integration SHALL trigger automated test suites
- WHEN PRs are created, THE CI_CD_Integration SHALL run tests and report results
- WHEN deploying to staging, THE CI_CD_Integration SHALL run smoke tests
- WHEN releasing to production, THE CI_CD_Integration SHALL require all tests to pass
- WHEN tests fail, THE CI_CD_Integration SHALL block deployment and notify developers
Implementation Example (CI/CD - GitHub Actions)
name: Test Suite
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main, develop ]
env:
PYTHON_VERSION: "3.11"
NODE_VERSION: "20"
jobs:
backend-tests:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: test_db
ports:
- 5432:5432
redis:
image: redis:7
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install dependencies
run: |
pip install -r apps/backend/requirements.txt
pip install pytest pytest-asyncio pytest-cov
- name: Run unit tests
run: |
cd apps/backend
pytest tests/unit -v --cov=. --cov-report=xml
- name: Run integration tests
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
REDIS_URL: redis://localhost:6379
run: |
cd apps/backend
pytest tests/integration -v
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./apps/backend/coverage.xml
flags: backend
frontend-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: ${{ env.NODE_VERSION }}
- name: Install dependencies
run: |
cd clients/frontend
npm ci
- name: Run unit tests
run: |
cd clients/frontend
npm run test:unit
- name: Run component tests
run: |
cd clients/frontend
npm run test:components
- name: Build application
run: |
cd clients/frontend
npm run build
e2e-tests:
runs-on: ubuntu-latest
needs: [backend-tests, frontend-tests]
steps:
- uses: actions/checkout@v4
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: ${{ env.NODE_VERSION }}
- name: Install Playwright
run: |
cd clients/frontend
npm ci
npx playwright install --with-deps
- name: Run E2E tests
run: |
cd clients/frontend
npm run test:e2e
- name: Upload Playwright report
if: always()
uses: actions/upload-artifact@v3
with:
name: playwright-report
path: clients/frontend/playwright-report/
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run Bandit (Python)
run: |
pip install bandit
bandit -r apps/backend -f json -o bandit-report.json
- name: Run npm audit (Frontend)
run: |
cd clients/frontend
npm audit --audit-level=moderate
- name: Run Snyk scan
uses: snyk/actions/python@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
with:
args: --severity-threshold=high
quality-gate:
runs-on: ubuntu-latest
needs: [backend-tests, frontend-tests, e2e-tests, security-scan]
steps:
- name: Check quality metrics
run: |
echo "All tests passed!"
echo "Quality gate: PASSED β
"REQ-TQF-009: Test Orchestration
User Story: As a QA lead, I want parallel test execution, so that test suites complete quickly without bottlenecks.
Acceptance Criteria
- WHEN running test suites, THE Test_Orchestration SHALL execute tests in parallel across workers
- WHEN distributing tests, THE Test_Orchestration SHALL balance load across available resources
- WHEN collecting results, THE Test_Orchestration SHALL aggregate from all workers
- WHEN retrying flaky tests, THE Test_Orchestration SHALL automatically retry up to 3 times
- WHEN optimizing, THE Test_Orchestration SHALL prioritize fast-failing tests first
Implementation Example
# tests/orchestrator/test_orchestrator.py
import asyncio
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor
import pytest
from tests.fixtures.test_fixtures import TestSuite, TestResult
class TestOrchestrator:
"""Parallel test execution orchestrator"""
def __init__(self, max_workers: int = None):
self.max_workers = max_workers or os.cpu_count()
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
async def run_parallel_tests(self, test_suites: List[TestSuite]) -> AggregatedTestResult:
"""Run test suites in parallel"""
all_results = []
failed_suites = []
# Run each suite in parallel
tasks = [
self.executor.submit(self._run_test_suite, suite)
for suite in test_suites
]
# Collect results
for future in asyncio.as_completed(tasks):
try:
result = future.result()
all_results.extend(result.results)
if result.failed_tests:
failed_suites.append(result.suite_name)
except Exception as e:
# Log suite failure
all_results.append(TestResult(
test_id="suite_error",
suite_name="unknown",
status="error",
error=str(e),
duration=0
))
# Retry flaky tests
flaky_results = await self._retry_flaky_tests(all_results)
all_results.extend(flaky_results)
# Aggregate results
aggregated = self._aggregate_results(all_results)
return AggregatedTestResult(
total_tests=len(all_results),
passed_tests=len([r for r in all_results if r.status == "passed"]),
failed_tests=len([r for r in all_results if r.status == "failed"]),
error_tests=len([r for r in all_results if r.status == "error"]),
duration=sum(r.duration for r in all_results),
flaky_retries=len(flaky_results),
suite_results=all_results
)
def _run_test_suite(self, suite: TestSuite) -> TestSuiteResult:
"""Run individual test suite"""
results = []
for test in suite.tests:
start_time = time.time()
try:
outcome = test.run()
duration = time.time() - start_time
results.append(TestResult(
test_id=test.id,
suite_name=suite.name,
status="passed" if outcome else "failed",
duration=duration
))
except Exception as e:
duration = time.time() - start_time
results.append(TestResult(
test_id=test.id,
suite_name=suite.name,
status="error",
error=str(e),
duration=duration
))
return TestSuiteResult(
suite_name=suite.name,
results=results,
failed_tests=[r for r in results if r.status != "passed"]
)
async def _retry_flaky_tests(self, results: List[TestResult]) -> List[TestResult]:
"""Retry flaky tests up to 3 times"""
flaky_tests = [r for r in results if r.status == "failed" and "flaky" in r.tags]
retry_results = []
for test_result in flaky_tests:
for attempt in range(3):
try:
# Rerun test
outcome = test_result.test.rerun()
duration = 0 # Measure actual duration
retry_results.append(TestResult(
test_id=test_result.test_id,
suite_name=test_result.suite_name,
status="passed" if outcome else "failed",
duration=duration,
retry_attempt=attempt + 1
))
break
except Exception as e:
duration = 0
retry_results.append(TestResult(
test_id=test_result.test_id,
suite_name=test_result.suite_name,
status="error",
error=f"Retry {attempt + 1} failed: {str(e)}",
duration=duration,
retry_attempt=attempt + 1
))
return retry_results
def _aggregate_results(self, results: List[TestResult]) -> AggregatedTestResult:
"""Aggregate test results"""
passed = len([r for r in results if r.status == "passed"])
failed = len([r for r in results if r.status == "failed"])
errors = len([r for r in results if r.status == "error"])
return AggregatedTestResult(
total= len(results),
passed=passed,
failed=failed,
errors=errors,
pass_rate=passed / len(results) if results else 0,
results=results
)REQ-TQF-010: Visual Regression Testing
User Story: As a frontend developer, I want visual regression testing, so that UI changes don't introduce unintended visual bugs.
Acceptance Criteria
- WHEN capturing screenshots, THE Testing_Framework SHALL use Percy or Chromatic
- WHEN comparing visuals, THE Testing_Framework SHALL detect pixel-level differences
- WHEN changes are detected, THE Testing_Framework SHALL require manual approval
- WHEN testing responsively, THE Testing_Framework SHALL capture multiple viewport sizes
- WHEN baseline updates occur, THE Testing_Framework SHALL version baseline images
Implementation Example
// clients/frontend/e2e/visual-regression.spec.ts
import { test, expect } from '@playwright/test';
import { defineConfig, devices } from '@playwright/test';
test.describe('Visual Regression Tests', () => {
test.describe('League Dashboard', () => {
test('Desktop view matches baseline', async ({ page }) => {
await page.goto('/dashboard');
// Wait for dynamic content to load
await page.waitForLoadState('networkidle');
// Capture screenshot of key sections
await expect(page).toHaveScreenshot('dashboard-desktop.png', {
fullPage: true,
threshold: 0.1, // 10% tolerance for minor changes
mask: [
// Mask dynamic elements
page.locator('[data-testid="timestamp"]'),
page.locator('[data-testid="user-menu"]')
]
});
});
test('Mobile view matches baseline', async ({ page }) => {
// Set mobile viewport
await page.setViewportSize({ width: 375, height: 812 });
await page.goto('/dashboard');
await page.waitForLoadState('networkidle');
await expect(page).toHaveScreenshot('dashboard-mobile.png', {
fullPage: true,
threshold: 0.1
});
});
test('Dark mode matches baseline', async ({ page }) => {
// Enable dark mode
await page.evaluate(() => {
localStorage.setItem('theme', 'dark');
document.documentElement.classList.add('dark');
});
await page.goto('/dashboard');
await page.waitForLoadState('networkidle');
await expect(page).toHaveScreenshot('dashboard-dark.png', {
fullPage: true,
threshold: 0.1
});
});
});
test.describe('Compliance Dashboard', () => {
test('Compliance cards render correctly', async ({ page }) => {
await page.goto('/compliance');
// Test different states
const complianceStates = [
{ score: 100, status: 'complete' },
{ score: 75, status: 'in-progress' },
{ score: 25, status: 'needs-attention' }
];
for (const state of complianceStates) {
// Mock compliance data
await page.evaluate((data) => {
window.complianceData = data;
}, state);
// Reload to trigger re-render
await page.reload();
await page.waitForLoadState('networkidle');
await expect(page.locator(`[data-score="${state.score}"]`)).toHaveScreenshot(
`compliance-card-${state.status}.png`,
{ threshold: 0.05 }
);
}
});
});
});
// playwright.config.ts
import { defineConfig, devices } from '@playwright/test';
export default defineConfig({
testDir: './e2e',
fullyParallel: true,
forbidOnly: !!process.env.CI,
retries: process.env.CI ? 2 : 0,
workers: process.env.CI ? 1 : undefined,
reporter: [['html'], ['json', { outputFile: 'test-results.json' }]],
use: {
baseURL: 'http://localhost:3000',
trace: 'on-first-retry',
screenshot: 'only-on-failure',
video: 'retain-on-failure',
},
projects: [
{
name: 'chromium',
use: { ...devices['Desktop Chrome'] },
},
{
name: 'firefox',
use: { ...devices['Desktop Firefox'] },
},
{
name: 'webkit',
use: { ...devices['Desktop Safari'] },
},
{
name: 'Mobile Chrome',
use: { ...devices['Pixel 5'] },
},
],
webServer: {
command: 'npm run start',
url: 'http://localhost:3000',
reuseExistingServer: !process.env.CI,
},
});REQ-TQF-011: API Contract Testing
User Story: As a backend developer, I want contract testing for APIs, so that breaking changes are detected early.
Acceptance Criteria
- WHEN defining contracts, THE Testing_Framework SHALL use OpenAPI schemas as source of truth
- WHEN validating requests, THE Testing_Framework SHALL verify request/response schemas
- WHEN versioning APIs, THE Testing_Framework SHALL test backward compatibility
- WHEN integrating with external APIs, THE Testing_Framework SHALL mock based on contracts
- WHEN contracts change, THE Testing_Framework SHALL generate consumer compatibility reports
Implementation Example
# tests/integration/test_api_contracts.py
import pytest
from pact import Consumer
from apps.backend.main import app
from apps.backend.tests.fixtures.contract_fixtures import (
league_contract,
partnership_contract
)
pact = Consumer('League API Consumer')
@pytest.fixture(scope="session")
def pact_broker():
"""Configure Pact broker"""
return pact.setup_broker(
broker_url='http://localhost:9292',
provider_order={
'league_service': 'http://localhost:8000'
}
)
class TestLeagueAPIContract:
"""API contract tests using Pact"""
@pytest.mark.asyncio
async def test_league_creation_contract(self, pact_broker):
"""
Contract test for league creation API
Verifies request/response schema against OpenAPI specification
"""
# Define expected interactions
league_contract.given('a league creation request with valid data').
upon_receiving('a valid league creation request').
with_request('POST', '/api/leagues', body={
"name": "Contract Test League",
"sport": "Basketball",
"region": "North America",
"contact_email": "contract@test.com"
}).
will_respond_with(
201,
headers={'Content-Type': 'application/json'},
body={
"id": "123e4567-e89b-12d3-a456-426614174000",
"name": "Contract Test League",
"sport": "Basketball",
"region": "North America",
"contact_email": "contract@test.com",
"created_at": "2024-01-01T00:00:00Z"
}
)
# Verify contract
with league_contract:
async with AsyncClient(app=app) as client:
response = await client.post(
"/api/leagues",
json={
"name": "Contract Test League",
"sport": "Basketball",
"region": "North America",
"contact_email": "contract@test.com"
}
)
assert response.status_code == 201
assert response.json()["name"] == "Contract Test League"
@pytest.mark.asyncio
async def test_backward_compatibility(self, pact_broker):
"""
Test API backward compatibility
Ensures new versions don't break existing consumers
"""
# Test v1 contract still works with v2 implementation
legacy_contract.given('legacy consumer using v1 schema').
upon_receiving('v1 league creation request').
with_request('POST', '/api/v1/leagues', body={
"name": "Legacy Test League",
"sport": "Football"
}).
will_respond_with(
201,
body={
"id": "legacy-123",
"name": "Legacy Test League",
"sport": "Football"
}
)
# Verify backward compatibility
with legacy_contract:
async with AsyncClient(app=app) as client:
response = await client.post(
"/api/v1/leagues",
json={
"name": "Legacy Test League",
"sport": "Football"
}
)
assert response.status_code == 201
@pytest.mark.asyncio
async def test_external_api_contract(self, pact_broker):
"""
Contract test for external sportsbook API
Ensures integration works with expected contract
"""
sportsbook_contract.given('sportsbook provides league data').
upon_receiving('league data request').
with_request('GET', '/api/external/sportsbooks/draftkings/leagues').
will_respond_with(
200,
body={
"leagues": [
{
"id": "draftkings-1",
"name": "Premier League",
"sport": "Soccer",
"active": true
}
]
}
)
# Verify external contract
with sportsbook_contract:
async with AsyncClient(app=app) as client:
response = await client.get("/api/external/sportsbooks/draftkings/leagues")
assert response.status_code == 200
assert "leagues" in response.json()REQ-TQF-012: Test Coverage & Quality Metrics
User Story: As a tech lead, I want comprehensive quality metrics, so that I can track testing effectiveness and identify gaps.
Acceptance Criteria
- WHEN measuring coverage, THE Quality_Metrics SHALL track line, branch, and function coverage
- WHEN setting thresholds, THE Quality_Metrics SHALL enforce minimum 80% code coverage
- WHEN visualizing trends, THE Quality_Metrics SHALL show coverage over time
- WHEN analyzing quality, THE Quality_Metrics SHALL track test execution time trends
- WHEN reporting, THE Quality_Metrics SHALL integrate with SonarQube or CodeClimate
Implementation Example
# tests/metrics/quality_metrics.py
import pytest
from datetime import datetime, timedelta
from typing import Dict, List, Any
from apps.backend.services.quality import QualityMetricsService
from apps.backend.schemas.quality import QualityReportSchema, CoverageMetricsSchema
class TestQualityMetrics:
"""Test quality metrics collection and reporting"""
@pytest.fixture
def metrics_service(self):
return QualityMetricsService()
@pytest.mark.asyncio
async def test_coverage_metrics(self, metrics_service):
"""Test coverage metrics calculation"""
# Mock coverage data
coverage_data = {
"backend": {
"line_coverage": 85.5,
"branch_coverage": 78.2,
"function_coverage": 92.1
},
"frontend": {
"line_coverage": 76.8,
"branch_coverage": 65.4,
"function_coverage": 88.7
}
}
# Calculate metrics
report = await metrics_service.calculate_coverage_metrics(coverage_data)
# Validate schema
assert isinstance(report, QualityReportSchema)
assert report.total_coverage > 80.0 # Overall threshold
assert report.backend_coverage.line_coverage == 85.5
assert report.frontend_coverage.line_coverage == 76.8
# Test threshold enforcement
assert report.coverage_status == "passing" # Should pass
assert report.threshold_met is True
@pytest.mark.asyncio
async def test_quality_trends(self, metrics_service):
"""Test quality trends analysis"""
# Mock historical data
historical_data = [
{
"date": datetime.utcnow() - timedelta(days=30),
"backend_coverage": 82.1,
"frontend_coverage": 74.5,
"test_execution_time": 180.0 # seconds
},
{
"date": datetime.utcnow() - timedelta(days=15),
"backend_coverage": 84.3,
"frontend_coverage": 76.2,
"test_execution_time": 165.0
},
{
"date": datetime.utcnow(),
"backend_coverage": 85.5,
"frontend_coverage": 78.9,
"test_execution_time": 150.0
}
]
# Analyze trends
trends = await metrics_service.analyze_quality_trends(historical_data)
# Validate trends
assert trends.coverage_trend == "improving"
assert trends.execution_time_trend == "improving"
assert trends.overall_quality_trend == "positive"
# Test visualization data
assert len(trends.coverage_history) == 3
assert trends.coverage_history[0]["date"] == historical_data[0]["date"].isoformat()
@pytest.mark.asyncio
async def test_sonarqube_integration(self, metrics_service):
"""Test SonarQube integration"""
# Mock SonarQube data
sonarqube_data = {
"measures": [
{
"metric": "coverage",
"value": "85.5"
},
{
"metric": "duplicated_lines_density",
"value": "2.1"
},
{
"metric": "bugs",
"value": "3"
},
{
"metric": "vulnerabilities",
"value": "1"
},
{
"metric": "code_smells",
"value": "15"
}
]
}
# Process SonarQube data
quality_score = await metrics_service.process_sonarqube_data(sonarqube_data)
# Validate quality score
assert quality_score.overall_score > 80
assert quality_score.quality_gate_status == "PASS"
assert len(quality_score.issues) == 19 # 3 bugs + 1 vuln + 15 smells
assert quality_score.security_rating == "A" # Based on low vulnerabilities
@pytest.mark.asyncio
async def test_quality_gate_enforcement(self, metrics_service):
"""Test quality gate enforcement in CI/CD"""
# Mock failing metrics
failing_metrics = {
"coverage": 75.0, # Below 80% threshold
"bugs": 10,
"vulnerabilities": 5,
"code_smells": 50
}
# Test gate failure
gate_result = await metrics_service.enforce_quality_gate(failing_metrics)
assert gate_result.passed is False
assert "coverage" in gate_result.failed_checks
assert "security" in gate_result.failed_checks
# Mock passing metrics
passing_metrics = {
"coverage": 85.0,
"bugs": 2,
"vulnerabilities": 0,
"code_smells": 10
}
# Test gate pass
gate_result = await metrics_service.enforce_quality_gate(passing_metrics)
assert gate_result.passed is True
assert len(gate_result.failed_checks) == 0REQ-TQF-013: Chaos Engineering & Resilience Testing
User Story: As a reliability engineer, I want chaos testing, so that I can verify system resilience under failure conditions.
Acceptance Criteria
- WHEN testing resilience, THE Testing_Framework SHALL simulate network failures and latency
- WHEN injecting faults, THE Testing_Framework SHALL test database connection failures
- WHEN testing recovery, THE Testing_Framework SHALL verify circuit breaker behavior
- WHEN monitoring, THE Testing_Framework SHALL track system recovery time
- WHEN documenting, THE Testing_Framework SHALL produce resilience test reports
Implementation Example
# tests/chaos/test_resilience.py
import pytest
import asyncio
from unittest.mock import patch, AsyncMock
from apps.backend.services.resilience import ResilienceTester
from apps.backend.schemas.resilience import ResilienceTestResult
class TestChaosEngineering:
"""Chaos engineering and resilience testing"""
@pytest.fixture
def resilience_tester(self):
return ResilienceTester()
@pytest.mark.asyncio
async def test_network_latency_injection(self, resilience_tester):
"""Test system behavior under network latency"""
# Mock network delay
with patch('httpx.AsyncClient', autospec=True) as mock_client:
# Simulate 500ms latency
mock_client.return_value.get = AsyncMock()
mock_client.return_value.get.side_effect = self._inject_latency
# Run test with latency
result = await resilience_tester.test_network_latency(
endpoint="/api/leagues",
expected_status=200,
max_latency_ms=500
)
# Validate resilience
assert result.system_behaved_normally is True
assert result.recovery_time_ms < 2000
assert result.circuit_breaker_triggered is False
@pytest.mark.asyncio
async def test_database_failure_injection(self, resilience_tester):
"""Test database connection failures"""
# Mock database failure
with patch('apps.backend.database.connection', autospec=True) as mock_db:
# Simulate connection failure
mock_db.execute.side_effect = ConnectionError("Database unavailable")
# Run test
result = await resilience_tester.test_database_failure(
operation="query_leagues",
retry_attempts=3,
fallback_strategy="cache"
)
# Validate fallback behavior
assert result.fallback_used is True
assert result.recovery_strategy_applied is True
assert "cache" in result.fallback_strategy
@pytest.mark.asyncio
async def test_circuit_breaker_behavior(self, resilience_tester):
"""Test circuit breaker under failure conditions"""
# Mock repeated failures
failure_count = 5
with patch('apps.backend.services.external_api', autospec=True) as mock_api:
# Simulate 5 consecutive failures
mock_api.call.side_effect = [Exception() for _ in range(failure_count)] + [AsyncMock()]
# Run test
result = await resilience_tester.test_circuit_breaker(
service="external_api",
failure_threshold=3,
timeout_duration=30 # seconds
)
# Validate circuit breaker opened
assert result.circuit_breaker_opened is True
assert result.failure_threshold_reached is True
assert result.open_duration_seconds == 30
# Validate recovery
assert result.recovery_successful is True
assert result.half_open_state_tested is True
def _inject_latency(self, *args, **kwargs):
"""Mock function with injected latency"""
import asyncio
asyncio.sleep(0.5) # 500ms delay
return AsyncMock()
async def _simulate_database_recovery(self, tester):
"""Helper to test database recovery"""
passREQ-TQF-014: Accessibility Testing
User Story: As an accessibility specialist, I want automated a11y testing, so that the application meets WCAG 2.1 AA standards.
Acceptance Criteria
- WHEN scanning pages, THE Testing_Framework SHALL use axe-core for accessibility checks
- WHEN testing keyboard navigation, THE Testing_Framework SHALL verify all interactive elements are accessible
- WHEN checking contrast, THE Testing_Framework SHALL verify WCAG color contrast ratios
- WHEN testing screen readers, THE Testing_Framework SHALL validate ARIA labels
- WHEN reporting, THE Testing_Framework SHALL categorize issues by severity and provide remediation guidance
Implementation Example
// clients/frontend/e2e/accessibility.spec.ts
import { test, expect } from '@playwright/test';
import AxeBuilder from '@axe-core/playwright';
test.describe('Accessibility Testing', () => {
test('Dashboard is WCAG 2.1 AA compliant', async ({ page }) => {
await page.goto('/dashboard');
// Wait for content to load
await page.waitForLoadState('networkidle');
// Run axe-core accessibility scan
const accessibilityScanResults = await new AxeBuilder({ page })
.withTags(['wcag2a', 'wcag2aa'])
.analyze();
// Assert no violations
expect(accessibilityScanResults.violations).toEqual([]);
// Check specific WCAG criteria
expect(accessibilityScanResults.passes.length).toBeGreaterThan(20);
});
test('Keyboard navigation works correctly', async ({ page }) => {
await page.goto('/dashboard');
// Test tab navigation
await page.keyboard.press('Tab');
await expect(page.locator('body')).toBeFocused(); // Initial focus
// Navigate through interactive elements
const interactiveElements = page.locator('button, a, input, select');
const elementCount = await interactiveElements.count();
for (let i = 0; i < elementCount; i++) {
await page.keyboard.press('Tab');
// Verify focus indicator is visible
await expect(page.locator(':focus-visible')).toBeVisible();
// Test Enter key activation for buttons
const currentElement = await interactiveElements.nth(i);
if (await currentElement.isVisible() && await currentElement.getAttribute('role') === 'button') {
await page.keyboard.press('Enter');
await expect(currentElement).toBeVisible(); // Element should respond
}
}
// Test skip links
await page.keyboard.press('Tab');
if (await page.locator('[data-skip-link]').isVisible()) {
await page.keyboard.press('Enter');
await expect(page.locator('main')).toBeFocused(); // Should jump to main content
}
});
test('Color contrast meets WCAG AA standards', async ({ page }) => {
await page.goto('/dashboard');
// Check color contrast using axe-core
const contrastResults = await new AxeBuilder({ page })
.withTags(['color-contrast'])
.analyze();
// Ensure no contrast violations
const contrastViolations = contrastResults.violations.filter(v =>
v.id === 'color-contrast'
);
expect(contrastViolations).toEqual([]);
// Manual contrast checks for critical elements
const primaryButtons = page.locator('button[data-variant="primary"]');
const buttonCount = await primaryButtons.count();
for (let i = 0; i < buttonCount; i++) {
const button = primaryButtons.nth(i);
const bgColor = await button.evaluate(el => getComputedStyle(el).backgroundColor);
const textColor = await button.evaluate(el => getComputedStyle(el).color);
// Verify minimum contrast ratio 4.5:1 for AA
const contrastRatio = await calculateContrastRatio(bgColor, textColor);
expect(contrastRatio).toBeGreaterThan(4.5);
}
});
test('ARIA labels and screen reader support', async ({ page }) => {
await page.goto('/dashboard');
// Check ARIA labels on form elements
const formElements = page.locator('input, select, textarea');
const elementCount = await formElements.count();
for (let i = 0; i < elementCount; i++) {
const element = formElements.nth(i);
const ariaLabel = await element.getAttribute('aria-label');
const labelId = await element.getAttribute('aria-labelledby');
// Element must have either aria-label or aria-labelledby
expect(ariaLabel || labelId).toBeTruthy();
if (labelId) {
const label = page.locator(`#${labelId}`);
await expect(label).toBeVisible();
}
}
// Check landmark roles
const landmarks = await page.$$eval('[role="main"], [role="navigation"], [role="banner"], [role="contentinfo"]', elements =>
elements.length
);
expect(landmarks).toBeGreaterThan(0); // At least one landmark
// Check heading structure
const headings = await page.$$eval('h1, h2, h3, h4, h5, h6', elements =>
elements.map(el => ({ tag: el.tagName, text: el.textContent?.trim() }))
);
expect(headings.length).toBeGreaterThan(0);
expect(headings[0].tag).toBe('h1'); // Page should have H1
});
});
// Helper function for contrast ratio calculation
async function calculateContrastRatio(bgColor: string, textColor: string): Promise<number> {
// Implement WCAG contrast ratio calculation
// This is a simplified version - in practice, use a proper color library
const getLuminance = (color: string) => {
// Parse RGB values and calculate relative luminance
const rgb = color.match(/\d+/g)?.map(Number) || [0, 0, 0];
return (0.2126 * rgb[0] + 0.7152 * rgb[1] + 0.0722 * rgb[2]) / 255;
};
const bgLuminance = getLuminance(bgColor);
const textLuminance = getLuminance(textColor);
const lighter = Math.max(bgLuminance, textLuminance);
const darker = Math.min(bgLuminance, textLuminance);
return (lighter + 0.05) / (darker + 0.05);
}REQ-TQF-015: Test Environment Management
User Story: As a QA engineer, I want isolated test environments, so that tests don't interfere with each other or production.
Acceptance Criteria
- WHEN running tests, THE Testing_Framework SHALL use isolated Docker containers
- WHEN provisioning, THE Testing_Framework SHALL spin up test databases automatically
- WHEN cleaning up, THE Testing_Framework SHALL tear down environments after test completion
- WHEN seeding data, THE Testing_Framework SHALL provide environment-specific fixtures
- WHEN managing secrets, THE Testing_Framework SHALL use test-specific credentials
Implementation Example (Docker Test Environment)
# docker-compose.test.yml
version: '3.11'
services:
test-db:
image: postgres:15-alpine
environment:
POSTGRES_USER: test
POSTGRES_PASSWORD: test
POSTGRES_DB: test_db
ports:
- "5433:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U test -d test_db"]
interval: 10s
timeout: 5s
retries: 5
test-redis:
image: redis:7-alpine
ports:
- "6380:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
test-backend:
build:
context: ./apps/backend
dockerfile: Dockerfile.test
depends_on:
test-db:
condition: service_healthy
test-redis:
condition: service_healthy
environment:
DATABASE_URL: postgresql://test:test@test-db:5432/test_db
REDIS_URL: redis://test-redis:6379
TEST_MODE: true
command: pytest tests/ -v --cov=apps/backend --cov-report=xml
networks:
- test-network
test-frontend:
build:
context: ./clients/frontend
dockerfile: Dockerfile.test
depends_on:
- test-backend
environment:
NEXT_PUBLIC_API_URL: http://test-backend:8000
command: npm run test:e2e
networks:
- test-network
networks:
test-network:
driver: bridge
volumes:
test-db-data:Testing Strategy
Test Pyramid
/\
/ \ E2E Tests (Playwright)
/----\ 5% - Critical user flows
/ \
/--------\ Integration Tests
/ \ 25% - Component interactions
/------------\
/--------------\ Unit Tests
/________________\ 70% - Individual functions/componentsCoverage Targets
| Layer | Target | Measured By |
|---|---|---|
| Backend Unit | 80%+ | Coverage.py |
| Frontend Unit | 75%+ | c8 |
| Integration | 60%+ | Combined |
| E2E | Critical Paths | Playwright |
Test Execution Times
- Unit Tests: < 5 minutes
- Integration Tests: < 10 minutes
- E2E Tests: < 15 minutes
- Performance Tests: 30-60 minutes
- Security Scans: 5-10 minutes
Performance Optimization
Parallel Test Execution
# pytest-xdist for parallel execution
pytest -n auto # Auto-detect CPU cores
pytest -n 8 # Use 8 workersTest Caching
# Cache expensive fixtures
@pytest.fixture(scope="session")
def ai_client():
"""Reuse AI client across all tests"""
return AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))Security Considerations
- β Test Data Sanitization: No production data in tests
- β Secrets Management: Use environment variables, never commit secrets
- β Isolation: Each test runs in isolated environment
- β Access Control: Test accounts have limited permissions
- β Vulnerability Scanning: Automated dependency scanning in CI
Deployment Configuration
Docker Test Environment
# docker-compose.test.yml
version: '3.9'
services:
test-db:
image: postgres:15
environment:
POSTGRES_USER: test
POSTGRES_PASSWORD: test
POSTGRES_DB: test_db
ports:
- "5433:5432"
test-redis:
image: redis:7
ports:
- "6380:6379"
test-backend:
build:
context: ./apps/backend
dockerfile: Dockerfile.test
depends_on:
- test-db
- test-redis
environment:
DATABASE_URL: postgresql://test:test@test-db:5432/test_db
REDIS_URL: redis://test-redis:6379
command: pytest tests/ -vRevision History
- v1.0 (2025-10-27): Initial Testing & QA Framework design with schema integration
This guide provides a complete overview of the Testing & QA Framework, detailing each requirement, implementation examples, and integration points to ensure successful deployment and operation of this critical quality assurance infrastructure.