Migration Architect
Plan and execute code and system migrations — database migrations, framework upgrades, cloud migrations, and monolith-to-microservices transitions.
What this skill does
Safely plan and execute complex system upgrades, database moves, or cloud transitions without disrupting your users. Receive a detailed roadmap, compatibility checks, and automatic rollback plans to ensure data stays safe throughout the process. Use this whenever you need to modernize old systems or move to new platforms with confidence.
name: “migration-architect” description: “Migration Architect”
Migration Architect
Tier: POWERFUL
Category: Engineering - Migration Strategy
Purpose: Zero-downtime migration planning, compatibility validation, and rollback strategy generation
Overview
The Migration Architect skill provides comprehensive tools and methodologies for planning, executing, and validating complex system migrations with minimal business impact. This skill combines proven migration patterns with automated planning tools to ensure successful transitions between systems, databases, and infrastructure.
Core Capabilities
1. Migration Strategy Planning
- Phased Migration Planning: Break complex migrations into manageable phases with clear validation gates
- Risk Assessment: Identify potential failure points and mitigation strategies before execution
- Timeline Estimation: Generate realistic timelines based on migration complexity and resource constraints
- Stakeholder Communication: Create communication templates and progress dashboards
2. Compatibility Analysis
- Schema Evolution: Analyze database schema changes for backward compatibility issues
- API Versioning: Detect breaking changes in REST/GraphQL APIs and microservice interfaces
- Data Type Validation: Identify data format mismatches and conversion requirements
- Constraint Analysis: Validate referential integrity and business rule changes
3. Rollback Strategy Generation
- Automated Rollback Plans: Generate comprehensive rollback procedures for each migration phase
- Data Recovery Scripts: Create point-in-time data restoration procedures
- Service Rollback: Plan service version rollbacks with traffic management
- Validation Checkpoints: Define success criteria and rollback triggers
Migration Patterns
Database Migrations
Schema Evolution Patterns
-
Expand-Contract Pattern
- Expand: Add new columns/tables alongside existing schema
- Dual Write: Application writes to both old and new schema
- Migration: Backfill historical data to new schema
- Contract: Remove old columns/tables after validation
-
Parallel Schema Pattern
- Run new schema in parallel with existing schema
- Use feature flags to route traffic between schemas
- Validate data consistency between parallel systems
- Cutover when confidence is high
-
Event Sourcing Migration
- Capture all changes as events during migration window
- Apply events to new schema for consistency
- Enable replay capability for rollback scenarios
Data Migration Strategies
-
Bulk Data Migration
- Snapshot Approach: Full data copy during maintenance window
- Incremental Sync: Continuous data synchronization with change tracking
- Stream Processing: Real-time data transformation pipelines
-
Dual-Write Pattern
- Write to both source and target systems during migration
- Implement compensation patterns for write failures
- Use distributed transactions where consistency is critical
-
Change Data Capture (CDC)
- Stream database changes to target system
- Maintain eventual consistency during migration
- Enable zero-downtime migrations for large datasets
Service Migrations
Strangler Fig Pattern
- Intercept Requests: Route traffic through proxy/gateway
- Gradually Replace: Implement new service functionality incrementally
- Legacy Retirement: Remove old service components as new ones prove stable
- Monitoring: Track performance and error rates throughout transition
graph TD
A[Client Requests] --> B[API Gateway]
B --> C{Route Decision}
C -->|Legacy Path| D[Legacy Service]
C -->|New Path| E[New Service]
D --> F[Legacy Database]
E --> G[New Database]
Parallel Run Pattern
- Dual Execution: Run both old and new services simultaneously
- Shadow Traffic: Route production traffic to both systems
- Result Comparison: Compare outputs to validate correctness
- Gradual Cutover: Shift traffic percentage based on confidence
Canary Deployment Pattern
- Limited Rollout: Deploy new service to small percentage of users
- Monitoring: Track key metrics (latency, errors, business KPIs)
- Gradual Increase: Increase traffic percentage as confidence grows
- Full Rollout: Complete migration once validation passes
Infrastructure Migrations
Cloud-to-Cloud Migration
-
Assessment Phase
- Inventory existing resources and dependencies
- Map services to target cloud equivalents
- Identify vendor-specific features requiring refactoring
-
Pilot Migration
- Migrate non-critical workloads first
- Validate performance and cost models
- Refine migration procedures
-
Production Migration
- Use infrastructure as code for consistency
- Implement cross-cloud networking during transition
- Maintain disaster recovery capabilities
On-Premises to Cloud Migration
-
Lift and Shift
- Minimal changes to existing applications
- Quick migration with optimization later
- Use cloud migration tools and services
-
Re-architecture
- Redesign applications for cloud-native patterns
- Adopt microservices, containers, and serverless
- Implement cloud security and scaling practices
-
Hybrid Approach
- Keep sensitive data on-premises
- Migrate compute workloads to cloud
- Implement secure connectivity between environments
Feature Flags for Migrations
Progressive Feature Rollout
# Example feature flag implementation
class MigrationFeatureFlag:
def __init__(self, flag_name, rollout_percentage=0):
self.flag_name = flag_name
self.rollout_percentage = rollout_percentage
def is_enabled_for_user(self, user_id):
hash_value = hash(f"{self.flag_name}:{user_id}")
return (hash_value % 100) < self.rollout_percentage
def gradual_rollout(self, target_percentage, step_size=10):
while self.rollout_percentage < target_percentage:
self.rollout_percentage = min(
self.rollout_percentage + step_size,
target_percentage
)
yield self.rollout_percentage
Circuit Breaker Pattern
Implement automatic fallback to legacy systems when new systems show degraded performance:
class MigrationCircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call_new_service(self, request):
if self.state == 'OPEN':
if self.should_attempt_reset():
self.state = 'HALF_OPEN'
else:
return self.fallback_to_legacy(request)
try:
response = self.new_service.process(request)
self.on_success()
return response
except Exception as e:
self.on_failure()
return self.fallback_to_legacy(request)
Data Validation and Reconciliation
Validation Strategies
-
Row Count Validation
- Compare record counts between source and target
- Account for soft deletes and filtered records
- Implement threshold-based alerting
-
Checksums and Hashing
- Generate checksums for critical data subsets
- Compare hash values to detect data drift
- Use sampling for large datasets
-
Business Logic Validation
- Run critical business queries on both systems
- Compare aggregate results (sums, counts, averages)
- Validate derived data and calculations
Reconciliation Patterns
-
Delta Detection
-- Example delta query for reconciliation SELECT 'missing_in_target' as issue_type, source_id FROM source_table s WHERE NOT EXISTS ( SELECT 1 FROM target_table t WHERE t.id = s.id ) UNION ALL SELECT 'extra_in_target' as issue_type, target_id FROM target_table t WHERE NOT EXISTS ( SELECT 1 FROM source_table s WHERE s.id = t.id ); -
Automated Correction
- Implement data repair scripts for common issues
- Use idempotent operations for safe re-execution
- Log all correction actions for audit trails
Rollback Strategies
Database Rollback
-
Schema Rollback
- Maintain schema version control
- Use backward-compatible migrations when possible
- Keep rollback scripts for each migration step
-
Data Rollback
- Point-in-time recovery using database backups
- Transaction log replay for precise rollback points
- Maintain data snapshots at migration checkpoints
Service Rollback
-
Blue-Green Deployment
- Keep previous service version running during migration
- Switch traffic back to blue environment if issues arise
- Maintain parallel infrastructure during migration window
-
Rolling Rollback
- Gradually shift traffic back to previous version
- Monitor system health during rollback process
- Implement automated rollback triggers
Infrastructure Rollback
-
Infrastructure as Code
- Version control all infrastructure definitions
- Maintain rollback terraform/CloudFormation templates
- Test rollback procedures in staging environments
-
Data Persistence
- Preserve data in original location during migration
- Implement data sync back to original systems
- Maintain backup strategies across both environments
Risk Assessment Framework
Risk Categories
-
Technical Risks
- Data loss or corruption
- Service downtime or degraded performance
- Integration failures with dependent systems
- Scalability issues under production load
-
Business Risks
- Revenue impact from service disruption
- Customer experience degradation
- Compliance and regulatory concerns
- Brand reputation impact
-
Operational Risks
- Team knowledge gaps
- Insufficient testing coverage
- Inadequate monitoring and alerting
- Communication breakdowns
Risk Mitigation Strategies
-
Technical Mitigations
- Comprehensive testing (unit, integration, load, chaos)
- Gradual rollout with automated rollback triggers
- Data validation and reconciliation processes
- Performance monitoring and alerting
-
Business Mitigations
- Stakeholder communication plans
- Business continuity procedures
- Customer notification strategies
- Revenue protection measures
-
Operational Mitigations
- Team training and documentation
- Runbook creation and testing
- On-call rotation planning
- Post-migration review processes
Migration Runbooks
Pre-Migration Checklist
- Migration plan reviewed and approved
- Rollback procedures tested and validated
- Monitoring and alerting configured
- Team roles and responsibilities defined
- Stakeholder communication plan activated
- Backup and recovery procedures verified
- Test environment validation complete
- Performance benchmarks established
- Security review completed
- Compliance requirements verified
During Migration
- Execute migration phases in planned order
- Monitor key performance indicators continuously
- Validate data consistency at each checkpoint
- Communicate progress to stakeholders
- Document any deviations from plan
- Execute rollback if success criteria not met
- Coordinate with dependent teams
- Maintain detailed execution logs
Post-Migration
- Validate all success criteria met
- Perform comprehensive system health checks
- Execute data reconciliation procedures
- Monitor system performance over 72 hours
- Update documentation and runbooks
- Decommission legacy systems (if applicable)
- Conduct post-migration retrospective
- Archive migration artifacts
- Update disaster recovery procedures
Communication Templates
Executive Summary Template
Migration Status: [IN_PROGRESS | COMPLETED | ROLLED_BACK]
Start Time: [YYYY-MM-DD HH:MM UTC]
Current Phase: [X of Y]
Overall Progress: [X%]
Key Metrics:
- System Availability: [X.XX%]
- Data Migration Progress: [X.XX%]
- Performance Impact: [+/-X%]
- Issues Encountered: [X]
Next Steps:
1. [Action item 1]
2. [Action item 2]
Risk Assessment: [LOW | MEDIUM | HIGH]
Rollback Status: [AVAILABLE | NOT_AVAILABLE]
Technical Team Update Template
Phase: [Phase Name] - [Status]
Duration: [Started] - [Expected End]
Completed Tasks:
✓ [Task 1]
✓ [Task 2]
In Progress:
🔄 [Task 3] - [X% complete]
Upcoming:
⏳ [Task 4] - [Expected start time]
Issues:
⚠️ [Issue description] - [Severity] - [ETA resolution]
Metrics:
- Migration Rate: [X records/minute]
- Error Rate: [X.XX%]
- System Load: [CPU/Memory/Disk]
Success Metrics
Technical Metrics
- Migration Completion Rate: Percentage of data/services successfully migrated
- Downtime Duration: Total system unavailability during migration
- Data Consistency Score: Percentage of data validation checks passing
- Performance Delta: Performance change compared to baseline
- Error Rate: Percentage of failed operations during migration
Business Metrics
- Customer Impact Score: Measure of customer experience degradation
- Revenue Protection: Percentage of revenue maintained during migration
- Time to Value: Duration from migration start to business value realization
- Stakeholder Satisfaction: Post-migration stakeholder feedback scores
Operational Metrics
- Plan Adherence: Percentage of migration executed according to plan
- Issue Resolution Time: Average time to resolve migration issues
- Team Efficiency: Resource utilization and productivity metrics
- Knowledge Transfer Score: Team readiness for post-migration operations
Tools and Technologies
Migration Planning Tools
- migration_planner.py: Automated migration plan generation
- compatibility_checker.py: Schema and API compatibility analysis
- rollback_generator.py: Comprehensive rollback procedure generation
Validation Tools
- Database comparison utilities (schema and data)
- API contract testing frameworks
- Performance benchmarking tools
- Data quality validation pipelines
Monitoring and Alerting
- Real-time migration progress dashboards
- Automated rollback trigger systems
- Business metric monitoring
- Stakeholder notification systems
Best Practices
Planning Phase
- Start with Risk Assessment: Identify all potential failure modes before planning
- Design for Rollback: Every migration step should have a tested rollback procedure
- Validate in Staging: Execute full migration process in production-like environment
- Plan for Gradual Rollout: Use feature flags and traffic routing for controlled migration
Execution Phase
- Monitor Continuously: Track both technical and business metrics throughout
- Communicate Proactively: Keep all stakeholders informed of progress and issues
- Document Everything: Maintain detailed logs for post-migration analysis
- Stay Flexible: Be prepared to adjust timeline based on real-world performance
Validation Phase
- Automate Validation: Use automated tools for data consistency and performance checks
- Business Logic Testing: Validate critical business processes end-to-end
- Load Testing: Verify system performance under expected production load
- Security Validation: Ensure security controls function properly in new environment
Integration with Development Lifecycle
CI/CD Integration
# Example migration pipeline stage
migration_validation:
stage: test
script:
- python scripts/compatibility_checker.py --before=old_schema.json --after=new_schema.json
- python scripts/migration_planner.py --config=migration_config.json --validate
artifacts:
reports:
- compatibility_report.json
- migration_plan.json
Infrastructure as Code
# Example Terraform for blue-green infrastructure
resource "aws_instance" "blue_environment" {
count = var.migration_phase == "preparation" ? var.instance_count : 0
# Blue environment configuration
}
resource "aws_instance" "green_environment" {
count = var.migration_phase == "execution" ? var.instance_count : 0
# Green environment configuration
}
This Migration Architect skill provides a comprehensive framework for planning, executing, and validating complex system migrations while minimizing business impact and technical risk. The combination of automated tools, proven patterns, and detailed procedures enables organizations to confidently undertake even the most complex migration projects.
Migration Architect
Tier: POWERFUL
Category: Engineering - Migration Strategy
Purpose: Zero-downtime migration planning, compatibility validation, and rollback strategy generation
Overview
The Migration Architect skill provides comprehensive tools and methodologies for planning, executing, and validating complex system migrations with minimal business impact. This skill combines proven migration patterns with automated planning tools to ensure successful transitions between systems, databases, and infrastructure.
Components
Core Scripts
- migration_planner.py - Automated migration plan generation
- compatibility_checker.py - Schema and API compatibility analysis
- rollback_generator.py - Comprehensive rollback procedure generation
Reference Documentation
- migration_patterns_catalog.md - Detailed catalog of proven migration patterns
- zero_downtime_techniques.md - Comprehensive zero-downtime migration techniques
- data_reconciliation_strategies.md - Advanced data consistency and reconciliation strategies
Sample Assets
- sample_database_migration.json - Example database migration specification
- sample_service_migration.json - Example service migration specification
- database_schema_before.json - Sample "before" database schema
- database_schema_after.json - Sample "after" database schema
Quick Start
1. Generate a Migration Plan
python3 scripts/migration_planner.py \
--input assets/sample_database_migration.json \
--output migration_plan.json \
--format bothInput: Migration specification with source, target, constraints, and requirements Output: Detailed phased migration plan with risk assessment, timeline, and validation gates
2. Check Compatibility
python3 scripts/compatibility_checker.py \
--before assets/database_schema_before.json \
--after assets/database_schema_after.json \
--type database \
--output compatibility_report.json \
--format bothInput: Before and after schema definitions Output: Compatibility report with breaking changes, migration scripts, and recommendations
3. Generate Rollback Procedures
python3 scripts/rollback_generator.py \
--input migration_plan.json \
--output rollback_runbook.json \
--format bothInput: Migration plan from step 1 Output: Comprehensive rollback runbook with procedures, triggers, and communication templates
Script Details
Migration Planner (migration_planner.py)
Generates comprehensive migration plans with:
- Phased approach with dependencies and validation gates
- Risk assessment with mitigation strategies
- Timeline estimation based on complexity and constraints
- Rollback triggers and success criteria
- Stakeholder communication templates
Usage:
python3 scripts/migration_planner.py [OPTIONS]
Options:
--input, -i Input migration specification file (JSON) [required]
--output, -o Output file for migration plan (JSON)
--format, -f Output format: json, text, both (default: both)
--validate Validate migration specification onlyInput Format:
{
"type": "database|service|infrastructure",
"pattern": "schema_change|strangler_fig|blue_green",
"source": "Source system description",
"target": "Target system description",
"constraints": {
"max_downtime_minutes": 30,
"data_volume_gb": 2500,
"dependencies": ["service1", "service2"],
"compliance_requirements": ["GDPR", "SOX"]
}
}Compatibility Checker (compatibility_checker.py)
Analyzes compatibility between schema versions:
- Breaking change detection (removed fields, type changes, constraint additions)
- Data migration requirements identification
- Suggested migration scripts generation
- Risk assessment for each change
Usage:
python3 scripts/compatibility_checker.py [OPTIONS]
Options:
--before Before schema file (JSON) [required]
--after After schema file (JSON) [required]
--type Schema type: database, api (default: database)
--output, -o Output file for compatibility report (JSON)
--format, -f Output format: json, text, both (default: both)Exit Codes:
0: No compatibility issues1: Potentially breaking changes found2: Breaking changes found
Rollback Generator (rollback_generator.py)
Creates comprehensive rollback procedures:
- Phase-by-phase rollback steps
- Automated trigger conditions for rollback
- Data recovery procedures
- Communication templates for different audiences
- Validation checklists for rollback success
Usage:
python3 scripts/rollback_generator.py [OPTIONS]
Options:
--input, -i Input migration plan file (JSON) [required]
--output, -o Output file for rollback runbook (JSON)
--format, -f Output format: json, text, both (default: both)Migration Patterns Supported
Database Migrations
- Expand-Contract Pattern - Zero-downtime schema evolution
- Parallel Schema Pattern - Side-by-side schema migration
- Event Sourcing Migration - Event-driven data migration
Service Migrations
- Strangler Fig Pattern - Gradual legacy system replacement
- Parallel Run Pattern - Risk mitigation through dual execution
- Blue-Green Deployment - Zero-downtime service updates
Infrastructure Migrations
- Lift and Shift - Quick cloud migration with minimal changes
- Hybrid Cloud Migration - Gradual cloud adoption
- Multi-Cloud Migration - Distribution across multiple providers
Sample Workflow
1. Database Schema Migration
# Generate migration plan
python3 scripts/migration_planner.py \
--input assets/sample_database_migration.json \
--output db_migration_plan.json
# Check schema compatibility
python3 scripts/compatibility_checker.py \
--before assets/database_schema_before.json \
--after assets/database_schema_after.json \
--type database \
--output schema_compatibility.json
# Generate rollback procedures
python3 scripts/rollback_generator.py \
--input db_migration_plan.json \
--output db_rollback_runbook.json2. Service Migration
# Generate service migration plan
python3 scripts/migration_planner.py \
--input assets/sample_service_migration.json \
--output service_migration_plan.json
# Generate rollback procedures
python3 scripts/rollback_generator.py \
--input service_migration_plan.json \
--output service_rollback_runbook.jsonOutput Examples
Migration Plan Structure
{
"migration_id": "abc123def456",
"source_system": "Legacy User Service",
"target_system": "New User Service",
"migration_type": "service",
"complexity": "medium",
"estimated_duration_hours": 72,
"phases": [
{
"name": "preparation",
"description": "Prepare systems and teams for migration",
"duration_hours": 8,
"validation_criteria": ["All backups completed successfully"],
"rollback_triggers": ["Critical system failure"],
"risk_level": "medium"
}
],
"risks": [
{
"category": "technical",
"description": "Service compatibility issues",
"severity": "high",
"mitigation": "Comprehensive integration testing"
}
]
}Compatibility Report Structure
{
"overall_compatibility": "potentially_incompatible",
"breaking_changes_count": 2,
"potentially_breaking_count": 3,
"issues": [
{
"type": "required_column_added",
"severity": "breaking",
"description": "Required column 'email_verified_at' added",
"suggested_migration": "Add default value initially"
}
],
"migration_scripts": [
{
"script_type": "sql",
"description": "Add email verification columns",
"script_content": "ALTER TABLE users ADD COLUMN email_verified_at TIMESTAMP;",
"rollback_script": "ALTER TABLE users DROP COLUMN email_verified_at;"
}
]
}Best Practices
Planning Phase
- Start with risk assessment - Identify failure modes before planning
- Design for rollback - Every step should have a tested rollback procedure
- Validate in staging - Execute full migration in production-like environment
- Plan gradual rollout - Use feature flags and traffic routing
Execution Phase
- Monitor continuously - Track technical and business metrics
- Communicate proactively - Keep stakeholders informed
- Document everything - Maintain detailed logs for analysis
- Stay flexible - Be prepared to adjust based on real-world performance
Validation Phase
- Automate validation - Use automated consistency and performance checks
- Test business logic - Validate critical business processes end-to-end
- Load test - Verify performance under expected production load
- Security validation - Ensure security controls function properly
Integration
CI/CD Pipeline Integration
# Example GitHub Actions workflow
name: Migration Validation
on: [push, pull_request]
jobs:
validate-migration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Validate Migration Plan
run: |
python3 scripts/migration_planner.py \
--input migration_spec.json \
--validate
- name: Check Compatibility
run: |
python3 scripts/compatibility_checker.py \
--before schema_before.json \
--after schema_after.json \
--type databaseMonitoring Integration
The tools generate metrics and alerts that can be integrated with:
- Prometheus - For metrics collection
- Grafana - For visualization and dashboards
- PagerDuty - For incident management
- Slack - For team notifications
Advanced Features
Machine Learning Integration
- Anomaly detection for data consistency issues
- Predictive analysis for migration success probability
- Automated pattern recognition for migration optimization
Performance Optimization
- Parallel processing for large-scale migrations
- Incremental reconciliation strategies
- Statistical sampling for validation
Compliance Support
- GDPR compliance tracking
- SOX audit trail generation
- HIPAA security validation
Troubleshooting
Common Issues
"Migration plan validation failed"
- Check JSON syntax in migration specification
- Ensure all required fields are present
- Validate constraint values are realistic
"Compatibility checker reports false positives"
- Review excluded fields configuration
- Check data type mapping compatibility
- Adjust tolerance settings for numerical comparisons
"Rollback procedures seem incomplete"
- Ensure migration plan includes all phases
- Verify database backup locations are specified
- Check that all dependencies are documented
Getting Help
- Review documentation - Check reference docs for patterns and techniques
- Examine sample files - Use provided assets as templates
- Check expected outputs - Compare your results with sample outputs
- Validate inputs - Ensure input files match expected format
Contributing
To extend or modify the Migration Architect skill:
- Add new patterns - Extend pattern templates in migration_planner.py
- Enhance compatibility checks - Add new validation rules in compatibility_checker.py
- Improve rollback procedures - Add specialized rollback steps in rollback_generator.py
- Update documentation - Keep reference docs current with new patterns
License
This skill is part of the claude-skills repository and follows the same license terms.
{
"schema_version": "2.0",
"database": "user_management_v2",
"tables": {
"users": {
"columns": {
"id": {
"type": "bigint",
"nullable": false,
"primary_key": true,
"auto_increment": true
},
"username": {
"type": "varchar",
"length": 50,
"nullable": false,
"unique": true
},
"email": {
"type": "varchar",
"length": 320,
"nullable": false,
"unique": true
},
"password_hash": {
"type": "varchar",
"length": 255,
"nullable": false
},
"first_name": {
"type": "varchar",
"length": 100,
"nullable": true
},
"last_name": {
"type": "varchar",
"length": 100,
"nullable": true
},
"created_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP"
},
"updated_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
},
"is_active": {
"type": "boolean",
"nullable": false,
"default": true
},
"phone": {
"type": "varchar",
"length": 20,
"nullable": true
},
"email_verified_at": {
"type": "timestamp",
"nullable": true,
"comment": "When email was verified"
},
"phone_verified_at": {
"type": "timestamp",
"nullable": true,
"comment": "When phone was verified"
},
"two_factor_enabled": {
"type": "boolean",
"nullable": false,
"default": false
},
"last_login_at": {
"type": "timestamp",
"nullable": true
}
},
"constraints": {
"primary_key": ["id"],
"unique": [
"username",
"email"
],
"foreign_key": [],
"check": [
"email LIKE '%@%'",
"LENGTH(password_hash) >= 60",
"phone IS NULL OR LENGTH(phone) >= 10"
]
},
"indexes": [
{
"name": "idx_users_email",
"columns": ["email"],
"unique": true
},
{
"name": "idx_users_username",
"columns": ["username"],
"unique": true
},
{
"name": "idx_users_created_at",
"columns": ["created_at"]
},
{
"name": "idx_users_email_verified",
"columns": ["email_verified_at"]
},
{
"name": "idx_users_last_login",
"columns": ["last_login_at"]
}
]
},
"user_profiles": {
"columns": {
"id": {
"type": "bigint",
"nullable": false,
"primary_key": true,
"auto_increment": true
},
"user_id": {
"type": "bigint",
"nullable": false
},
"bio": {
"type": "text",
"nullable": true
},
"avatar_url": {
"type": "varchar",
"length": 500,
"nullable": true
},
"birth_date": {
"type": "date",
"nullable": true
},
"location": {
"type": "varchar",
"length": 100,
"nullable": true
},
"website": {
"type": "varchar",
"length": 255,
"nullable": true
},
"privacy_level": {
"type": "varchar",
"length": 20,
"nullable": false,
"default": "public"
},
"timezone": {
"type": "varchar",
"length": 50,
"nullable": true,
"default": "UTC"
},
"language": {
"type": "varchar",
"length": 10,
"nullable": false,
"default": "en"
},
"created_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP"
},
"updated_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
}
},
"constraints": {
"primary_key": ["id"],
"unique": [],
"foreign_key": [
{
"columns": ["user_id"],
"references": "users(id)",
"on_delete": "CASCADE"
}
],
"check": [
"privacy_level IN ('public', 'private', 'friends_only')",
"bio IS NULL OR LENGTH(bio) <= 2000",
"language IN ('en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'ja', 'ko', 'zh')"
]
},
"indexes": [
{
"name": "idx_user_profiles_user_id",
"columns": ["user_id"],
"unique": true
},
{
"name": "idx_user_profiles_privacy",
"columns": ["privacy_level"]
},
{
"name": "idx_user_profiles_language",
"columns": ["language"]
}
]
},
"user_sessions": {
"columns": {
"id": {
"type": "varchar",
"length": 128,
"nullable": false,
"primary_key": true
},
"user_id": {
"type": "bigint",
"nullable": false
},
"ip_address": {
"type": "varchar",
"length": 45,
"nullable": true
},
"user_agent": {
"type": "text",
"nullable": true
},
"expires_at": {
"type": "timestamp",
"nullable": false
},
"created_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP"
},
"last_activity": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
},
"session_type": {
"type": "varchar",
"length": 20,
"nullable": false,
"default": "web"
},
"is_mobile": {
"type": "boolean",
"nullable": false,
"default": false
}
},
"constraints": {
"primary_key": ["id"],
"unique": [],
"foreign_key": [
{
"columns": ["user_id"],
"references": "users(id)",
"on_delete": "CASCADE"
}
],
"check": [
"session_type IN ('web', 'mobile', 'api', 'admin')"
]
},
"indexes": [
{
"name": "idx_user_sessions_user_id",
"columns": ["user_id"]
},
{
"name": "idx_user_sessions_expires",
"columns": ["expires_at"]
},
{
"name": "idx_user_sessions_type",
"columns": ["session_type"]
}
]
},
"user_preferences": {
"columns": {
"id": {
"type": "bigint",
"nullable": false,
"primary_key": true,
"auto_increment": true
},
"user_id": {
"type": "bigint",
"nullable": false
},
"preference_key": {
"type": "varchar",
"length": 100,
"nullable": false
},
"preference_value": {
"type": "json",
"nullable": true
},
"created_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP"
},
"updated_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
}
},
"constraints": {
"primary_key": ["id"],
"unique": [
["user_id", "preference_key"]
],
"foreign_key": [
{
"columns": ["user_id"],
"references": "users(id)",
"on_delete": "CASCADE"
}
],
"check": []
},
"indexes": [
{
"name": "idx_user_preferences_user_key",
"columns": ["user_id", "preference_key"],
"unique": true
}
]
}
},
"views": {
"active_users": {
"definition": "SELECT u.id, u.username, u.email, u.first_name, u.last_name, u.email_verified_at, u.last_login_at FROM users u WHERE u.is_active = true",
"columns": ["id", "username", "email", "first_name", "last_name", "email_verified_at", "last_login_at"]
},
"verified_users": {
"definition": "SELECT u.id, u.username, u.email FROM users u WHERE u.is_active = true AND u.email_verified_at IS NOT NULL",
"columns": ["id", "username", "email"]
}
},
"procedures": [
{
"name": "cleanup_expired_sessions",
"parameters": [],
"definition": "DELETE FROM user_sessions WHERE expires_at < NOW()"
},
{
"name": "get_user_with_profile",
"parameters": ["user_id BIGINT"],
"definition": "SELECT u.*, p.bio, p.avatar_url, p.privacy_level FROM users u LEFT JOIN user_profiles p ON u.id = p.user_id WHERE u.id = user_id"
}
]
} {
"schema_version": "1.0",
"database": "user_management",
"tables": {
"users": {
"columns": {
"id": {
"type": "bigint",
"nullable": false,
"primary_key": true,
"auto_increment": true
},
"username": {
"type": "varchar",
"length": 50,
"nullable": false,
"unique": true
},
"email": {
"type": "varchar",
"length": 255,
"nullable": false,
"unique": true
},
"password_hash": {
"type": "varchar",
"length": 255,
"nullable": false
},
"first_name": {
"type": "varchar",
"length": 100,
"nullable": true
},
"last_name": {
"type": "varchar",
"length": 100,
"nullable": true
},
"created_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP"
},
"updated_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
},
"is_active": {
"type": "boolean",
"nullable": false,
"default": true
},
"phone": {
"type": "varchar",
"length": 20,
"nullable": true
}
},
"constraints": {
"primary_key": ["id"],
"unique": [
"username",
"email"
],
"foreign_key": [],
"check": [
"email LIKE '%@%'",
"LENGTH(password_hash) >= 60"
]
},
"indexes": [
{
"name": "idx_users_email",
"columns": ["email"],
"unique": true
},
{
"name": "idx_users_username",
"columns": ["username"],
"unique": true
},
{
"name": "idx_users_created_at",
"columns": ["created_at"]
}
]
},
"user_profiles": {
"columns": {
"id": {
"type": "bigint",
"nullable": false,
"primary_key": true,
"auto_increment": true
},
"user_id": {
"type": "bigint",
"nullable": false
},
"bio": {
"type": "varchar",
"length": 255,
"nullable": true
},
"avatar_url": {
"type": "varchar",
"length": 500,
"nullable": true
},
"birth_date": {
"type": "date",
"nullable": true
},
"location": {
"type": "varchar",
"length": 100,
"nullable": true
},
"website": {
"type": "varchar",
"length": 255,
"nullable": true
},
"privacy_level": {
"type": "varchar",
"length": 20,
"nullable": false,
"default": "public"
},
"created_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP"
},
"updated_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
}
},
"constraints": {
"primary_key": ["id"],
"unique": [],
"foreign_key": [
{
"columns": ["user_id"],
"references": "users(id)",
"on_delete": "CASCADE"
}
],
"check": [
"privacy_level IN ('public', 'private', 'friends_only')"
]
},
"indexes": [
{
"name": "idx_user_profiles_user_id",
"columns": ["user_id"],
"unique": true
},
{
"name": "idx_user_profiles_privacy",
"columns": ["privacy_level"]
}
]
},
"user_sessions": {
"columns": {
"id": {
"type": "varchar",
"length": 128,
"nullable": false,
"primary_key": true
},
"user_id": {
"type": "bigint",
"nullable": false
},
"ip_address": {
"type": "varchar",
"length": 45,
"nullable": true
},
"user_agent": {
"type": "varchar",
"length": 500,
"nullable": true
},
"expires_at": {
"type": "timestamp",
"nullable": false
},
"created_at": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP"
},
"last_activity": {
"type": "timestamp",
"nullable": false,
"default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
}
},
"constraints": {
"primary_key": ["id"],
"unique": [],
"foreign_key": [
{
"columns": ["user_id"],
"references": "users(id)",
"on_delete": "CASCADE"
}
],
"check": []
},
"indexes": [
{
"name": "idx_user_sessions_user_id",
"columns": ["user_id"]
},
{
"name": "idx_user_sessions_expires",
"columns": ["expires_at"]
}
]
}
},
"views": {
"active_users": {
"definition": "SELECT u.id, u.username, u.email, u.first_name, u.last_name FROM users u WHERE u.is_active = true",
"columns": ["id", "username", "email", "first_name", "last_name"]
}
},
"procedures": [
{
"name": "cleanup_expired_sessions",
"parameters": [],
"definition": "DELETE FROM user_sessions WHERE expires_at < NOW()"
}
]
} {
"type": "database",
"pattern": "schema_change",
"source": "PostgreSQL 13 Production Database",
"target": "PostgreSQL 15 Cloud Database",
"description": "Migrate user management system from on-premises PostgreSQL to cloud with schema updates",
"constraints": {
"max_downtime_minutes": 30,
"data_volume_gb": 2500,
"dependencies": [
"user_service_api",
"authentication_service",
"notification_service",
"analytics_pipeline",
"backup_service"
],
"compliance_requirements": [
"GDPR",
"SOX"
],
"special_requirements": [
"zero_data_loss",
"referential_integrity",
"performance_baseline_maintained"
]
},
"tables_to_migrate": [
{
"name": "users",
"row_count": 1500000,
"size_mb": 450,
"critical": true
},
{
"name": "user_profiles",
"row_count": 1500000,
"size_mb": 890,
"critical": true
},
{
"name": "user_sessions",
"row_count": 25000000,
"size_mb": 1200,
"critical": false
},
{
"name": "audit_logs",
"row_count": 50000000,
"size_mb": 2800,
"critical": false
}
],
"schema_changes": [
{
"table": "users",
"changes": [
{
"type": "add_column",
"column": "email_verified_at",
"data_type": "timestamp",
"nullable": true
},
{
"type": "add_column",
"column": "phone_verified_at",
"data_type": "timestamp",
"nullable": true
}
]
},
{
"table": "user_profiles",
"changes": [
{
"type": "modify_column",
"column": "bio",
"old_type": "varchar(255)",
"new_type": "text"
},
{
"type": "add_constraint",
"constraint_type": "check",
"constraint_name": "bio_length_check",
"definition": "LENGTH(bio) <= 2000"
}
]
}
],
"performance_requirements": {
"max_query_response_time_ms": 100,
"concurrent_connections": 500,
"transactions_per_second": 1000
},
"business_continuity": {
"critical_business_hours": {
"start": "08:00",
"end": "18:00",
"timezone": "UTC"
},
"preferred_migration_window": {
"start": "02:00",
"end": "06:00",
"timezone": "UTC"
}
}
} {
"type": "service",
"pattern": "strangler_fig",
"source": "Legacy User Service (Java Spring Boot 2.x)",
"target": "New User Service (Node.js + TypeScript)",
"description": "Migrate legacy user management service to modern microservices architecture",
"constraints": {
"max_downtime_minutes": 0,
"data_volume_gb": 50,
"dependencies": [
"payment_service",
"order_service",
"notification_service",
"analytics_service",
"mobile_app_v1",
"mobile_app_v2",
"web_frontend",
"admin_dashboard"
],
"compliance_requirements": [
"PCI_DSS",
"GDPR"
],
"special_requirements": [
"api_backward_compatibility",
"session_continuity",
"rate_limit_preservation"
]
},
"service_details": {
"legacy_service": {
"endpoints": [
"GET /api/v1/users/{id}",
"POST /api/v1/users",
"PUT /api/v1/users/{id}",
"DELETE /api/v1/users/{id}",
"GET /api/v1/users/{id}/profile",
"PUT /api/v1/users/{id}/profile",
"POST /api/v1/users/{id}/verify-email",
"POST /api/v1/users/login",
"POST /api/v1/users/logout"
],
"current_load": {
"requests_per_second": 850,
"peak_requests_per_second": 2000,
"average_response_time_ms": 120,
"p95_response_time_ms": 300
},
"infrastructure": {
"instances": 4,
"cpu_cores_per_instance": 4,
"memory_gb_per_instance": 8,
"load_balancer": "AWS ELB Classic"
}
},
"new_service": {
"endpoints": [
"GET /api/v2/users/{id}",
"POST /api/v2/users",
"PUT /api/v2/users/{id}",
"DELETE /api/v2/users/{id}",
"GET /api/v2/users/{id}/profile",
"PUT /api/v2/users/{id}/profile",
"POST /api/v2/users/{id}/verify-email",
"POST /api/v2/users/{id}/verify-phone",
"POST /api/v2/auth/login",
"POST /api/v2/auth/logout",
"POST /api/v2/auth/refresh"
],
"target_performance": {
"requests_per_second": 1500,
"peak_requests_per_second": 3000,
"average_response_time_ms": 80,
"p95_response_time_ms": 200
},
"infrastructure": {
"container_platform": "Kubernetes",
"initial_replicas": 3,
"max_replicas": 10,
"cpu_request_millicores": 500,
"cpu_limit_millicores": 1000,
"memory_request_mb": 512,
"memory_limit_mb": 1024,
"load_balancer": "AWS ALB"
}
}
},
"migration_phases": [
{
"phase": "preparation",
"description": "Deploy new service and configure routing",
"estimated_duration_hours": 8
},
{
"phase": "intercept",
"description": "Configure API gateway to route to new service",
"estimated_duration_hours": 2
},
{
"phase": "gradual_migration",
"description": "Gradually increase traffic to new service",
"estimated_duration_hours": 48
},
{
"phase": "validation",
"description": "Validate new service performance and functionality",
"estimated_duration_hours": 24
},
{
"phase": "decommission",
"description": "Remove legacy service after validation",
"estimated_duration_hours": 4
}
],
"feature_flags": [
{
"name": "enable_new_user_service",
"description": "Route user service requests to new implementation",
"initial_percentage": 5,
"rollout_schedule": [
{"percentage": 5, "duration_hours": 24},
{"percentage": 25, "duration_hours": 24},
{"percentage": 50, "duration_hours": 24},
{"percentage": 100, "duration_hours": 0}
]
},
{
"name": "enable_new_auth_endpoints",
"description": "Enable new authentication endpoints",
"initial_percentage": 0,
"rollout_schedule": [
{"percentage": 10, "duration_hours": 12},
{"percentage": 50, "duration_hours": 12},
{"percentage": 100, "duration_hours": 0}
]
}
],
"monitoring": {
"critical_metrics": [
"request_rate",
"error_rate",
"response_time_p95",
"response_time_p99",
"cpu_utilization",
"memory_utilization",
"database_connection_pool"
],
"alert_thresholds": {
"error_rate": 0.05,
"response_time_p95": 250,
"cpu_utilization": 0.80,
"memory_utilization": 0.85
}
},
"rollback_triggers": [
{
"metric": "error_rate",
"threshold": 0.10,
"duration_minutes": 5,
"action": "automatic_rollback"
},
{
"metric": "response_time_p95",
"threshold": 500,
"duration_minutes": 10,
"action": "alert_team"
},
{
"metric": "cpu_utilization",
"threshold": 0.95,
"duration_minutes": 5,
"action": "scale_up"
}
]
} {
"runbook_id": "rb_921c0bca",
"migration_id": "23a52ed1507f",
"created_at": "2026-02-16T13:47:31.108500",
"rollback_phases": [
{
"phase_name": "rollback_cleanup",
"description": "Rollback changes made during cleanup phase",
"urgency_level": "medium",
"estimated_duration_minutes": 570,
"prerequisites": [
"Incident commander assigned and briefed",
"All team members notified of rollback initiation",
"Monitoring systems confirmed operational",
"Backup systems verified and accessible"
],
"steps": [
{
"step_id": "rb_validate_0_final",
"name": "Validate rollback completion",
"description": "Comprehensive validation that cleanup rollback completed successfully",
"script_type": "manual",
"script_content": "Execute validation checklist for this phase",
"estimated_duration_minutes": 10,
"dependencies": [],
"validation_commands": [
"SELECT COUNT(*) FROM {table_name};",
"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}';",
"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = '{column_name}';",
"SELECT COUNT(DISTINCT {primary_key}) FROM {table_name};",
"SELECT MAX({timestamp_column}) FROM {table_name};"
],
"success_criteria": [
"cleanup fully rolled back",
"All validation checks pass"
],
"failure_escalation": "Investigate cleanup rollback failures",
"rollback_order": 99
}
],
"validation_checkpoints": [
"cleanup rollback steps completed",
"System health checks passing",
"No critical errors in logs",
"Key metrics within acceptable ranges",
"Validation command passed: SELECT COUNT(*) FROM {table_name};...",
"Validation command passed: SELECT COUNT(*) FROM information_schema.tables WHE...",
"Validation command passed: SELECT COUNT(*) FROM information_schema.columns WH..."
],
"communication_requirements": [
"Notify incident commander of phase start/completion",
"Update rollback status dashboard",
"Log all actions and decisions"
],
"risk_level": "medium"
},
{
"phase_name": "rollback_contract",
"description": "Rollback changes made during contract phase",
"urgency_level": "medium",
"estimated_duration_minutes": 570,
"prerequisites": [
"Incident commander assigned and briefed",
"All team members notified of rollback initiation",
"Monitoring systems confirmed operational",
"Backup systems verified and accessible",
"Previous rollback phase completed successfully"
],
"steps": [
{
"step_id": "rb_validate_1_final",
"name": "Validate rollback completion",
"description": "Comprehensive validation that contract rollback completed successfully",
"script_type": "manual",
"script_content": "Execute validation checklist for this phase",
"estimated_duration_minutes": 10,
"dependencies": [],
"validation_commands": [
"SELECT COUNT(*) FROM {table_name};",
"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}';",
"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = '{column_name}';",
"SELECT COUNT(DISTINCT {primary_key}) FROM {table_name};",
"SELECT MAX({timestamp_column}) FROM {table_name};"
],
"success_criteria": [
"contract fully rolled back",
"All validation checks pass"
],
"failure_escalation": "Investigate contract rollback failures",
"rollback_order": 99
}
],
"validation_checkpoints": [
"contract rollback steps completed",
"System health checks passing",
"No critical errors in logs",
"Key metrics within acceptable ranges",
"Validation command passed: SELECT COUNT(*) FROM {table_name};...",
"Validation command passed: SELECT COUNT(*) FROM information_schema.tables WHE...",
"Validation command passed: SELECT COUNT(*) FROM information_schema.columns WH..."
],
"communication_requirements": [
"Notify incident commander of phase start/completion",
"Update rollback status dashboard",
"Log all actions and decisions"
],
"risk_level": "medium"
},
{
"phase_name": "rollback_migrate",
"description": "Rollback changes made during migrate phase",
"urgency_level": "medium",
"estimated_duration_minutes": 570,
"prerequisites": [
"Incident commander assigned and briefed",
"All team members notified of rollback initiation",
"Monitoring systems confirmed operational",
"Backup systems verified and accessible",
"Previous rollback phase completed successfully"
],
"steps": [
{
"step_id": "rb_validate_2_final",
"name": "Validate rollback completion",
"description": "Comprehensive validation that migrate rollback completed successfully",
"script_type": "manual",
"script_content": "Execute validation checklist for this phase",
"estimated_duration_minutes": 10,
"dependencies": [],
"validation_commands": [
"SELECT COUNT(*) FROM {table_name};",
"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}';",
"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = '{column_name}';",
"SELECT COUNT(DISTINCT {primary_key}) FROM {table_name};",
"SELECT MAX({timestamp_column}) FROM {table_name};"
],
"success_criteria": [
"migrate fully rolled back",
"All validation checks pass"
],
"failure_escalation": "Investigate migrate rollback failures",
"rollback_order": 99
}
],
"validation_checkpoints": [
"migrate rollback steps completed",
"System health checks passing",
"No critical errors in logs",
"Key metrics within acceptable ranges",
"Validation command passed: SELECT COUNT(*) FROM {table_name};...",
"Validation command passed: SELECT COUNT(*) FROM information_schema.tables WHE...",
"Validation command passed: SELECT COUNT(*) FROM information_schema.columns WH..."
],
"communication_requirements": [
"Notify incident commander of phase start/completion",
"Update rollback status dashboard",
"Log all actions and decisions"
],
"risk_level": "medium"
},
{
"phase_name": "rollback_expand",
"description": "Rollback changes made during expand phase",
"urgency_level": "medium",
"estimated_duration_minutes": 570,
"prerequisites": [
"Incident commander assigned and briefed",
"All team members notified of rollback initiation",
"Monitoring systems confirmed operational",
"Backup systems verified and accessible",
"Previous rollback phase completed successfully"
],
"steps": [
{
"step_id": "rb_validate_3_final",
"name": "Validate rollback completion",
"description": "Comprehensive validation that expand rollback completed successfully",
"script_type": "manual",
"script_content": "Execute validation checklist for this phase",
"estimated_duration_minutes": 10,
"dependencies": [],
"validation_commands": [
"SELECT COUNT(*) FROM {table_name};",
"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}';",
"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = '{column_name}';",
"SELECT COUNT(DISTINCT {primary_key}) FROM {table_name};",
"SELECT MAX({timestamp_column}) FROM {table_name};"
],
"success_criteria": [
"expand fully rolled back",
"All validation checks pass"
],
"failure_escalation": "Investigate expand rollback failures",
"rollback_order": 99
}
],
"validation_checkpoints": [
"expand rollback steps completed",
"System health checks passing",
"No critical errors in logs",
"Key metrics within acceptable ranges",
"Validation command passed: SELECT COUNT(*) FROM {table_name};...",
"Validation command passed: SELECT COUNT(*) FROM information_schema.tables WHE...",
"Validation command passed: SELECT COUNT(*) FROM information_schema.columns WH..."
],
"communication_requirements": [
"Notify incident commander of phase start/completion",
"Update rollback status dashboard",
"Log all actions and decisions"
],
"risk_level": "medium"
},
{
"phase_name": "rollback_preparation",
"description": "Rollback changes made during preparation phase",
"urgency_level": "medium",
"estimated_duration_minutes": 570,
"prerequisites": [
"Incident commander assigned and briefed",
"All team members notified of rollback initiation",
"Monitoring systems confirmed operational",
"Backup systems verified and accessible",
"Previous rollback phase completed successfully"
],
"steps": [
{
"step_id": "rb_schema_4_01",
"name": "Drop migration artifacts",
"description": "Remove temporary migration tables and procedures",
"script_type": "sql",
"script_content": "-- Drop migration artifacts\nDROP TABLE IF EXISTS migration_log;\nDROP PROCEDURE IF EXISTS migrate_data();",
"estimated_duration_minutes": 5,
"dependencies": [],
"validation_commands": [
"SELECT COUNT(*) FROM information_schema.tables WHERE table_name LIKE '%migration%';"
],
"success_criteria": [
"No migration artifacts remain"
],
"failure_escalation": "Manual cleanup required",
"rollback_order": 1
},
{
"step_id": "rb_validate_4_final",
"name": "Validate rollback completion",
"description": "Comprehensive validation that preparation rollback completed successfully",
"script_type": "manual",
"script_content": "Execute validation checklist for this phase",
"estimated_duration_minutes": 10,
"dependencies": [
"rb_schema_4_01"
],
"validation_commands": [
"SELECT COUNT(*) FROM {table_name};",
"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}';",
"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = '{column_name}';",
"SELECT COUNT(DISTINCT {primary_key}) FROM {table_name};",
"SELECT MAX({timestamp_column}) FROM {table_name};"
],
"success_criteria": [
"preparation fully rolled back",
"All validation checks pass"
],
"failure_escalation": "Investigate preparation rollback failures",
"rollback_order": 99
}
],
"validation_checkpoints": [
"preparation rollback steps completed",
"System health checks passing",
"No critical errors in logs",
"Key metrics within acceptable ranges",
"Validation command passed: SELECT COUNT(*) FROM {table_name};...",
"Validation command passed: SELECT COUNT(*) FROM information_schema.tables WHE...",
"Validation command passed: SELECT COUNT(*) FROM information_schema.columns WH..."
],
"communication_requirements": [
"Notify incident commander of phase start/completion",
"Update rollback status dashboard",
"Log all actions and decisions"
],
"risk_level": "medium"
}
],
"trigger_conditions": [
{
"trigger_id": "error_rate_spike",
"name": "Error Rate Spike",
"condition": "error_rate > baseline * 5 for 5 minutes",
"metric_threshold": {
"metric": "error_rate",
"operator": "greater_than",
"value": "baseline_error_rate * 5",
"duration_minutes": 5
},
"evaluation_window_minutes": 5,
"auto_execute": true,
"escalation_contacts": [
"on_call_engineer",
"migration_lead"
]
},
{
"trigger_id": "response_time_degradation",
"name": "Response Time Degradation",
"condition": "p95_response_time > baseline * 3 for 10 minutes",
"metric_threshold": {
"metric": "p95_response_time",
"operator": "greater_than",
"value": "baseline_p95 * 3",
"duration_minutes": 10
},
"evaluation_window_minutes": 10,
"auto_execute": false,
"escalation_contacts": [
"performance_team",
"migration_lead"
]
},
{
"trigger_id": "availability_drop",
"name": "Service Availability Drop",
"condition": "availability < 95% for 2 minutes",
"metric_threshold": {
"metric": "availability",
"operator": "less_than",
"value": 0.95,
"duration_minutes": 2
},
"evaluation_window_minutes": 2,
"auto_execute": true,
"escalation_contacts": [
"sre_team",
"incident_commander"
]
},
{
"trigger_id": "data_integrity_failure",
"name": "Data Integrity Check Failure",
"condition": "data_validation_failures > 0",
"metric_threshold": {
"metric": "data_validation_failures",
"operator": "greater_than",
"value": 0,
"duration_minutes": 1
},
"evaluation_window_minutes": 1,
"auto_execute": true,
"escalation_contacts": [
"dba_team",
"data_team"
]
},
{
"trigger_id": "migration_progress_stalled",
"name": "Migration Progress Stalled",
"condition": "migration_progress unchanged for 30 minutes",
"metric_threshold": {
"metric": "migration_progress_rate",
"operator": "equals",
"value": 0,
"duration_minutes": 30
},
"evaluation_window_minutes": 30,
"auto_execute": false,
"escalation_contacts": [
"migration_team",
"dba_team"
]
}
],
"data_recovery_plan": {
"recovery_method": "point_in_time",
"backup_location": "/backups/pre_migration_{migration_id}_{timestamp}.sql",
"recovery_scripts": [
"pg_restore -d production -c /backups/pre_migration_backup.sql",
"SELECT pg_create_restore_point('rollback_point');",
"VACUUM ANALYZE; -- Refresh statistics after restore"
],
"data_validation_queries": [
"SELECT COUNT(*) FROM critical_business_table;",
"SELECT MAX(created_at) FROM audit_log;",
"SELECT COUNT(DISTINCT user_id) FROM user_sessions;",
"SELECT SUM(amount) FROM financial_transactions WHERE date = CURRENT_DATE;"
],
"estimated_recovery_time_minutes": 45,
"recovery_dependencies": [
"database_instance_running",
"backup_file_accessible"
]
},
"communication_templates": [
{
"template_type": "rollback_start",
"audience": "technical",
"subject": "ROLLBACK INITIATED: {migration_name}",
"body": "Team,\n\nWe have initiated rollback for migration: {migration_name}\nRollback ID: {rollback_id}\nStart Time: {start_time}\nEstimated Duration: {estimated_duration}\n\nReason: {rollback_reason}\n\nCurrent Status: Rolling back phase {current_phase}\n\nNext Updates: Every 15 minutes or upon phase completion\n\nActions Required:\n- Monitor system health dashboards\n- Stand by for escalation if needed\n- Do not make manual changes during rollback\n\nIncident Commander: {incident_commander}\n",
"urgency": "medium",
"delivery_methods": [
"email",
"slack"
]
},
{
"template_type": "rollback_start",
"audience": "business",
"subject": "System Rollback In Progress - {system_name}",
"body": "Business Stakeholders,\n\nWe are currently performing a planned rollback of the {system_name} migration due to {rollback_reason}.\n\nImpact: {business_impact}\nExpected Resolution: {estimated_completion_time}\nAffected Services: {affected_services}\n\nWe will provide updates every 30 minutes.\n\nContact: {business_contact}\n",
"urgency": "medium",
"delivery_methods": [
"email"
]
},
{
"template_type": "rollback_start",
"audience": "executive",
"subject": "EXEC ALERT: Critical System Rollback - {system_name}",
"body": "Executive Team,\n\nA critical rollback is in progress for {system_name}.\n\nSummary:\n- Rollback Reason: {rollback_reason}\n- Business Impact: {business_impact}\n- Expected Resolution: {estimated_completion_time}\n- Customer Impact: {customer_impact}\n\nWe are following established procedures and will update hourly.\n\nEscalation: {escalation_contact}\n",
"urgency": "high",
"delivery_methods": [
"email"
]
},
{
"template_type": "rollback_complete",
"audience": "technical",
"subject": "ROLLBACK COMPLETED: {migration_name}",
"body": "Team,\n\nRollback has been successfully completed for migration: {migration_name}\n\nSummary:\n- Start Time: {start_time}\n- End Time: {end_time}\n- Duration: {actual_duration}\n- Phases Completed: {completed_phases}\n\nValidation Results:\n{validation_results}\n\nSystem Status: {system_status}\n\nNext Steps:\n- Continue monitoring for 24 hours\n- Post-rollback review scheduled for {review_date}\n- Root cause analysis to begin\n\nAll clear to resume normal operations.\n\nIncident Commander: {incident_commander}\n",
"urgency": "medium",
"delivery_methods": [
"email",
"slack"
]
},
{
"template_type": "emergency_escalation",
"audience": "executive",
"subject": "CRITICAL: Rollback Emergency - {migration_name}",
"body": "CRITICAL SITUATION - IMMEDIATE ATTENTION REQUIRED\n\nMigration: {migration_name}\nIssue: Rollback procedure has encountered critical failures\n\nCurrent Status: {current_status}\nFailed Components: {failed_components}\nBusiness Impact: {business_impact}\nCustomer Impact: {customer_impact}\n\nImmediate Actions:\n1. Emergency response team activated\n2. {emergency_action_1}\n3. {emergency_action_2}\n\nWar Room: {war_room_location}\nBridge Line: {conference_bridge}\n\nNext Update: {next_update_time}\n\nIncident Commander: {incident_commander}\nExecutive On-Call: {executive_on_call}\n",
"urgency": "emergency",
"delivery_methods": [
"email",
"sms",
"phone_call"
]
}
],
"escalation_matrix": {
"level_1": {
"trigger": "Single component failure",
"response_time_minutes": 5,
"contacts": [
"on_call_engineer",
"migration_lead"
],
"actions": [
"Investigate issue",
"Attempt automated remediation",
"Monitor closely"
]
},
"level_2": {
"trigger": "Multiple component failures or single critical failure",
"response_time_minutes": 2,
"contacts": [
"senior_engineer",
"team_lead",
"devops_lead"
],
"actions": [
"Initiate rollback",
"Establish war room",
"Notify stakeholders"
]
},
"level_3": {
"trigger": "System-wide failure or data corruption",
"response_time_minutes": 1,
"contacts": [
"engineering_manager",
"cto",
"incident_commander"
],
"actions": [
"Emergency rollback",
"All hands on deck",
"Executive notification"
]
},
"emergency": {
"trigger": "Business-critical failure with customer impact",
"response_time_minutes": 0,
"contacts": [
"ceo",
"cto",
"head_of_operations"
],
"actions": [
"Emergency procedures",
"Customer communication",
"Media preparation if needed"
]
}
},
"validation_checklist": [
"Verify system is responding to health checks",
"Confirm error rates are within normal parameters",
"Validate response times meet SLA requirements",
"Check all critical business processes are functioning",
"Verify monitoring and alerting systems are operational",
"Confirm no data corruption has occurred",
"Validate security controls are functioning properly",
"Check backup systems are working correctly",
"Verify integration points with downstream systems",
"Confirm user authentication and authorization working",
"Validate database schema matches expected state",
"Confirm referential integrity constraints",
"Check database performance metrics",
"Verify data consistency across related tables",
"Validate indexes and statistics are optimal",
"Confirm transaction logs are clean",
"Check database connections and connection pooling"
],
"post_rollback_procedures": [
"Monitor system stability for 24-48 hours post-rollback",
"Conduct thorough post-rollback testing of all critical paths",
"Review and analyze rollback metrics and timing",
"Document lessons learned and rollback procedure improvements",
"Schedule post-mortem meeting with all stakeholders",
"Update rollback procedures based on actual experience",
"Communicate rollback completion to all stakeholders",
"Archive rollback logs and artifacts for future reference",
"Review and update monitoring thresholds if needed",
"Plan for next migration attempt with improved procedures",
"Conduct security review to ensure no vulnerabilities introduced",
"Update disaster recovery procedures if affected by rollback",
"Review capacity planning based on rollback resource usage",
"Update documentation with rollback experience and timings"
],
"emergency_contacts": [
{
"role": "Incident Commander",
"name": "TBD - Assigned during migration",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "[email protected]",
"backup_contact": "[email protected]"
},
{
"role": "Technical Lead",
"name": "TBD - Migration technical owner",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "[email protected]",
"backup_contact": "[email protected]"
},
{
"role": "Business Owner",
"name": "TBD - Business stakeholder",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "[email protected]",
"backup_contact": "[email protected]"
},
{
"role": "On-Call Engineer",
"name": "Current on-call rotation",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "[email protected]",
"backup_contact": "[email protected]"
},
{
"role": "Executive Escalation",
"name": "CTO/VP Engineering",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "[email protected]",
"backup_contact": "[email protected]"
}
]
} ================================================================================
ROLLBACK RUNBOOK: rb_921c0bca
================================================================================
Migration ID: 23a52ed1507f
Created: 2026-02-16T13:47:31.108500
EMERGENCY CONTACTS
----------------------------------------
Incident Commander: TBD - Assigned during migration
Phone: +1-XXX-XXX-XXXX
Email: [email protected]
Backup: [email protected]
Technical Lead: TBD - Migration technical owner
Phone: +1-XXX-XXX-XXXX
Email: [email protected]
Backup: [email protected]
Business Owner: TBD - Business stakeholder
Phone: +1-XXX-XXX-XXXX
Email: [email protected]
Backup: [email protected]
On-Call Engineer: Current on-call rotation
Phone: +1-XXX-XXX-XXXX
Email: [email protected]
Backup: [email protected]
Executive Escalation: CTO/VP Engineering
Phone: +1-XXX-XXX-XXXX
Email: [email protected]
Backup: [email protected]
ESCALATION MATRIX
----------------------------------------
LEVEL_1:
Trigger: Single component failure
Response Time: 5 minutes
Contacts: on_call_engineer, migration_lead
Actions: Investigate issue, Attempt automated remediation, Monitor closely
LEVEL_2:
Trigger: Multiple component failures or single critical failure
Response Time: 2 minutes
Contacts: senior_engineer, team_lead, devops_lead
Actions: Initiate rollback, Establish war room, Notify stakeholders
LEVEL_3:
Trigger: System-wide failure or data corruption
Response Time: 1 minutes
Contacts: engineering_manager, cto, incident_commander
Actions: Emergency rollback, All hands on deck, Executive notification
EMERGENCY:
Trigger: Business-critical failure with customer impact
Response Time: 0 minutes
Contacts: ceo, cto, head_of_operations
Actions: Emergency procedures, Customer communication, Media preparation if needed
AUTOMATIC ROLLBACK TRIGGERS
----------------------------------------
• Error Rate Spike
Condition: error_rate > baseline * 5 for 5 minutes
Auto-Execute: Yes
Evaluation Window: 5 minutes
Contacts: on_call_engineer, migration_lead
• Response Time Degradation
Condition: p95_response_time > baseline * 3 for 10 minutes
Auto-Execute: No
Evaluation Window: 10 minutes
Contacts: performance_team, migration_lead
• Service Availability Drop
Condition: availability < 95% for 2 minutes
Auto-Execute: Yes
Evaluation Window: 2 minutes
Contacts: sre_team, incident_commander
• Data Integrity Check Failure
Condition: data_validation_failures > 0
Auto-Execute: Yes
Evaluation Window: 1 minutes
Contacts: dba_team, data_team
• Migration Progress Stalled
Condition: migration_progress unchanged for 30 minutes
Auto-Execute: No
Evaluation Window: 30 minutes
Contacts: migration_team, dba_team
ROLLBACK PHASES
----------------------------------------
1. ROLLBACK_CLEANUP
Description: Rollback changes made during cleanup phase
Urgency: MEDIUM
Duration: 570 minutes
Risk Level: MEDIUM
Prerequisites:
✓ Incident commander assigned and briefed
✓ All team members notified of rollback initiation
✓ Monitoring systems confirmed operational
✓ Backup systems verified and accessible
Steps:
99. Validate rollback completion
Duration: 10 min
Type: manual
Success Criteria: cleanup fully rolled back, All validation checks pass
Validation Checkpoints:
☐ cleanup rollback steps completed
☐ System health checks passing
☐ No critical errors in logs
☐ Key metrics within acceptable ranges
☐ Validation command passed: SELECT COUNT(*) FROM {table_name};...
☐ Validation command passed: SELECT COUNT(*) FROM information_schema.tables WHE...
☐ Validation command passed: SELECT COUNT(*) FROM information_schema.columns WH...
2. ROLLBACK_CONTRACT
Description: Rollback changes made during contract phase
Urgency: MEDIUM
Duration: 570 minutes
Risk Level: MEDIUM
Prerequisites:
✓ Incident commander assigned and briefed
✓ All team members notified of rollback initiation
✓ Monitoring systems confirmed operational
✓ Backup systems verified and accessible
✓ Previous rollback phase completed successfully
Steps:
99. Validate rollback completion
Duration: 10 min
Type: manual
Success Criteria: contract fully rolled back, All validation checks pass
Validation Checkpoints:
☐ contract rollback steps completed
☐ System health checks passing
☐ No critical errors in logs
☐ Key metrics within acceptable ranges
☐ Validation command passed: SELECT COUNT(*) FROM {table_name};...
☐ Validation command passed: SELECT COUNT(*) FROM information_schema.tables WHE...
☐ Validation command passed: SELECT COUNT(*) FROM information_schema.columns WH...
3. ROLLBACK_MIGRATE
Description: Rollback changes made during migrate phase
Urgency: MEDIUM
Duration: 570 minutes
Risk Level: MEDIUM
Prerequisites:
✓ Incident commander assigned and briefed
✓ All team members notified of rollback initiation
✓ Monitoring systems confirmed operational
✓ Backup systems verified and accessible
✓ Previous rollback phase completed successfully
Steps:
99. Validate rollback completion
Duration: 10 min
Type: manual
Success Criteria: migrate fully rolled back, All validation checks pass
Validation Checkpoints:
☐ migrate rollback steps completed
☐ System health checks passing
☐ No critical errors in logs
☐ Key metrics within acceptable ranges
☐ Validation command passed: SELECT COUNT(*) FROM {table_name};...
☐ Validation command passed: SELECT COUNT(*) FROM information_schema.tables WHE...
☐ Validation command passed: SELECT COUNT(*) FROM information_schema.columns WH...
4. ROLLBACK_EXPAND
Description: Rollback changes made during expand phase
Urgency: MEDIUM
Duration: 570 minutes
Risk Level: MEDIUM
Prerequisites:
✓ Incident commander assigned and briefed
✓ All team members notified of rollback initiation
✓ Monitoring systems confirmed operational
✓ Backup systems verified and accessible
✓ Previous rollback phase completed successfully
Steps:
99. Validate rollback completion
Duration: 10 min
Type: manual
Success Criteria: expand fully rolled back, All validation checks pass
Validation Checkpoints:
☐ expand rollback steps completed
☐ System health checks passing
☐ No critical errors in logs
☐ Key metrics within acceptable ranges
☐ Validation command passed: SELECT COUNT(*) FROM {table_name};...
☐ Validation command passed: SELECT COUNT(*) FROM information_schema.tables WHE...
☐ Validation command passed: SELECT COUNT(*) FROM information_schema.columns WH...
5. ROLLBACK_PREPARATION
Description: Rollback changes made during preparation phase
Urgency: MEDIUM
Duration: 570 minutes
Risk Level: MEDIUM
Prerequisites:
✓ Incident commander assigned and briefed
✓ All team members notified of rollback initiation
✓ Monitoring systems confirmed operational
✓ Backup systems verified and accessible
✓ Previous rollback phase completed successfully
Steps:
1. Drop migration artifacts
Duration: 5 min
Type: sql
Script:
-- Drop migration artifacts
DROP TABLE IF EXISTS migration_log;
DROP PROCEDURE IF EXISTS migrate_data();
Success Criteria: No migration artifacts remain
99. Validate rollback completion
Duration: 10 min
Type: manual
Success Criteria: preparation fully rolled back, All validation checks pass
Validation Checkpoints:
☐ preparation rollback steps completed
☐ System health checks passing
☐ No critical errors in logs
☐ Key metrics within acceptable ranges
☐ Validation command passed: SELECT COUNT(*) FROM {table_name};...
☐ Validation command passed: SELECT COUNT(*) FROM information_schema.tables WHE...
☐ Validation command passed: SELECT COUNT(*) FROM information_schema.columns WH...
DATA RECOVERY PLAN
----------------------------------------
Recovery Method: point_in_time
Backup Location: /backups/pre_migration_{migration_id}_{timestamp}.sql
Estimated Recovery Time: 45 minutes
Recovery Scripts:
• pg_restore -d production -c /backups/pre_migration_backup.sql
• SELECT pg_create_restore_point('rollback_point');
• VACUUM ANALYZE; -- Refresh statistics after restore
Validation Queries:
• SELECT COUNT(*) FROM critical_business_table;
• SELECT MAX(created_at) FROM audit_log;
• SELECT COUNT(DISTINCT user_id) FROM user_sessions;
• SELECT SUM(amount) FROM financial_transactions WHERE date = CURRENT_DATE;
POST-ROLLBACK VALIDATION CHECKLIST
----------------------------------------
1. ☐ Verify system is responding to health checks
2. ☐ Confirm error rates are within normal parameters
3. ☐ Validate response times meet SLA requirements
4. ☐ Check all critical business processes are functioning
5. ☐ Verify monitoring and alerting systems are operational
6. ☐ Confirm no data corruption has occurred
7. ☐ Validate security controls are functioning properly
8. ☐ Check backup systems are working correctly
9. ☐ Verify integration points with downstream systems
10. ☐ Confirm user authentication and authorization working
11. ☐ Validate database schema matches expected state
12. ☐ Confirm referential integrity constraints
13. ☐ Check database performance metrics
14. ☐ Verify data consistency across related tables
15. ☐ Validate indexes and statistics are optimal
16. ☐ Confirm transaction logs are clean
17. ☐ Check database connections and connection pooling
POST-ROLLBACK PROCEDURES
----------------------------------------
1. Monitor system stability for 24-48 hours post-rollback
2. Conduct thorough post-rollback testing of all critical paths
3. Review and analyze rollback metrics and timing
4. Document lessons learned and rollback procedure improvements
5. Schedule post-mortem meeting with all stakeholders
6. Update rollback procedures based on actual experience
7. Communicate rollback completion to all stakeholders
8. Archive rollback logs and artifacts for future reference
9. Review and update monitoring thresholds if needed
10. Plan for next migration attempt with improved procedures
11. Conduct security review to ensure no vulnerabilities introduced
12. Update disaster recovery procedures if affected by rollback
13. Review capacity planning based on rollback resource usage
14. Update documentation with rollback experience and timings
{
"migration_id": "23a52ed1507f",
"source_system": "PostgreSQL 13 Production Database",
"target_system": "PostgreSQL 15 Cloud Database",
"migration_type": "database",
"complexity": "critical",
"estimated_duration_hours": 95,
"phases": [
{
"name": "preparation",
"description": "Prepare systems and teams for migration",
"duration_hours": 19,
"dependencies": [],
"validation_criteria": [
"All backups completed successfully",
"Monitoring systems operational",
"Team members briefed and ready",
"Rollback procedures tested"
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Performance degradation > 50%",
"Business process failure"
],
"tasks": [
"Backup source system",
"Set up monitoring and alerting",
"Prepare rollback procedures",
"Communicate migration timeline",
"Validate prerequisites"
],
"risk_level": "medium",
"resources_required": [
"Technical team availability",
"System access and permissions",
"Monitoring and alerting systems",
"Communication channels"
]
},
{
"name": "expand",
"description": "Execute expand phase",
"duration_hours": 19,
"dependencies": [
"preparation"
],
"validation_criteria": [
"Expand phase completed successfully"
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Performance degradation > 50%",
"Business process failure"
],
"tasks": [
"Complete expand activities"
],
"risk_level": "medium",
"resources_required": [
"Technical team availability",
"System access and permissions",
"Monitoring and alerting systems",
"Communication channels"
]
},
{
"name": "migrate",
"description": "Execute migrate phase",
"duration_hours": 19,
"dependencies": [
"expand"
],
"validation_criteria": [
"Migrate phase completed successfully"
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Performance degradation > 50%",
"Business process failure"
],
"tasks": [
"Complete migrate activities"
],
"risk_level": "medium",
"resources_required": [
"Technical team availability",
"System access and permissions",
"Monitoring and alerting systems",
"Communication channels"
]
},
{
"name": "contract",
"description": "Execute contract phase",
"duration_hours": 19,
"dependencies": [
"migrate"
],
"validation_criteria": [
"Contract phase completed successfully"
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Performance degradation > 50%",
"Business process failure"
],
"tasks": [
"Complete contract activities"
],
"risk_level": "medium",
"resources_required": [
"Technical team availability",
"System access and permissions",
"Monitoring and alerting systems",
"Communication channels"
]
},
{
"name": "cleanup",
"description": "Execute cleanup phase",
"duration_hours": 19,
"dependencies": [
"contract"
],
"validation_criteria": [
"Cleanup phase completed successfully"
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Performance degradation > 50%",
"Business process failure"
],
"tasks": [
"Complete cleanup activities"
],
"risk_level": "medium",
"resources_required": [
"Technical team availability",
"System access and permissions",
"Monitoring and alerting systems",
"Communication channels"
]
}
],
"risks": [
{
"category": "technical",
"description": "Data corruption during migration",
"probability": "low",
"impact": "critical",
"severity": "high",
"mitigation": "Implement comprehensive backup and validation procedures",
"owner": "DBA Team"
},
{
"category": "technical",
"description": "Extended downtime due to migration complexity",
"probability": "medium",
"impact": "high",
"severity": "high",
"mitigation": "Use blue-green deployment and phased migration approach",
"owner": "DevOps Team"
},
{
"category": "business",
"description": "Business process disruption",
"probability": "medium",
"impact": "high",
"severity": "high",
"mitigation": "Communicate timeline and provide alternate workflows",
"owner": "Business Owner"
},
{
"category": "operational",
"description": "Insufficient rollback testing",
"probability": "high",
"impact": "critical",
"severity": "critical",
"mitigation": "Execute full rollback procedures in staging environment",
"owner": "QA Team"
},
{
"category": "business",
"description": "Zero-downtime requirement increases complexity",
"probability": "high",
"impact": "medium",
"severity": "high",
"mitigation": "Implement blue-green deployment or rolling update strategy",
"owner": "DevOps Team"
},
{
"category": "compliance",
"description": "Regulatory compliance requirements",
"probability": "medium",
"impact": "high",
"severity": "high",
"mitigation": "Ensure all compliance checks are integrated into migration process",
"owner": "Compliance Team"
}
],
"success_criteria": [
"All data successfully migrated with 100% integrity",
"System performance meets or exceeds baseline",
"All business processes functioning normally",
"No critical security vulnerabilities introduced",
"Stakeholder acceptance criteria met",
"Documentation and runbooks updated"
],
"rollback_plan": {
"rollback_phases": [
{
"phase": "cleanup",
"rollback_actions": [
"Revert cleanup changes",
"Restore pre-cleanup state",
"Validate cleanup rollback success"
],
"validation_criteria": [
"System restored to pre-cleanup state",
"All cleanup changes successfully reverted",
"System functionality confirmed"
],
"estimated_time_minutes": 285
},
{
"phase": "contract",
"rollback_actions": [
"Revert contract changes",
"Restore pre-contract state",
"Validate contract rollback success"
],
"validation_criteria": [
"System restored to pre-contract state",
"All contract changes successfully reverted",
"System functionality confirmed"
],
"estimated_time_minutes": 285
},
{
"phase": "migrate",
"rollback_actions": [
"Revert migrate changes",
"Restore pre-migrate state",
"Validate migrate rollback success"
],
"validation_criteria": [
"System restored to pre-migrate state",
"All migrate changes successfully reverted",
"System functionality confirmed"
],
"estimated_time_minutes": 285
},
{
"phase": "expand",
"rollback_actions": [
"Revert expand changes",
"Restore pre-expand state",
"Validate expand rollback success"
],
"validation_criteria": [
"System restored to pre-expand state",
"All expand changes successfully reverted",
"System functionality confirmed"
],
"estimated_time_minutes": 285
},
{
"phase": "preparation",
"rollback_actions": [
"Revert preparation changes",
"Restore pre-preparation state",
"Validate preparation rollback success"
],
"validation_criteria": [
"System restored to pre-preparation state",
"All preparation changes successfully reverted",
"System functionality confirmed"
],
"estimated_time_minutes": 285
}
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Migration timeline exceeded by > 50%",
"Business-critical functionality unavailable",
"Security breach detected",
"Stakeholder decision to abort"
],
"rollback_decision_matrix": {
"low_severity": "Continue with monitoring",
"medium_severity": "Assess and decide within 15 minutes",
"high_severity": "Immediate rollback initiation",
"critical_severity": "Emergency rollback - all hands"
},
"rollback_contacts": [
"Migration Lead",
"Technical Lead",
"Business Owner",
"On-call Engineer"
]
},
"stakeholders": [
"Business Owner",
"Technical Lead",
"DevOps Team",
"QA Team",
"Security Team",
"End Users"
],
"created_at": "2026-02-16T13:47:23.704502"
} ================================================================================
MIGRATION PLAN: 23a52ed1507f
================================================================================
Source System: PostgreSQL 13 Production Database
Target System: PostgreSQL 15 Cloud Database
Migration Type: DATABASE
Complexity Level: CRITICAL
Estimated Duration: 95 hours (4.0 days)
Created: 2026-02-16T13:47:23.704502
MIGRATION PHASES
----------------------------------------
1. PREPARATION (19h)
Description: Prepare systems and teams for migration
Risk Level: MEDIUM
Tasks:
• Backup source system
• Set up monitoring and alerting
• Prepare rollback procedures
• Communicate migration timeline
• Validate prerequisites
Success Criteria:
✓ All backups completed successfully
✓ Monitoring systems operational
✓ Team members briefed and ready
✓ Rollback procedures tested
2. EXPAND (19h)
Description: Execute expand phase
Risk Level: MEDIUM
Dependencies: preparation
Tasks:
• Complete expand activities
Success Criteria:
✓ Expand phase completed successfully
3. MIGRATE (19h)
Description: Execute migrate phase
Risk Level: MEDIUM
Dependencies: expand
Tasks:
• Complete migrate activities
Success Criteria:
✓ Migrate phase completed successfully
4. CONTRACT (19h)
Description: Execute contract phase
Risk Level: MEDIUM
Dependencies: migrate
Tasks:
• Complete contract activities
Success Criteria:
✓ Contract phase completed successfully
5. CLEANUP (19h)
Description: Execute cleanup phase
Risk Level: MEDIUM
Dependencies: contract
Tasks:
• Complete cleanup activities
Success Criteria:
✓ Cleanup phase completed successfully
RISK ASSESSMENT
----------------------------------------
CRITICAL SEVERITY RISKS:
• Insufficient rollback testing
Category: operational
Probability: high | Impact: critical
Mitigation: Execute full rollback procedures in staging environment
Owner: QA Team
HIGH SEVERITY RISKS:
• Data corruption during migration
Category: technical
Probability: low | Impact: critical
Mitigation: Implement comprehensive backup and validation procedures
Owner: DBA Team
• Extended downtime due to migration complexity
Category: technical
Probability: medium | Impact: high
Mitigation: Use blue-green deployment and phased migration approach
Owner: DevOps Team
• Business process disruption
Category: business
Probability: medium | Impact: high
Mitigation: Communicate timeline and provide alternate workflows
Owner: Business Owner
• Zero-downtime requirement increases complexity
Category: business
Probability: high | Impact: medium
Mitigation: Implement blue-green deployment or rolling update strategy
Owner: DevOps Team
• Regulatory compliance requirements
Category: compliance
Probability: medium | Impact: high
Mitigation: Ensure all compliance checks are integrated into migration process
Owner: Compliance Team
ROLLBACK STRATEGY
----------------------------------------
Rollback Triggers:
• Critical system failure
• Data corruption detected
• Migration timeline exceeded by > 50%
• Business-critical functionality unavailable
• Security breach detected
• Stakeholder decision to abort
Rollback Phases:
CLEANUP:
- Revert cleanup changes
- Restore pre-cleanup state
- Validate cleanup rollback success
Estimated Time: 285 minutes
CONTRACT:
- Revert contract changes
- Restore pre-contract state
- Validate contract rollback success
Estimated Time: 285 minutes
MIGRATE:
- Revert migrate changes
- Restore pre-migrate state
- Validate migrate rollback success
Estimated Time: 285 minutes
EXPAND:
- Revert expand changes
- Restore pre-expand state
- Validate expand rollback success
Estimated Time: 285 minutes
PREPARATION:
- Revert preparation changes
- Restore pre-preparation state
- Validate preparation rollback success
Estimated Time: 285 minutes
SUCCESS CRITERIA
----------------------------------------
✓ All data successfully migrated with 100% integrity
✓ System performance meets or exceeds baseline
✓ All business processes functioning normally
✓ No critical security vulnerabilities introduced
✓ Stakeholder acceptance criteria met
✓ Documentation and runbooks updated
STAKEHOLDERS
----------------------------------------
• Business Owner
• Technical Lead
• DevOps Team
• QA Team
• Security Team
• End Users
{
"migration_id": "21031930da18",
"source_system": "Legacy User Service (Java Spring Boot 2.x)",
"target_system": "New User Service (Node.js + TypeScript)",
"migration_type": "service",
"complexity": "critical",
"estimated_duration_hours": 500,
"phases": [
{
"name": "intercept",
"description": "Execute intercept phase",
"duration_hours": 100,
"dependencies": [],
"validation_criteria": [
"Intercept phase completed successfully"
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Performance degradation > 50%",
"Business process failure"
],
"tasks": [
"Complete intercept activities"
],
"risk_level": "medium",
"resources_required": [
"Technical team availability",
"System access and permissions",
"Monitoring and alerting systems",
"Communication channels"
]
},
{
"name": "implement",
"description": "Execute implement phase",
"duration_hours": 100,
"dependencies": [
"intercept"
],
"validation_criteria": [
"Implement phase completed successfully"
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Performance degradation > 50%",
"Business process failure"
],
"tasks": [
"Complete implement activities"
],
"risk_level": "medium",
"resources_required": [
"Technical team availability",
"System access and permissions",
"Monitoring and alerting systems",
"Communication channels"
]
},
{
"name": "redirect",
"description": "Execute redirect phase",
"duration_hours": 100,
"dependencies": [
"implement"
],
"validation_criteria": [
"Redirect phase completed successfully"
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Performance degradation > 50%",
"Business process failure"
],
"tasks": [
"Complete redirect activities"
],
"risk_level": "medium",
"resources_required": [
"Technical team availability",
"System access and permissions",
"Monitoring and alerting systems",
"Communication channels"
]
},
{
"name": "validate",
"description": "Execute validate phase",
"duration_hours": 100,
"dependencies": [
"redirect"
],
"validation_criteria": [
"Validate phase completed successfully"
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Performance degradation > 50%",
"Business process failure"
],
"tasks": [
"Complete validate activities"
],
"risk_level": "medium",
"resources_required": [
"Technical team availability",
"System access and permissions",
"Monitoring and alerting systems",
"Communication channels"
]
},
{
"name": "retire",
"description": "Execute retire phase",
"duration_hours": 100,
"dependencies": [
"validate"
],
"validation_criteria": [
"Retire phase completed successfully"
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Performance degradation > 50%",
"Business process failure"
],
"tasks": [
"Complete retire activities"
],
"risk_level": "medium",
"resources_required": [
"Technical team availability",
"System access and permissions",
"Monitoring and alerting systems",
"Communication channels"
]
}
],
"risks": [
{
"category": "technical",
"description": "Service compatibility issues",
"probability": "medium",
"impact": "high",
"severity": "high",
"mitigation": "Implement comprehensive integration testing",
"owner": "Development Team"
},
{
"category": "technical",
"description": "Performance degradation",
"probability": "medium",
"impact": "medium",
"severity": "medium",
"mitigation": "Conduct load testing and performance benchmarking",
"owner": "DevOps Team"
},
{
"category": "business",
"description": "Feature parity gaps",
"probability": "high",
"impact": "high",
"severity": "high",
"mitigation": "Document feature mapping and acceptance criteria",
"owner": "Product Owner"
},
{
"category": "operational",
"description": "Monitoring gap during transition",
"probability": "medium",
"impact": "medium",
"severity": "medium",
"mitigation": "Set up dual monitoring and alerting systems",
"owner": "SRE Team"
},
{
"category": "business",
"description": "Zero-downtime requirement increases complexity",
"probability": "high",
"impact": "medium",
"severity": "high",
"mitigation": "Implement blue-green deployment or rolling update strategy",
"owner": "DevOps Team"
},
{
"category": "compliance",
"description": "Regulatory compliance requirements",
"probability": "medium",
"impact": "high",
"severity": "high",
"mitigation": "Ensure all compliance checks are integrated into migration process",
"owner": "Compliance Team"
}
],
"success_criteria": [
"All data successfully migrated with 100% integrity",
"System performance meets or exceeds baseline",
"All business processes functioning normally",
"No critical security vulnerabilities introduced",
"Stakeholder acceptance criteria met",
"Documentation and runbooks updated"
],
"rollback_plan": {
"rollback_phases": [
{
"phase": "retire",
"rollback_actions": [
"Revert retire changes",
"Restore pre-retire state",
"Validate retire rollback success"
],
"validation_criteria": [
"System restored to pre-retire state",
"All retire changes successfully reverted",
"System functionality confirmed"
],
"estimated_time_minutes": 1500
},
{
"phase": "validate",
"rollback_actions": [
"Revert validate changes",
"Restore pre-validate state",
"Validate validate rollback success"
],
"validation_criteria": [
"System restored to pre-validate state",
"All validate changes successfully reverted",
"System functionality confirmed"
],
"estimated_time_minutes": 1500
},
{
"phase": "redirect",
"rollback_actions": [
"Revert redirect changes",
"Restore pre-redirect state",
"Validate redirect rollback success"
],
"validation_criteria": [
"System restored to pre-redirect state",
"All redirect changes successfully reverted",
"System functionality confirmed"
],
"estimated_time_minutes": 1500
},
{
"phase": "implement",
"rollback_actions": [
"Revert implement changes",
"Restore pre-implement state",
"Validate implement rollback success"
],
"validation_criteria": [
"System restored to pre-implement state",
"All implement changes successfully reverted",
"System functionality confirmed"
],
"estimated_time_minutes": 1500
},
{
"phase": "intercept",
"rollback_actions": [
"Revert intercept changes",
"Restore pre-intercept state",
"Validate intercept rollback success"
],
"validation_criteria": [
"System restored to pre-intercept state",
"All intercept changes successfully reverted",
"System functionality confirmed"
],
"estimated_time_minutes": 1500
}
],
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Migration timeline exceeded by > 50%",
"Business-critical functionality unavailable",
"Security breach detected",
"Stakeholder decision to abort"
],
"rollback_decision_matrix": {
"low_severity": "Continue with monitoring",
"medium_severity": "Assess and decide within 15 minutes",
"high_severity": "Immediate rollback initiation",
"critical_severity": "Emergency rollback - all hands"
},
"rollback_contacts": [
"Migration Lead",
"Technical Lead",
"Business Owner",
"On-call Engineer"
]
},
"stakeholders": [
"Business Owner",
"Technical Lead",
"DevOps Team",
"QA Team",
"Security Team",
"End Users"
],
"created_at": "2026-02-16T13:47:34.565896"
} ================================================================================
MIGRATION PLAN: 21031930da18
================================================================================
Source System: Legacy User Service (Java Spring Boot 2.x)
Target System: New User Service (Node.js + TypeScript)
Migration Type: SERVICE
Complexity Level: CRITICAL
Estimated Duration: 500 hours (20.8 days)
Created: 2026-02-16T13:47:34.565896
MIGRATION PHASES
----------------------------------------
1. INTERCEPT (100h)
Description: Execute intercept phase
Risk Level: MEDIUM
Tasks:
• Complete intercept activities
Success Criteria:
✓ Intercept phase completed successfully
2. IMPLEMENT (100h)
Description: Execute implement phase
Risk Level: MEDIUM
Dependencies: intercept
Tasks:
• Complete implement activities
Success Criteria:
✓ Implement phase completed successfully
3. REDIRECT (100h)
Description: Execute redirect phase
Risk Level: MEDIUM
Dependencies: implement
Tasks:
• Complete redirect activities
Success Criteria:
✓ Redirect phase completed successfully
4. VALIDATE (100h)
Description: Execute validate phase
Risk Level: MEDIUM
Dependencies: redirect
Tasks:
• Complete validate activities
Success Criteria:
✓ Validate phase completed successfully
5. RETIRE (100h)
Description: Execute retire phase
Risk Level: MEDIUM
Dependencies: validate
Tasks:
• Complete retire activities
Success Criteria:
✓ Retire phase completed successfully
RISK ASSESSMENT
----------------------------------------
HIGH SEVERITY RISKS:
• Service compatibility issues
Category: technical
Probability: medium | Impact: high
Mitigation: Implement comprehensive integration testing
Owner: Development Team
• Feature parity gaps
Category: business
Probability: high | Impact: high
Mitigation: Document feature mapping and acceptance criteria
Owner: Product Owner
• Zero-downtime requirement increases complexity
Category: business
Probability: high | Impact: medium
Mitigation: Implement blue-green deployment or rolling update strategy
Owner: DevOps Team
• Regulatory compliance requirements
Category: compliance
Probability: medium | Impact: high
Mitigation: Ensure all compliance checks are integrated into migration process
Owner: Compliance Team
MEDIUM SEVERITY RISKS:
• Performance degradation
Category: technical
Probability: medium | Impact: medium
Mitigation: Conduct load testing and performance benchmarking
Owner: DevOps Team
• Monitoring gap during transition
Category: operational
Probability: medium | Impact: medium
Mitigation: Set up dual monitoring and alerting systems
Owner: SRE Team
ROLLBACK STRATEGY
----------------------------------------
Rollback Triggers:
• Critical system failure
• Data corruption detected
• Migration timeline exceeded by > 50%
• Business-critical functionality unavailable
• Security breach detected
• Stakeholder decision to abort
Rollback Phases:
RETIRE:
- Revert retire changes
- Restore pre-retire state
- Validate retire rollback success
Estimated Time: 1500 minutes
VALIDATE:
- Revert validate changes
- Restore pre-validate state
- Validate validate rollback success
Estimated Time: 1500 minutes
REDIRECT:
- Revert redirect changes
- Restore pre-redirect state
- Validate redirect rollback success
Estimated Time: 1500 minutes
IMPLEMENT:
- Revert implement changes
- Restore pre-implement state
- Validate implement rollback success
Estimated Time: 1500 minutes
INTERCEPT:
- Revert intercept changes
- Restore pre-intercept state
- Validate intercept rollback success
Estimated Time: 1500 minutes
SUCCESS CRITERIA
----------------------------------------
✓ All data successfully migrated with 100% integrity
✓ System performance meets or exceeds baseline
✓ All business processes functioning normally
✓ No critical security vulnerabilities introduced
✓ Stakeholder acceptance criteria met
✓ Documentation and runbooks updated
STAKEHOLDERS
----------------------------------------
• Business Owner
• Technical Lead
• DevOps Team
• QA Team
• Security Team
• End Users
{
"schema_before": "{\n \"schema_version\": \"1.0\",\n \"database\": \"user_management\",\n \"tables\": {\n \"users\": {\n \"columns\": {\n \"id\": {\n \"type\": \"bigint\",\n \"nullable\": false,\n \"primary_key\": true,\n \"auto_increment\": true\n },\n \"username\": {\n \"type\": \"varchar\",\n \"length\": 50,\n \"nullable\": false,\n \"unique\": true\n },\n \"email\": {\n \"type\": \"varchar\",\n \"length\": 255,\n \"nullable\": false,\n...",
"schema_after": "{\n \"schema_version\": \"2.0\",\n \"database\": \"user_management_v2\",\n \"tables\": {\n \"users\": {\n \"columns\": {\n \"id\": {\n \"type\": \"bigint\",\n \"nullable\": false,\n \"primary_key\": true,\n \"auto_increment\": true\n },\n \"username\": {\n \"type\": \"varchar\",\n \"length\": 50,\n \"nullable\": false,\n \"unique\": true\n },\n \"email\": {\n \"type\": \"varchar\",\n \"length\": 320,\n \"nullable\": fals...",
"analysis_date": "2026-02-16T13:47:27.050459",
"overall_compatibility": "potentially_incompatible",
"breaking_changes_count": 0,
"potentially_breaking_count": 4,
"non_breaking_changes_count": 0,
"additive_changes_count": 0,
"issues": [
{
"type": "check_added",
"severity": "potentially_breaking",
"description": "New check constraint 'phone IS NULL OR LENGTH(phone) >= 10' added to table 'users'",
"field_path": "tables.users.constraints.check",
"old_value": null,
"new_value": "phone IS NULL OR LENGTH(phone) >= 10",
"impact": "New check constraint may reject existing data",
"suggested_migration": "Validate existing data complies with new constraint",
"affected_operations": [
"INSERT",
"UPDATE"
]
},
{
"type": "check_added",
"severity": "potentially_breaking",
"description": "New check constraint 'bio IS NULL OR LENGTH(bio) <= 2000' added to table 'user_profiles'",
"field_path": "tables.user_profiles.constraints.check",
"old_value": null,
"new_value": "bio IS NULL OR LENGTH(bio) <= 2000",
"impact": "New check constraint may reject existing data",
"suggested_migration": "Validate existing data complies with new constraint",
"affected_operations": [
"INSERT",
"UPDATE"
]
},
{
"type": "check_added",
"severity": "potentially_breaking",
"description": "New check constraint 'language IN ('en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'ja', 'ko', 'zh')' added to table 'user_profiles'",
"field_path": "tables.user_profiles.constraints.check",
"old_value": null,
"new_value": "language IN ('en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'ja', 'ko', 'zh')",
"impact": "New check constraint may reject existing data",
"suggested_migration": "Validate existing data complies with new constraint",
"affected_operations": [
"INSERT",
"UPDATE"
]
},
{
"type": "check_added",
"severity": "potentially_breaking",
"description": "New check constraint 'session_type IN ('web', 'mobile', 'api', 'admin')' added to table 'user_sessions'",
"field_path": "tables.user_sessions.constraints.check",
"old_value": null,
"new_value": "session_type IN ('web', 'mobile', 'api', 'admin')",
"impact": "New check constraint may reject existing data",
"suggested_migration": "Validate existing data complies with new constraint",
"affected_operations": [
"INSERT",
"UPDATE"
]
}
],
"migration_scripts": [
{
"script_type": "sql",
"description": "Create new table user_preferences",
"script_content": "CREATE TABLE user_preferences (\n id bigint NOT NULL,\n user_id bigint NOT NULL,\n preference_key varchar NOT NULL,\n preference_value json,\n created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,\n updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP\n);",
"rollback_script": "DROP TABLE IF EXISTS user_preferences;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'user_preferences';"
},
{
"script_type": "sql",
"description": "Add column email_verified_at to table users",
"script_content": "ALTER TABLE users ADD COLUMN email_verified_at timestamp;",
"rollback_script": "ALTER TABLE users DROP COLUMN email_verified_at;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'users' AND column_name = 'email_verified_at';"
},
{
"script_type": "sql",
"description": "Add column phone_verified_at to table users",
"script_content": "ALTER TABLE users ADD COLUMN phone_verified_at timestamp;",
"rollback_script": "ALTER TABLE users DROP COLUMN phone_verified_at;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'users' AND column_name = 'phone_verified_at';"
},
{
"script_type": "sql",
"description": "Add column two_factor_enabled to table users",
"script_content": "ALTER TABLE users ADD COLUMN two_factor_enabled boolean NOT NULL DEFAULT False;",
"rollback_script": "ALTER TABLE users DROP COLUMN two_factor_enabled;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'users' AND column_name = 'two_factor_enabled';"
},
{
"script_type": "sql",
"description": "Add column last_login_at to table users",
"script_content": "ALTER TABLE users ADD COLUMN last_login_at timestamp;",
"rollback_script": "ALTER TABLE users DROP COLUMN last_login_at;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'users' AND column_name = 'last_login_at';"
},
{
"script_type": "sql",
"description": "Add check constraint to users",
"script_content": "ALTER TABLE users ADD CONSTRAINT check_users CHECK (phone IS NULL OR LENGTH(phone) >= 10);",
"rollback_script": "ALTER TABLE users DROP CONSTRAINT check_users;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.table_constraints WHERE table_name = 'users' AND constraint_type = 'CHECK';"
},
{
"script_type": "sql",
"description": "Add column timezone to table user_profiles",
"script_content": "ALTER TABLE user_profiles ADD COLUMN timezone varchar DEFAULT UTC;",
"rollback_script": "ALTER TABLE user_profiles DROP COLUMN timezone;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'user_profiles' AND column_name = 'timezone';"
},
{
"script_type": "sql",
"description": "Add column language to table user_profiles",
"script_content": "ALTER TABLE user_profiles ADD COLUMN language varchar NOT NULL DEFAULT en;",
"rollback_script": "ALTER TABLE user_profiles DROP COLUMN language;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'user_profiles' AND column_name = 'language';"
},
{
"script_type": "sql",
"description": "Add check constraint to user_profiles",
"script_content": "ALTER TABLE user_profiles ADD CONSTRAINT check_user_profiles CHECK (bio IS NULL OR LENGTH(bio) <= 2000);",
"rollback_script": "ALTER TABLE user_profiles DROP CONSTRAINT check_user_profiles;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.table_constraints WHERE table_name = 'user_profiles' AND constraint_type = 'CHECK';"
},
{
"script_type": "sql",
"description": "Add check constraint to user_profiles",
"script_content": "ALTER TABLE user_profiles ADD CONSTRAINT check_user_profiles CHECK (language IN ('en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'ja', 'ko', 'zh'));",
"rollback_script": "ALTER TABLE user_profiles DROP CONSTRAINT check_user_profiles;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.table_constraints WHERE table_name = 'user_profiles' AND constraint_type = 'CHECK';"
},
{
"script_type": "sql",
"description": "Add column session_type to table user_sessions",
"script_content": "ALTER TABLE user_sessions ADD COLUMN session_type varchar NOT NULL DEFAULT web;",
"rollback_script": "ALTER TABLE user_sessions DROP COLUMN session_type;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'user_sessions' AND column_name = 'session_type';"
},
{
"script_type": "sql",
"description": "Add column is_mobile to table user_sessions",
"script_content": "ALTER TABLE user_sessions ADD COLUMN is_mobile boolean NOT NULL DEFAULT False;",
"rollback_script": "ALTER TABLE user_sessions DROP COLUMN is_mobile;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'user_sessions' AND column_name = 'is_mobile';"
},
{
"script_type": "sql",
"description": "Add check constraint to user_sessions",
"script_content": "ALTER TABLE user_sessions ADD CONSTRAINT check_user_sessions CHECK (session_type IN ('web', 'mobile', 'api', 'admin'));",
"rollback_script": "ALTER TABLE user_sessions DROP CONSTRAINT check_user_sessions;",
"dependencies": [],
"validation_query": "SELECT COUNT(*) FROM information_schema.table_constraints WHERE table_name = 'user_sessions' AND constraint_type = 'CHECK';"
}
],
"risk_assessment": {
"overall_risk": "medium",
"deployment_risk": "safe_independent_deployment",
"rollback_complexity": "low",
"testing_requirements": [
"integration_testing",
"regression_testing",
"data_migration_testing"
]
},
"recommendations": [
"Conduct thorough testing with realistic data volumes",
"Implement monitoring for migration success metrics",
"Test all migration scripts in staging environment",
"Implement migration progress monitoring",
"Create detailed communication plan for stakeholders",
"Implement feature flags for gradual rollout"
]
} ================================================================================
COMPATIBILITY ANALYSIS REPORT
================================================================================
Analysis Date: 2026-02-16T13:47:27.050459
Overall Compatibility: POTENTIALLY_INCOMPATIBLE
SUMMARY
----------------------------------------
Breaking Changes: 0
Potentially Breaking: 4
Non-Breaking Changes: 0
Additive Changes: 0
Total Issues Found: 4
RISK ASSESSMENT
----------------------------------------
Overall Risk: medium
Deployment Risk: safe_independent_deployment
Rollback Complexity: low
Testing Requirements: ['integration_testing', 'regression_testing', 'data_migration_testing']
POTENTIALLY BREAKING ISSUES
----------------------------------------
• New check constraint 'phone IS NULL OR LENGTH(phone) >= 10' added to table 'users'
Field: tables.users.constraints.check
Impact: New check constraint may reject existing data
Migration: Validate existing data complies with new constraint
Affected Operations: INSERT, UPDATE
• New check constraint 'bio IS NULL OR LENGTH(bio) <= 2000' added to table 'user_profiles'
Field: tables.user_profiles.constraints.check
Impact: New check constraint may reject existing data
Migration: Validate existing data complies with new constraint
Affected Operations: INSERT, UPDATE
• New check constraint 'language IN ('en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'ja', 'ko', 'zh')' added to table 'user_profiles'
Field: tables.user_profiles.constraints.check
Impact: New check constraint may reject existing data
Migration: Validate existing data complies with new constraint
Affected Operations: INSERT, UPDATE
• New check constraint 'session_type IN ('web', 'mobile', 'api', 'admin')' added to table 'user_sessions'
Field: tables.user_sessions.constraints.check
Impact: New check constraint may reject existing data
Migration: Validate existing data complies with new constraint
Affected Operations: INSERT, UPDATE
SUGGESTED MIGRATION SCRIPTS
----------------------------------------
1. Create new table user_preferences
Type: sql
Script:
CREATE TABLE user_preferences (
id bigint NOT NULL,
user_id bigint NOT NULL,
preference_key varchar NOT NULL,
preference_value json,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
2. Add column email_verified_at to table users
Type: sql
Script:
ALTER TABLE users ADD COLUMN email_verified_at timestamp;
3. Add column phone_verified_at to table users
Type: sql
Script:
ALTER TABLE users ADD COLUMN phone_verified_at timestamp;
4. Add column two_factor_enabled to table users
Type: sql
Script:
ALTER TABLE users ADD COLUMN two_factor_enabled boolean NOT NULL DEFAULT False;
5. Add column last_login_at to table users
Type: sql
Script:
ALTER TABLE users ADD COLUMN last_login_at timestamp;
6. Add check constraint to users
Type: sql
Script:
ALTER TABLE users ADD CONSTRAINT check_users CHECK (phone IS NULL OR LENGTH(phone) >= 10);
7. Add column timezone to table user_profiles
Type: sql
Script:
ALTER TABLE user_profiles ADD COLUMN timezone varchar DEFAULT UTC;
8. Add column language to table user_profiles
Type: sql
Script:
ALTER TABLE user_profiles ADD COLUMN language varchar NOT NULL DEFAULT en;
9. Add check constraint to user_profiles
Type: sql
Script:
ALTER TABLE user_profiles ADD CONSTRAINT check_user_profiles CHECK (bio IS NULL OR LENGTH(bio) <= 2000);
10. Add check constraint to user_profiles
Type: sql
Script:
ALTER TABLE user_profiles ADD CONSTRAINT check_user_profiles CHECK (language IN ('en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'ja', 'ko', 'zh'));
11. Add column session_type to table user_sessions
Type: sql
Script:
ALTER TABLE user_sessions ADD COLUMN session_type varchar NOT NULL DEFAULT web;
12. Add column is_mobile to table user_sessions
Type: sql
Script:
ALTER TABLE user_sessions ADD COLUMN is_mobile boolean NOT NULL DEFAULT False;
13. Add check constraint to user_sessions
Type: sql
Script:
ALTER TABLE user_sessions ADD CONSTRAINT check_user_sessions CHECK (session_type IN ('web', 'mobile', 'api', 'admin'));
RECOMMENDATIONS
----------------------------------------
1. Conduct thorough testing with realistic data volumes
2. Implement monitoring for migration success metrics
3. Test all migration scripts in staging environment
4. Implement migration progress monitoring
5. Create detailed communication plan for stakeholders
6. Implement feature flags for gradual rollout
Data Reconciliation Strategies
Overview
Data reconciliation is the process of ensuring data consistency and integrity across systems during and after migrations. This document provides comprehensive strategies, tools, and implementation patterns for detecting, measuring, and correcting data discrepancies in migration scenarios.
Core Principles
1. Eventually Consistent
Accept that perfect real-time consistency may not be achievable during migrations, but ensure eventual consistency through reconciliation processes.
2. Idempotent Operations
All reconciliation operations must be safe to run multiple times without causing additional issues.
3. Audit Trail
Maintain detailed logs of all reconciliation actions for compliance and debugging.
4. Non-Destructive
Reconciliation should prefer addition over deletion, and always maintain backups before corrections.
Types of Data Inconsistencies
1. Missing Records
Records that exist in source but not in target system.
2. Extra Records
Records that exist in target but not in source system.
3. Field Mismatches
Records exist in both systems but with different field values.
4. Referential Integrity Violations
Foreign key relationships that are broken during migration.
5. Temporal Inconsistencies
Data with incorrect timestamps or ordering.
6. Schema Drift
Structural differences between source and target schemas.
Detection Strategies
1. Row Count Validation
Simple Count Comparison
-- Compare total row counts
SELECT
'source' as system,
COUNT(*) as row_count
FROM source_table
UNION ALL
SELECT
'target' as system,
COUNT(*) as row_count
FROM target_table;Filtered Count Comparison
-- Compare counts with business logic filters
WITH source_counts AS (
SELECT
status,
created_date::date as date,
COUNT(*) as count
FROM source_orders
WHERE created_date >= '2024-01-01'
GROUP BY status, created_date::date
),
target_counts AS (
SELECT
status,
created_date::date as date,
COUNT(*) as count
FROM target_orders
WHERE created_date >= '2024-01-01'
GROUP BY status, created_date::date
)
SELECT
COALESCE(s.status, t.status) as status,
COALESCE(s.date, t.date) as date,
COALESCE(s.count, 0) as source_count,
COALESCE(t.count, 0) as target_count,
COALESCE(s.count, 0) - COALESCE(t.count, 0) as difference
FROM source_counts s
FULL OUTER JOIN target_counts t
ON s.status = t.status AND s.date = t.date
WHERE COALESCE(s.count, 0) != COALESCE(t.count, 0);2. Checksum-Based Validation
Record-Level Checksums
import hashlib
import json
class RecordChecksum:
def __init__(self, exclude_fields=None):
self.exclude_fields = exclude_fields or ['updated_at', 'version']
def calculate_checksum(self, record):
"""Calculate MD5 checksum for a database record"""
# Remove excluded fields and sort for consistency
filtered_record = {
k: v for k, v in record.items()
if k not in self.exclude_fields
}
# Convert to sorted JSON string for consistent hashing
normalized = json.dumps(filtered_record, sort_keys=True, default=str)
return hashlib.md5(normalized.encode('utf-8')).hexdigest()
def compare_records(self, source_record, target_record):
"""Compare two records using checksums"""
source_checksum = self.calculate_checksum(source_record)
target_checksum = self.calculate_checksum(target_record)
return {
'match': source_checksum == target_checksum,
'source_checksum': source_checksum,
'target_checksum': target_checksum
}
# Usage example
checksum_calculator = RecordChecksum(exclude_fields=['updated_at', 'migration_flag'])
source_records = fetch_records_from_source()
target_records = fetch_records_from_target()
mismatches = []
for source_id, source_record in source_records.items():
if source_id in target_records:
comparison = checksum_calculator.compare_records(
source_record, target_records[source_id]
)
if not comparison['match']:
mismatches.append({
'record_id': source_id,
'source_checksum': comparison['source_checksum'],
'target_checksum': comparison['target_checksum']
})Aggregate Checksums
-- Calculate aggregate checksums for data validation
WITH source_aggregates AS (
SELECT
DATE_TRUNC('day', created_at) as day,
status,
COUNT(*) as record_count,
SUM(amount) as total_amount,
MD5(STRING_AGG(CAST(id AS VARCHAR) || ':' || CAST(amount AS VARCHAR), '|' ORDER BY id)) as checksum
FROM source_transactions
GROUP BY DATE_TRUNC('day', created_at), status
),
target_aggregates AS (
SELECT
DATE_TRUNC('day', created_at) as day,
status,
COUNT(*) as record_count,
SUM(amount) as total_amount,
MD5(STRING_AGG(CAST(id AS VARCHAR) || ':' || CAST(amount AS VARCHAR), '|' ORDER BY id)) as checksum
FROM target_transactions
GROUP BY DATE_TRUNC('day', created_at), status
)
SELECT
COALESCE(s.day, t.day) as day,
COALESCE(s.status, t.status) as status,
COALESCE(s.record_count, 0) as source_count,
COALESCE(t.record_count, 0) as target_count,
COALESCE(s.total_amount, 0) as source_amount,
COALESCE(t.total_amount, 0) as target_amount,
s.checksum as source_checksum,
t.checksum as target_checksum,
CASE WHEN s.checksum = t.checksum THEN 'MATCH' ELSE 'MISMATCH' END as status
FROM source_aggregates s
FULL OUTER JOIN target_aggregates t
ON s.day = t.day AND s.status = t.status
WHERE s.checksum != t.checksum OR s.checksum IS NULL OR t.checksum IS NULL;3. Delta Detection
Change Data Capture (CDC) Based
class CDCReconciler:
def __init__(self, kafka_client, database_client):
self.kafka = kafka_client
self.db = database_client
self.processed_changes = set()
def process_cdc_stream(self, topic_name):
"""Process CDC events and track changes for reconciliation"""
consumer = self.kafka.consumer(topic_name)
for message in consumer:
change_event = json.loads(message.value)
change_id = f"{change_event['table']}:{change_event['key']}:{change_event['timestamp']}"
if change_id in self.processed_changes:
continue # Skip duplicate events
try:
self.apply_change(change_event)
self.processed_changes.add(change_id)
# Commit offset only after successful processing
consumer.commit()
except Exception as e:
# Log failure and continue - will be caught by reconciliation
self.log_processing_failure(change_id, str(e))
def apply_change(self, change_event):
"""Apply CDC change to target system"""
table = change_event['table']
operation = change_event['operation']
key = change_event['key']
data = change_event.get('data', {})
if operation == 'INSERT':
self.db.insert(table, data)
elif operation == 'UPDATE':
self.db.update(table, key, data)
elif operation == 'DELETE':
self.db.delete(table, key)
def reconcile_missed_changes(self, start_timestamp, end_timestamp):
"""Find and apply changes that may have been missed"""
# Query source database for changes in time window
source_changes = self.db.get_changes_in_window(
start_timestamp, end_timestamp
)
missed_changes = []
for change in source_changes:
change_id = f"{change['table']}:{change['key']}:{change['timestamp']}"
if change_id not in self.processed_changes:
missed_changes.append(change)
# Apply missed changes
for change in missed_changes:
try:
self.apply_change(change)
print(f"Applied missed change: {change['table']}:{change['key']}")
except Exception as e:
print(f"Failed to apply missed change: {e}")4. Business Logic Validation
Critical Business Rules Validation
class BusinessLogicValidator:
def __init__(self, source_db, target_db):
self.source_db = source_db
self.target_db = target_db
def validate_financial_consistency(self):
"""Validate critical financial calculations"""
validation_rules = [
{
'name': 'daily_transaction_totals',
'source_query': """
SELECT DATE(created_at) as date, SUM(amount) as total
FROM source_transactions
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY DATE(created_at)
""",
'target_query': """
SELECT DATE(created_at) as date, SUM(amount) as total
FROM target_transactions
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY DATE(created_at)
""",
'tolerance': 0.01 # Allow $0.01 difference for rounding
},
{
'name': 'customer_balance_totals',
'source_query': """
SELECT customer_id, SUM(balance) as total_balance
FROM source_accounts
GROUP BY customer_id
HAVING SUM(balance) > 0
""",
'target_query': """
SELECT customer_id, SUM(balance) as total_balance
FROM target_accounts
GROUP BY customer_id
HAVING SUM(balance) > 0
""",
'tolerance': 0.01
}
]
validation_results = []
for rule in validation_rules:
source_data = self.source_db.execute_query(rule['source_query'])
target_data = self.target_db.execute_query(rule['target_query'])
differences = self.compare_financial_data(
source_data, target_data, rule['tolerance']
)
validation_results.append({
'rule_name': rule['name'],
'differences_found': len(differences),
'differences': differences[:10], # First 10 differences
'status': 'PASS' if len(differences) == 0 else 'FAIL'
})
return validation_results
def compare_financial_data(self, source_data, target_data, tolerance):
"""Compare financial data with tolerance for rounding differences"""
source_dict = {
tuple(row[:-1]): row[-1] for row in source_data
} # Last column is the amount
target_dict = {
tuple(row[:-1]): row[-1] for row in target_data
}
differences = []
# Check for missing records and value differences
for key, source_value in source_dict.items():
if key not in target_dict:
differences.append({
'key': key,
'source_value': source_value,
'target_value': None,
'difference_type': 'MISSING_IN_TARGET'
})
else:
target_value = target_dict[key]
if abs(float(source_value) - float(target_value)) > tolerance:
differences.append({
'key': key,
'source_value': source_value,
'target_value': target_value,
'difference': float(source_value) - float(target_value),
'difference_type': 'VALUE_MISMATCH'
})
# Check for extra records in target
for key, target_value in target_dict.items():
if key not in source_dict:
differences.append({
'key': key,
'source_value': None,
'target_value': target_value,
'difference_type': 'EXTRA_IN_TARGET'
})
return differencesCorrection Strategies
1. Automated Correction
Missing Record Insertion
class AutoCorrector:
def __init__(self, source_db, target_db, dry_run=True):
self.source_db = source_db
self.target_db = target_db
self.dry_run = dry_run
self.correction_log = []
def correct_missing_records(self, table_name, key_field):
"""Add missing records from source to target"""
# Find records in source but not in target
missing_query = f"""
SELECT s.*
FROM source_{table_name} s
LEFT JOIN target_{table_name} t ON s.{key_field} = t.{key_field}
WHERE t.{key_field} IS NULL
"""
missing_records = self.source_db.execute_query(missing_query)
for record in missing_records:
correction = {
'table': table_name,
'operation': 'INSERT',
'key': record[key_field],
'data': record,
'timestamp': datetime.utcnow()
}
if not self.dry_run:
try:
self.target_db.insert(table_name, record)
correction['status'] = 'SUCCESS'
except Exception as e:
correction['status'] = 'FAILED'
correction['error'] = str(e)
else:
correction['status'] = 'DRY_RUN'
self.correction_log.append(correction)
return len(missing_records)
def correct_field_mismatches(self, table_name, key_field, fields_to_correct):
"""Correct field value mismatches"""
mismatch_query = f"""
SELECT s.{key_field}, {', '.join([f's.{f} as source_{f}, t.{f} as target_{f}' for f in fields_to_correct])}
FROM source_{table_name} s
JOIN target_{table_name} t ON s.{key_field} = t.{key_field}
WHERE {' OR '.join([f's.{f} != t.{f}' for f in fields_to_correct])}
"""
mismatched_records = self.source_db.execute_query(mismatch_query)
for record in mismatched_records:
key_value = record[key_field]
updates = {}
for field in fields_to_correct:
source_value = record[f'source_{field}']
target_value = record[f'target_{field}']
if source_value != target_value:
updates[field] = source_value
if updates:
correction = {
'table': table_name,
'operation': 'UPDATE',
'key': key_value,
'updates': updates,
'timestamp': datetime.utcnow()
}
if not self.dry_run:
try:
self.target_db.update(table_name, {key_field: key_value}, updates)
correction['status'] = 'SUCCESS'
except Exception as e:
correction['status'] = 'FAILED'
correction['error'] = str(e)
else:
correction['status'] = 'DRY_RUN'
self.correction_log.append(correction)
return len(mismatched_records)2. Manual Review Process
Correction Workflow
class ManualReviewSystem:
def __init__(self, database_client):
self.db = database_client
self.review_queue = []
def queue_for_review(self, discrepancy):
"""Add discrepancy to manual review queue"""
review_item = {
'id': str(uuid.uuid4()),
'discrepancy_type': discrepancy['type'],
'table': discrepancy['table'],
'record_key': discrepancy['key'],
'source_data': discrepancy.get('source_data'),
'target_data': discrepancy.get('target_data'),
'description': discrepancy['description'],
'severity': discrepancy.get('severity', 'medium'),
'status': 'PENDING',
'created_at': datetime.utcnow(),
'reviewed_by': None,
'reviewed_at': None,
'resolution': None
}
self.review_queue.append(review_item)
# Persist to review database
self.db.insert('manual_review_queue', review_item)
return review_item['id']
def process_review(self, review_id, reviewer, action, notes=None):
"""Process manual review decision"""
review_item = self.get_review_item(review_id)
if not review_item:
raise ValueError(f"Review item {review_id} not found")
review_item.update({
'status': 'REVIEWED',
'reviewed_by': reviewer,
'reviewed_at': datetime.utcnow(),
'resolution': {
'action': action, # 'APPLY_SOURCE', 'KEEP_TARGET', 'CUSTOM_FIX'
'notes': notes
}
})
# Apply the resolution
if action == 'APPLY_SOURCE':
self.apply_source_data(review_item)
elif action == 'KEEP_TARGET':
pass # No action needed
elif action == 'CUSTOM_FIX':
# Custom fix would be applied separately
pass
# Update review record
self.db.update('manual_review_queue',
{'id': review_id},
review_item)
return review_item
def generate_review_report(self):
"""Generate summary report of manual reviews"""
reviews = self.db.query("""
SELECT
discrepancy_type,
severity,
status,
COUNT(*) as count,
MIN(created_at) as oldest_review,
MAX(created_at) as newest_review
FROM manual_review_queue
GROUP BY discrepancy_type, severity, status
ORDER BY severity DESC, discrepancy_type
""")
return reviews3. Reconciliation Scheduling
Automated Reconciliation Jobs
import schedule
import time
from datetime import datetime, timedelta
class ReconciliationScheduler:
def __init__(self, reconciler):
self.reconciler = reconciler
self.job_history = []
def setup_schedules(self):
"""Set up automated reconciliation schedules"""
# Quick reconciliation every 15 minutes during migration
schedule.every(15).minutes.do(self.quick_reconciliation)
# Comprehensive reconciliation every 4 hours
schedule.every(4).hours.do(self.comprehensive_reconciliation)
# Deep validation daily
schedule.every().day.at("02:00").do(self.deep_validation)
# Weekly business logic validation
schedule.every().sunday.at("03:00").do(self.business_logic_validation)
def quick_reconciliation(self):
"""Quick count-based reconciliation"""
job_start = datetime.utcnow()
try:
# Check critical tables only
critical_tables = [
'transactions', 'orders', 'customers', 'accounts'
]
results = []
for table in critical_tables:
count_diff = self.reconciler.check_row_counts(table)
if abs(count_diff) > 0:
results.append({
'table': table,
'count_difference': count_diff,
'severity': 'high' if abs(count_diff) > 100 else 'medium'
})
job_result = {
'job_type': 'quick_reconciliation',
'start_time': job_start,
'end_time': datetime.utcnow(),
'status': 'completed',
'issues_found': len(results),
'details': results
}
# Alert if significant issues found
if any(r['severity'] == 'high' for r in results):
self.send_alert(job_result)
except Exception as e:
job_result = {
'job_type': 'quick_reconciliation',
'start_time': job_start,
'end_time': datetime.utcnow(),
'status': 'failed',
'error': str(e)
}
self.job_history.append(job_result)
def comprehensive_reconciliation(self):
"""Comprehensive checksum-based reconciliation"""
job_start = datetime.utcnow()
try:
tables_to_check = self.get_migration_tables()
issues = []
for table in tables_to_check:
# Sample-based checksum validation
sample_issues = self.reconciler.validate_sample_checksums(
table, sample_size=1000
)
issues.extend(sample_issues)
# Auto-correct simple issues
auto_corrections = 0
for issue in issues:
if issue['auto_correctable']:
self.reconciler.auto_correct_issue(issue)
auto_corrections += 1
else:
# Queue for manual review
self.reconciler.queue_for_manual_review(issue)
job_result = {
'job_type': 'comprehensive_reconciliation',
'start_time': job_start,
'end_time': datetime.utcnow(),
'status': 'completed',
'total_issues': len(issues),
'auto_corrections': auto_corrections,
'manual_reviews_queued': len(issues) - auto_corrections
}
except Exception as e:
job_result = {
'job_type': 'comprehensive_reconciliation',
'start_time': job_start,
'end_time': datetime.utcnow(),
'status': 'failed',
'error': str(e)
}
self.job_history.append(job_result)
def run_scheduler(self):
"""Run the reconciliation scheduler"""
print("Starting reconciliation scheduler...")
while True:
schedule.run_pending()
time.sleep(60) # Check every minuteMonitoring and Reporting
1. Reconciliation Metrics
class ReconciliationMetrics:
def __init__(self, prometheus_client):
self.prometheus = prometheus_client
# Define metrics
self.inconsistencies_found = Counter(
'reconciliation_inconsistencies_total',
'Number of inconsistencies found',
['table', 'type', 'severity']
)
self.reconciliation_duration = Histogram(
'reconciliation_duration_seconds',
'Time spent on reconciliation jobs',
['job_type']
)
self.auto_corrections = Counter(
'reconciliation_auto_corrections_total',
'Number of automatically corrected inconsistencies',
['table', 'correction_type']
)
self.data_drift_gauge = Gauge(
'data_drift_percentage',
'Percentage of records with inconsistencies',
['table']
)
def record_inconsistency(self, table, inconsistency_type, severity):
"""Record a found inconsistency"""
self.inconsistencies_found.labels(
table=table,
type=inconsistency_type,
severity=severity
).inc()
def record_auto_correction(self, table, correction_type):
"""Record an automatic correction"""
self.auto_corrections.labels(
table=table,
correction_type=correction_type
).inc()
def update_data_drift(self, table, drift_percentage):
"""Update data drift gauge"""
self.data_drift_gauge.labels(table=table).set(drift_percentage)
def record_job_duration(self, job_type, duration_seconds):
"""Record reconciliation job duration"""
self.reconciliation_duration.labels(job_type=job_type).observe(duration_seconds)2. Alerting Rules
# Prometheus alerting rules for data reconciliation
groups:
- name: data_reconciliation
rules:
- alert: HighDataInconsistency
expr: reconciliation_inconsistencies_total > 100
for: 5m
labels:
severity: critical
annotations:
summary: "High number of data inconsistencies detected"
description: "{{ $value }} inconsistencies found in the last 5 minutes"
- alert: DataDriftHigh
expr: data_drift_percentage > 5
for: 10m
labels:
severity: warning
annotations:
summary: "Data drift percentage is high"
description: "{{ $labels.table }} has {{ $value }}% data drift"
- alert: ReconciliationJobFailed
expr: up{job="reconciliation"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Reconciliation job is down"
description: "The data reconciliation service is not responding"
- alert: AutoCorrectionRateHigh
expr: rate(reconciliation_auto_corrections_total[10m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "High rate of automatic corrections"
description: "Auto-correction rate is {{ $value }} per second"3. Dashboard and Reporting
class ReconciliationDashboard:
def __init__(self, database_client, metrics_client):
self.db = database_client
self.metrics = metrics_client
def generate_daily_report(self, date=None):
"""Generate daily reconciliation report"""
if not date:
date = datetime.utcnow().date()
# Query reconciliation results for the day
daily_stats = self.db.query("""
SELECT
table_name,
inconsistency_type,
COUNT(*) as count,
AVG(CASE WHEN resolution = 'AUTO_CORRECTED' THEN 1 ELSE 0 END) as auto_correction_rate
FROM reconciliation_log
WHERE DATE(created_at) = %s
GROUP BY table_name, inconsistency_type
""", (date,))
# Generate summary
summary = {
'date': date.isoformat(),
'total_inconsistencies': sum(row['count'] for row in daily_stats),
'auto_correction_rate': sum(row['auto_correction_rate'] * row['count'] for row in daily_stats) / max(sum(row['count'] for row in daily_stats), 1),
'tables_affected': len(set(row['table_name'] for row in daily_stats)),
'details_by_table': {}
}
# Group by table
for row in daily_stats:
table = row['table_name']
if table not in summary['details_by_table']:
summary['details_by_table'][table] = []
summary['details_by_table'][table].append({
'inconsistency_type': row['inconsistency_type'],
'count': row['count'],
'auto_correction_rate': row['auto_correction_rate']
})
return summary
def generate_trend_analysis(self, days=7):
"""Generate trend analysis for reconciliation metrics"""
end_date = datetime.utcnow().date()
start_date = end_date - timedelta(days=days)
trends = self.db.query("""
SELECT
DATE(created_at) as date,
table_name,
COUNT(*) as inconsistencies,
AVG(CASE WHEN resolution = 'AUTO_CORRECTED' THEN 1 ELSE 0 END) as auto_correction_rate
FROM reconciliation_log
WHERE DATE(created_at) BETWEEN %s AND %s
GROUP BY DATE(created_at), table_name
ORDER BY date, table_name
""", (start_date, end_date))
# Calculate trends
trend_analysis = {
'period': f"{start_date} to {end_date}",
'trends': {},
'overall_trend': 'stable'
}
for table in set(row['table_name'] for row in trends):
table_data = [row for row in trends if row['table_name'] == table]
if len(table_data) >= 2:
first_count = table_data[0]['inconsistencies']
last_count = table_data[-1]['inconsistencies']
if last_count > first_count * 1.2:
trend = 'increasing'
elif last_count < first_count * 0.8:
trend = 'decreasing'
else:
trend = 'stable'
trend_analysis['trends'][table] = {
'direction': trend,
'first_day_count': first_count,
'last_day_count': last_count,
'change_percentage': ((last_count - first_count) / max(first_count, 1)) * 100
}
return trend_analysisAdvanced Reconciliation Techniques
1. Machine Learning-Based Anomaly Detection
from sklearn.isolation import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
class MLAnomalyDetector:
def __init__(self):
self.models = {}
self.scalers = {}
def train_anomaly_detector(self, table_name, training_data):
"""Train anomaly detection model for a specific table"""
# Prepare features (convert records to numerical features)
features = self.extract_features(training_data)
# Scale features
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
# Train isolation forest
model = IsolationForest(contamination=0.05, random_state=42)
model.fit(scaled_features)
# Store model and scaler
self.models[table_name] = model
self.scalers[table_name] = scaler
def detect_anomalies(self, table_name, data):
"""Detect anomalous records that may indicate reconciliation issues"""
if table_name not in self.models:
raise ValueError(f"No trained model for table {table_name}")
# Extract features
features = self.extract_features(data)
# Scale features
scaled_features = self.scalers[table_name].transform(features)
# Predict anomalies
anomaly_scores = self.models[table_name].decision_function(scaled_features)
anomaly_predictions = self.models[table_name].predict(scaled_features)
# Return anomalous records with scores
anomalies = []
for i, (record, score, is_anomaly) in enumerate(zip(data, anomaly_scores, anomaly_predictions)):
if is_anomaly == -1: # Isolation forest returns -1 for anomalies
anomalies.append({
'record_index': i,
'record': record,
'anomaly_score': score,
'severity': 'high' if score < -0.5 else 'medium'
})
return anomalies
def extract_features(self, data):
"""Extract numerical features from database records"""
features = []
for record in data:
record_features = []
for key, value in record.items():
if isinstance(value, (int, float)):
record_features.append(value)
elif isinstance(value, str):
# Convert string to hash-based feature
record_features.append(hash(value) % 10000)
elif isinstance(value, datetime):
# Convert datetime to timestamp
record_features.append(value.timestamp())
else:
# Default value for other types
record_features.append(0)
features.append(record_features)
return np.array(features)2. Probabilistic Reconciliation
import random
from typing import List, Dict, Tuple
class ProbabilisticReconciler:
def __init__(self, confidence_threshold=0.95):
self.confidence_threshold = confidence_threshold
def statistical_sampling_validation(self, table_name: str, population_size: int) -> Dict:
"""Use statistical sampling to validate large datasets"""
# Calculate sample size for 95% confidence, 5% margin of error
confidence_level = 0.95
margin_of_error = 0.05
z_score = 1.96 # for 95% confidence
p = 0.5 # assume 50% error rate for maximum sample size
sample_size = (z_score ** 2 * p * (1 - p)) / (margin_of_error ** 2)
if population_size < 10000:
# Finite population correction
sample_size = sample_size / (1 + (sample_size - 1) / population_size)
sample_size = min(int(sample_size), population_size)
# Generate random sample
sample_ids = self.generate_random_sample(table_name, sample_size)
# Validate sample
sample_results = self.validate_sample_records(table_name, sample_ids)
# Calculate population estimates
error_rate = sample_results['errors'] / sample_size
estimated_errors = int(population_size * error_rate)
# Calculate confidence interval
standard_error = (error_rate * (1 - error_rate) / sample_size) ** 0.5
margin_of_error_actual = z_score * standard_error
confidence_interval = (
max(0, error_rate - margin_of_error_actual),
min(1, error_rate + margin_of_error_actual)
)
return {
'table_name': table_name,
'population_size': population_size,
'sample_size': sample_size,
'sample_error_rate': error_rate,
'estimated_total_errors': estimated_errors,
'confidence_interval': confidence_interval,
'confidence_level': confidence_level,
'recommendation': self.generate_recommendation(error_rate, confidence_interval)
}
def generate_random_sample(self, table_name: str, sample_size: int) -> List[int]:
"""Generate random sample of record IDs"""
# Get total record count and ID range
id_range = self.db.query(f"SELECT MIN(id), MAX(id) FROM {table_name}")[0]
min_id, max_id = id_range
# Generate random IDs
sample_ids = []
attempts = 0
max_attempts = sample_size * 10 # Avoid infinite loop
while len(sample_ids) < sample_size and attempts < max_attempts:
candidate_id = random.randint(min_id, max_id)
# Check if ID exists
exists = self.db.query(f"SELECT 1 FROM {table_name} WHERE id = %s", (candidate_id,))
if exists and candidate_id not in sample_ids:
sample_ids.append(candidate_id)
attempts += 1
return sample_ids
def validate_sample_records(self, table_name: str, sample_ids: List[int]) -> Dict:
"""Validate a sample of records"""
validation_results = {
'total_checked': len(sample_ids),
'errors': 0,
'error_details': []
}
for record_id in sample_ids:
# Get record from both source and target
source_record = self.source_db.get_record(table_name, record_id)
target_record = self.target_db.get_record(table_name, record_id)
if not target_record:
validation_results['errors'] += 1
validation_results['error_details'].append({
'id': record_id,
'error_type': 'MISSING_IN_TARGET'
})
elif not self.records_match(source_record, target_record):
validation_results['errors'] += 1
validation_results['error_details'].append({
'id': record_id,
'error_type': 'DATA_MISMATCH',
'differences': self.find_differences(source_record, target_record)
})
return validation_results
def generate_recommendation(self, error_rate: float, confidence_interval: Tuple[float, float]) -> str:
"""Generate recommendation based on error rate and confidence"""
if confidence_interval[1] < 0.01: # Less than 1% error rate with confidence
return "Data quality is excellent. Continue with normal reconciliation schedule."
elif confidence_interval[1] < 0.05: # Less than 5% error rate with confidence
return "Data quality is acceptable. Monitor closely and investigate sample errors."
elif confidence_interval[0] > 0.1: # More than 10% error rate with confidence
return "Data quality is poor. Immediate comprehensive reconciliation required."
else:
return "Data quality is uncertain. Increase sample size for better estimates."Performance Optimization
1. Parallel Processing
import asyncio
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
class ParallelReconciler:
def __init__(self, max_workers=None):
self.max_workers = max_workers or mp.cpu_count()
async def parallel_table_reconciliation(self, tables: List[str]):
"""Reconcile multiple tables in parallel"""
async with asyncio.Semaphore(self.max_workers):
tasks = [
self.reconcile_table_async(table)
for table in tables
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
summary = {
'total_tables': len(tables),
'successful': 0,
'failed': 0,
'results': {}
}
for table, result in zip(tables, results):
if isinstance(result, Exception):
summary['failed'] += 1
summary['results'][table] = {
'status': 'failed',
'error': str(result)
}
else:
summary['successful'] += 1
summary['results'][table] = result
return summary
def parallel_chunk_processing(self, table_name: str, chunk_size: int = 10000):
"""Process table reconciliation in parallel chunks"""
# Get total record count
total_records = self.db.get_record_count(table_name)
num_chunks = (total_records + chunk_size - 1) // chunk_size
# Create chunk specifications
chunks = []
for i in range(num_chunks):
start_id = i * chunk_size
end_id = min((i + 1) * chunk_size - 1, total_records - 1)
chunks.append({
'table': table_name,
'start_id': start_id,
'end_id': end_id,
'chunk_number': i + 1
})
# Process chunks in parallel
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
chunk_results = list(executor.map(self.process_chunk, chunks))
# Aggregate results
total_inconsistencies = sum(r['inconsistencies'] for r in chunk_results)
total_corrections = sum(r['corrections'] for r in chunk_results)
return {
'table': table_name,
'total_records': total_records,
'chunks_processed': len(chunks),
'total_inconsistencies': total_inconsistencies,
'total_corrections': total_corrections,
'chunk_details': chunk_results
}
def process_chunk(self, chunk_spec: Dict) -> Dict:
"""Process a single chunk of records"""
# This runs in a separate process
table = chunk_spec['table']
start_id = chunk_spec['start_id']
end_id = chunk_spec['end_id']
# Initialize database connections for this process
local_source_db = SourceDatabase()
local_target_db = TargetDatabase()
# Get records in chunk
source_records = local_source_db.get_records_range(table, start_id, end_id)
target_records = local_target_db.get_records_range(table, start_id, end_id)
# Reconcile chunk
inconsistencies = 0
corrections = 0
for source_record in source_records:
target_record = target_records.get(source_record['id'])
if not target_record:
inconsistencies += 1
# Auto-correct if possible
try:
local_target_db.insert(table, source_record)
corrections += 1
except Exception:
pass # Log error in production
elif not self.records_match(source_record, target_record):
inconsistencies += 1
# Auto-correct field mismatches
try:
updates = self.calculate_updates(source_record, target_record)
local_target_db.update(table, source_record['id'], updates)
corrections += 1
except Exception:
pass # Log error in production
return {
'chunk_number': chunk_spec['chunk_number'],
'start_id': start_id,
'end_id': end_id,
'records_processed': len(source_records),
'inconsistencies': inconsistencies,
'corrections': corrections
}2. Incremental Reconciliation
class IncrementalReconciler:
def __init__(self, source_db, target_db):
self.source_db = source_db
self.target_db = target_db
self.last_reconciliation_times = {}
def incremental_reconciliation(self, table_name: str):
"""Reconcile only records changed since last reconciliation"""
last_reconciled = self.get_last_reconciliation_time(table_name)
# Get records modified since last reconciliation
modified_source = self.source_db.get_records_modified_since(
table_name, last_reconciled
)
modified_target = self.target_db.get_records_modified_since(
table_name, last_reconciled
)
# Create lookup dictionaries
source_dict = {r['id']: r for r in modified_source}
target_dict = {r['id']: r for r in modified_target}
# Find all record IDs to check
all_ids = set(source_dict.keys()) | set(target_dict.keys())
inconsistencies = []
for record_id in all_ids:
source_record = source_dict.get(record_id)
target_record = target_dict.get(record_id)
if source_record and not target_record:
inconsistencies.append({
'type': 'missing_in_target',
'table': table_name,
'id': record_id,
'source_record': source_record
})
elif not source_record and target_record:
inconsistencies.append({
'type': 'extra_in_target',
'table': table_name,
'id': record_id,
'target_record': target_record
})
elif source_record and target_record:
if not self.records_match(source_record, target_record):
inconsistencies.append({
'type': 'data_mismatch',
'table': table_name,
'id': record_id,
'source_record': source_record,
'target_record': target_record,
'differences': self.find_differences(source_record, target_record)
})
# Update last reconciliation time
self.update_last_reconciliation_time(table_name, datetime.utcnow())
return {
'table': table_name,
'reconciliation_time': datetime.utcnow(),
'records_checked': len(all_ids),
'inconsistencies_found': len(inconsistencies),
'inconsistencies': inconsistencies
}
def get_last_reconciliation_time(self, table_name: str) -> datetime:
"""Get the last reconciliation timestamp for a table"""
result = self.source_db.query("""
SELECT last_reconciled_at
FROM reconciliation_metadata
WHERE table_name = %s
""", (table_name,))
if result:
return result[0]['last_reconciled_at']
else:
# First time reconciliation - start from beginning of migration
return self.get_migration_start_time()
def update_last_reconciliation_time(self, table_name: str, timestamp: datetime):
"""Update the last reconciliation timestamp"""
self.source_db.execute("""
INSERT INTO reconciliation_metadata (table_name, last_reconciled_at)
VALUES (%s, %s)
ON CONFLICT (table_name)
DO UPDATE SET last_reconciled_at = %s
""", (table_name, timestamp, timestamp))This comprehensive guide provides the framework and tools necessary for implementing robust data reconciliation strategies during migrations, ensuring data integrity and consistency while minimizing business disruption.
Migration Patterns Catalog
Overview
This catalog provides detailed descriptions of proven migration patterns, their use cases, implementation guidelines, and best practices. Each pattern includes code examples, diagrams, and lessons learned from real-world implementations.
Database Migration Patterns
1. Expand-Contract Pattern
Use Case: Schema evolution with zero downtime Complexity: Medium Risk Level: Low-Medium
Description
The Expand-Contract pattern allows for schema changes without downtime by following a three-phase approach:
- Expand: Add new schema elements alongside existing ones
- Migrate: Dual-write to both old and new schema during transition
- Contract: Remove old schema elements after validation
Implementation Steps
-- Phase 1: Expand
ALTER TABLE users ADD COLUMN email_new VARCHAR(255);
CREATE INDEX CONCURRENTLY idx_users_email_new ON users(email_new);
-- Phase 2: Migrate (Application Code)
-- Write to both columns during transition period
INSERT INTO users (name, email, email_new) VALUES (?, ?, ?);
-- Backfill existing data
UPDATE users SET email_new = email WHERE email_new IS NULL;
-- Phase 3: Contract (after validation)
ALTER TABLE users DROP COLUMN email;
ALTER TABLE users RENAME COLUMN email_new TO email;Pros and Cons
Pros:
- Zero downtime deployments
- Safe rollback at any point
- Gradual transition with validation
Cons:
- Increased storage during transition
- More complex application logic
- Extended migration timeline
2. Parallel Schema Pattern
Use Case: Major database restructuring Complexity: High Risk Level: Medium
Description
Run new and old schemas in parallel, using feature flags to gradually route traffic to the new schema while maintaining the ability to rollback quickly.
Implementation Example
class DatabaseRouter:
def __init__(self, feature_flag_service):
self.feature_flags = feature_flag_service
self.old_db = OldDatabaseConnection()
self.new_db = NewDatabaseConnection()
def route_query(self, user_id, query_type):
if self.feature_flags.is_enabled("new_schema", user_id):
return self.new_db.execute(query_type)
else:
return self.old_db.execute(query_type)
def dual_write(self, data):
# Write to both databases for consistency
success_old = self.old_db.write(data)
success_new = self.new_db.write(transform_data(data))
if not (success_old and success_new):
# Handle partial failures
self.handle_dual_write_failure(data, success_old, success_new)Best Practices
- Implement data consistency checks between schemas
- Use circuit breakers for automatic failover
- Monitor performance impact of dual writes
- Plan for data reconciliation processes
3. Event Sourcing Migration
Use Case: Migrating systems with complex business logic Complexity: High Risk Level: Medium-High
Description
Capture all changes as events during migration, enabling replay and reconciliation capabilities.
Event Store Schema
CREATE TABLE migration_events (
event_id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_data JSONB NOT NULL,
event_version INTEGER NOT NULL,
occurred_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE
);Migration Event Handler
class MigrationEventHandler:
def __init__(self, old_store, new_store):
self.old_store = old_store
self.new_store = new_store
self.event_log = []
def handle_update(self, entity_id, old_data, new_data):
# Log the change as an event
event = MigrationEvent(
entity_id=entity_id,
event_type="entity_migrated",
old_data=old_data,
new_data=new_data,
timestamp=datetime.now()
)
self.event_log.append(event)
# Apply to new store
success = self.new_store.update(entity_id, new_data)
if not success:
# Mark for retry
event.status = "failed"
self.schedule_retry(event)
return success
def replay_events(self, from_timestamp=None):
"""Replay events for reconciliation"""
events = self.get_events_since(from_timestamp)
for event in events:
self.apply_event(event)Service Migration Patterns
1. Strangler Fig Pattern
Use Case: Legacy system replacement Complexity: Medium-High Risk Level: Medium
Description
Gradually replace legacy functionality by intercepting calls and routing them to new services, eventually "strangling" the legacy system.
Implementation Architecture
# API Gateway Configuration
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: user-service-migration
spec:
http:
- match:
- headers:
migration-flag:
exact: "new"
route:
- destination:
host: user-service-v2
- route:
- destination:
host: user-service-v1Strangler Proxy Implementation
class StranglerProxy:
def __init__(self):
self.legacy_service = LegacyUserService()
self.new_service = NewUserService()
self.feature_flags = FeatureFlagService()
def handle_request(self, request):
route = self.determine_route(request)
if route == "new":
return self.handle_with_new_service(request)
elif route == "both":
return self.handle_with_both_services(request)
else:
return self.handle_with_legacy_service(request)
def determine_route(self, request):
user_id = request.get('user_id')
if self.feature_flags.is_enabled("new_user_service", user_id):
if self.feature_flags.is_enabled("dual_write", user_id):
return "both"
else:
return "new"
else:
return "legacy"2. Parallel Run Pattern
Use Case: Risk mitigation for critical services Complexity: Medium Risk Level: Low-Medium
Description
Run both old and new services simultaneously, comparing outputs to validate correctness before switching traffic.
Implementation
class ParallelRunManager:
def __init__(self):
self.primary_service = PrimaryService()
self.candidate_service = CandidateService()
self.comparator = ResponseComparator()
self.metrics = MetricsCollector()
async def parallel_execute(self, request):
# Execute both services concurrently
primary_task = asyncio.create_task(
self.primary_service.process(request)
)
candidate_task = asyncio.create_task(
self.candidate_service.process(request)
)
# Always wait for primary
primary_result = await primary_task
try:
# Wait for candidate with timeout
candidate_result = await asyncio.wait_for(
candidate_task, timeout=5.0
)
# Compare results
comparison = self.comparator.compare(
primary_result, candidate_result
)
# Record metrics
self.metrics.record_comparison(comparison)
except asyncio.TimeoutError:
self.metrics.record_timeout("candidate")
except Exception as e:
self.metrics.record_error("candidate", str(e))
# Always return primary result
return primary_result3. Blue-Green Deployment Pattern
Use Case: Zero-downtime service updates Complexity: Low-Medium Risk Level: Low
Description
Maintain two identical production environments (blue and green), switching traffic between them for deployments.
Kubernetes Implementation
# Blue Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: app-blue
labels:
version: blue
spec:
replicas: 3
selector:
matchLabels:
app: myapp
version: blue
template:
metadata:
labels:
app: myapp
version: blue
spec:
containers:
- name: app
image: myapp:v1.0.0
---
# Green Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: app-green
labels:
version: green
spec:
replicas: 3
selector:
matchLabels:
app: myapp
version: green
template:
metadata:
labels:
app: myapp
version: green
spec:
containers:
- name: app
image: myapp:v2.0.0
---
# Service (switches between blue and green)
apiVersion: v1
kind: Service
metadata:
name: app-service
spec:
selector:
app: myapp
version: blue # Change to green for deployment
ports:
- port: 80
targetPort: 8080Infrastructure Migration Patterns
1. Lift and Shift Pattern
Use Case: Quick cloud migration with minimal changes Complexity: Low-Medium Risk Level: Low
Description
Migrate applications to cloud infrastructure with minimal or no code changes, focusing on infrastructure compatibility.
Migration Checklist
Pre-Migration Assessment:
- inventory_current_infrastructure:
- servers_and_specifications
- network_configuration
- storage_requirements
- security_configurations
- identify_dependencies:
- database_connections
- external_service_integrations
- file_system_dependencies
- assess_compatibility:
- operating_system_versions
- runtime_dependencies
- license_requirements
Migration Execution:
- provision_target_infrastructure:
- compute_instances
- storage_volumes
- network_configuration
- security_groups
- migrate_data:
- database_backup_restore
- file_system_replication
- configuration_files
- update_configurations:
- connection_strings
- environment_variables
- dns_records
- validate_functionality:
- application_health_checks
- end_to_end_testing
- performance_validation2. Hybrid Cloud Migration
Use Case: Gradual cloud adoption with on-premises integration Complexity: High Risk Level: Medium-High
Description
Maintain some components on-premises while migrating others to cloud, requiring secure connectivity and data synchronization.
Network Architecture
# Terraform configuration for hybrid connectivity
resource "aws_vpc" "main" {
cidr_block = "10.0.0.0/16"
enable_dns_hostnames = true
enable_dns_support = true
}
resource "aws_vpn_gateway" "main" {
vpc_id = aws_vpc.main.id
tags = {
Name = "hybrid-vpn-gateway"
}
}
resource "aws_customer_gateway" "main" {
bgp_asn = 65000
ip_address = var.on_premises_public_ip
type = "ipsec.1"
tags = {
Name = "on-premises-gateway"
}
}
resource "aws_vpn_connection" "main" {
vpn_gateway_id = aws_vpn_gateway.main.id
customer_gateway_id = aws_customer_gateway.main.id
type = "ipsec.1"
static_routes_only = true
}Data Synchronization Pattern
class HybridDataSync:
def __init__(self):
self.on_prem_db = OnPremiseDatabase()
self.cloud_db = CloudDatabase()
self.sync_log = SyncLogManager()
async def bidirectional_sync(self):
"""Synchronize data between on-premises and cloud"""
# Get last sync timestamp
last_sync = self.sync_log.get_last_sync_time()
# Sync on-prem changes to cloud
on_prem_changes = self.on_prem_db.get_changes_since(last_sync)
for change in on_prem_changes:
await self.apply_change_to_cloud(change)
# Sync cloud changes to on-prem
cloud_changes = self.cloud_db.get_changes_since(last_sync)
for change in cloud_changes:
await self.apply_change_to_on_prem(change)
# Handle conflicts
conflicts = self.detect_conflicts(on_prem_changes, cloud_changes)
for conflict in conflicts:
await self.resolve_conflict(conflict)
# Update sync timestamp
self.sync_log.record_sync_completion()
async def apply_change_to_cloud(self, change):
"""Apply on-premises change to cloud database"""
try:
if change.operation == "INSERT":
await self.cloud_db.insert(change.table, change.data)
elif change.operation == "UPDATE":
await self.cloud_db.update(change.table, change.key, change.data)
elif change.operation == "DELETE":
await self.cloud_db.delete(change.table, change.key)
self.sync_log.record_success(change.id, "cloud")
except Exception as e:
self.sync_log.record_failure(change.id, "cloud", str(e))
raise3. Multi-Cloud Migration
Use Case: Avoiding vendor lock-in or regulatory requirements Complexity: Very High Risk Level: High
Description
Distribute workloads across multiple cloud providers for resilience, compliance, or cost optimization.
Service Mesh Configuration
# Istio configuration for multi-cloud service mesh
apiVersion: networking.istio.io/v1beta1
kind: ServiceEntry
metadata:
name: aws-service
spec:
hosts:
- aws-service.company.com
ports:
- number: 443
name: https
protocol: HTTPS
location: MESH_EXTERNAL
resolution: DNS
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: multi-cloud-routing
spec:
hosts:
- user-service
http:
- match:
- headers:
region:
exact: "us-east"
route:
- destination:
host: aws-service.company.com
weight: 100
- match:
- headers:
region:
exact: "eu-west"
route:
- destination:
host: gcp-service.company.com
weight: 100
- route: # Default routing
- destination:
host: user-service
subset: local
weight: 80
- destination:
host: aws-service.company.com
weight: 20Feature Flag Patterns
1. Progressive Rollout Pattern
Use Case: Gradual feature deployment with risk mitigation Implementation:
class ProgressiveRollout:
def __init__(self, feature_name):
self.feature_name = feature_name
self.rollout_percentage = 0
self.user_buckets = {}
def is_enabled_for_user(self, user_id):
# Consistent user bucketing
user_hash = hashlib.md5(f"{self.feature_name}:{user_id}".encode()).hexdigest()
bucket = int(user_hash, 16) % 100
return bucket < self.rollout_percentage
def increase_rollout(self, target_percentage, step_size=10):
"""Gradually increase rollout percentage"""
while self.rollout_percentage < target_percentage:
self.rollout_percentage = min(
self.rollout_percentage + step_size,
target_percentage
)
# Monitor metrics before next increase
yield self.rollout_percentage
time.sleep(300) # Wait 5 minutes between increases2. Circuit Breaker Pattern
Use Case: Automatic fallback during migration issues
class MigrationCircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call_new_service(self, request):
if self.state == 'OPEN':
if self.should_attempt_reset():
self.state = 'HALF_OPEN'
else:
return self.fallback_to_legacy(request)
try:
response = self.new_service.process(request)
self.on_success()
return response
except Exception as e:
self.on_failure()
return self.fallback_to_legacy(request)
def on_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
def should_attempt_reset(self):
return (time.time() - self.last_failure_time) >= self.timeoutMigration Anti-Patterns
1. Big Bang Migration (Anti-Pattern)
Why to Avoid:
- High risk of complete system failure
- Difficult to rollback
- Extended downtime
- All-or-nothing deployment
Better Alternative: Use incremental migration patterns like Strangler Fig or Parallel Run.
2. No Rollback Plan (Anti-Pattern)
Why to Avoid:
- Cannot recover from failures
- Increases business risk
- Panic-driven decisions during issues
Better Alternative: Always implement comprehensive rollback procedures before migration.
3. Insufficient Testing (Anti-Pattern)
Why to Avoid:
- Unknown compatibility issues
- Performance degradation
- Data corruption risks
Better Alternative: Implement comprehensive testing at each migration phase.
Pattern Selection Matrix
| Migration Type | Complexity | Downtime Tolerance | Recommended Pattern |
|---|---|---|---|
| Schema Change | Low | Zero | Expand-Contract |
| Schema Change | High | Zero | Parallel Schema |
| Service Replace | Medium | Zero | Strangler Fig |
| Service Update | Low | Zero | Blue-Green |
| Data Migration | High | Some | Event Sourcing |
| Infrastructure | Low | Some | Lift and Shift |
| Infrastructure | High | Zero | Hybrid Cloud |
Success Metrics
Technical Metrics
- Migration completion rate
- System availability during migration
- Performance impact (response time, throughput)
- Error rate changes
- Rollback execution time
Business Metrics
- Customer impact score
- Revenue protection
- Time to value realization
- Stakeholder satisfaction
Operational Metrics
- Team efficiency
- Knowledge transfer effectiveness
- Post-migration support requirements
- Documentation completeness
Lessons Learned
Common Pitfalls
- Underestimating data dependencies - Always map all data relationships
- Insufficient monitoring - Implement comprehensive observability before migration
- Poor communication - Keep all stakeholders informed throughout the process
- Rushed timelines - Allow adequate time for testing and validation
- Ignoring performance impact - Benchmark before and after migration
Best Practices
- Start with low-risk migrations - Build confidence and experience
- Automate everything possible - Reduce human error and increase repeatability
- Test rollback procedures - Ensure you can recover from any failure
- Monitor continuously - Use real-time dashboards and alerting
- Document everything - Create comprehensive runbooks and documentation
This catalog serves as a reference for selecting appropriate migration patterns based on specific requirements, risk tolerance, and technical constraints.
Zero-Downtime Migration Techniques
Overview
Zero-downtime migrations are critical for maintaining business continuity and user experience during system changes. This guide provides comprehensive techniques, patterns, and implementation strategies for achieving true zero-downtime migrations across different system components.
Core Principles
1. Backward Compatibility
Every change must be backward compatible until all clients have migrated to the new version.
2. Incremental Changes
Break large changes into smaller, independent increments that can be deployed and validated separately.
3. Feature Flags
Use feature toggles to control the rollout of new functionality without code deployments.
4. Graceful Degradation
Ensure systems continue to function even when some components are unavailable or degraded.
Database Zero-Downtime Techniques
Schema Evolution Without Downtime
1. Additive Changes Only
Principle: Only add new elements; never remove or modify existing ones directly.
-- ✅ Good: Additive change
ALTER TABLE users ADD COLUMN middle_name VARCHAR(50);
-- ❌ Bad: Breaking change
ALTER TABLE users DROP COLUMN email;2. Multi-Phase Schema Evolution
Phase 1: Expand
-- Add new column alongside existing one
ALTER TABLE users ADD COLUMN email_address VARCHAR(255);
-- Add index concurrently (PostgreSQL)
CREATE INDEX CONCURRENTLY idx_users_email_address ON users(email_address);Phase 2: Dual Write (Application Code)
class UserService:
def create_user(self, name, email):
# Write to both old and new columns
user = User(
name=name,
email=email, # Old column
email_address=email # New column
)
return user.save()
def update_email(self, user_id, new_email):
# Update both columns
user = User.objects.get(id=user_id)
user.email = new_email
user.email_address = new_email
user.save()
return userPhase 3: Backfill Data
-- Backfill existing data (in batches)
UPDATE users
SET email_address = email
WHERE email_address IS NULL
AND id BETWEEN ? AND ?;Phase 4: Switch Reads
class UserService:
def get_user_email(self, user_id):
user = User.objects.get(id=user_id)
# Switch to reading from new column
return user.email_address or user.emailPhase 5: Contract
-- After validation, remove old column
ALTER TABLE users DROP COLUMN email;
-- Rename new column if needed
ALTER TABLE users RENAME COLUMN email_address TO email;3. Online Schema Changes
PostgreSQL Techniques
-- Safe column addition
ALTER TABLE orders ADD COLUMN status_new VARCHAR(20) DEFAULT 'pending';
-- Safe index creation
CREATE INDEX CONCURRENTLY idx_orders_status_new ON orders(status_new);
-- Safe constraint addition (after data validation)
ALTER TABLE orders ADD CONSTRAINT check_status_new
CHECK (status_new IN ('pending', 'processing', 'completed', 'cancelled'));MySQL Techniques
-- Use pt-online-schema-change for large tables
pt-online-schema-change \
--alter "ADD COLUMN status VARCHAR(20) DEFAULT 'pending'" \
--execute \
D=mydb,t=orders
-- Online DDL (MySQL 5.6+)
ALTER TABLE orders
ADD COLUMN priority INT DEFAULT 1,
ALGORITHM=INPLACE,
LOCK=NONE;4. Data Migration Strategies
Chunked Data Migration
class DataMigrator:
def __init__(self, source_table, target_table, chunk_size=1000):
self.source_table = source_table
self.target_table = target_table
self.chunk_size = chunk_size
def migrate_data(self):
last_id = 0
total_migrated = 0
while True:
# Get next chunk
chunk = self.get_chunk(last_id, self.chunk_size)
if not chunk:
break
# Transform and migrate chunk
for record in chunk:
transformed = self.transform_record(record)
self.insert_or_update(transformed)
last_id = chunk[-1]['id']
total_migrated += len(chunk)
# Brief pause to avoid overwhelming the database
time.sleep(0.1)
self.log_progress(total_migrated)
return total_migrated
def get_chunk(self, last_id, limit):
return db.execute(f"""
SELECT * FROM {self.source_table}
WHERE id > %s
ORDER BY id
LIMIT %s
""", (last_id, limit))Change Data Capture (CDC)
class CDCProcessor:
def __init__(self):
self.kafka_consumer = KafkaConsumer('db_changes')
self.target_db = TargetDatabase()
def process_changes(self):
for message in self.kafka_consumer:
change = json.loads(message.value)
if change['operation'] == 'INSERT':
self.handle_insert(change)
elif change['operation'] == 'UPDATE':
self.handle_update(change)
elif change['operation'] == 'DELETE':
self.handle_delete(change)
def handle_insert(self, change):
transformed_data = self.transform_data(change['after'])
self.target_db.insert(change['table'], transformed_data)
def handle_update(self, change):
key = change['key']
transformed_data = self.transform_data(change['after'])
self.target_db.update(change['table'], key, transformed_data)Application Zero-Downtime Techniques
1. Blue-Green Deployments
Infrastructure Setup
# Blue Environment (Current Production)
apiVersion: apps/v1
kind: Deployment
metadata:
name: app-blue
labels:
version: blue
app: myapp
spec:
replicas: 3
selector:
matchLabels:
app: myapp
version: blue
template:
metadata:
labels:
app: myapp
version: blue
spec:
containers:
- name: app
image: myapp:1.0.0
ports:
- containerPort: 8080
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 15
periodSeconds: 10
---
# Green Environment (New Version)
apiVersion: apps/v1
kind: Deployment
metadata:
name: app-green
labels:
version: green
app: myapp
spec:
replicas: 3
selector:
matchLabels:
app: myapp
version: green
template:
metadata:
labels:
app: myapp
version: green
spec:
containers:
- name: app
image: myapp:2.0.0
ports:
- containerPort: 8080
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 5Service Switching
# Service (switches between blue and green)
apiVersion: v1
kind: Service
metadata:
name: app-service
spec:
selector:
app: myapp
version: blue # Switch to 'green' for deployment
ports:
- port: 80
targetPort: 8080
type: LoadBalancerAutomated Deployment Script
#!/bin/bash
# Blue-Green Deployment Script
NAMESPACE="production"
APP_NAME="myapp"
NEW_IMAGE="myapp:2.0.0"
# Determine current and target environments
CURRENT_VERSION=$(kubectl get service $APP_NAME-service -o jsonpath='{.spec.selector.version}')
if [ "$CURRENT_VERSION" = "blue" ]; then
TARGET_VERSION="green"
else
TARGET_VERSION="blue"
fi
echo "Current version: $CURRENT_VERSION"
echo "Target version: $TARGET_VERSION"
# Update target environment with new image
kubectl set image deployment/$APP_NAME-$TARGET_VERSION app=$NEW_IMAGE
# Wait for rollout to complete
kubectl rollout status deployment/$APP_NAME-$TARGET_VERSION --timeout=300s
# Run health checks
echo "Running health checks..."
TARGET_IP=$(kubectl get service $APP_NAME-$TARGET_VERSION -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
for i in {1..30}; do
if curl -f http://$TARGET_IP/health; then
echo "Health check passed"
break
fi
if [ $i -eq 30 ]; then
echo "Health check failed after 30 attempts"
exit 1
fi
sleep 2
done
# Switch traffic to new version
kubectl patch service $APP_NAME-service -p '{"spec":{"selector":{"version":"'$TARGET_VERSION'"}}}'
echo "Traffic switched to $TARGET_VERSION"
# Monitor for 5 minutes
echo "Monitoring new version..."
sleep 300
# Check if rollback is needed
ERROR_RATE=$(curl -s "http://monitoring.company.com/api/error_rate?service=$APP_NAME" | jq '.error_rate')
if (( $(echo "$ERROR_RATE > 0.05" | bc -l) )); then
echo "Error rate too high ($ERROR_RATE), rolling back..."
kubectl patch service $APP_NAME-service -p '{"spec":{"selector":{"version":"'$CURRENT_VERSION'"}}}'
exit 1
fi
echo "Deployment successful!"2. Canary Deployments
Progressive Canary with Istio
# Destination Rule
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: myapp-destination
spec:
host: myapp
subsets:
- name: v1
labels:
version: v1
- name: v2
labels:
version: v2
---
# Virtual Service for Canary
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: myapp-canary
spec:
hosts:
- myapp
http:
- match:
- headers:
canary:
exact: "true"
route:
- destination:
host: myapp
subset: v2
- route:
- destination:
host: myapp
subset: v1
weight: 95
- destination:
host: myapp
subset: v2
weight: 5Automated Canary Controller
class CanaryController:
def __init__(self, istio_client, prometheus_client):
self.istio = istio_client
self.prometheus = prometheus_client
self.canary_weight = 5
self.max_weight = 100
self.weight_increment = 5
self.validation_window = 300 # 5 minutes
async def deploy_canary(self, app_name, new_version):
"""Deploy new version using canary strategy"""
# Start with small percentage
await self.update_traffic_split(app_name, self.canary_weight)
while self.canary_weight < self.max_weight:
# Monitor metrics for validation window
await asyncio.sleep(self.validation_window)
# Check canary health
if not await self.is_canary_healthy(app_name, new_version):
await self.rollback_canary(app_name)
raise Exception("Canary deployment failed health checks")
# Increase traffic to canary
self.canary_weight = min(
self.canary_weight + self.weight_increment,
self.max_weight
)
await self.update_traffic_split(app_name, self.canary_weight)
print(f"Canary traffic increased to {self.canary_weight}%")
print("Canary deployment completed successfully")
async def is_canary_healthy(self, app_name, version):
"""Check if canary version is healthy"""
# Check error rate
error_rate = await self.prometheus.query(
f'rate(http_requests_total{{app="{app_name}", version="{version}", status=~"5.."}}'
f'[5m]) / rate(http_requests_total{{app="{app_name}", version="{version}"}}[5m])'
)
if error_rate > 0.05: # 5% error rate threshold
return False
# Check response time
p95_latency = await self.prometheus.query(
f'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket'
f'{{app="{app_name}", version="{version}"}}[5m]))'
)
if p95_latency > 2.0: # 2 second p95 threshold
return False
return True
async def update_traffic_split(self, app_name, canary_weight):
"""Update Istio virtual service with new traffic split"""
stable_weight = 100 - canary_weight
virtual_service = {
"apiVersion": "networking.istio.io/v1beta1",
"kind": "VirtualService",
"metadata": {"name": f"{app_name}-canary"},
"spec": {
"hosts": [app_name],
"http": [{
"route": [
{
"destination": {"host": app_name, "subset": "stable"},
"weight": stable_weight
},
{
"destination": {"host": app_name, "subset": "canary"},
"weight": canary_weight
}
]
}]
}
}
await self.istio.apply_virtual_service(virtual_service)3. Rolling Updates
Kubernetes Rolling Update Strategy
apiVersion: apps/v1
kind: Deployment
metadata:
name: rolling-update-app
spec:
replicas: 10
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 2 # Can have 2 extra pods during update
maxUnavailable: 1 # At most 1 pod can be unavailable
selector:
matchLabels:
app: rolling-update-app
template:
metadata:
labels:
app: rolling-update-app
spec:
containers:
- name: app
image: myapp:2.0.0
ports:
- containerPort: 8080
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 2
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 3
livenessProbe:
httpGet:
path: /live
port: 8080
initialDelaySeconds: 10
periodSeconds: 10Custom Rolling Update Controller
class RollingUpdateController:
def __init__(self, k8s_client):
self.k8s = k8s_client
self.max_surge = 2
self.max_unavailable = 1
async def rolling_update(self, deployment_name, new_image):
"""Perform rolling update with custom logic"""
deployment = await self.k8s.get_deployment(deployment_name)
total_replicas = deployment.spec.replicas
# Calculate batch size
batch_size = min(self.max_surge, total_replicas // 5) # Update 20% at a time
updated_pods = []
for i in range(0, total_replicas, batch_size):
batch_end = min(i + batch_size, total_replicas)
# Update batch of pods
for pod_index in range(i, batch_end):
old_pod = await self.get_pod_by_index(deployment_name, pod_index)
# Create new pod with new image
new_pod = await self.create_updated_pod(old_pod, new_image)
# Wait for new pod to be ready
await self.wait_for_pod_ready(new_pod.metadata.name)
# Remove old pod
await self.k8s.delete_pod(old_pod.metadata.name)
updated_pods.append(new_pod)
# Brief pause between pod updates
await asyncio.sleep(2)
# Validate batch health before continuing
if not await self.validate_batch_health(updated_pods[-batch_size:]):
# Rollback batch
await self.rollback_batch(updated_pods[-batch_size:])
raise Exception("Rolling update failed validation")
print(f"Updated {batch_end}/{total_replicas} pods")
print("Rolling update completed successfully")Load Balancer and Traffic Management
1. Weighted Routing
NGINX Configuration
upstream backend {
# Old version - 80% traffic
server old-app-1:8080 weight=4;
server old-app-2:8080 weight=4;
# New version - 20% traffic
server new-app-1:8080 weight=1;
server new-app-2:8080 weight=1;
}
server {
listen 80;
location / {
proxy_pass http://backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# Health check headers
proxy_set_header X-Health-Check-Timeout 5s;
}
}HAProxy Configuration
backend app_servers
balance roundrobin
option httpchk GET /health
# Old version servers
server old-app-1 old-app-1:8080 check weight 80
server old-app-2 old-app-2:8080 check weight 80
# New version servers
server new-app-1 new-app-1:8080 check weight 20
server new-app-2 new-app-2:8080 check weight 20
frontend app_frontend
bind *:80
default_backend app_servers
# Custom health check endpoint
acl health_check path_beg /health
http-request return status 200 content-type text/plain string "OK" if health_check2. Circuit Breaker Implementation
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == 'OPEN':
if self._should_attempt_reset():
self.state = 'HALF_OPEN'
else:
raise CircuitBreakerOpenException("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self):
return (
self.last_failure_time and
time.time() - self.last_failure_time >= self.recovery_timeout
)
def _on_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
# Usage with service migration
@CircuitBreaker(failure_threshold=3, recovery_timeout=30)
def call_new_service(request):
return new_service.process(request)
def handle_request(request):
try:
return call_new_service(request)
except CircuitBreakerOpenException:
# Fallback to old service
return old_service.process(request)Monitoring and Validation
1. Health Check Implementation
class HealthChecker:
def __init__(self):
self.checks = []
def add_check(self, name, check_func, timeout=5):
self.checks.append({
'name': name,
'func': check_func,
'timeout': timeout
})
async def run_checks(self):
"""Run all health checks and return status"""
results = {}
overall_status = 'healthy'
for check in self.checks:
try:
result = await asyncio.wait_for(
check['func'](),
timeout=check['timeout']
)
results[check['name']] = {
'status': 'healthy',
'result': result
}
except asyncio.TimeoutError:
results[check['name']] = {
'status': 'unhealthy',
'error': 'timeout'
}
overall_status = 'unhealthy'
except Exception as e:
results[check['name']] = {
'status': 'unhealthy',
'error': str(e)
}
overall_status = 'unhealthy'
return {
'status': overall_status,
'checks': results,
'timestamp': datetime.utcnow().isoformat()
}
# Example health checks
health_checker = HealthChecker()
async def database_check():
"""Check database connectivity"""
result = await db.execute("SELECT 1")
return result is not None
async def external_api_check():
"""Check external API availability"""
response = await http_client.get("https://api.example.com/health")
return response.status_code == 200
async def memory_check():
"""Check memory usage"""
memory_usage = psutil.virtual_memory().percent
if memory_usage > 90:
raise Exception(f"Memory usage too high: {memory_usage}%")
return f"Memory usage: {memory_usage}%"
health_checker.add_check("database", database_check)
health_checker.add_check("external_api", external_api_check)
health_checker.add_check("memory", memory_check)2. Readiness vs Liveness Probes
# Kubernetes Pod with proper health checks
apiVersion: v1
kind: Pod
metadata:
name: app-pod
spec:
containers:
- name: app
image: myapp:2.0.0
ports:
- containerPort: 8080
# Readiness probe - determines if pod should receive traffic
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 3
timeoutSeconds: 2
successThreshold: 1
failureThreshold: 3
# Liveness probe - determines if pod should be restarted
livenessProbe:
httpGet:
path: /live
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 3
# Startup probe - gives app time to start before other probes
startupProbe:
httpGet:
path: /startup
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
successThreshold: 1
failureThreshold: 30 # Allow up to 150 seconds for startup3. Metrics and Alerting
class MigrationMetrics:
def __init__(self, prometheus_client):
self.prometheus = prometheus_client
# Define custom metrics
self.migration_progress = Counter(
'migration_progress_total',
'Total migration operations completed',
['operation', 'status']
)
self.migration_duration = Histogram(
'migration_operation_duration_seconds',
'Time spent on migration operations',
['operation']
)
self.system_health = Gauge(
'system_health_score',
'Overall system health score (0-1)',
['component']
)
self.traffic_split = Gauge(
'traffic_split_percentage',
'Percentage of traffic going to each version',
['version']
)
def record_migration_step(self, operation, status, duration=None):
"""Record completion of a migration step"""
self.migration_progress.labels(operation=operation, status=status).inc()
if duration:
self.migration_duration.labels(operation=operation).observe(duration)
def update_health_score(self, component, score):
"""Update health score for a component"""
self.system_health.labels(component=component).set(score)
def update_traffic_split(self, version_weights):
"""Update traffic split metrics"""
for version, weight in version_weights.items():
self.traffic_split.labels(version=version).set(weight)
# Usage in migration
metrics = MigrationMetrics(prometheus_client)
def perform_migration_step(operation):
start_time = time.time()
try:
# Perform migration operation
result = execute_migration_operation(operation)
# Record success
duration = time.time() - start_time
metrics.record_migration_step(operation, 'success', duration)
return result
except Exception as e:
# Record failure
duration = time.time() - start_time
metrics.record_migration_step(operation, 'failure', duration)
raiseRollback Strategies
1. Immediate Rollback Triggers
class AutoRollbackSystem:
def __init__(self, metrics_client, deployment_client):
self.metrics = metrics_client
self.deployment = deployment_client
self.rollback_triggers = {
'error_rate_spike': {
'threshold': 0.05, # 5% error rate
'window': 300, # 5 minutes
'auto_rollback': True
},
'latency_increase': {
'threshold': 2.0, # 2x baseline latency
'window': 600, # 10 minutes
'auto_rollback': False # Manual confirmation required
},
'availability_drop': {
'threshold': 0.95, # Below 95% availability
'window': 120, # 2 minutes
'auto_rollback': True
}
}
async def monitor_and_rollback(self, deployment_name):
"""Monitor deployment and trigger rollback if needed"""
while True:
for trigger_name, config in self.rollback_triggers.items():
if await self.check_trigger(trigger_name, config):
if config['auto_rollback']:
await self.execute_rollback(deployment_name, trigger_name)
else:
await self.alert_for_manual_rollback(deployment_name, trigger_name)
await asyncio.sleep(30) # Check every 30 seconds
async def check_trigger(self, trigger_name, config):
"""Check if rollback trigger condition is met"""
current_value = await self.metrics.get_current_value(trigger_name)
baseline_value = await self.metrics.get_baseline_value(trigger_name)
if trigger_name == 'error_rate_spike':
return current_value > config['threshold']
elif trigger_name == 'latency_increase':
return current_value > baseline_value * config['threshold']
elif trigger_name == 'availability_drop':
return current_value < config['threshold']
return False
async def execute_rollback(self, deployment_name, reason):
"""Execute automatic rollback"""
print(f"Executing automatic rollback for {deployment_name}. Reason: {reason}")
# Get previous revision
previous_revision = await self.deployment.get_previous_revision(deployment_name)
# Perform rollback
await self.deployment.rollback_to_revision(deployment_name, previous_revision)
# Notify stakeholders
await self.notify_rollback_executed(deployment_name, reason)2. Data Rollback Strategies
-- Point-in-time recovery setup
-- Create restore point before migration
SELECT pg_create_restore_point('pre_migration_' || to_char(now(), 'YYYYMMDD_HH24MISS'));
-- Rollback using point-in-time recovery
-- (This would be executed on a separate recovery instance)
-- recovery.conf:
-- recovery_target_name = 'pre_migration_20240101_120000'
-- recovery_target_action = 'promote'class DataRollbackManager:
def __init__(self, database_client, backup_service):
self.db = database_client
self.backup = backup_service
async def create_rollback_point(self, migration_id):
"""Create a rollback point before migration"""
rollback_point = {
'migration_id': migration_id,
'timestamp': datetime.utcnow(),
'backup_location': None,
'schema_snapshot': None
}
# Create database backup
backup_path = await self.backup.create_backup(
f"pre_migration_{migration_id}_{int(time.time())}"
)
rollback_point['backup_location'] = backup_path
# Capture schema snapshot
schema_snapshot = await self.capture_schema_snapshot()
rollback_point['schema_snapshot'] = schema_snapshot
# Store rollback point metadata
await self.store_rollback_metadata(rollback_point)
return rollback_point
async def execute_rollback(self, migration_id):
"""Execute data rollback to specified point"""
rollback_point = await self.get_rollback_metadata(migration_id)
if not rollback_point:
raise Exception(f"No rollback point found for migration {migration_id}")
# Stop application traffic
await self.stop_application_traffic()
try:
# Restore from backup
await self.backup.restore_from_backup(
rollback_point['backup_location']
)
# Validate data integrity
await self.validate_data_integrity(
rollback_point['schema_snapshot']
)
# Update application configuration
await self.update_application_config(rollback_point)
# Resume application traffic
await self.resume_application_traffic()
print(f"Data rollback completed successfully for migration {migration_id}")
except Exception as e:
# If rollback fails, we have a serious problem
await self.escalate_rollback_failure(migration_id, str(e))
raiseBest Practices Summary
1. Pre-Migration Checklist
- Comprehensive backup strategy in place
- Rollback procedures tested in staging
- Monitoring and alerting configured
- Health checks implemented
- Feature flags configured
- Team communication plan established
- Load balancer configuration prepared
- Database connection pooling optimized
2. During Migration
- Monitor key metrics continuously
- Validate each phase before proceeding
- Maintain detailed logs of all actions
- Keep stakeholders informed of progress
- Have rollback trigger ready
- Monitor user experience metrics
- Watch for performance degradation
- Validate data consistency
3. Post-Migration
- Continue monitoring for 24-48 hours
- Validate all business processes
- Update documentation
- Conduct post-migration retrospective
- Archive migration artifacts
- Update disaster recovery procedures
- Plan for legacy system decommissioning
4. Common Pitfalls to Avoid
- Don't skip testing rollback procedures
- Don't ignore performance impact
- Don't rush through validation phases
- Don't forget to communicate with stakeholders
- Don't assume health checks are sufficient
- Don't neglect data consistency validation
- Don't underestimate time requirements
- Don't overlook dependency impacts
This comprehensive guide provides the foundation for implementing zero-downtime migrations across various system components while maintaining high availability and data integrity.
#!/usr/bin/env python3
"""
Compatibility Checker - Analyze schema and API compatibility between versions
This tool analyzes schema and API changes between versions and identifies backward
compatibility issues including breaking changes, data type mismatches, missing fields,
constraint violations, and generates migration scripts suggestions.
Author: Migration Architect Skill
Version: 1.0.0
License: MIT
"""
import json
import argparse
import sys
import re
import datetime
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, asdict
from enum import Enum
class ChangeType(Enum):
"""Types of changes detected"""
BREAKING = "breaking"
POTENTIALLY_BREAKING = "potentially_breaking"
NON_BREAKING = "non_breaking"
ADDITIVE = "additive"
class CompatibilityLevel(Enum):
"""Compatibility assessment levels"""
FULLY_COMPATIBLE = "fully_compatible"
BACKWARD_COMPATIBLE = "backward_compatible"
POTENTIALLY_INCOMPATIBLE = "potentially_incompatible"
BREAKING_CHANGES = "breaking_changes"
@dataclass
class CompatibilityIssue:
"""Individual compatibility issue"""
type: str
severity: str
description: str
field_path: str
old_value: Any
new_value: Any
impact: str
suggested_migration: str
affected_operations: List[str]
@dataclass
class MigrationScript:
"""Migration script suggestion"""
script_type: str # sql, api, config
description: str
script_content: str
rollback_script: str
dependencies: List[str]
validation_query: str
@dataclass
class CompatibilityReport:
"""Complete compatibility analysis report"""
schema_before: str
schema_after: str
analysis_date: str
overall_compatibility: str
breaking_changes_count: int
potentially_breaking_count: int
non_breaking_changes_count: int
additive_changes_count: int
issues: List[CompatibilityIssue]
migration_scripts: List[MigrationScript]
risk_assessment: Dict[str, Any]
recommendations: List[str]
class SchemaCompatibilityChecker:
"""Main schema compatibility checker class"""
def __init__(self):
self.type_compatibility_matrix = self._build_type_compatibility_matrix()
self.constraint_implications = self._build_constraint_implications()
def _build_type_compatibility_matrix(self) -> Dict[str, Dict[str, str]]:
"""Build data type compatibility matrix"""
return {
# SQL data types compatibility
"varchar": {
"text": "compatible",
"char": "potentially_breaking", # length might be different
"nvarchar": "compatible",
"int": "breaking",
"bigint": "breaking",
"decimal": "breaking",
"datetime": "breaking",
"boolean": "breaking"
},
"int": {
"bigint": "compatible",
"smallint": "potentially_breaking", # range reduction
"decimal": "compatible",
"float": "potentially_breaking", # precision loss
"varchar": "breaking",
"boolean": "breaking"
},
"bigint": {
"int": "potentially_breaking", # range reduction
"decimal": "compatible",
"varchar": "breaking",
"boolean": "breaking"
},
"decimal": {
"float": "potentially_breaking", # precision loss
"int": "potentially_breaking", # precision loss
"bigint": "potentially_breaking", # precision loss
"varchar": "breaking",
"boolean": "breaking"
},
"datetime": {
"timestamp": "compatible",
"date": "potentially_breaking", # time component lost
"varchar": "breaking",
"int": "breaking"
},
"boolean": {
"tinyint": "compatible",
"varchar": "breaking",
"int": "breaking"
},
# JSON/API field types
"string": {
"number": "breaking",
"boolean": "breaking",
"array": "breaking",
"object": "breaking",
"null": "potentially_breaking"
},
"number": {
"string": "breaking",
"boolean": "breaking",
"array": "breaking",
"object": "breaking",
"null": "potentially_breaking"
},
"boolean": {
"string": "breaking",
"number": "breaking",
"array": "breaking",
"object": "breaking",
"null": "potentially_breaking"
},
"array": {
"string": "breaking",
"number": "breaking",
"boolean": "breaking",
"object": "breaking",
"null": "potentially_breaking"
},
"object": {
"string": "breaking",
"number": "breaking",
"boolean": "breaking",
"array": "breaking",
"null": "potentially_breaking"
}
}
def _build_constraint_implications(self) -> Dict[str, Dict[str, str]]:
"""Build constraint change implications"""
return {
"required": {
"added": "breaking", # Previously optional field now required
"removed": "non_breaking" # Previously required field now optional
},
"not_null": {
"added": "breaking", # Previously nullable now NOT NULL
"removed": "non_breaking" # Previously NOT NULL now nullable
},
"unique": {
"added": "potentially_breaking", # May fail if duplicates exist
"removed": "non_breaking" # No longer enforcing uniqueness
},
"primary_key": {
"added": "breaking", # Major structural change
"removed": "breaking", # Major structural change
"modified": "breaking" # Primary key change is always breaking
},
"foreign_key": {
"added": "potentially_breaking", # May fail if referential integrity violated
"removed": "potentially_breaking", # May allow orphaned records
"modified": "breaking" # Reference change is breaking
},
"check": {
"added": "potentially_breaking", # May fail if existing data violates check
"removed": "non_breaking", # No longer enforcing check
"modified": "potentially_breaking" # Different validation rules
},
"index": {
"added": "non_breaking", # Performance improvement
"removed": "non_breaking", # Performance impact only
"modified": "non_breaking" # Performance impact only
}
}
def analyze_database_schema(self, before_schema: Dict[str, Any],
after_schema: Dict[str, Any]) -> CompatibilityReport:
"""Analyze database schema compatibility"""
issues = []
migration_scripts = []
before_tables = before_schema.get("tables", {})
after_tables = after_schema.get("tables", {})
# Check for removed tables
for table_name in before_tables:
if table_name not in after_tables:
issues.append(CompatibilityIssue(
type="table_removed",
severity="breaking",
description=f"Table '{table_name}' has been removed",
field_path=f"tables.{table_name}",
old_value=before_tables[table_name],
new_value=None,
impact="All operations on this table will fail",
suggested_migration=f"CREATE VIEW {table_name} AS SELECT * FROM replacement_table;",
affected_operations=["SELECT", "INSERT", "UPDATE", "DELETE"]
))
# Check for added tables
for table_name in after_tables:
if table_name not in before_tables:
migration_scripts.append(MigrationScript(
script_type="sql",
description=f"Create new table {table_name}",
script_content=self._generate_create_table_sql(table_name, after_tables[table_name]),
rollback_script=f"DROP TABLE IF EXISTS {table_name};",
dependencies=[],
validation_query=f"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}';"
))
# Check for modified tables
for table_name in set(before_tables.keys()) & set(after_tables.keys()):
table_issues, table_scripts = self._analyze_table_changes(
table_name, before_tables[table_name], after_tables[table_name]
)
issues.extend(table_issues)
migration_scripts.extend(table_scripts)
return self._build_compatibility_report(
before_schema, after_schema, issues, migration_scripts
)
def analyze_api_schema(self, before_schema: Dict[str, Any],
after_schema: Dict[str, Any]) -> CompatibilityReport:
"""Analyze REST API schema compatibility"""
issues = []
migration_scripts = []
# Analyze endpoints
before_paths = before_schema.get("paths", {})
after_paths = after_schema.get("paths", {})
# Check for removed endpoints
for path in before_paths:
if path not in after_paths:
for method in before_paths[path]:
issues.append(CompatibilityIssue(
type="endpoint_removed",
severity="breaking",
description=f"Endpoint {method.upper()} {path} has been removed",
field_path=f"paths.{path}.{method}",
old_value=before_paths[path][method],
new_value=None,
impact="Client requests to this endpoint will fail with 404",
suggested_migration=f"Implement redirect to replacement endpoint or maintain backward compatibility stub",
affected_operations=[f"{method.upper()} {path}"]
))
# Check for modified endpoints
for path in set(before_paths.keys()) & set(after_paths.keys()):
path_issues, path_scripts = self._analyze_endpoint_changes(
path, before_paths[path], after_paths[path]
)
issues.extend(path_issues)
migration_scripts.extend(path_scripts)
# Analyze data models
before_components = before_schema.get("components", {}).get("schemas", {})
after_components = after_schema.get("components", {}).get("schemas", {})
for model_name in set(before_components.keys()) & set(after_components.keys()):
model_issues, model_scripts = self._analyze_model_changes(
model_name, before_components[model_name], after_components[model_name]
)
issues.extend(model_issues)
migration_scripts.extend(model_scripts)
return self._build_compatibility_report(
before_schema, after_schema, issues, migration_scripts
)
def _analyze_table_changes(self, table_name: str, before_table: Dict[str, Any],
after_table: Dict[str, Any]) -> Tuple[List[CompatibilityIssue], List[MigrationScript]]:
"""Analyze changes to a specific table"""
issues = []
scripts = []
before_columns = before_table.get("columns", {})
after_columns = after_table.get("columns", {})
# Check for removed columns
for col_name in before_columns:
if col_name not in after_columns:
issues.append(CompatibilityIssue(
type="column_removed",
severity="breaking",
description=f"Column '{col_name}' removed from table '{table_name}'",
field_path=f"tables.{table_name}.columns.{col_name}",
old_value=before_columns[col_name],
new_value=None,
impact="SELECT statements including this column will fail",
suggested_migration=f"ALTER TABLE {table_name} ADD COLUMN {col_name}_deprecated AS computed_value;",
affected_operations=["SELECT", "INSERT", "UPDATE"]
))
# Check for added columns
for col_name in after_columns:
if col_name not in before_columns:
col_def = after_columns[col_name]
is_required = col_def.get("nullable", True) == False and col_def.get("default") is None
if is_required:
issues.append(CompatibilityIssue(
type="required_column_added",
severity="breaking",
description=f"Required column '{col_name}' added to table '{table_name}'",
field_path=f"tables.{table_name}.columns.{col_name}",
old_value=None,
new_value=col_def,
impact="INSERT statements without this column will fail",
suggested_migration=f"Add default value or make column nullable initially",
affected_operations=["INSERT"]
))
scripts.append(MigrationScript(
script_type="sql",
description=f"Add column {col_name} to table {table_name}",
script_content=f"ALTER TABLE {table_name} ADD COLUMN {self._generate_column_definition(col_name, col_def)};",
rollback_script=f"ALTER TABLE {table_name} DROP COLUMN {col_name};",
dependencies=[],
validation_query=f"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = '{col_name}';"
))
# Check for modified columns
for col_name in set(before_columns.keys()) & set(after_columns.keys()):
col_issues, col_scripts = self._analyze_column_changes(
table_name, col_name, before_columns[col_name], after_columns[col_name]
)
issues.extend(col_issues)
scripts.extend(col_scripts)
# Check constraint changes
before_constraints = before_table.get("constraints", {})
after_constraints = after_table.get("constraints", {})
constraint_issues, constraint_scripts = self._analyze_constraint_changes(
table_name, before_constraints, after_constraints
)
issues.extend(constraint_issues)
scripts.extend(constraint_scripts)
return issues, scripts
def _analyze_column_changes(self, table_name: str, col_name: str,
before_col: Dict[str, Any], after_col: Dict[str, Any]) -> Tuple[List[CompatibilityIssue], List[MigrationScript]]:
"""Analyze changes to a specific column"""
issues = []
scripts = []
# Check data type changes
before_type = before_col.get("type", "").lower()
after_type = after_col.get("type", "").lower()
if before_type != after_type:
compatibility = self.type_compatibility_matrix.get(before_type, {}).get(after_type, "breaking")
if compatibility == "breaking":
issues.append(CompatibilityIssue(
type="incompatible_type_change",
severity="breaking",
description=f"Column '{col_name}' type changed from {before_type} to {after_type}",
field_path=f"tables.{table_name}.columns.{col_name}.type",
old_value=before_type,
new_value=after_type,
impact="Data conversion may fail or lose precision",
suggested_migration=f"Add conversion logic and validate data integrity",
affected_operations=["SELECT", "INSERT", "UPDATE", "WHERE clauses"]
))
scripts.append(MigrationScript(
script_type="sql",
description=f"Convert column {col_name} from {before_type} to {after_type}",
script_content=f"ALTER TABLE {table_name} ALTER COLUMN {col_name} TYPE {after_type} USING {col_name}::{after_type};",
rollback_script=f"ALTER TABLE {table_name} ALTER COLUMN {col_name} TYPE {before_type};",
dependencies=[f"backup_{table_name}"],
validation_query=f"SELECT COUNT(*) FROM {table_name} WHERE {col_name} IS NOT NULL;"
))
elif compatibility == "potentially_breaking":
issues.append(CompatibilityIssue(
type="risky_type_change",
severity="potentially_breaking",
description=f"Column '{col_name}' type changed from {before_type} to {after_type} - may lose data",
field_path=f"tables.{table_name}.columns.{col_name}.type",
old_value=before_type,
new_value=after_type,
impact="Potential data loss or precision reduction",
suggested_migration=f"Validate all existing data can be converted safely",
affected_operations=["Data integrity"]
))
# Check nullability changes
before_nullable = before_col.get("nullable", True)
after_nullable = after_col.get("nullable", True)
if before_nullable != after_nullable:
if before_nullable and not after_nullable: # null -> not null
issues.append(CompatibilityIssue(
type="nullability_restriction",
severity="breaking",
description=f"Column '{col_name}' changed from nullable to NOT NULL",
field_path=f"tables.{table_name}.columns.{col_name}.nullable",
old_value=before_nullable,
new_value=after_nullable,
impact="Existing NULL values will cause constraint violations",
suggested_migration=f"Update NULL values to valid defaults before applying NOT NULL constraint",
affected_operations=["INSERT", "UPDATE"]
))
scripts.append(MigrationScript(
script_type="sql",
description=f"Make column {col_name} NOT NULL",
script_content=f"""
-- Update NULL values first
UPDATE {table_name} SET {col_name} = 'DEFAULT_VALUE' WHERE {col_name} IS NULL;
-- Add NOT NULL constraint
ALTER TABLE {table_name} ALTER COLUMN {col_name} SET NOT NULL;
""",
rollback_script=f"ALTER TABLE {table_name} ALTER COLUMN {col_name} DROP NOT NULL;",
dependencies=[],
validation_query=f"SELECT COUNT(*) FROM {table_name} WHERE {col_name} IS NULL;"
))
# Check length/precision changes
before_length = before_col.get("length")
after_length = after_col.get("length")
if before_length and after_length and before_length != after_length:
if after_length < before_length:
issues.append(CompatibilityIssue(
type="length_reduction",
severity="potentially_breaking",
description=f"Column '{col_name}' length reduced from {before_length} to {after_length}",
field_path=f"tables.{table_name}.columns.{col_name}.length",
old_value=before_length,
new_value=after_length,
impact="Data truncation may occur for values exceeding new length",
suggested_migration=f"Validate no existing data exceeds new length limit",
affected_operations=["INSERT", "UPDATE"]
))
return issues, scripts
def _analyze_constraint_changes(self, table_name: str, before_constraints: Dict[str, Any],
after_constraints: Dict[str, Any]) -> Tuple[List[CompatibilityIssue], List[MigrationScript]]:
"""Analyze constraint changes"""
issues = []
scripts = []
for constraint_type in ["primary_key", "foreign_key", "unique", "check"]:
before_constraint = before_constraints.get(constraint_type, [])
after_constraint = after_constraints.get(constraint_type, [])
# Convert to sets for comparison
before_set = set(str(c) for c in before_constraint) if isinstance(before_constraint, list) else {str(before_constraint)} if before_constraint else set()
after_set = set(str(c) for c in after_constraint) if isinstance(after_constraint, list) else {str(after_constraint)} if after_constraint else set()
# Check for removed constraints
for constraint in before_set - after_set:
implication = self.constraint_implications.get(constraint_type, {}).get("removed", "non_breaking")
issues.append(CompatibilityIssue(
type=f"{constraint_type}_removed",
severity=implication,
description=f"{constraint_type.replace('_', ' ').title()} constraint '{constraint}' removed from table '{table_name}'",
field_path=f"tables.{table_name}.constraints.{constraint_type}",
old_value=constraint,
new_value=None,
impact=f"No longer enforcing {constraint_type} constraint",
suggested_migration=f"Consider application-level validation for removed constraint",
affected_operations=["INSERT", "UPDATE", "DELETE"]
))
# Check for added constraints
for constraint in after_set - before_set:
implication = self.constraint_implications.get(constraint_type, {}).get("added", "potentially_breaking")
issues.append(CompatibilityIssue(
type=f"{constraint_type}_added",
severity=implication,
description=f"New {constraint_type.replace('_', ' ')} constraint '{constraint}' added to table '{table_name}'",
field_path=f"tables.{table_name}.constraints.{constraint_type}",
old_value=None,
new_value=constraint,
impact=f"New {constraint_type} constraint may reject existing data",
suggested_migration=f"Validate existing data complies with new constraint",
affected_operations=["INSERT", "UPDATE"]
))
scripts.append(MigrationScript(
script_type="sql",
description=f"Add {constraint_type} constraint to {table_name}",
script_content=f"ALTER TABLE {table_name} ADD CONSTRAINT {constraint_type}_{table_name} {constraint_type.upper()} ({constraint});",
rollback_script=f"ALTER TABLE {table_name} DROP CONSTRAINT {constraint_type}_{table_name};",
dependencies=[],
validation_query=f"SELECT COUNT(*) FROM information_schema.table_constraints WHERE table_name = '{table_name}' AND constraint_type = '{constraint_type.upper()}';"
))
return issues, scripts
def _analyze_endpoint_changes(self, path: str, before_endpoint: Dict[str, Any],
after_endpoint: Dict[str, Any]) -> Tuple[List[CompatibilityIssue], List[MigrationScript]]:
"""Analyze changes to an API endpoint"""
issues = []
scripts = []
for method in set(before_endpoint.keys()) & set(after_endpoint.keys()):
before_method = before_endpoint[method]
after_method = after_endpoint[method]
# Check parameter changes
before_params = before_method.get("parameters", [])
after_params = after_method.get("parameters", [])
before_param_names = {p["name"] for p in before_params}
after_param_names = {p["name"] for p in after_params}
# Check for removed required parameters
for param_name in before_param_names - after_param_names:
param = next(p for p in before_params if p["name"] == param_name)
if param.get("required", False):
issues.append(CompatibilityIssue(
type="required_parameter_removed",
severity="breaking",
description=f"Required parameter '{param_name}' removed from {method.upper()} {path}",
field_path=f"paths.{path}.{method}.parameters",
old_value=param,
new_value=None,
impact="Client requests with this parameter will fail",
suggested_migration="Implement parameter validation with backward compatibility",
affected_operations=[f"{method.upper()} {path}"]
))
# Check for added required parameters
for param_name in after_param_names - before_param_names:
param = next(p for p in after_params if p["name"] == param_name)
if param.get("required", False):
issues.append(CompatibilityIssue(
type="required_parameter_added",
severity="breaking",
description=f"New required parameter '{param_name}' added to {method.upper()} {path}",
field_path=f"paths.{path}.{method}.parameters",
old_value=None,
new_value=param,
impact="Client requests without this parameter will fail",
suggested_migration="Provide default value or make parameter optional initially",
affected_operations=[f"{method.upper()} {path}"]
))
# Check response schema changes
before_responses = before_method.get("responses", {})
after_responses = after_method.get("responses", {})
for status_code in before_responses:
if status_code in after_responses:
before_schema = before_responses[status_code].get("content", {}).get("application/json", {}).get("schema", {})
after_schema = after_responses[status_code].get("content", {}).get("application/json", {}).get("schema", {})
if before_schema != after_schema:
issues.append(CompatibilityIssue(
type="response_schema_changed",
severity="potentially_breaking",
description=f"Response schema changed for {method.upper()} {path} (status {status_code})",
field_path=f"paths.{path}.{method}.responses.{status_code}",
old_value=before_schema,
new_value=after_schema,
impact="Client response parsing may fail",
suggested_migration="Implement versioned API responses",
affected_operations=[f"{method.upper()} {path}"]
))
return issues, scripts
def _analyze_model_changes(self, model_name: str, before_model: Dict[str, Any],
after_model: Dict[str, Any]) -> Tuple[List[CompatibilityIssue], List[MigrationScript]]:
"""Analyze changes to an API data model"""
issues = []
scripts = []
before_props = before_model.get("properties", {})
after_props = after_model.get("properties", {})
before_required = set(before_model.get("required", []))
after_required = set(after_model.get("required", []))
# Check for removed properties
for prop_name in set(before_props.keys()) - set(after_props.keys()):
issues.append(CompatibilityIssue(
type="property_removed",
severity="breaking",
description=f"Property '{prop_name}' removed from model '{model_name}'",
field_path=f"components.schemas.{model_name}.properties.{prop_name}",
old_value=before_props[prop_name],
new_value=None,
impact="Client code expecting this property will fail",
suggested_migration="Use API versioning to maintain backward compatibility",
affected_operations=["Serialization", "Deserialization"]
))
# Check for newly required properties
for prop_name in after_required - before_required:
issues.append(CompatibilityIssue(
type="property_made_required",
severity="breaking",
description=f"Property '{prop_name}' is now required in model '{model_name}'",
field_path=f"components.schemas.{model_name}.required",
old_value=list(before_required),
new_value=list(after_required),
impact="Client requests without this property will fail validation",
suggested_migration="Provide default values or implement gradual rollout",
affected_operations=["Request validation"]
))
# Check for property type changes
for prop_name in set(before_props.keys()) & set(after_props.keys()):
before_type = before_props[prop_name].get("type")
after_type = after_props[prop_name].get("type")
if before_type != after_type:
compatibility = self.type_compatibility_matrix.get(before_type, {}).get(after_type, "breaking")
issues.append(CompatibilityIssue(
type="property_type_changed",
severity=compatibility,
description=f"Property '{prop_name}' type changed from {before_type} to {after_type} in model '{model_name}'",
field_path=f"components.schemas.{model_name}.properties.{prop_name}.type",
old_value=before_type,
new_value=after_type,
impact="Client serialization/deserialization may fail",
suggested_migration="Implement type coercion or API versioning",
affected_operations=["Serialization", "Deserialization"]
))
return issues, scripts
def _build_compatibility_report(self, before_schema: Dict[str, Any], after_schema: Dict[str, Any],
issues: List[CompatibilityIssue], migration_scripts: List[MigrationScript]) -> CompatibilityReport:
"""Build the final compatibility report"""
# Count issues by severity
breaking_count = sum(1 for issue in issues if issue.severity == "breaking")
potentially_breaking_count = sum(1 for issue in issues if issue.severity == "potentially_breaking")
non_breaking_count = sum(1 for issue in issues if issue.severity == "non_breaking")
additive_count = sum(1 for issue in issues if issue.type == "additive")
# Determine overall compatibility
if breaking_count > 0:
overall_compatibility = "breaking_changes"
elif potentially_breaking_count > 0:
overall_compatibility = "potentially_incompatible"
elif non_breaking_count > 0:
overall_compatibility = "backward_compatible"
else:
overall_compatibility = "fully_compatible"
# Generate risk assessment
risk_assessment = {
"overall_risk": "high" if breaking_count > 0 else "medium" if potentially_breaking_count > 0 else "low",
"deployment_risk": "requires_coordinated_deployment" if breaking_count > 0 else "safe_independent_deployment",
"rollback_complexity": "high" if breaking_count > 3 else "medium" if breaking_count > 0 else "low",
"testing_requirements": ["integration_testing", "regression_testing"] +
(["data_migration_testing"] if any(s.script_type == "sql" for s in migration_scripts) else [])
}
# Generate recommendations
recommendations = []
if breaking_count > 0:
recommendations.append("Implement API versioning to maintain backward compatibility")
recommendations.append("Plan for coordinated deployment with all clients")
recommendations.append("Implement comprehensive rollback procedures")
if potentially_breaking_count > 0:
recommendations.append("Conduct thorough testing with realistic data volumes")
recommendations.append("Implement monitoring for migration success metrics")
if migration_scripts:
recommendations.append("Test all migration scripts in staging environment")
recommendations.append("Implement migration progress monitoring")
recommendations.append("Create detailed communication plan for stakeholders")
recommendations.append("Implement feature flags for gradual rollout")
return CompatibilityReport(
schema_before=json.dumps(before_schema, indent=2)[:500] + "..." if len(json.dumps(before_schema)) > 500 else json.dumps(before_schema, indent=2),
schema_after=json.dumps(after_schema, indent=2)[:500] + "..." if len(json.dumps(after_schema)) > 500 else json.dumps(after_schema, indent=2),
analysis_date=datetime.datetime.now().isoformat(),
overall_compatibility=overall_compatibility,
breaking_changes_count=breaking_count,
potentially_breaking_count=potentially_breaking_count,
non_breaking_changes_count=non_breaking_count,
additive_changes_count=additive_count,
issues=issues,
migration_scripts=migration_scripts,
risk_assessment=risk_assessment,
recommendations=recommendations
)
def _generate_create_table_sql(self, table_name: str, table_def: Dict[str, Any]) -> str:
"""Generate CREATE TABLE SQL statement"""
columns = []
for col_name, col_def in table_def.get("columns", {}).items():
columns.append(self._generate_column_definition(col_name, col_def))
return f"CREATE TABLE {table_name} (\n " + ",\n ".join(columns) + "\n);"
def _generate_column_definition(self, col_name: str, col_def: Dict[str, Any]) -> str:
"""Generate column definition for SQL"""
col_type = col_def.get("type", "VARCHAR(255)")
nullable = "" if col_def.get("nullable", True) else " NOT NULL"
default = f" DEFAULT {col_def.get('default')}" if col_def.get("default") is not None else ""
return f"{col_name} {col_type}{nullable}{default}"
def generate_human_readable_report(self, report: CompatibilityReport) -> str:
"""Generate human-readable compatibility report"""
output = []
output.append("=" * 80)
output.append("COMPATIBILITY ANALYSIS REPORT")
output.append("=" * 80)
output.append(f"Analysis Date: {report.analysis_date}")
output.append(f"Overall Compatibility: {report.overall_compatibility.upper()}")
output.append("")
# Summary
output.append("SUMMARY")
output.append("-" * 40)
output.append(f"Breaking Changes: {report.breaking_changes_count}")
output.append(f"Potentially Breaking: {report.potentially_breaking_count}")
output.append(f"Non-Breaking Changes: {report.non_breaking_changes_count}")
output.append(f"Additive Changes: {report.additive_changes_count}")
output.append(f"Total Issues Found: {len(report.issues)}")
output.append("")
# Risk Assessment
output.append("RISK ASSESSMENT")
output.append("-" * 40)
for key, value in report.risk_assessment.items():
output.append(f"{key.replace('_', ' ').title()}: {value}")
output.append("")
# Issues by Severity
issues_by_severity = {}
for issue in report.issues:
if issue.severity not in issues_by_severity:
issues_by_severity[issue.severity] = []
issues_by_severity[issue.severity].append(issue)
for severity in ["breaking", "potentially_breaking", "non_breaking"]:
if severity in issues_by_severity:
output.append(f"{severity.upper().replace('_', ' ')} ISSUES")
output.append("-" * 40)
for issue in issues_by_severity[severity]:
output.append(f"• {issue.description}")
output.append(f" Field: {issue.field_path}")
output.append(f" Impact: {issue.impact}")
output.append(f" Migration: {issue.suggested_migration}")
if issue.affected_operations:
output.append(f" Affected Operations: {', '.join(issue.affected_operations)}")
output.append("")
# Migration Scripts
if report.migration_scripts:
output.append("SUGGESTED MIGRATION SCRIPTS")
output.append("-" * 40)
for i, script in enumerate(report.migration_scripts, 1):
output.append(f"{i}. {script.description}")
output.append(f" Type: {script.script_type}")
output.append(" Script:")
for line in script.script_content.split('\n'):
output.append(f" {line}")
output.append("")
# Recommendations
output.append("RECOMMENDATIONS")
output.append("-" * 40)
for i, rec in enumerate(report.recommendations, 1):
output.append(f"{i}. {rec}")
output.append("")
return "\n".join(output)
def main():
"""Main function with command line interface"""
parser = argparse.ArgumentParser(description="Analyze schema and API compatibility between versions")
parser.add_argument("--before", required=True, help="Before schema file (JSON)")
parser.add_argument("--after", required=True, help="After schema file (JSON)")
parser.add_argument("--type", choices=["database", "api"], default="database", help="Schema type to analyze")
parser.add_argument("--output", "-o", help="Output file for compatibility report (JSON)")
parser.add_argument("--format", "-f", choices=["json", "text", "both"], default="both", help="Output format")
args = parser.parse_args()
try:
# Load schemas
with open(args.before, 'r') as f:
before_schema = json.load(f)
with open(args.after, 'r') as f:
after_schema = json.load(f)
# Analyze compatibility
checker = SchemaCompatibilityChecker()
if args.type == "database":
report = checker.analyze_database_schema(before_schema, after_schema)
else: # api
report = checker.analyze_api_schema(before_schema, after_schema)
# Output results
if args.format in ["json", "both"]:
report_dict = asdict(report)
if args.output:
with open(args.output, 'w') as f:
json.dump(report_dict, f, indent=2)
print(f"Compatibility report saved to {args.output}")
else:
print(json.dumps(report_dict, indent=2))
if args.format in ["text", "both"]:
human_report = checker.generate_human_readable_report(report)
text_output = args.output.replace('.json', '.txt') if args.output else None
if text_output:
with open(text_output, 'w') as f:
f.write(human_report)
print(f"Human-readable report saved to {text_output}")
else:
print("\n" + "="*80)
print("HUMAN-READABLE COMPATIBILITY REPORT")
print("="*80)
print(human_report)
# Return exit code based on compatibility
if report.breaking_changes_count > 0:
return 2 # Breaking changes found
elif report.potentially_breaking_count > 0:
return 1 # Potentially breaking changes found
else:
return 0 # No compatibility issues
except FileNotFoundError as e:
print(f"Error: File not found: {e}", file=sys.stderr)
return 1
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main()) #!/usr/bin/env python3
"""
Migration Planner - Generate comprehensive migration plans with risk assessment
This tool analyzes migration specifications and generates detailed, phased migration plans
including pre-migration checklists, validation gates, rollback triggers, timeline estimates,
and risk matrices.
Author: Migration Architect Skill
Version: 1.0.0
License: MIT
"""
import json
import argparse
import sys
import datetime
import hashlib
import math
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
class MigrationType(Enum):
"""Migration type enumeration"""
DATABASE = "database"
SERVICE = "service"
INFRASTRUCTURE = "infrastructure"
DATA = "data"
API = "api"
class MigrationComplexity(Enum):
"""Migration complexity levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class RiskLevel(Enum):
"""Risk assessment levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class MigrationConstraint:
"""Migration constraint definition"""
type: str
description: str
impact: str
mitigation: str
@dataclass
class MigrationPhase:
"""Individual migration phase"""
name: str
description: str
duration_hours: int
dependencies: List[str]
validation_criteria: List[str]
rollback_triggers: List[str]
tasks: List[str]
risk_level: str
resources_required: List[str]
@dataclass
class RiskItem:
"""Individual risk assessment item"""
category: str
description: str
probability: str # low, medium, high
impact: str # low, medium, high
severity: str # low, medium, high, critical
mitigation: str
owner: str
@dataclass
class MigrationPlan:
"""Complete migration plan structure"""
migration_id: str
source_system: str
target_system: str
migration_type: str
complexity: str
estimated_duration_hours: int
phases: List[MigrationPhase]
risks: List[RiskItem]
success_criteria: List[str]
rollback_plan: Dict[str, Any]
stakeholders: List[str]
created_at: str
class MigrationPlanner:
"""Main migration planner class"""
def __init__(self):
self.migration_patterns = self._load_migration_patterns()
self.risk_templates = self._load_risk_templates()
def _load_migration_patterns(self) -> Dict[str, Any]:
"""Load predefined migration patterns"""
return {
"database": {
"schema_change": {
"phases": ["preparation", "expand", "migrate", "contract", "cleanup"],
"base_duration": 24,
"complexity_multiplier": {"low": 1.0, "medium": 1.5, "high": 2.5, "critical": 4.0}
},
"data_migration": {
"phases": ["assessment", "setup", "bulk_copy", "delta_sync", "validation", "cutover"],
"base_duration": 48,
"complexity_multiplier": {"low": 1.2, "medium": 2.0, "high": 3.0, "critical": 5.0}
}
},
"service": {
"strangler_fig": {
"phases": ["intercept", "implement", "redirect", "validate", "retire"],
"base_duration": 168, # 1 week
"complexity_multiplier": {"low": 0.8, "medium": 1.0, "high": 1.8, "critical": 3.0}
},
"parallel_run": {
"phases": ["setup", "deploy", "shadow", "compare", "cutover", "cleanup"],
"base_duration": 72,
"complexity_multiplier": {"low": 1.0, "medium": 1.3, "high": 2.0, "critical": 3.5}
}
},
"infrastructure": {
"cloud_migration": {
"phases": ["assessment", "design", "pilot", "migration", "optimization", "decommission"],
"base_duration": 720, # 30 days
"complexity_multiplier": {"low": 0.6, "medium": 1.0, "high": 1.5, "critical": 2.5}
},
"on_prem_to_cloud": {
"phases": ["discovery", "planning", "pilot", "migration", "validation", "cutover"],
"base_duration": 480, # 20 days
"complexity_multiplier": {"low": 0.8, "medium": 1.2, "high": 2.0, "critical": 3.0}
}
}
}
def _load_risk_templates(self) -> Dict[str, List[RiskItem]]:
"""Load risk templates for different migration types"""
return {
"database": [
RiskItem("technical", "Data corruption during migration", "low", "critical", "high",
"Implement comprehensive backup and validation procedures", "DBA Team"),
RiskItem("technical", "Extended downtime due to migration complexity", "medium", "high", "high",
"Use blue-green deployment and phased migration approach", "DevOps Team"),
RiskItem("business", "Business process disruption", "medium", "high", "high",
"Communicate timeline and provide alternate workflows", "Business Owner"),
RiskItem("operational", "Insufficient rollback testing", "high", "critical", "critical",
"Execute full rollback procedures in staging environment", "QA Team")
],
"service": [
RiskItem("technical", "Service compatibility issues", "medium", "high", "high",
"Implement comprehensive integration testing", "Development Team"),
RiskItem("technical", "Performance degradation", "medium", "medium", "medium",
"Conduct load testing and performance benchmarking", "DevOps Team"),
RiskItem("business", "Feature parity gaps", "high", "high", "high",
"Document feature mapping and acceptance criteria", "Product Owner"),
RiskItem("operational", "Monitoring gap during transition", "medium", "medium", "medium",
"Set up dual monitoring and alerting systems", "SRE Team")
],
"infrastructure": [
RiskItem("technical", "Network connectivity issues", "medium", "critical", "high",
"Implement redundant network paths and monitoring", "Network Team"),
RiskItem("technical", "Security configuration drift", "high", "high", "high",
"Automated security scanning and compliance checks", "Security Team"),
RiskItem("business", "Cost overrun during transition", "high", "medium", "medium",
"Implement cost monitoring and budget alerts", "Finance Team"),
RiskItem("operational", "Team knowledge gaps", "high", "medium", "medium",
"Provide training and create detailed documentation", "Platform Team")
]
}
def _calculate_complexity(self, spec: Dict[str, Any]) -> str:
"""Calculate migration complexity based on specification"""
complexity_score = 0
# Data volume complexity
data_volume = spec.get("constraints", {}).get("data_volume_gb", 0)
if data_volume > 10000:
complexity_score += 3
elif data_volume > 1000:
complexity_score += 2
elif data_volume > 100:
complexity_score += 1
# System dependencies
dependencies = len(spec.get("constraints", {}).get("dependencies", []))
if dependencies > 10:
complexity_score += 3
elif dependencies > 5:
complexity_score += 2
elif dependencies > 2:
complexity_score += 1
# Downtime constraints
max_downtime = spec.get("constraints", {}).get("max_downtime_minutes", 480)
if max_downtime < 60:
complexity_score += 3
elif max_downtime < 240:
complexity_score += 2
elif max_downtime < 480:
complexity_score += 1
# Special requirements
special_reqs = spec.get("constraints", {}).get("special_requirements", [])
complexity_score += len(special_reqs)
if complexity_score >= 8:
return "critical"
elif complexity_score >= 5:
return "high"
elif complexity_score >= 3:
return "medium"
else:
return "low"
def _estimate_duration(self, migration_type: str, migration_pattern: str, complexity: str) -> int:
"""Estimate migration duration based on type, pattern, and complexity"""
pattern_info = self.migration_patterns.get(migration_type, {}).get(migration_pattern, {})
base_duration = pattern_info.get("base_duration", 48)
multiplier = pattern_info.get("complexity_multiplier", {}).get(complexity, 1.5)
return int(base_duration * multiplier)
def _generate_phases(self, spec: Dict[str, Any]) -> List[MigrationPhase]:
"""Generate migration phases based on specification"""
migration_type = spec.get("type")
migration_pattern = spec.get("pattern", "")
complexity = self._calculate_complexity(spec)
pattern_info = self.migration_patterns.get(migration_type, {})
if migration_pattern in pattern_info:
phase_names = pattern_info[migration_pattern]["phases"]
else:
# Default phases based on migration type
phase_names = {
"database": ["preparation", "migration", "validation", "cutover"],
"service": ["preparation", "deployment", "testing", "cutover"],
"infrastructure": ["assessment", "preparation", "migration", "validation"]
}.get(migration_type, ["preparation", "execution", "validation", "cleanup"])
phases = []
total_duration = self._estimate_duration(migration_type, migration_pattern, complexity)
phase_duration = total_duration // len(phase_names)
for i, phase_name in enumerate(phase_names):
phase = self._create_phase(phase_name, phase_duration, complexity, i, phase_names)
phases.append(phase)
return phases
def _create_phase(self, phase_name: str, duration: int, complexity: str,
phase_index: int, all_phases: List[str]) -> MigrationPhase:
"""Create a detailed migration phase"""
phase_templates = {
"preparation": {
"description": "Prepare systems and teams for migration",
"tasks": [
"Backup source system",
"Set up monitoring and alerting",
"Prepare rollback procedures",
"Communicate migration timeline",
"Validate prerequisites"
],
"validation_criteria": [
"All backups completed successfully",
"Monitoring systems operational",
"Team members briefed and ready",
"Rollback procedures tested"
],
"risk_level": "medium"
},
"assessment": {
"description": "Assess current state and migration requirements",
"tasks": [
"Inventory existing systems and dependencies",
"Analyze data volumes and complexity",
"Identify integration points",
"Document current architecture",
"Create migration mapping"
],
"validation_criteria": [
"Complete system inventory documented",
"Dependencies mapped and validated",
"Migration scope clearly defined",
"Resource requirements identified"
],
"risk_level": "low"
},
"migration": {
"description": "Execute core migration processes",
"tasks": [
"Begin data/service migration",
"Monitor migration progress",
"Validate data consistency",
"Handle migration errors",
"Update configuration"
],
"validation_criteria": [
"Migration progress within expected parameters",
"Data consistency checks passing",
"Error rates within acceptable limits",
"Performance metrics stable"
],
"risk_level": "high"
},
"validation": {
"description": "Validate migration success and system health",
"tasks": [
"Execute comprehensive testing",
"Validate business processes",
"Check system performance",
"Verify data integrity",
"Confirm security controls"
],
"validation_criteria": [
"All critical tests passing",
"Performance within acceptable range",
"Security controls functioning",
"Business processes operational"
],
"risk_level": "medium"
},
"cutover": {
"description": "Switch production traffic to new system",
"tasks": [
"Update DNS/load balancer configuration",
"Redirect production traffic",
"Monitor system performance",
"Validate end-user experience",
"Confirm business operations"
],
"validation_criteria": [
"Traffic successfully redirected",
"System performance stable",
"User experience satisfactory",
"Business operations normal"
],
"risk_level": "critical"
}
}
template = phase_templates.get(phase_name, {
"description": f"Execute {phase_name} phase",
"tasks": [f"Complete {phase_name} activities"],
"validation_criteria": [f"{phase_name.title()} phase completed successfully"],
"risk_level": "medium"
})
dependencies = []
if phase_index > 0:
dependencies.append(all_phases[phase_index - 1])
rollback_triggers = [
"Critical system failure",
"Data corruption detected",
"Performance degradation > 50%",
"Business process failure"
]
resources_required = [
"Technical team availability",
"System access and permissions",
"Monitoring and alerting systems",
"Communication channels"
]
return MigrationPhase(
name=phase_name,
description=template["description"],
duration_hours=duration,
dependencies=dependencies,
validation_criteria=template["validation_criteria"],
rollback_triggers=rollback_triggers,
tasks=template["tasks"],
risk_level=template["risk_level"],
resources_required=resources_required
)
def _assess_risks(self, spec: Dict[str, Any]) -> List[RiskItem]:
"""Generate risk assessment for migration"""
migration_type = spec.get("type")
base_risks = self.risk_templates.get(migration_type, [])
# Add specification-specific risks
additional_risks = []
constraints = spec.get("constraints", {})
if constraints.get("max_downtime_minutes", 480) < 60:
additional_risks.append(
RiskItem("business", "Zero-downtime requirement increases complexity", "high", "medium", "high",
"Implement blue-green deployment or rolling update strategy", "DevOps Team")
)
if constraints.get("data_volume_gb", 0) > 5000:
additional_risks.append(
RiskItem("technical", "Large data volumes may cause extended migration time", "high", "medium", "medium",
"Implement parallel processing and progress monitoring", "Data Team")
)
compliance_reqs = constraints.get("compliance_requirements", [])
if compliance_reqs:
additional_risks.append(
RiskItem("compliance", "Regulatory compliance requirements", "medium", "high", "high",
"Ensure all compliance checks are integrated into migration process", "Compliance Team")
)
return base_risks + additional_risks
def _generate_rollback_plan(self, phases: List[MigrationPhase]) -> Dict[str, Any]:
"""Generate comprehensive rollback plan"""
rollback_phases = []
for phase in reversed(phases):
rollback_phase = {
"phase": phase.name,
"rollback_actions": [
f"Revert {phase.name} changes",
f"Restore pre-{phase.name} state",
f"Validate {phase.name} rollback success"
],
"validation_criteria": [
f"System restored to pre-{phase.name} state",
f"All {phase.name} changes successfully reverted",
"System functionality confirmed"
],
"estimated_time_minutes": phase.duration_hours * 15 # 25% of original phase time
}
rollback_phases.append(rollback_phase)
return {
"rollback_phases": rollback_phases,
"rollback_triggers": [
"Critical system failure",
"Data corruption detected",
"Migration timeline exceeded by > 50%",
"Business-critical functionality unavailable",
"Security breach detected",
"Stakeholder decision to abort"
],
"rollback_decision_matrix": {
"low_severity": "Continue with monitoring",
"medium_severity": "Assess and decide within 15 minutes",
"high_severity": "Immediate rollback initiation",
"critical_severity": "Emergency rollback - all hands"
},
"rollback_contacts": [
"Migration Lead",
"Technical Lead",
"Business Owner",
"On-call Engineer"
]
}
def generate_plan(self, spec: Dict[str, Any]) -> MigrationPlan:
"""Generate complete migration plan from specification"""
migration_id = hashlib.md5(json.dumps(spec, sort_keys=True).encode()).hexdigest()[:12]
complexity = self._calculate_complexity(spec)
phases = self._generate_phases(spec)
risks = self._assess_risks(spec)
total_duration = sum(phase.duration_hours for phase in phases)
rollback_plan = self._generate_rollback_plan(phases)
success_criteria = [
"All data successfully migrated with 100% integrity",
"System performance meets or exceeds baseline",
"All business processes functioning normally",
"No critical security vulnerabilities introduced",
"Stakeholder acceptance criteria met",
"Documentation and runbooks updated"
]
stakeholders = [
"Business Owner",
"Technical Lead",
"DevOps Team",
"QA Team",
"Security Team",
"End Users"
]
return MigrationPlan(
migration_id=migration_id,
source_system=spec.get("source", "Unknown"),
target_system=spec.get("target", "Unknown"),
migration_type=spec.get("type", "Unknown"),
complexity=complexity,
estimated_duration_hours=total_duration,
phases=phases,
risks=risks,
success_criteria=success_criteria,
rollback_plan=rollback_plan,
stakeholders=stakeholders,
created_at=datetime.datetime.now().isoformat()
)
def generate_human_readable_plan(self, plan: MigrationPlan) -> str:
"""Generate human-readable migration plan"""
output = []
output.append("=" * 80)
output.append(f"MIGRATION PLAN: {plan.migration_id}")
output.append("=" * 80)
output.append(f"Source System: {plan.source_system}")
output.append(f"Target System: {plan.target_system}")
output.append(f"Migration Type: {plan.migration_type.upper()}")
output.append(f"Complexity Level: {plan.complexity.upper()}")
output.append(f"Estimated Duration: {plan.estimated_duration_hours} hours ({plan.estimated_duration_hours/24:.1f} days)")
output.append(f"Created: {plan.created_at}")
output.append("")
# Phases
output.append("MIGRATION PHASES")
output.append("-" * 40)
for i, phase in enumerate(plan.phases, 1):
output.append(f"{i}. {phase.name.upper()} ({phase.duration_hours}h)")
output.append(f" Description: {phase.description}")
output.append(f" Risk Level: {phase.risk_level.upper()}")
if phase.dependencies:
output.append(f" Dependencies: {', '.join(phase.dependencies)}")
output.append(" Tasks:")
for task in phase.tasks:
output.append(f" • {task}")
output.append(" Success Criteria:")
for criteria in phase.validation_criteria:
output.append(f" ✓ {criteria}")
output.append("")
# Risk Assessment
output.append("RISK ASSESSMENT")
output.append("-" * 40)
risk_by_severity = {}
for risk in plan.risks:
if risk.severity not in risk_by_severity:
risk_by_severity[risk.severity] = []
risk_by_severity[risk.severity].append(risk)
for severity in ["critical", "high", "medium", "low"]:
if severity in risk_by_severity:
output.append(f"{severity.upper()} SEVERITY RISKS:")
for risk in risk_by_severity[severity]:
output.append(f" • {risk.description}")
output.append(f" Category: {risk.category}")
output.append(f" Probability: {risk.probability} | Impact: {risk.impact}")
output.append(f" Mitigation: {risk.mitigation}")
output.append(f" Owner: {risk.owner}")
output.append("")
# Rollback Plan
output.append("ROLLBACK STRATEGY")
output.append("-" * 40)
output.append("Rollback Triggers:")
for trigger in plan.rollback_plan["rollback_triggers"]:
output.append(f" • {trigger}")
output.append("")
output.append("Rollback Phases:")
for rb_phase in plan.rollback_plan["rollback_phases"]:
output.append(f" {rb_phase['phase'].upper()}:")
for action in rb_phase["rollback_actions"]:
output.append(f" - {action}")
output.append(f" Estimated Time: {rb_phase['estimated_time_minutes']} minutes")
output.append("")
# Success Criteria
output.append("SUCCESS CRITERIA")
output.append("-" * 40)
for criteria in plan.success_criteria:
output.append(f"✓ {criteria}")
output.append("")
# Stakeholders
output.append("STAKEHOLDERS")
output.append("-" * 40)
for stakeholder in plan.stakeholders:
output.append(f"• {stakeholder}")
output.append("")
return "\n".join(output)
def main():
"""Main function with command line interface"""
parser = argparse.ArgumentParser(description="Generate comprehensive migration plans")
parser.add_argument("--input", "-i", required=True, help="Input migration specification file (JSON)")
parser.add_argument("--output", "-o", help="Output file for migration plan (JSON)")
parser.add_argument("--format", "-f", choices=["json", "text", "both"], default="both",
help="Output format")
parser.add_argument("--validate", action="store_true", help="Validate migration specification only")
args = parser.parse_args()
try:
# Load migration specification
with open(args.input, 'r') as f:
spec = json.load(f)
# Validate required fields
required_fields = ["type", "source", "target"]
for field in required_fields:
if field not in spec:
print(f"Error: Missing required field '{field}' in specification", file=sys.stderr)
return 1
if args.validate:
print("Migration specification is valid")
return 0
# Generate migration plan
planner = MigrationPlanner()
plan = planner.generate_plan(spec)
# Output results
if args.format in ["json", "both"]:
plan_dict = asdict(plan)
if args.output:
with open(args.output, 'w') as f:
json.dump(plan_dict, f, indent=2)
print(f"Migration plan saved to {args.output}")
else:
print(json.dumps(plan_dict, indent=2))
if args.format in ["text", "both"]:
human_plan = planner.generate_human_readable_plan(plan)
text_output = args.output.replace('.json', '.txt') if args.output else None
if text_output:
with open(text_output, 'w') as f:
f.write(human_plan)
print(f"Human-readable plan saved to {text_output}")
else:
print("\n" + "="*80)
print("HUMAN-READABLE MIGRATION PLAN")
print("="*80)
print(human_plan)
except FileNotFoundError:
print(f"Error: Input file '{args.input}' not found", file=sys.stderr)
return 1
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in input file: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(main()) #!/usr/bin/env python3
"""
Rollback Generator - Generate comprehensive rollback procedures for migrations
This tool takes a migration plan and generates detailed rollback procedures for each phase,
including data rollback scripts, service rollback steps, validation checks, and communication
templates to ensure safe and reliable migration reversals.
Author: Migration Architect Skill
Version: 1.0.0
License: MIT
"""
import json
import argparse
import sys
import datetime
import hashlib
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
class RollbackTrigger(Enum):
"""Types of rollback triggers"""
MANUAL = "manual"
AUTOMATED = "automated"
THRESHOLD_BASED = "threshold_based"
TIME_BASED = "time_based"
class RollbackUrgency(Enum):
"""Rollback urgency levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
EMERGENCY = "emergency"
@dataclass
class RollbackStep:
"""Individual rollback step"""
step_id: str
name: str
description: str
script_type: str # sql, bash, api, manual
script_content: str
estimated_duration_minutes: int
dependencies: List[str]
validation_commands: List[str]
success_criteria: List[str]
failure_escalation: str
rollback_order: int
@dataclass
class RollbackPhase:
"""Rollback phase containing multiple steps"""
phase_name: str
description: str
urgency_level: str
estimated_duration_minutes: int
prerequisites: List[str]
steps: List[RollbackStep]
validation_checkpoints: List[str]
communication_requirements: List[str]
risk_level: str
@dataclass
class RollbackTriggerCondition:
"""Conditions that trigger automatic rollback"""
trigger_id: str
name: str
condition: str
metric_threshold: Optional[Dict[str, Any]]
evaluation_window_minutes: int
auto_execute: bool
escalation_contacts: List[str]
@dataclass
class DataRecoveryPlan:
"""Data recovery and restoration plan"""
recovery_method: str # backup_restore, point_in_time, event_replay
backup_location: str
recovery_scripts: List[str]
data_validation_queries: List[str]
estimated_recovery_time_minutes: int
recovery_dependencies: List[str]
@dataclass
class CommunicationTemplate:
"""Communication template for rollback scenarios"""
template_type: str # start, progress, completion, escalation
audience: str # technical, business, executive, customers
subject: str
body: str
urgency: str
delivery_methods: List[str]
@dataclass
class RollbackRunbook:
"""Complete rollback runbook"""
runbook_id: str
migration_id: str
created_at: str
rollback_phases: List[RollbackPhase]
trigger_conditions: List[RollbackTriggerCondition]
data_recovery_plan: DataRecoveryPlan
communication_templates: List[CommunicationTemplate]
escalation_matrix: Dict[str, Any]
validation_checklist: List[str]
post_rollback_procedures: List[str]
emergency_contacts: List[Dict[str, str]]
class RollbackGenerator:
"""Main rollback generator class"""
def __init__(self):
self.rollback_templates = self._load_rollback_templates()
self.validation_templates = self._load_validation_templates()
self.communication_templates = self._load_communication_templates()
def _load_rollback_templates(self) -> Dict[str, Any]:
"""Load rollback script templates for different migration types"""
return {
"database": {
"schema_rollback": {
"drop_table": "DROP TABLE IF EXISTS {table_name};",
"drop_column": "ALTER TABLE {table_name} DROP COLUMN IF EXISTS {column_name};",
"restore_column": "ALTER TABLE {table_name} ADD COLUMN {column_definition};",
"revert_type": "ALTER TABLE {table_name} ALTER COLUMN {column_name} TYPE {original_type};",
"drop_constraint": "ALTER TABLE {table_name} DROP CONSTRAINT {constraint_name};",
"add_constraint": "ALTER TABLE {table_name} ADD CONSTRAINT {constraint_name} {constraint_definition};"
},
"data_rollback": {
"restore_backup": "pg_restore -d {database_name} -c {backup_file}",
"point_in_time_recovery": "SELECT pg_create_restore_point('pre_migration_{timestamp}');",
"delete_migrated_data": "DELETE FROM {table_name} WHERE migration_batch_id = '{batch_id}';",
"restore_original_values": "UPDATE {table_name} SET {column_name} = backup_{column_name} WHERE migration_flag = true;"
}
},
"service": {
"deployment_rollback": {
"rollback_blue_green": "kubectl patch service {service_name} -p '{\"spec\":{\"selector\":{\"version\":\"blue\"}}}'",
"rollback_canary": "kubectl scale deployment {service_name}-canary --replicas=0",
"restore_previous_version": "kubectl rollout undo deployment/{service_name} --to-revision={revision_number}",
"update_load_balancer": "aws elbv2 modify-rule --rule-arn {rule_arn} --actions Type=forward,TargetGroupArn={original_target_group}"
},
"configuration_rollback": {
"restore_config_map": "kubectl apply -f {original_config_file}",
"revert_feature_flags": "curl -X PUT {feature_flag_api}/flags/{flag_name} -d '{\"enabled\": false}'",
"restore_environment_vars": "kubectl set env deployment/{deployment_name} {env_var_name}={original_value}"
}
},
"infrastructure": {
"cloud_rollback": {
"revert_terraform": "terraform apply -target={resource_name} {rollback_plan_file}",
"restore_dns": "aws route53 change-resource-record-sets --hosted-zone-id {zone_id} --change-batch file://{rollback_dns_changes}",
"rollback_security_groups": "aws ec2 authorize-security-group-ingress --group-id {group_id} --protocol {protocol} --port {port} --cidr {cidr}",
"restore_iam_policies": "aws iam put-role-policy --role-name {role_name} --policy-name {policy_name} --policy-document file://{original_policy}"
},
"network_rollback": {
"restore_routing": "aws ec2 replace-route --route-table-id {route_table_id} --destination-cidr-block {cidr} --gateway-id {original_gateway}",
"revert_load_balancer": "aws elbv2 modify-load-balancer --load-balancer-arn {lb_arn} --scheme {original_scheme}",
"restore_firewall_rules": "aws ec2 revoke-security-group-ingress --group-id {group_id} --protocol {protocol} --port {port} --source-group {source_group}"
}
}
}
def _load_validation_templates(self) -> Dict[str, List[str]]:
"""Load validation command templates"""
return {
"database": [
"SELECT COUNT(*) FROM {table_name};",
"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}';",
"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = '{column_name}';",
"SELECT COUNT(DISTINCT {primary_key}) FROM {table_name};",
"SELECT MAX({timestamp_column}) FROM {table_name};"
],
"service": [
"curl -f {health_check_url}",
"kubectl get pods -l app={service_name} --field-selector=status.phase=Running",
"kubectl logs deployment/{service_name} --tail=100 | grep -i error",
"curl -f {service_endpoint}/api/v1/status"
],
"infrastructure": [
"aws ec2 describe-instances --instance-ids {instance_id} --query 'Reservations[*].Instances[*].State.Name'",
"nslookup {domain_name}",
"curl -I {load_balancer_url}",
"aws elbv2 describe-target-health --target-group-arn {target_group_arn}"
]
}
def _load_communication_templates(self) -> Dict[str, Dict[str, str]]:
"""Load communication templates"""
return {
"rollback_start": {
"technical": {
"subject": "ROLLBACK INITIATED: {migration_name}",
"body": """Team,
We have initiated rollback for migration: {migration_name}
Rollback ID: {rollback_id}
Start Time: {start_time}
Estimated Duration: {estimated_duration}
Reason: {rollback_reason}
Current Status: Rolling back phase {current_phase}
Next Updates: Every 15 minutes or upon phase completion
Actions Required:
- Monitor system health dashboards
- Stand by for escalation if needed
- Do not make manual changes during rollback
Incident Commander: {incident_commander}
"""
},
"business": {
"subject": "System Rollback In Progress - {system_name}",
"body": """Business Stakeholders,
We are currently performing a planned rollback of the {system_name} migration due to {rollback_reason}.
Impact: {business_impact}
Expected Resolution: {estimated_completion_time}
Affected Services: {affected_services}
We will provide updates every 30 minutes.
Contact: {business_contact}
"""
},
"executive": {
"subject": "EXEC ALERT: Critical System Rollback - {system_name}",
"body": """Executive Team,
A critical rollback is in progress for {system_name}.
Summary:
- Rollback Reason: {rollback_reason}
- Business Impact: {business_impact}
- Expected Resolution: {estimated_completion_time}
- Customer Impact: {customer_impact}
We are following established procedures and will update hourly.
Escalation: {escalation_contact}
"""
}
},
"rollback_complete": {
"technical": {
"subject": "ROLLBACK COMPLETED: {migration_name}",
"body": """Team,
Rollback has been successfully completed for migration: {migration_name}
Summary:
- Start Time: {start_time}
- End Time: {end_time}
- Duration: {actual_duration}
- Phases Completed: {completed_phases}
Validation Results:
{validation_results}
System Status: {system_status}
Next Steps:
- Continue monitoring for 24 hours
- Post-rollback review scheduled for {review_date}
- Root cause analysis to begin
All clear to resume normal operations.
Incident Commander: {incident_commander}
"""
}
}
}
def generate_rollback_runbook(self, migration_plan: Dict[str, Any]) -> RollbackRunbook:
"""Generate comprehensive rollback runbook from migration plan"""
runbook_id = f"rb_{hashlib.md5(str(migration_plan).encode()).hexdigest()[:8]}"
migration_id = migration_plan.get("migration_id", "unknown")
migration_type = migration_plan.get("migration_type", "unknown")
# Generate rollback phases (reverse order of migration phases)
rollback_phases = self._generate_rollback_phases(migration_plan)
# Generate trigger conditions
trigger_conditions = self._generate_trigger_conditions(migration_plan)
# Generate data recovery plan
data_recovery_plan = self._generate_data_recovery_plan(migration_plan)
# Generate communication templates
communication_templates = self._generate_communication_templates(migration_plan)
# Generate escalation matrix
escalation_matrix = self._generate_escalation_matrix(migration_plan)
# Generate validation checklist
validation_checklist = self._generate_validation_checklist(migration_plan)
# Generate post-rollback procedures
post_rollback_procedures = self._generate_post_rollback_procedures(migration_plan)
# Generate emergency contacts
emergency_contacts = self._generate_emergency_contacts(migration_plan)
return RollbackRunbook(
runbook_id=runbook_id,
migration_id=migration_id,
created_at=datetime.datetime.now().isoformat(),
rollback_phases=rollback_phases,
trigger_conditions=trigger_conditions,
data_recovery_plan=data_recovery_plan,
communication_templates=communication_templates,
escalation_matrix=escalation_matrix,
validation_checklist=validation_checklist,
post_rollback_procedures=post_rollback_procedures,
emergency_contacts=emergency_contacts
)
def _generate_rollback_phases(self, migration_plan: Dict[str, Any]) -> List[RollbackPhase]:
"""Generate rollback phases from migration plan"""
migration_phases = migration_plan.get("phases", [])
migration_type = migration_plan.get("migration_type", "unknown")
rollback_phases = []
# Reverse the order of migration phases for rollback
for i, phase in enumerate(reversed(migration_phases)):
if isinstance(phase, dict):
phase_name = phase.get("name", f"phase_{i}")
phase_duration = phase.get("duration_hours", 2) * 60 # Convert to minutes
phase_risk = phase.get("risk_level", "medium")
else:
phase_name = str(phase)
phase_duration = 120 # Default 2 hours
phase_risk = "medium"
rollback_steps = self._generate_rollback_steps(phase_name, migration_type, i)
rollback_phase = RollbackPhase(
phase_name=f"rollback_{phase_name}",
description=f"Rollback changes made during {phase_name} phase",
urgency_level=self._calculate_urgency(phase_risk),
estimated_duration_minutes=phase_duration // 2, # Rollback typically faster
prerequisites=self._get_rollback_prerequisites(phase_name, i),
steps=rollback_steps,
validation_checkpoints=self._get_validation_checkpoints(phase_name, migration_type),
communication_requirements=self._get_communication_requirements(phase_name, phase_risk),
risk_level=phase_risk
)
rollback_phases.append(rollback_phase)
return rollback_phases
def _generate_rollback_steps(self, phase_name: str, migration_type: str, phase_index: int) -> List[RollbackStep]:
"""Generate specific rollback steps for a phase"""
steps = []
templates = self.rollback_templates.get(migration_type, {})
if migration_type == "database":
if "migration" in phase_name.lower() or "cutover" in phase_name.lower():
# Data rollback steps
steps.extend([
RollbackStep(
step_id=f"rb_data_{phase_index}_01",
name="Stop data migration processes",
description="Halt all ongoing data migration processes",
script_type="sql",
script_content="-- Stop migration processes\nSELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE query LIKE '%migration%';",
estimated_duration_minutes=5,
dependencies=[],
validation_commands=["SELECT COUNT(*) FROM pg_stat_activity WHERE query LIKE '%migration%';"],
success_criteria=["No active migration processes"],
failure_escalation="Contact DBA immediately",
rollback_order=1
),
RollbackStep(
step_id=f"rb_data_{phase_index}_02",
name="Restore from backup",
description="Restore database from pre-migration backup",
script_type="bash",
script_content=templates.get("data_rollback", {}).get("restore_backup", "pg_restore -d {database_name} -c {backup_file}"),
estimated_duration_minutes=30,
dependencies=[f"rb_data_{phase_index}_01"],
validation_commands=["SELECT COUNT(*) FROM information_schema.tables;"],
success_criteria=["Database restored successfully", "All expected tables present"],
failure_escalation="Escalate to senior DBA and infrastructure team",
rollback_order=2
)
])
if "preparation" in phase_name.lower():
# Schema rollback steps
steps.append(
RollbackStep(
step_id=f"rb_schema_{phase_index}_01",
name="Drop migration artifacts",
description="Remove temporary migration tables and procedures",
script_type="sql",
script_content="-- Drop migration artifacts\nDROP TABLE IF EXISTS migration_log;\nDROP PROCEDURE IF EXISTS migrate_data();",
estimated_duration_minutes=5,
dependencies=[],
validation_commands=["SELECT COUNT(*) FROM information_schema.tables WHERE table_name LIKE '%migration%';"],
success_criteria=["No migration artifacts remain"],
failure_escalation="Manual cleanup required",
rollback_order=1
)
)
elif migration_type == "service":
if "cutover" in phase_name.lower():
# Service rollback steps
steps.extend([
RollbackStep(
step_id=f"rb_service_{phase_index}_01",
name="Redirect traffic back to old service",
description="Update load balancer to route traffic back to previous service version",
script_type="bash",
script_content=templates.get("deployment_rollback", {}).get("update_load_balancer", "aws elbv2 modify-rule --rule-arn {rule_arn} --actions Type=forward,TargetGroupArn={original_target_group}"),
estimated_duration_minutes=2,
dependencies=[],
validation_commands=["curl -f {health_check_url}"],
success_criteria=["Traffic routing to original service", "Health checks passing"],
failure_escalation="Emergency procedure - manual traffic routing",
rollback_order=1
),
RollbackStep(
step_id=f"rb_service_{phase_index}_02",
name="Rollback service deployment",
description="Revert to previous service deployment version",
script_type="bash",
script_content=templates.get("deployment_rollback", {}).get("restore_previous_version", "kubectl rollout undo deployment/{service_name} --to-revision={revision_number}"),
estimated_duration_minutes=10,
dependencies=[f"rb_service_{phase_index}_01"],
validation_commands=["kubectl get pods -l app={service_name} --field-selector=status.phase=Running"],
success_criteria=["Previous version deployed", "All pods running"],
failure_escalation="Manual pod management required",
rollback_order=2
)
])
elif migration_type == "infrastructure":
steps.extend([
RollbackStep(
step_id=f"rb_infra_{phase_index}_01",
name="Revert infrastructure changes",
description="Apply terraform plan to revert infrastructure to previous state",
script_type="bash",
script_content=templates.get("cloud_rollback", {}).get("revert_terraform", "terraform apply -target={resource_name} {rollback_plan_file}"),
estimated_duration_minutes=15,
dependencies=[],
validation_commands=["terraform plan -detailed-exitcode"],
success_criteria=["Infrastructure matches previous state", "No planned changes"],
failure_escalation="Manual infrastructure review required",
rollback_order=1
),
RollbackStep(
step_id=f"rb_infra_{phase_index}_02",
name="Restore DNS configuration",
description="Revert DNS changes to point back to original infrastructure",
script_type="bash",
script_content=templates.get("cloud_rollback", {}).get("restore_dns", "aws route53 change-resource-record-sets --hosted-zone-id {zone_id} --change-batch file://{rollback_dns_changes}"),
estimated_duration_minutes=10,
dependencies=[f"rb_infra_{phase_index}_01"],
validation_commands=["nslookup {domain_name}"],
success_criteria=["DNS resolves to original endpoints"],
failure_escalation="Contact DNS administrator",
rollback_order=2
)
])
# Add generic validation step for all migration types
steps.append(
RollbackStep(
step_id=f"rb_validate_{phase_index}_final",
name="Validate rollback completion",
description=f"Comprehensive validation that {phase_name} rollback completed successfully",
script_type="manual",
script_content="Execute validation checklist for this phase",
estimated_duration_minutes=10,
dependencies=[step.step_id for step in steps],
validation_commands=self.validation_templates.get(migration_type, []),
success_criteria=[f"{phase_name} fully rolled back", "All validation checks pass"],
failure_escalation=f"Investigate {phase_name} rollback failures",
rollback_order=99
)
)
return steps
def _generate_trigger_conditions(self, migration_plan: Dict[str, Any]) -> List[RollbackTriggerCondition]:
"""Generate automatic rollback trigger conditions"""
triggers = []
migration_type = migration_plan.get("migration_type", "unknown")
# Generic triggers for all migration types
triggers.extend([
RollbackTriggerCondition(
trigger_id="error_rate_spike",
name="Error Rate Spike",
condition="error_rate > baseline * 5 for 5 minutes",
metric_threshold={
"metric": "error_rate",
"operator": "greater_than",
"value": "baseline_error_rate * 5",
"duration_minutes": 5
},
evaluation_window_minutes=5,
auto_execute=True,
escalation_contacts=["on_call_engineer", "migration_lead"]
),
RollbackTriggerCondition(
trigger_id="response_time_degradation",
name="Response Time Degradation",
condition="p95_response_time > baseline * 3 for 10 minutes",
metric_threshold={
"metric": "p95_response_time",
"operator": "greater_than",
"value": "baseline_p95 * 3",
"duration_minutes": 10
},
evaluation_window_minutes=10,
auto_execute=False,
escalation_contacts=["performance_team", "migration_lead"]
),
RollbackTriggerCondition(
trigger_id="availability_drop",
name="Service Availability Drop",
condition="availability < 95% for 2 minutes",
metric_threshold={
"metric": "availability",
"operator": "less_than",
"value": 0.95,
"duration_minutes": 2
},
evaluation_window_minutes=2,
auto_execute=True,
escalation_contacts=["sre_team", "incident_commander"]
)
])
# Migration-type specific triggers
if migration_type == "database":
triggers.extend([
RollbackTriggerCondition(
trigger_id="data_integrity_failure",
name="Data Integrity Check Failure",
condition="data_validation_failures > 0",
metric_threshold={
"metric": "data_validation_failures",
"operator": "greater_than",
"value": 0,
"duration_minutes": 1
},
evaluation_window_minutes=1,
auto_execute=True,
escalation_contacts=["dba_team", "data_team"]
),
RollbackTriggerCondition(
trigger_id="migration_progress_stalled",
name="Migration Progress Stalled",
condition="migration_progress unchanged for 30 minutes",
metric_threshold={
"metric": "migration_progress_rate",
"operator": "equals",
"value": 0,
"duration_minutes": 30
},
evaluation_window_minutes=30,
auto_execute=False,
escalation_contacts=["migration_team", "dba_team"]
)
])
elif migration_type == "service":
triggers.extend([
RollbackTriggerCondition(
trigger_id="cpu_utilization_spike",
name="CPU Utilization Spike",
condition="cpu_utilization > 90% for 15 minutes",
metric_threshold={
"metric": "cpu_utilization",
"operator": "greater_than",
"value": 0.90,
"duration_minutes": 15
},
evaluation_window_minutes=15,
auto_execute=False,
escalation_contacts=["devops_team", "infrastructure_team"]
),
RollbackTriggerCondition(
trigger_id="memory_leak_detected",
name="Memory Leak Detected",
condition="memory_usage increasing continuously for 20 minutes",
metric_threshold={
"metric": "memory_growth_rate",
"operator": "greater_than",
"value": "1MB/minute",
"duration_minutes": 20
},
evaluation_window_minutes=20,
auto_execute=True,
escalation_contacts=["development_team", "sre_team"]
)
])
return triggers
def _generate_data_recovery_plan(self, migration_plan: Dict[str, Any]) -> DataRecoveryPlan:
"""Generate data recovery plan"""
migration_type = migration_plan.get("migration_type", "unknown")
if migration_type == "database":
return DataRecoveryPlan(
recovery_method="point_in_time",
backup_location="/backups/pre_migration_{migration_id}_{timestamp}.sql",
recovery_scripts=[
"pg_restore -d production -c /backups/pre_migration_backup.sql",
"SELECT pg_create_restore_point('rollback_point');",
"VACUUM ANALYZE; -- Refresh statistics after restore"
],
data_validation_queries=[
"SELECT COUNT(*) FROM critical_business_table;",
"SELECT MAX(created_at) FROM audit_log;",
"SELECT COUNT(DISTINCT user_id) FROM user_sessions;",
"SELECT SUM(amount) FROM financial_transactions WHERE date = CURRENT_DATE;"
],
estimated_recovery_time_minutes=45,
recovery_dependencies=["database_instance_running", "backup_file_accessible"]
)
else:
return DataRecoveryPlan(
recovery_method="backup_restore",
backup_location="/backups/pre_migration_state",
recovery_scripts=[
"# Restore configuration files from backup",
"cp -r /backups/pre_migration_state/config/* /app/config/",
"# Restart services with previous configuration",
"systemctl restart application_service"
],
data_validation_queries=[
"curl -f http://localhost:8080/health",
"curl -f http://localhost:8080/api/status"
],
estimated_recovery_time_minutes=20,
recovery_dependencies=["service_stopped", "backup_accessible"]
)
def _generate_communication_templates(self, migration_plan: Dict[str, Any]) -> List[CommunicationTemplate]:
"""Generate communication templates for rollback scenarios"""
templates = []
base_templates = self.communication_templates
# Rollback start notifications
for audience in ["technical", "business", "executive"]:
if audience in base_templates["rollback_start"]:
template_data = base_templates["rollback_start"][audience]
templates.append(CommunicationTemplate(
template_type="rollback_start",
audience=audience,
subject=template_data["subject"],
body=template_data["body"],
urgency="high" if audience == "executive" else "medium",
delivery_methods=["email", "slack"] if audience == "technical" else ["email"]
))
# Rollback completion notifications
for audience in ["technical", "business"]:
if audience in base_templates.get("rollback_complete", {}):
template_data = base_templates["rollback_complete"][audience]
templates.append(CommunicationTemplate(
template_type="rollback_complete",
audience=audience,
subject=template_data["subject"],
body=template_data["body"],
urgency="medium",
delivery_methods=["email", "slack"] if audience == "technical" else ["email"]
))
# Emergency escalation template
templates.append(CommunicationTemplate(
template_type="emergency_escalation",
audience="executive",
subject="CRITICAL: Rollback Emergency - {migration_name}",
body="""CRITICAL SITUATION - IMMEDIATE ATTENTION REQUIRED
Migration: {migration_name}
Issue: Rollback procedure has encountered critical failures
Current Status: {current_status}
Failed Components: {failed_components}
Business Impact: {business_impact}
Customer Impact: {customer_impact}
Immediate Actions:
1. Emergency response team activated
2. {emergency_action_1}
3. {emergency_action_2}
War Room: {war_room_location}
Bridge Line: {conference_bridge}
Next Update: {next_update_time}
Incident Commander: {incident_commander}
Executive On-Call: {executive_on_call}
""",
urgency="emergency",
delivery_methods=["email", "sms", "phone_call"]
))
return templates
def _generate_escalation_matrix(self, migration_plan: Dict[str, Any]) -> Dict[str, Any]:
"""Generate escalation matrix for different failure scenarios"""
return {
"level_1": {
"trigger": "Single component failure",
"response_time_minutes": 5,
"contacts": ["on_call_engineer", "migration_lead"],
"actions": ["Investigate issue", "Attempt automated remediation", "Monitor closely"]
},
"level_2": {
"trigger": "Multiple component failures or single critical failure",
"response_time_minutes": 2,
"contacts": ["senior_engineer", "team_lead", "devops_lead"],
"actions": ["Initiate rollback", "Establish war room", "Notify stakeholders"]
},
"level_3": {
"trigger": "System-wide failure or data corruption",
"response_time_minutes": 1,
"contacts": ["engineering_manager", "cto", "incident_commander"],
"actions": ["Emergency rollback", "All hands on deck", "Executive notification"]
},
"emergency": {
"trigger": "Business-critical failure with customer impact",
"response_time_minutes": 0,
"contacts": ["ceo", "cto", "head_of_operations"],
"actions": ["Emergency procedures", "Customer communication", "Media preparation if needed"]
}
}
def _generate_validation_checklist(self, migration_plan: Dict[str, Any]) -> List[str]:
"""Generate comprehensive validation checklist"""
migration_type = migration_plan.get("migration_type", "unknown")
base_checklist = [
"Verify system is responding to health checks",
"Confirm error rates are within normal parameters",
"Validate response times meet SLA requirements",
"Check all critical business processes are functioning",
"Verify monitoring and alerting systems are operational",
"Confirm no data corruption has occurred",
"Validate security controls are functioning properly",
"Check backup systems are working correctly",
"Verify integration points with downstream systems",
"Confirm user authentication and authorization working"
]
if migration_type == "database":
base_checklist.extend([
"Validate database schema matches expected state",
"Confirm referential integrity constraints",
"Check database performance metrics",
"Verify data consistency across related tables",
"Validate indexes and statistics are optimal",
"Confirm transaction logs are clean",
"Check database connections and connection pooling"
])
elif migration_type == "service":
base_checklist.extend([
"Verify service discovery is working correctly",
"Confirm load balancing is distributing traffic properly",
"Check service-to-service communication",
"Validate API endpoints are responding correctly",
"Confirm feature flags are in correct state",
"Check resource utilization (CPU, memory, disk)",
"Verify container orchestration is healthy"
])
elif migration_type == "infrastructure":
base_checklist.extend([
"Verify network connectivity between components",
"Confirm DNS resolution is working correctly",
"Check firewall rules and security groups",
"Validate load balancer configuration",
"Confirm SSL/TLS certificates are valid",
"Check storage systems are accessible",
"Verify backup and disaster recovery systems"
])
return base_checklist
def _generate_post_rollback_procedures(self, migration_plan: Dict[str, Any]) -> List[str]:
"""Generate post-rollback procedures"""
return [
"Monitor system stability for 24-48 hours post-rollback",
"Conduct thorough post-rollback testing of all critical paths",
"Review and analyze rollback metrics and timing",
"Document lessons learned and rollback procedure improvements",
"Schedule post-mortem meeting with all stakeholders",
"Update rollback procedures based on actual experience",
"Communicate rollback completion to all stakeholders",
"Archive rollback logs and artifacts for future reference",
"Review and update monitoring thresholds if needed",
"Plan for next migration attempt with improved procedures",
"Conduct security review to ensure no vulnerabilities introduced",
"Update disaster recovery procedures if affected by rollback",
"Review capacity planning based on rollback resource usage",
"Update documentation with rollback experience and timings"
]
def _generate_emergency_contacts(self, migration_plan: Dict[str, Any]) -> List[Dict[str, str]]:
"""Generate emergency contact list"""
return [
{
"role": "Incident Commander",
"name": "TBD - Assigned during migration",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "[email protected]",
"backup_contact": "[email protected]"
},
{
"role": "Technical Lead",
"name": "TBD - Migration technical owner",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "[email protected]",
"backup_contact": "[email protected]"
},
{
"role": "Business Owner",
"name": "TBD - Business stakeholder",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "[email protected]",
"backup_contact": "[email protected]"
},
{
"role": "On-Call Engineer",
"name": "Current on-call rotation",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "[email protected]",
"backup_contact": "[email protected]"
},
{
"role": "Executive Escalation",
"name": "CTO/VP Engineering",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "[email protected]",
"backup_contact": "[email protected]"
}
]
def _calculate_urgency(self, risk_level: str) -> str:
"""Calculate rollback urgency based on risk level"""
risk_to_urgency = {
"low": "low",
"medium": "medium",
"high": "high",
"critical": "emergency"
}
return risk_to_urgency.get(risk_level, "medium")
def _get_rollback_prerequisites(self, phase_name: str, phase_index: int) -> List[str]:
"""Get prerequisites for rollback phase"""
prerequisites = [
"Incident commander assigned and briefed",
"All team members notified of rollback initiation",
"Monitoring systems confirmed operational",
"Backup systems verified and accessible"
]
if phase_index > 0:
prerequisites.append("Previous rollback phase completed successfully")
if "cutover" in phase_name.lower():
prerequisites.extend([
"Traffic redirection capabilities confirmed",
"Load balancer configuration backed up",
"DNS changes prepared for quick execution"
])
if "data" in phase_name.lower() or "migration" in phase_name.lower():
prerequisites.extend([
"Database backup verified and accessible",
"Data validation queries prepared",
"Database administrator on standby"
])
return prerequisites
def _get_validation_checkpoints(self, phase_name: str, migration_type: str) -> List[str]:
"""Get validation checkpoints for rollback phase"""
checkpoints = [
f"{phase_name} rollback steps completed",
"System health checks passing",
"No critical errors in logs",
"Key metrics within acceptable ranges"
]
validation_commands = self.validation_templates.get(migration_type, [])
checkpoints.extend([f"Validation command passed: {cmd[:50]}..." for cmd in validation_commands[:3]])
return checkpoints
def _get_communication_requirements(self, phase_name: str, risk_level: str) -> List[str]:
"""Get communication requirements for rollback phase"""
base_requirements = [
"Notify incident commander of phase start/completion",
"Update rollback status dashboard",
"Log all actions and decisions"
]
if risk_level in ["high", "critical"]:
base_requirements.extend([
"Notify all stakeholders of phase progress",
"Update executive team if rollback extends beyond expected time",
"Prepare customer communication if needed"
])
if "cutover" in phase_name.lower():
base_requirements.append("Immediate notification when traffic is redirected")
return base_requirements
def generate_human_readable_runbook(self, runbook: RollbackRunbook) -> str:
"""Generate human-readable rollback runbook"""
output = []
output.append("=" * 80)
output.append(f"ROLLBACK RUNBOOK: {runbook.runbook_id}")
output.append("=" * 80)
output.append(f"Migration ID: {runbook.migration_id}")
output.append(f"Created: {runbook.created_at}")
output.append("")
# Emergency Contacts
output.append("EMERGENCY CONTACTS")
output.append("-" * 40)
for contact in runbook.emergency_contacts:
output.append(f"{contact['role']}: {contact['name']}")
output.append(f" Phone: {contact['primary_phone']}")
output.append(f" Email: {contact['email']}")
output.append(f" Backup: {contact['backup_contact']}")
output.append("")
# Escalation Matrix
output.append("ESCALATION MATRIX")
output.append("-" * 40)
for level, details in runbook.escalation_matrix.items():
output.append(f"{level.upper()}:")
output.append(f" Trigger: {details['trigger']}")
output.append(f" Response Time: {details['response_time_minutes']} minutes")
output.append(f" Contacts: {', '.join(details['contacts'])}")
output.append(f" Actions: {', '.join(details['actions'])}")
output.append("")
# Rollback Trigger Conditions
output.append("AUTOMATIC ROLLBACK TRIGGERS")
output.append("-" * 40)
for trigger in runbook.trigger_conditions:
output.append(f"• {trigger.name}")
output.append(f" Condition: {trigger.condition}")
output.append(f" Auto-Execute: {'Yes' if trigger.auto_execute else 'No'}")
output.append(f" Evaluation Window: {trigger.evaluation_window_minutes} minutes")
output.append(f" Contacts: {', '.join(trigger.escalation_contacts)}")
output.append("")
# Rollback Phases
output.append("ROLLBACK PHASES")
output.append("-" * 40)
for i, phase in enumerate(runbook.rollback_phases, 1):
output.append(f"{i}. {phase.phase_name.upper()}")
output.append(f" Description: {phase.description}")
output.append(f" Urgency: {phase.urgency_level.upper()}")
output.append(f" Duration: {phase.estimated_duration_minutes} minutes")
output.append(f" Risk Level: {phase.risk_level.upper()}")
if phase.prerequisites:
output.append(" Prerequisites:")
for prereq in phase.prerequisites:
output.append(f" ✓ {prereq}")
output.append(" Steps:")
for step in sorted(phase.steps, key=lambda x: x.rollback_order):
output.append(f" {step.rollback_order}. {step.name}")
output.append(f" Duration: {step.estimated_duration_minutes} min")
output.append(f" Type: {step.script_type}")
if step.script_content and step.script_type != "manual":
output.append(" Script:")
for line in step.script_content.split('\n')[:3]: # Show first 3 lines
output.append(f" {line}")
if len(step.script_content.split('\n')) > 3:
output.append(" ...")
output.append(f" Success Criteria: {', '.join(step.success_criteria)}")
output.append("")
if phase.validation_checkpoints:
output.append(" Validation Checkpoints:")
for checkpoint in phase.validation_checkpoints:
output.append(f" ☐ {checkpoint}")
output.append("")
# Data Recovery Plan
output.append("DATA RECOVERY PLAN")
output.append("-" * 40)
drp = runbook.data_recovery_plan
output.append(f"Recovery Method: {drp.recovery_method}")
output.append(f"Backup Location: {drp.backup_location}")
output.append(f"Estimated Recovery Time: {drp.estimated_recovery_time_minutes} minutes")
output.append("Recovery Scripts:")
for script in drp.recovery_scripts:
output.append(f" • {script}")
output.append("Validation Queries:")
for query in drp.data_validation_queries:
output.append(f" • {query}")
output.append("")
# Validation Checklist
output.append("POST-ROLLBACK VALIDATION CHECKLIST")
output.append("-" * 40)
for i, item in enumerate(runbook.validation_checklist, 1):
output.append(f"{i:2d}. ☐ {item}")
output.append("")
# Post-Rollback Procedures
output.append("POST-ROLLBACK PROCEDURES")
output.append("-" * 40)
for i, procedure in enumerate(runbook.post_rollback_procedures, 1):
output.append(f"{i:2d}. {procedure}")
output.append("")
return "\n".join(output)
def main():
"""Main function with command line interface"""
parser = argparse.ArgumentParser(description="Generate comprehensive rollback runbooks from migration plans")
parser.add_argument("--input", "-i", required=True, help="Input migration plan file (JSON)")
parser.add_argument("--output", "-o", help="Output file for rollback runbook (JSON)")
parser.add_argument("--format", "-f", choices=["json", "text", "both"], default="both", help="Output format")
args = parser.parse_args()
try:
# Load migration plan
with open(args.input, 'r') as f:
migration_plan = json.load(f)
# Validate required fields
if "migration_id" not in migration_plan and "source" not in migration_plan:
print("Error: Migration plan must contain migration_id or source field", file=sys.stderr)
return 1
# Generate rollback runbook
generator = RollbackGenerator()
runbook = generator.generate_rollback_runbook(migration_plan)
# Output results
if args.format in ["json", "both"]:
runbook_dict = asdict(runbook)
if args.output:
with open(args.output, 'w') as f:
json.dump(runbook_dict, f, indent=2)
print(f"Rollback runbook saved to {args.output}")
else:
print(json.dumps(runbook_dict, indent=2))
if args.format in ["text", "both"]:
human_runbook = generator.generate_human_readable_runbook(runbook)
text_output = args.output.replace('.json', '.txt') if args.output else None
if text_output:
with open(text_output, 'w') as f:
f.write(human_runbook)
print(f"Human-readable runbook saved to {text_output}")
else:
print("\n" + "="*80)
print("HUMAN-READABLE ROLLBACK RUNBOOK")
print("="*80)
print(human_runbook)
except FileNotFoundError:
print(f"Error: Input file '{args.input}' not found", file=sys.stderr)
return 1
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in input file: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(main()) Install this Skill
Skills give your AI agent a consistent, structured approach to this task — better output than a one-off prompt.
npx skills add alirezarezvani/claude-skills --skill engineering/migration-architect Community skill by @alirezarezvani. Need a walkthrough? See the install guide →
Works with
Prefer no terminal? Download the ZIP and place it manually.
Details
- Category
- Development
- License
- MIT
- Author
- @alirezarezvani
- Source
- GitHub →
- Source file
-
show path
engineering/migration-architect/SKILL.md
People who install this also use
Senior Software Architect
Design system architecture with C4 and sequence diagrams, write Architecture Decision Records, evaluate tech stacks, and guide architectural trade-offs.
@alirezarezvani
Database Designer
Design production-grade databases — schema modeling, normalization, indexing strategy, query optimization, and choosing between SQL and NoSQL.
@alirezarezvani
Senior DevOps Engineer
CI/CD pipeline design, Infrastructure as Code, containerization with Docker and Kubernetes, and deployment automation from a senior DevOps perspective.
@alirezarezvani