Database Designer
Design production-grade databases — schema modeling, normalization, indexing strategy, query optimization, and choosing between SQL and NoSQL.
What this skill does
Build production-ready database structures that stay fast and organized as your application grows. You can generate optimized designs, plan safe updates without downtime, and ensure security best practices are built in from the start. Use this whenever you need to launch a new data system or improve the performance of an existing one.
name: “database-designer” description: “Database Designer - POWERFUL Tier Skill”
Database Designer - POWERFUL Tier Skill
Overview
A comprehensive database design skill that provides expert-level analysis, optimization, and migration capabilities for modern database systems. This skill combines theoretical principles with practical tools to help architects and developers create scalable, performant, and maintainable database schemas.
Core Competencies
Schema Design & Analysis
- Normalization Analysis: Automated detection of normalization levels (1NF through BCNF)
- Denormalization Strategy: Smart recommendations for performance optimization
- Data Type Optimization: Identification of inappropriate types and size issues
- Constraint Analysis: Missing foreign keys, unique constraints, and null checks
- Naming Convention Validation: Consistent table and column naming patterns
- ERD Generation: Automatic Mermaid diagram creation from DDL
Index Optimization
- Index Gap Analysis: Identification of missing indexes on foreign keys and query patterns
- Composite Index Strategy: Optimal column ordering for multi-column indexes
- Index Redundancy Detection: Elimination of overlapping and unused indexes
- Performance Impact Modeling: Selectivity estimation and query cost analysis
- Index Type Selection: B-tree, hash, partial, covering, and specialized indexes
Migration Management
- Zero-Downtime Migrations: Expand-contract pattern implementation
- Schema Evolution: Safe column additions, deletions, and type changes
- Data Migration Scripts: Automated data transformation and validation
- Rollback Strategy: Complete reversal capabilities with validation
- Execution Planning: Ordered migration steps with dependency resolution
Database Design Principles
→ See references/database-design-reference.md for details
Best Practices
Schema Design
- Use meaningful names: Clear, consistent naming conventions
- Choose appropriate data types: Right-sized columns for storage efficiency
- Define proper constraints: Foreign keys, check constraints, unique indexes
- Consider future growth: Plan for scale from the beginning
- Document relationships: Clear foreign key relationships and business rules
Performance Optimization
- Index strategically: Cover common query patterns without over-indexing
- Monitor query performance: Regular analysis of slow queries
- Partition large tables: Improve query performance and maintenance
- Use appropriate isolation levels: Balance consistency with performance
- Implement connection pooling: Efficient resource utilization
Security Considerations
- Principle of least privilege: Grant minimal necessary permissions
- Encrypt sensitive data: At rest and in transit
- Audit access patterns: Monitor and log database access
- Validate inputs: Prevent SQL injection attacks
- Regular security updates: Keep database software current
Conclusion
Effective database design requires balancing multiple competing concerns: performance, scalability, maintainability, and business requirements. This skill provides the tools and knowledge to make informed decisions throughout the database lifecycle, from initial schema design through production optimization and evolution.
The included tools automate common analysis and optimization tasks, while the comprehensive guides provide the theoretical foundation for making sound architectural decisions. Whether building a new system or optimizing an existing one, these resources provide expert-level guidance for creating robust, scalable database solutions.
Database Designer - POWERFUL Tier Skill
A comprehensive database design and analysis toolkit that provides expert-level schema analysis, index optimization, and migration generation capabilities for modern database systems.
Features
🔍 Schema Analyzer
- Normalization Analysis: Automated detection of 1NF through BCNF violations
- Data Type Optimization: Identifies antipatterns and inappropriate types
- Constraint Analysis: Finds missing foreign keys, unique constraints, and checks
- ERD Generation: Creates Mermaid diagrams from DDL or JSON schema
- Naming Convention Validation: Ensures consistent naming patterns
⚡ Index Optimizer
- Missing Index Detection: Identifies indexes needed for query patterns
- Composite Index Design: Optimizes column ordering for maximum efficiency
- Redundancy Analysis: Finds duplicate and overlapping indexes
- Performance Modeling: Estimates selectivity and query performance impact
- Covering Index Recommendations: Eliminates table lookups
🚀 Migration Generator
- Zero-Downtime Migrations: Implements expand-contract patterns
- Schema Evolution: Handles column changes, table renames, constraint updates
- Data Migration Scripts: Automated data transformation and validation
- Rollback Planning: Complete reversal capabilities for all changes
- Execution Orchestration: Dependency-aware migration ordering
Quick Start
Prerequisites
- Python 3.7+ (no external dependencies required)
- Database schema in SQL DDL format or JSON
- Query patterns (for index optimization)
Installation
# Clone or download the database-designer skill
cd engineering/database-designer/
# Make scripts executable
chmod +x *.pyUsage Examples
Schema Analysis
Analyze SQL DDL file:
python schema_analyzer.py --input assets/sample_schema.sql --output-format textGenerate ERD diagram:
python schema_analyzer.py --input assets/sample_schema.sql --generate-erd --output analysis.txtJSON schema analysis:
python schema_analyzer.py --input assets/sample_schema.json --output-format json --output results.jsonIndex Optimization
Basic index analysis:
python index_optimizer.py --schema assets/sample_schema.json --queries assets/sample_query_patterns.jsonHigh-priority recommendations only:
python index_optimizer.py --schema assets/sample_schema.json --queries assets/sample_query_patterns.json --min-priority 2JSON output with existing index analysis:
python index_optimizer.py --schema assets/sample_schema.json --queries assets/sample_query_patterns.json --format json --analyze-existingMigration Generation
Generate migration between schemas:
python migration_generator.py --current assets/current_schema.json --target assets/target_schema.jsonZero-downtime migration:
python migration_generator.py --current current.json --target target.json --zero-downtime --format sqlInclude validation queries:
python migration_generator.py --current current.json --target target.json --include-validations --output migration_plan.txtTool Documentation
Schema Analyzer
Input Formats:
- SQL DDL files (.sql)
- JSON schema definitions (.json)
Key Capabilities:
- Detects 1NF violations (non-atomic values, repeating groups)
- Identifies 2NF issues (partial dependencies in composite keys)
- Finds 3NF problems (transitive dependencies)
- Checks BCNF compliance (determinant key requirements)
- Validates data types (VARCHAR(255) antipattern, inappropriate types)
- Missing constraints (NOT NULL, UNIQUE, CHECK, foreign keys)
- Naming convention adherence
Sample Command:
python schema_analyzer.py \
--input sample_schema.sql \
--generate-erd \
--output-format text \
--output analysis.txtOutput:
- Comprehensive text or JSON analysis report
- Mermaid ERD diagram
- Prioritized recommendations
- SQL statements for improvements
Index Optimizer
Input Requirements:
- Schema definition (JSON format)
- Query patterns with frequency and selectivity data
Analysis Features:
- Selectivity estimation based on column patterns
- Composite index column ordering optimization
- Covering index recommendations for SELECT queries
- Foreign key index validation
- Redundancy detection (duplicates, overlaps, unused indexes)
- Performance impact modeling
Sample Command:
python index_optimizer.py \
--schema schema.json \
--queries query_patterns.json \
--format text \
--min-priority 3 \
--output recommendations.txtOutput:
- Prioritized index recommendations
- CREATE INDEX statements
- Drop statements for redundant indexes
- Performance impact analysis
- Storage size estimates
Migration Generator
Input Requirements:
- Current schema (JSON format)
- Target schema (JSON format)
Migration Strategies:
- Standard migrations with ALTER statements
- Zero-downtime expand-contract patterns
- Data migration and transformation scripts
- Constraint management (add/drop in correct order)
- Index management with timing estimates
Sample Command:
python migration_generator.py \
--current current_schema.json \
--target target_schema.json \
--zero-downtime \
--include-validations \
--format textOutput:
- Step-by-step migration plan
- Forward and rollback SQL statements
- Risk assessment for each step
- Validation queries
- Execution time estimates
File Structure
database-designer/
├── README.md # This file
├── SKILL.md # Comprehensive database design guide
├── schema_analyzer.py # Schema analysis tool
├── index_optimizer.py # Index optimization tool
├── migration_generator.py # Migration generation tool
├── references/ # Reference documentation
│ ├── normalization_guide.md # Normalization principles and patterns
│ ├── index_strategy_patterns.md # Index design and optimization guide
│ └── database_selection_decision_tree.md # Database technology selection
├── assets/ # Sample files and test data
│ ├── sample_schema.sql # Sample DDL with various issues
│ ├── sample_schema.json # JSON schema definition
│ └── sample_query_patterns.json # Query patterns for index analysis
└── expected_outputs/ # Example tool outputs
├── schema_analysis_sample.txt # Sample schema analysis report
├── index_optimization_sample.txt # Sample index recommendations
└── migration_sample.txt # Sample migration planJSON Schema Format
The tools use a standardized JSON format for schema definitions:
{
"tables": {
"table_name": {
"columns": {
"column_name": {
"type": "VARCHAR(255)",
"nullable": true,
"unique": false,
"foreign_key": "other_table.column",
"default": "default_value",
"cardinality_estimate": 1000
}
},
"primary_key": ["id"],
"unique_constraints": [["email"], ["username"]],
"check_constraints": {
"chk_positive_price": "price > 0"
},
"indexes": [
{
"name": "idx_table_column",
"columns": ["column_name"],
"unique": false,
"partial_condition": "status = 'active'"
}
]
}
}
}Query Patterns Format
For index optimization, provide query patterns in this format:
{
"queries": [
{
"id": "user_lookup",
"type": "SELECT",
"table": "users",
"where_conditions": [
{
"column": "email",
"operator": "=",
"selectivity": 0.95
}
],
"join_conditions": [
{
"local_column": "user_id",
"foreign_table": "orders",
"foreign_column": "id",
"join_type": "INNER"
}
],
"order_by": [
{"column": "created_at", "direction": "DESC"}
],
"frequency": 1000,
"avg_execution_time_ms": 5.2
}
]
}Best Practices
Schema Analysis
- Start with DDL: Use actual CREATE TABLE statements when possible
- Include Constraints: Capture all existing constraints and indexes
- Consider History: Some denormalization may be intentional for performance
- Validate Results: Review recommendations against business requirements
Index Optimization
- Real Query Patterns: Use actual application queries, not theoretical ones
- Include Frequency: Query frequency is crucial for prioritization
- Monitor Performance: Validate recommendations with actual performance testing
- Gradual Implementation: Add indexes incrementally and monitor impact
Migration Planning
- Test Migrations: Always test on non-production environments first
- Backup First: Ensure complete backups before running migrations
- Monitor Progress: Watch for locks and performance impacts during execution
- Rollback Ready: Have rollback procedures tested and ready
Advanced Usage
Custom Selectivity Estimation
The index optimizer uses pattern-based selectivity estimation. You can improve accuracy by providing cardinality estimates in your schema JSON:
{
"columns": {
"status": {
"type": "VARCHAR(20)",
"cardinality_estimate": 5 # Only 5 distinct values
}
}
}Zero-Downtime Migration Strategy
For production systems, use the zero-downtime flag to generate expand-contract migrations:
- Expand Phase: Add new columns/tables without constraints
- Dual Write: Application writes to both old and new structures
- Backfill: Populate new structures with existing data
- Contract Phase: Remove old structures after validation
Integration with CI/CD
Integrate these tools into your deployment pipeline:
# Schema validation in CI
python schema_analyzer.py --input schema.sql --output-format json | \
jq '.constraint_analysis.total_issues' | \
test $(cat) -eq 0 || exit 1
# Generate migrations automatically
python migration_generator.py \
--current prod_schema.json \
--target new_schema.json \
--zero-downtime \
--output migration.sqlTroubleshooting
Common Issues
"No tables found in input file"
- Ensure SQL DDL uses standard CREATE TABLE syntax
- Check for syntax errors in DDL
- Verify file encoding (UTF-8 recommended)
"Invalid JSON schema"
- Validate JSON syntax with a JSON validator
- Ensure all required fields are present
- Check that foreign key references use "table.column" format
"Analysis shows no issues but problems exist"
- Tools use heuristic analysis - review recommendations carefully
- Some design decisions may be intentional (denormalization for performance)
- Consider domain-specific requirements not captured by general rules
Performance Tips
Large Schemas:
- Use
--output-format jsonfor machine processing - Consider analyzing subsets of tables for very large schemas
- Provide cardinality estimates for better index recommendations
Complex Queries:
- Include actual execution times in query patterns
- Provide realistic frequency estimates
- Consider seasonal or usage pattern variations
Contributing
This is a self-contained skill with no external dependencies. To extend functionality:
- Follow the existing code patterns
- Maintain Python standard library only requirement
- Add comprehensive test cases for new features
- Update documentation and examples
License
This database designer skill is part of the claude-skills collection and follows the same licensing terms.
{
"queries": [
{
"id": "user_login",
"type": "SELECT",
"table": "users",
"description": "User authentication lookup by email",
"where_conditions": [
{
"column": "email",
"operator": "=",
"selectivity": 0.95
}
],
"join_conditions": [],
"order_by": [],
"group_by": [],
"frequency": 5000,
"avg_execution_time_ms": 2.5
},
{
"id": "product_search_category",
"type": "SELECT",
"table": "products",
"description": "Product search within category with pagination",
"where_conditions": [
{
"column": "category_id",
"operator": "=",
"selectivity": 0.2
},
{
"column": "is_active",
"operator": "=",
"selectivity": 0.1
}
],
"join_conditions": [],
"order_by": [
{"column": "created_at", "direction": "DESC"}
],
"group_by": [],
"frequency": 2500,
"avg_execution_time_ms": 15.2
},
{
"id": "product_search_price_range",
"type": "SELECT",
"table": "products",
"description": "Product search by price range and brand",
"where_conditions": [
{
"column": "price",
"operator": "BETWEEN",
"selectivity": 0.3
},
{
"column": "brand",
"operator": "=",
"selectivity": 0.05
},
{
"column": "is_active",
"operator": "=",
"selectivity": 0.1
}
],
"join_conditions": [],
"order_by": [
{"column": "price", "direction": "ASC"}
],
"group_by": [],
"frequency": 800,
"avg_execution_time_ms": 25.7
},
{
"id": "user_orders_history",
"type": "SELECT",
"table": "orders",
"description": "User order history with pagination",
"where_conditions": [
{
"column": "user_id",
"operator": "=",
"selectivity": 0.8
}
],
"join_conditions": [],
"order_by": [
{"column": "created_at", "direction": "DESC"}
],
"group_by": [],
"frequency": 1200,
"avg_execution_time_ms": 8.3
},
{
"id": "order_details_with_items",
"type": "SELECT",
"table": "orders",
"description": "Order details with order items (JOIN query)",
"where_conditions": [
{
"column": "id",
"operator": "=",
"selectivity": 1.0
}
],
"join_conditions": [
{
"local_column": "id",
"foreign_table": "order_items",
"foreign_column": "order_id",
"join_type": "INNER"
}
],
"order_by": [],
"group_by": [],
"frequency": 3000,
"avg_execution_time_ms": 12.1
},
{
"id": "pending_orders_processing",
"type": "SELECT",
"table": "orders",
"description": "Processing queue - pending orders by date",
"where_conditions": [
{
"column": "status",
"operator": "=",
"selectivity": 0.15
},
{
"column": "created_at",
"operator": ">=",
"selectivity": 0.3
}
],
"join_conditions": [],
"order_by": [
{"column": "created_at", "direction": "ASC"}
],
"group_by": [],
"frequency": 150,
"avg_execution_time_ms": 45.2
},
{
"id": "user_orders_by_status",
"type": "SELECT",
"table": "orders",
"description": "User orders filtered by status",
"where_conditions": [
{
"column": "user_id",
"operator": "=",
"selectivity": 0.8
},
{
"column": "status",
"operator": "IN",
"selectivity": 0.4
}
],
"join_conditions": [],
"order_by": [
{"column": "created_at", "direction": "DESC"}
],
"group_by": [],
"frequency": 600,
"avg_execution_time_ms": 18.5
},
{
"id": "product_reviews_summary",
"type": "SELECT",
"table": "product_reviews",
"description": "Product review aggregation",
"where_conditions": [
{
"column": "product_id",
"operator": "=",
"selectivity": 0.85
}
],
"join_conditions": [],
"order_by": [],
"group_by": ["product_id"],
"frequency": 1800,
"avg_execution_time_ms": 22.3
},
{
"id": "inventory_low_stock",
"type": "SELECT",
"table": "products",
"description": "Low inventory alert query",
"where_conditions": [
{
"column": "inventory_count",
"operator": "<=",
"selectivity": 0.1
},
{
"column": "is_active",
"operator": "=",
"selectivity": 0.1
}
],
"join_conditions": [],
"order_by": [
{"column": "inventory_count", "direction": "ASC"}
],
"group_by": [],
"frequency": 50,
"avg_execution_time_ms": 35.8
},
{
"id": "popular_products_by_category",
"type": "SELECT",
"table": "order_items",
"description": "Popular products analysis with category join",
"where_conditions": [
{
"column": "created_at",
"operator": ">=",
"selectivity": 0.2
}
],
"join_conditions": [
{
"local_column": "product_id",
"foreign_table": "products",
"foreign_column": "id",
"join_type": "INNER"
},
{
"local_column": "category_id",
"foreign_table": "categories",
"foreign_column": "id",
"join_type": "INNER"
}
],
"order_by": [
{"column": "total_quantity", "direction": "DESC"}
],
"group_by": ["product_id", "category_id"],
"frequency": 25,
"avg_execution_time_ms": 180.5
},
{
"id": "customer_purchase_history",
"type": "SELECT",
"table": "orders",
"description": "Customer analytics - purchase history with items",
"where_conditions": [
{
"column": "user_id",
"operator": "=",
"selectivity": 0.8
},
{
"column": "status",
"operator": "IN",
"selectivity": 0.6
}
],
"join_conditions": [
{
"local_column": "id",
"foreign_table": "order_items",
"foreign_column": "order_id",
"join_type": "INNER"
}
],
"order_by": [
{"column": "created_at", "direction": "DESC"}
],
"group_by": [],
"frequency": 300,
"avg_execution_time_ms": 65.2
},
{
"id": "daily_sales_report",
"type": "SELECT",
"table": "orders",
"description": "Daily sales aggregation report",
"where_conditions": [
{
"column": "created_at",
"operator": ">=",
"selectivity": 0.05
},
{
"column": "status",
"operator": "IN",
"selectivity": 0.6
}
],
"join_conditions": [],
"order_by": [
{"column": "order_date", "direction": "DESC"}
],
"group_by": ["DATE(created_at)"],
"frequency": 10,
"avg_execution_time_ms": 250.8
},
{
"id": "category_hierarchy_nav",
"type": "SELECT",
"table": "categories",
"description": "Category navigation - parent-child relationships",
"where_conditions": [
{
"column": "parent_id",
"operator": "=",
"selectivity": 0.2
},
{
"column": "is_active",
"operator": "=",
"selectivity": 0.1
}
],
"join_conditions": [],
"order_by": [
{"column": "sort_order", "direction": "ASC"}
],
"group_by": [],
"frequency": 800,
"avg_execution_time_ms": 5.1
},
{
"id": "recent_user_reviews",
"type": "SELECT",
"table": "product_reviews",
"description": "Recent product reviews by user",
"where_conditions": [
{
"column": "user_id",
"operator": "=",
"selectivity": 0.95
}
],
"join_conditions": [
{
"local_column": "product_id",
"foreign_table": "products",
"foreign_column": "id",
"join_type": "INNER"
}
],
"order_by": [
{"column": "created_at", "direction": "DESC"}
],
"group_by": [],
"frequency": 200,
"avg_execution_time_ms": 12.7
},
{
"id": "product_avg_rating",
"type": "SELECT",
"table": "product_reviews",
"description": "Product average rating calculation",
"where_conditions": [
{
"column": "product_id",
"operator": "IN",
"selectivity": 0.1
}
],
"join_conditions": [],
"order_by": [],
"group_by": ["product_id"],
"frequency": 400,
"avg_execution_time_ms": 35.4
}
]
} {
"tables": {
"users": {
"columns": {
"id": {
"type": "INTEGER",
"nullable": false,
"unique": true,
"cardinality_estimate": 50000
},
"email": {
"type": "VARCHAR(255)",
"nullable": false,
"unique": true,
"cardinality_estimate": 50000
},
"username": {
"type": "VARCHAR(50)",
"nullable": false,
"unique": true,
"cardinality_estimate": 50000
},
"password_hash": {
"type": "VARCHAR(255)",
"nullable": false,
"cardinality_estimate": 50000
},
"first_name": {
"type": "VARCHAR(100)",
"nullable": true,
"cardinality_estimate": 25000
},
"last_name": {
"type": "VARCHAR(100)",
"nullable": true,
"cardinality_estimate": 30000
},
"status": {
"type": "VARCHAR(20)",
"nullable": false,
"default": "active",
"cardinality_estimate": 5
},
"created_at": {
"type": "TIMESTAMP",
"nullable": false,
"default": "CURRENT_TIMESTAMP"
}
},
"primary_key": ["id"],
"unique_constraints": [
["email"],
["username"]
],
"check_constraints": {
"chk_status_valid": "status IN ('active', 'inactive', 'suspended', 'deleted')"
},
"indexes": [
{
"name": "idx_users_email",
"columns": ["email"],
"unique": true
},
{
"name": "idx_users_status",
"columns": ["status"]
}
]
},
"products": {
"columns": {
"id": {
"type": "INTEGER",
"nullable": false,
"unique": true,
"cardinality_estimate": 10000
},
"name": {
"type": "VARCHAR(255)",
"nullable": false,
"cardinality_estimate": 9500
},
"sku": {
"type": "VARCHAR(50)",
"nullable": false,
"unique": true,
"cardinality_estimate": 10000
},
"price": {
"type": "DECIMAL(10,2)",
"nullable": false,
"cardinality_estimate": 5000
},
"category_id": {
"type": "INTEGER",
"nullable": false,
"foreign_key": "categories.id",
"cardinality_estimate": 50
},
"brand": {
"type": "VARCHAR(100)",
"nullable": true,
"cardinality_estimate": 200
},
"is_active": {
"type": "BOOLEAN",
"nullable": false,
"default": true,
"cardinality_estimate": 2
},
"inventory_count": {
"type": "INTEGER",
"nullable": false,
"default": 0,
"cardinality_estimate": 1000
},
"created_at": {
"type": "TIMESTAMP",
"nullable": false,
"default": "CURRENT_TIMESTAMP"
}
},
"primary_key": ["id"],
"unique_constraints": [
["sku"]
],
"check_constraints": {
"chk_price_positive": "price > 0",
"chk_inventory_non_negative": "inventory_count >= 0"
},
"indexes": [
{
"name": "idx_products_category",
"columns": ["category_id"]
},
{
"name": "idx_products_brand",
"columns": ["brand"]
},
{
"name": "idx_products_price",
"columns": ["price"]
},
{
"name": "idx_products_active_category",
"columns": ["is_active", "category_id"],
"partial_condition": "is_active = true"
}
]
},
"orders": {
"columns": {
"id": {
"type": "INTEGER",
"nullable": false,
"unique": true,
"cardinality_estimate": 200000
},
"order_number": {
"type": "VARCHAR(50)",
"nullable": false,
"unique": true,
"cardinality_estimate": 200000
},
"user_id": {
"type": "INTEGER",
"nullable": false,
"foreign_key": "users.id",
"cardinality_estimate": 40000
},
"status": {
"type": "VARCHAR(50)",
"nullable": false,
"default": "pending",
"cardinality_estimate": 8
},
"total_amount": {
"type": "DECIMAL(10,2)",
"nullable": false,
"cardinality_estimate": 50000
},
"payment_method": {
"type": "VARCHAR(50)",
"nullable": true,
"cardinality_estimate": 10
},
"created_at": {
"type": "TIMESTAMP",
"nullable": false,
"default": "CURRENT_TIMESTAMP"
},
"shipped_at": {
"type": "TIMESTAMP",
"nullable": true
}
},
"primary_key": ["id"],
"unique_constraints": [
["order_number"]
],
"check_constraints": {
"chk_total_positive": "total_amount > 0",
"chk_status_valid": "status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')"
},
"indexes": [
{
"name": "idx_orders_user",
"columns": ["user_id"]
},
{
"name": "idx_orders_status",
"columns": ["status"]
},
{
"name": "idx_orders_created",
"columns": ["created_at"]
},
{
"name": "idx_orders_user_status",
"columns": ["user_id", "status"]
}
]
},
"order_items": {
"columns": {
"id": {
"type": "INTEGER",
"nullable": false,
"unique": true,
"cardinality_estimate": 800000
},
"order_id": {
"type": "INTEGER",
"nullable": false,
"foreign_key": "orders.id",
"cardinality_estimate": 200000
},
"product_id": {
"type": "INTEGER",
"nullable": false,
"foreign_key": "products.id",
"cardinality_estimate": 8000
},
"quantity": {
"type": "INTEGER",
"nullable": false,
"cardinality_estimate": 20
},
"unit_price": {
"type": "DECIMAL(10,2)",
"nullable": false,
"cardinality_estimate": 5000
},
"total_price": {
"type": "DECIMAL(10,2)",
"nullable": false,
"cardinality_estimate": 10000
}
},
"primary_key": ["id"],
"check_constraints": {
"chk_quantity_positive": "quantity > 0",
"chk_unit_price_positive": "unit_price > 0"
},
"indexes": [
{
"name": "idx_order_items_order",
"columns": ["order_id"]
},
{
"name": "idx_order_items_product",
"columns": ["product_id"]
}
]
},
"categories": {
"columns": {
"id": {
"type": "INTEGER",
"nullable": false,
"unique": true,
"cardinality_estimate": 100
},
"name": {
"type": "VARCHAR(100)",
"nullable": false,
"cardinality_estimate": 100
},
"parent_id": {
"type": "INTEGER",
"nullable": true,
"foreign_key": "categories.id",
"cardinality_estimate": 20
},
"is_active": {
"type": "BOOLEAN",
"nullable": false,
"default": true,
"cardinality_estimate": 2
}
},
"primary_key": ["id"],
"indexes": [
{
"name": "idx_categories_parent",
"columns": ["parent_id"]
},
{
"name": "idx_categories_active",
"columns": ["is_active"]
}
]
},
"product_reviews": {
"columns": {
"id": {
"type": "INTEGER",
"nullable": false,
"unique": true,
"cardinality_estimate": 150000
},
"product_id": {
"type": "INTEGER",
"nullable": false,
"foreign_key": "products.id",
"cardinality_estimate": 8000
},
"user_id": {
"type": "INTEGER",
"nullable": false,
"foreign_key": "users.id",
"cardinality_estimate": 30000
},
"rating": {
"type": "INTEGER",
"nullable": false,
"cardinality_estimate": 5
},
"review_text": {
"type": "TEXT",
"nullable": true
},
"created_at": {
"type": "TIMESTAMP",
"nullable": false,
"default": "CURRENT_TIMESTAMP"
}
},
"primary_key": ["id"],
"unique_constraints": [
["product_id", "user_id"]
],
"check_constraints": {
"chk_rating_valid": "rating BETWEEN 1 AND 5"
},
"indexes": [
{
"name": "idx_reviews_product",
"columns": ["product_id"]
},
{
"name": "idx_reviews_user",
"columns": ["user_id"]
},
{
"name": "idx_reviews_rating",
"columns": ["rating"]
}
]
}
}
} -- Sample E-commerce Database Schema
-- Demonstrates various normalization levels and common patterns
-- Users table - well normalized
CREATE TABLE users (
id INTEGER PRIMARY KEY,
email VARCHAR(255) NOT NULL UNIQUE,
username VARCHAR(50) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(100),
last_name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(20) DEFAULT 'active'
);
-- Categories table - hierarchical structure
CREATE TABLE categories (
id INTEGER PRIMARY KEY,
name VARCHAR(100) NOT NULL,
slug VARCHAR(100) NOT NULL UNIQUE,
parent_id INTEGER REFERENCES categories(id),
description TEXT,
is_active BOOLEAN DEFAULT true,
sort_order INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Products table - potential normalization issues
CREATE TABLE products (
id INTEGER PRIMARY KEY,
name VARCHAR(255) NOT NULL,
sku VARCHAR(50) NOT NULL UNIQUE,
description TEXT,
price DECIMAL(10,2) NOT NULL,
cost DECIMAL(10,2),
weight DECIMAL(8,2),
dimensions VARCHAR(50), -- Potential 1NF violation: "10x5x3 inches"
category_id INTEGER REFERENCES categories(id),
category_name VARCHAR(100), -- Redundant with categories.name (3NF violation)
brand VARCHAR(100), -- Should be normalized to separate brands table
tags VARCHAR(500), -- Potential 1NF violation: comma-separated tags
inventory_count INTEGER DEFAULT 0,
reorder_point INTEGER DEFAULT 10,
supplier_name VARCHAR(100), -- Should be normalized
supplier_contact VARCHAR(255), -- Should be normalized
is_active BOOLEAN DEFAULT true,
featured BOOLEAN DEFAULT false,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Addresses table - good normalization
CREATE TABLE addresses (
id INTEGER PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
address_type VARCHAR(20) DEFAULT 'shipping', -- 'shipping', 'billing'
street_address VARCHAR(255) NOT NULL,
street_address_2 VARCHAR(255),
city VARCHAR(100) NOT NULL,
state VARCHAR(50) NOT NULL,
postal_code VARCHAR(20) NOT NULL,
country VARCHAR(50) NOT NULL DEFAULT 'US',
is_default BOOLEAN DEFAULT false,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Orders table - mixed normalization issues
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
order_number VARCHAR(50) NOT NULL UNIQUE,
user_id INTEGER REFERENCES users(id),
user_email VARCHAR(255), -- Denormalized for performance/historical reasons
user_name VARCHAR(200), -- Denormalized for performance/historical reasons
status VARCHAR(50) NOT NULL DEFAULT 'pending',
total_amount DECIMAL(10,2) NOT NULL,
tax_amount DECIMAL(10,2) NOT NULL,
shipping_amount DECIMAL(10,2) NOT NULL,
discount_amount DECIMAL(10,2) DEFAULT 0,
payment_method VARCHAR(50), -- Should be normalized to payment_methods
payment_status VARCHAR(50) DEFAULT 'pending',
shipping_address_id INTEGER REFERENCES addresses(id),
billing_address_id INTEGER REFERENCES addresses(id),
-- Denormalized shipping address for historical preservation
shipping_street VARCHAR(255),
shipping_city VARCHAR(100),
shipping_state VARCHAR(50),
shipping_postal_code VARCHAR(20),
shipping_country VARCHAR(50),
notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
shipped_at TIMESTAMP,
delivered_at TIMESTAMP
);
-- Order items table - properly normalized
CREATE TABLE order_items (
id INTEGER PRIMARY KEY,
order_id INTEGER REFERENCES orders(id),
product_id INTEGER REFERENCES products(id),
product_name VARCHAR(255), -- Denormalized for historical reasons
product_sku VARCHAR(50), -- Denormalized for historical reasons
quantity INTEGER NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
total_price DECIMAL(10,2) NOT NULL, -- Calculated field (could be computed)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Shopping cart table - session-based data
CREATE TABLE shopping_cart (
id INTEGER PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
session_id VARCHAR(255), -- For anonymous users
product_id INTEGER REFERENCES products(id),
quantity INTEGER NOT NULL DEFAULT 1,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, product_id),
UNIQUE(session_id, product_id)
);
-- Product reviews - user-generated content
CREATE TABLE product_reviews (
id INTEGER PRIMARY KEY,
product_id INTEGER REFERENCES products(id),
user_id INTEGER REFERENCES users(id),
rating INTEGER NOT NULL CHECK (rating BETWEEN 1 AND 5),
title VARCHAR(200),
review_text TEXT,
verified_purchase BOOLEAN DEFAULT false,
helpful_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(product_id, user_id) -- One review per user per product
);
-- Coupons table - promotional data
CREATE TABLE coupons (
id INTEGER PRIMARY KEY,
code VARCHAR(50) NOT NULL UNIQUE,
description VARCHAR(255),
discount_type VARCHAR(20) NOT NULL, -- 'percentage', 'fixed_amount'
discount_value DECIMAL(8,2) NOT NULL,
minimum_amount DECIMAL(10,2),
maximum_discount DECIMAL(10,2),
usage_limit INTEGER,
usage_count INTEGER DEFAULT 0,
valid_from TIMESTAMP NOT NULL,
valid_until TIMESTAMP NOT NULL,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Audit log table - for tracking changes
CREATE TABLE audit_log (
id INTEGER PRIMARY KEY,
table_name VARCHAR(50) NOT NULL,
record_id INTEGER NOT NULL,
action VARCHAR(20) NOT NULL, -- 'INSERT', 'UPDATE', 'DELETE'
old_values TEXT, -- JSON format
new_values TEXT, -- JSON format
user_id INTEGER REFERENCES users(id),
ip_address VARCHAR(45),
user_agent TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Problematic table - multiple normalization violations
CREATE TABLE user_preferences (
user_id INTEGER PRIMARY KEY REFERENCES users(id),
preferred_categories VARCHAR(500), -- CSV list - 1NF violation
email_notifications VARCHAR(255), -- "daily,weekly,promotions" - 1NF violation
user_name VARCHAR(200), -- Redundant with users table - 3NF violation
user_email VARCHAR(255), -- Redundant with users table - 3NF violation
theme VARCHAR(50) DEFAULT 'light',
language VARCHAR(10) DEFAULT 'en',
timezone VARCHAR(50) DEFAULT 'UTC',
currency VARCHAR(3) DEFAULT 'USD',
date_format VARCHAR(20) DEFAULT 'YYYY-MM-DD',
newsletter_subscribed BOOLEAN DEFAULT true,
sms_notifications BOOLEAN DEFAULT false,
push_notifications BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create some basic indexes (some missing, some redundant for demonstration)
CREATE INDEX idx_users_email ON users (email);
CREATE INDEX idx_users_username ON users (username); -- Redundant due to UNIQUE constraint
CREATE INDEX idx_products_category ON products (category_id);
CREATE INDEX idx_products_brand ON products (brand);
CREATE INDEX idx_products_sku ON products (sku); -- Redundant due to UNIQUE constraint
CREATE INDEX idx_orders_user ON orders (user_id);
CREATE INDEX idx_orders_status ON orders (status);
CREATE INDEX idx_orders_created ON orders (created_at);
CREATE INDEX idx_order_items_order ON order_items (order_id);
CREATE INDEX idx_order_items_product ON order_items (product_id);
-- Missing index on addresses.user_id
-- Missing composite index on orders (user_id, status)
-- Missing index on product_reviews.product_id
-- Constraints that should exist but are missing
-- ALTER TABLE products ADD CONSTRAINT chk_price_positive CHECK (price > 0);
-- ALTER TABLE products ADD CONSTRAINT chk_inventory_non_negative CHECK (inventory_count >= 0);
-- ALTER TABLE order_items ADD CONSTRAINT chk_quantity_positive CHECK (quantity > 0);
-- ALTER TABLE orders ADD CONSTRAINT chk_total_positive CHECK (total_amount > 0); DATABASE INDEX OPTIMIZATION REPORT
==================================================
ANALYSIS SUMMARY
----------------
Tables Analyzed: 6
Query Patterns: 15
Existing Indexes: 12
New Recommendations: 8
High Priority: 4
Redundancy Issues: 2
HIGH PRIORITY RECOMMENDATIONS (4)
----------------------------------
1. orders: Optimize multi-column WHERE conditions: user_id, status, created_at
Columns: user_id, status, created_at
Benefit: Very High
SQL: CREATE INDEX idx_orders_user_status_created ON orders (user_id, status, created_at);
2. products: Optimize WHERE category_id = AND is_active = queries
Columns: category_id, is_active
Benefit: High
SQL: CREATE INDEX idx_products_category_active ON products (category_id, is_active);
3. order_items: Optimize JOIN with products table on product_id
Columns: product_id
Benefit: High (frequent JOINs)
SQL: CREATE INDEX idx_order_items_product_join ON order_items (product_id);
4. product_reviews: Covering index for WHERE + ORDER BY optimization
Columns: product_id, created_at
Benefit: High (eliminates table lookups for SELECT)
SQL: CREATE INDEX idx_product_reviews_covering_product_created ON product_reviews (product_id, created_at) INCLUDE (rating, review_text);
REDUNDANCY ISSUES (2)
---------------------
• DUPLICATE: Indexes 'idx_users_email' and 'unique_users_email' are identical
Recommendation: Drop one of the duplicate indexes
SQL: DROP INDEX idx_users_email;
• OVERLAPPING: Index 'idx_products_category' overlaps 85% with 'idx_products_category_active'
Recommendation: Consider dropping 'idx_products_category' as it's largely covered by 'idx_products_category_active'
SQL: DROP INDEX idx_products_category;
PERFORMANCE IMPACT ANALYSIS
----------------------------
Queries to be optimized: 12
High impact optimizations: 6
Estimated insert overhead: 40%
RECOMMENDED CREATE INDEX STATEMENTS
------------------------------------
1. CREATE INDEX idx_orders_user_status_created ON orders (user_id, status, created_at);
2. CREATE INDEX idx_products_category_active ON products (category_id, is_active);
3. CREATE INDEX idx_order_items_product_join ON order_items (product_id);
4. CREATE INDEX idx_product_reviews_covering_product_created ON product_reviews (product_id, created_at) INCLUDE (rating, review_text);
5. CREATE INDEX idx_products_price_brand ON products (price, brand);
6. CREATE INDEX idx_orders_status_created ON orders (status, created_at);
7. CREATE INDEX idx_categories_parent_active ON categories (parent_id, is_active);
8. CREATE INDEX idx_product_reviews_user_created ON product_reviews (user_id, created_at); DATABASE MIGRATION PLAN
==================================================
Migration ID: a7b3c9d2
Created: 2024-02-16T15:30:00Z
Zero Downtime: false
MIGRATION SUMMARY
-----------------
Total Steps: 12
Tables Added: 1
Tables Dropped: 0
Tables Renamed: 0
Columns Added: 3
Columns Dropped: 1
Columns Modified: 2
Constraints Added: 4
Constraints Dropped: 1
Indexes Added: 2
Indexes Dropped: 1
RISK ASSESSMENT
---------------
High Risk Steps: 3
Medium Risk Steps: 4
Low Risk Steps: 5
MIGRATION STEPS
---------------
1. Create table brands with 4 columns (LOW risk)
Type: CREATE_TABLE
Forward SQL: CREATE TABLE brands (
id INTEGER PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Rollback SQL: DROP TABLE IF EXISTS brands;
2. Add column brand_id to products (LOW risk)
Type: ADD_COLUMN
Forward SQL: ALTER TABLE products ADD COLUMN brand_id INTEGER;
Rollback SQL: ALTER TABLE products DROP COLUMN brand_id;
3. Add column email_verified to users (LOW risk)
Type: ADD_COLUMN
Forward SQL: ALTER TABLE users ADD COLUMN email_verified BOOLEAN DEFAULT false;
Rollback SQL: ALTER TABLE users DROP COLUMN email_verified;
4. Add column last_login to users (LOW risk)
Type: ADD_COLUMN
Forward SQL: ALTER TABLE users ADD COLUMN last_login TIMESTAMP;
Rollback SQL: ALTER TABLE users DROP COLUMN last_login;
5. Modify column price: type: DECIMAL(10,2) -> DECIMAL(12,2) (LOW risk)
Type: MODIFY_COLUMN
Forward SQL: ALTER TABLE products
ALTER COLUMN price TYPE DECIMAL(12,2);
Rollback SQL: ALTER TABLE products
ALTER COLUMN price TYPE DECIMAL(10,2);
6. Modify column inventory_count: nullable: true -> false (HIGH risk)
Type: MODIFY_COLUMN
Forward SQL: ALTER TABLE products
ALTER COLUMN inventory_count SET NOT NULL;
Rollback SQL: ALTER TABLE products
ALTER COLUMN inventory_count DROP NOT NULL;
7. Add primary key on id (MEDIUM risk)
Type: ADD_CONSTRAINT
Forward SQL: ALTER TABLE brands ADD CONSTRAINT pk_brands PRIMARY KEY (id);
Rollback SQL: ALTER TABLE brands DROP CONSTRAINT pk_brands;
8. Add foreign key constraint on brand_id (MEDIUM risk)
Type: ADD_CONSTRAINT
Forward SQL: ALTER TABLE products ADD CONSTRAINT fk_products_brand_id FOREIGN KEY (brand_id) REFERENCES brands(id);
Rollback SQL: ALTER TABLE products DROP CONSTRAINT fk_products_brand_id;
9. Add unique constraint on name (MEDIUM risk)
Type: ADD_CONSTRAINT
Forward SQL: ALTER TABLE brands ADD CONSTRAINT uq_brands_name UNIQUE (name);
Rollback SQL: ALTER TABLE brands DROP CONSTRAINT uq_brands_name;
10. Add check constraint: price > 0 (MEDIUM risk)
Type: ADD_CONSTRAINT
Forward SQL: ALTER TABLE products ADD CONSTRAINT chk_products_price_positive CHECK (price > 0);
Rollback SQL: ALTER TABLE products DROP CONSTRAINT chk_products_price_positive;
11. Create index idx_products_brand_id on (brand_id) (LOW risk)
Type: ADD_INDEX
Forward SQL: CREATE INDEX idx_products_brand_id ON products (brand_id);
Rollback SQL: DROP INDEX idx_products_brand_id;
Estimated Time: 1-5 minutes depending on table size
12. Create index idx_users_email_verified on (email_verified) (LOW risk)
Type: ADD_INDEX
Forward SQL: CREATE INDEX idx_users_email_verified ON users (email_verified);
Rollback SQL: DROP INDEX idx_users_email_verified;
Estimated Time: 1-5 minutes depending on table size
VALIDATION CHECKS
-----------------
• Verify table brands exists
SQL: SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'brands';
Expected: 1
• Verify column brand_id exists in products
SQL: SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'products' AND column_name = 'brand_id';
Expected: 1
• Verify column email_verified exists in users
SQL: SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'users' AND column_name = 'email_verified';
Expected: 1
• Verify column modification in products
SQL: SELECT data_type, is_nullable FROM information_schema.columns WHERE table_name = 'products' AND column_name = 'price';
Expected: 1
• Verify index idx_products_brand_id exists
SQL: SELECT COUNT(*) FROM information_schema.statistics WHERE index_name = 'idx_products_brand_id';
Expected: 1
• Verify index idx_users_email_verified exists
SQL: SELECT COUNT(*) FROM information_schema.statistics WHERE index_name = 'idx_users_email_verified';
Expected: 1 DATABASE SCHEMA ANALYSIS REPORT
==================================================
SCHEMA OVERVIEW
---------------
Total Tables: 8
Total Columns: 52
Tables with Primary Keys: 8
Total Foreign Keys: 6
Total Indexes: 15
KEY RECOMMENDATIONS
------------------
1. Address 3 high-severity issues immediately
2. Add primary keys to tables:
3. Review 4 VARCHAR(255) columns for right-sizing
4. Consider adding 2 foreign key constraints for referential integrity
5. Review 8 normalization issues for schema optimization
NORMALIZATION ISSUES (8 total)
------------------------------
High: 2, Medium: 3, Low: 2, Warning: 1
• products: Column 'dimensions' appears to store delimited values
Suggestion: Create separate table for individual values with foreign key relationship
• products: Column 'tags' appears to store delimited values
Suggestion: Create separate table for individual values with foreign key relationship
• products: Columns ['category_name'] may have transitive dependency through 'category_id'
Suggestion: Consider creating separate 'category' table with these columns
• orders: Columns ['shipping_street', 'shipping_city', 'shipping_state', 'shipping_postal_code', 'shipping_country'] may have transitive dependency through 'shipping_address_id'
Suggestion: Consider creating separate 'shipping_address' table with these columns
• user_preferences: Column 'preferred_categories' appears to store delimited values
Suggestion: Create separate table for individual values with foreign key relationship
DATA TYPE ISSUES (4 total)
--------------------------
• products.dimensions: VARCHAR(255) antipattern
Current: VARCHAR(50) → Suggested: Appropriately sized VARCHAR or TEXT
Rationale: VARCHAR(255) is often used as default without considering actual data length requirements
• products.tags: VARCHAR(255) antipattern
Current: VARCHAR(500) → Suggested: Appropriately sized VARCHAR or TEXT
Rationale: VARCHAR(255) is often used as default without considering actual data length requirements
• user_preferences.preferred_categories: VARCHAR(255) antipattern
Current: VARCHAR(500) → Suggested: Appropriately sized VARCHAR or TEXT
Rationale: VARCHAR(255) is often used as default without considering actual data length requirements
• user_preferences.email_notifications: VARCHAR(255) antipattern
Current: VARCHAR(255) → Suggested: Appropriately sized VARCHAR or TEXT
Rationale: VARCHAR(255) is often used as default without considering actual data length requirements
CONSTRAINT ISSUES (12 total)
-----------------------------
High: 0, Medium: 4, Low: 8
• products: Column 'price' should validate positive values
Suggestion: Add CHECK constraint: price > 0
• products: Column 'inventory_count' should validate positive values
Suggestion: Add CHECK constraint: inventory_count > 0
• orders: Column 'total_amount' should validate positive values
Suggestion: Add CHECK constraint: total_amount > 0
• order_items: Column 'quantity' should validate positive values
Suggestion: Add CHECK constraint: quantity > 0
• order_items: Column 'unit_price' should validate positive values
Suggestion: Add CHECK constraint: unit_price > 0
MISSING INDEXES (3 total)
-------------------------
• addresses.user_id (foreign_key)
SQL: CREATE INDEX idx_addresses_user_id ON addresses (user_id);
• product_reviews.product_id (foreign_key)
SQL: CREATE INDEX idx_product_reviews_product_id ON product_reviews (product_id);
• shopping_cart.user_id (foreign_key)
SQL: CREATE INDEX idx_shopping_cart_user_id ON shopping_cart (user_id);
MERMAID ERD
===========
erDiagram
USERS {
INTEGER id "PK"
VARCHAR(255) email "NOT NULL"
VARCHAR(50) username "NOT NULL"
VARCHAR(255) password_hash "NOT NULL"
VARCHAR(100) first_name
VARCHAR(100) last_name
TIMESTAMP created_at
TIMESTAMP updated_at
VARCHAR(20) status
}
CATEGORIES {
INTEGER id "PK"
VARCHAR(100) name "NOT NULL"
VARCHAR(100) slug "NOT NULL UNIQUE"
INTEGER parent_id "FK"
TEXT description
BOOLEAN is_active
INTEGER sort_order
TIMESTAMP created_at
}
PRODUCTS {
INTEGER id "PK"
VARCHAR(255) name "NOT NULL"
VARCHAR(50) sku "NOT NULL UNIQUE"
TEXT description
DECIMAL(10,2) price "NOT NULL"
DECIMAL(10,2) cost
DECIMAL(8,2) weight
VARCHAR(50) dimensions
INTEGER category_id "FK"
VARCHAR(100) category_name
VARCHAR(100) brand
VARCHAR(500) tags
INTEGER inventory_count
INTEGER reorder_point
VARCHAR(100) supplier_name
VARCHAR(255) supplier_contact
BOOLEAN is_active
BOOLEAN featured
TIMESTAMP created_at
TIMESTAMP updated_at
}
ADDRESSES {
INTEGER id "PK"
INTEGER user_id "FK"
VARCHAR(20) address_type
VARCHAR(255) street_address "NOT NULL"
VARCHAR(255) street_address_2
VARCHAR(100) city "NOT NULL"
VARCHAR(50) state "NOT NULL"
VARCHAR(20) postal_code "NOT NULL"
VARCHAR(50) country "NOT NULL"
BOOLEAN is_default
TIMESTAMP created_at
}
ORDERS {
INTEGER id "PK"
VARCHAR(50) order_number "NOT NULL UNIQUE"
INTEGER user_id "FK"
VARCHAR(255) user_email
VARCHAR(200) user_name
VARCHAR(50) status "NOT NULL"
DECIMAL(10,2) total_amount "NOT NULL"
DECIMAL(10,2) tax_amount "NOT NULL"
DECIMAL(10,2) shipping_amount "NOT NULL"
DECIMAL(10,2) discount_amount
VARCHAR(50) payment_method
VARCHAR(50) payment_status
INTEGER shipping_address_id "FK"
INTEGER billing_address_id "FK"
VARCHAR(255) shipping_street
VARCHAR(100) shipping_city
VARCHAR(50) shipping_state
VARCHAR(20) shipping_postal_code
VARCHAR(50) shipping_country
TEXT notes
TIMESTAMP created_at
TIMESTAMP updated_at
TIMESTAMP shipped_at
TIMESTAMP delivered_at
}
ORDER_ITEMS {
INTEGER id "PK"
INTEGER order_id "FK"
INTEGER product_id "FK"
VARCHAR(255) product_name
VARCHAR(50) product_sku
INTEGER quantity "NOT NULL"
DECIMAL(10,2) unit_price "NOT NULL"
DECIMAL(10,2) total_price "NOT NULL"
TIMESTAMP created_at
}
SHOPPING_CART {
INTEGER id "PK"
INTEGER user_id "FK"
VARCHAR(255) session_id
INTEGER product_id "FK"
INTEGER quantity "NOT NULL"
TIMESTAMP added_at
TIMESTAMP updated_at
}
PRODUCT_REVIEWS {
INTEGER id "PK"
INTEGER product_id "FK"
INTEGER user_id "FK"
INTEGER rating "NOT NULL"
VARCHAR(200) title
TEXT review_text
BOOLEAN verified_purchase
INTEGER helpful_count
TIMESTAMP created_at
TIMESTAMP updated_at
}
CATEGORIES ||--o{ CATEGORIES : has
CATEGORIES ||--o{ PRODUCTS : has
USERS ||--o{ ADDRESSES : has
USERS ||--o{ ORDERS : has
USERS ||--o{ SHOPPING_CART : has
USERS ||--o{ PRODUCT_REVIEWS : has
ADDRESSES ||--o{ ORDERS : has
ORDERS ||--o{ ORDER_ITEMS : has
PRODUCTS ||--o{ ORDER_ITEMS : has
PRODUCTS ||--o{ SHOPPING_CART : has
PRODUCTS ||--o{ PRODUCT_REVIEWS : has #!/usr/bin/env python3
"""
Database Index Optimizer
Analyzes schema definitions and query patterns to recommend optimal indexes:
- Identifies missing indexes for common query patterns
- Detects redundant and overlapping indexes
- Suggests composite index column ordering
- Estimates selectivity and performance impact
- Generates CREATE INDEX statements with rationale
Input: Schema JSON + Query patterns JSON
Output: Index recommendations + CREATE INDEX SQL + before/after analysis
Usage:
python index_optimizer.py --schema schema.json --queries queries.json --output recommendations.json
python index_optimizer.py --schema schema.json --queries queries.json --format text
python index_optimizer.py --schema schema.json --queries queries.json --analyze-existing
"""
import argparse
import json
import re
import sys
from collections import defaultdict, namedtuple, Counter
from typing import Dict, List, Set, Tuple, Optional, Any
from dataclasses import dataclass, asdict
import hashlib
@dataclass
class Column:
name: str
data_type: str
nullable: bool = True
unique: bool = False
cardinality_estimate: Optional[int] = None
@dataclass
class Index:
name: str
table: str
columns: List[str]
unique: bool = False
index_type: str = "btree"
partial_condition: Optional[str] = None
include_columns: List[str] = None
size_estimate: Optional[int] = None
@dataclass
class QueryPattern:
query_id: str
query_type: str # SELECT, INSERT, UPDATE, DELETE
table: str
where_conditions: List[Dict[str, Any]]
join_conditions: List[Dict[str, Any]]
order_by: List[Dict[str, str]] # column, direction
group_by: List[str]
frequency: int = 1
avg_execution_time_ms: Optional[float] = None
@dataclass
class IndexRecommendation:
recommendation_id: str
table: str
recommended_index: Index
reason: str
query_patterns_helped: List[str]
estimated_benefit: str
estimated_overhead: str
priority: int # 1 = highest priority
sql_statement: str
selectivity_analysis: Dict[str, Any]
@dataclass
class RedundancyIssue:
issue_type: str # DUPLICATE, OVERLAPPING, UNUSED
affected_indexes: List[str]
table: str
description: str
recommendation: str
sql_statements: List[str]
class SelectivityEstimator:
"""Estimates column selectivity based on naming patterns and data types."""
def __init__(self):
# Selectivity patterns based on common column names and types
self.high_selectivity_patterns = [
r'.*_id$', r'^id$', r'uuid', r'guid', r'email', r'username', r'ssn',
r'account.*number', r'transaction.*id', r'reference.*number'
]
self.medium_selectivity_patterns = [
r'name$', r'title$', r'description$', r'address', r'phone', r'zip',
r'postal.*code', r'serial.*number', r'sku', r'product.*code'
]
self.low_selectivity_patterns = [
r'status$', r'type$', r'category', r'state$', r'flag$', r'active$',
r'enabled$', r'deleted$', r'visible$', r'gender$', r'priority$'
]
self.very_low_selectivity_patterns = [
r'is_.*', r'has_.*', r'can_.*', r'boolean', r'bool'
]
def estimate_selectivity(self, column: Column, table_size_estimate: int = 10000) -> float:
"""Estimate column selectivity (0.0 = all same values, 1.0 = all unique values)."""
column_name_lower = column.name.lower()
# Primary key or unique columns
if column.unique or column.name.lower() in ['id', 'uuid', 'guid']:
return 1.0
# Check cardinality estimate if available
if column.cardinality_estimate:
return min(column.cardinality_estimate / table_size_estimate, 1.0)
# Pattern-based estimation
for pattern in self.high_selectivity_patterns:
if re.search(pattern, column_name_lower):
return 0.9 # Very high selectivity
for pattern in self.medium_selectivity_patterns:
if re.search(pattern, column_name_lower):
return 0.7 # Good selectivity
for pattern in self.low_selectivity_patterns:
if re.search(pattern, column_name_lower):
return 0.2 # Poor selectivity
for pattern in self.very_low_selectivity_patterns:
if re.search(pattern, column_name_lower):
return 0.1 # Very poor selectivity
# Data type based estimation
data_type_upper = column.data_type.upper()
if data_type_upper.startswith('BOOL'):
return 0.1
elif data_type_upper.startswith(('TINYINT', 'SMALLINT')):
return 0.3
elif data_type_upper.startswith('INT'):
return 0.8
elif data_type_upper.startswith(('VARCHAR', 'TEXT')):
# Estimate based on column name
if 'name' in column_name_lower:
return 0.7
elif 'description' in column_name_lower or 'comment' in column_name_lower:
return 0.9
else:
return 0.6
# Default moderate selectivity
return 0.5
class IndexOptimizer:
def __init__(self):
self.tables: Dict[str, Dict[str, Column]] = {}
self.existing_indexes: Dict[str, List[Index]] = {}
self.query_patterns: List[QueryPattern] = []
self.selectivity_estimator = SelectivityEstimator()
# Configuration
self.max_composite_index_columns = 6
self.min_selectivity_for_index = 0.1
self.redundancy_overlap_threshold = 0.8
def load_schema(self, schema_data: Dict[str, Any]) -> None:
"""Load schema definition."""
if 'tables' not in schema_data:
raise ValueError("Schema must contain 'tables' key")
for table_name, table_def in schema_data['tables'].items():
self.tables[table_name] = {}
self.existing_indexes[table_name] = []
# Load columns
for col_name, col_def in table_def.get('columns', {}).items():
column = Column(
name=col_name,
data_type=col_def.get('type', 'VARCHAR(255)'),
nullable=col_def.get('nullable', True),
unique=col_def.get('unique', False),
cardinality_estimate=col_def.get('cardinality_estimate')
)
self.tables[table_name][col_name] = column
# Load existing indexes
for idx_def in table_def.get('indexes', []):
index = Index(
name=idx_def['name'],
table=table_name,
columns=idx_def['columns'],
unique=idx_def.get('unique', False),
index_type=idx_def.get('type', 'btree'),
partial_condition=idx_def.get('partial_condition'),
include_columns=idx_def.get('include_columns', [])
)
self.existing_indexes[table_name].append(index)
def load_query_patterns(self, query_data: Dict[str, Any]) -> None:
"""Load query patterns for analysis."""
if 'queries' not in query_data:
raise ValueError("Query data must contain 'queries' key")
for query_def in query_data['queries']:
pattern = QueryPattern(
query_id=query_def['id'],
query_type=query_def.get('type', 'SELECT').upper(),
table=query_def['table'],
where_conditions=query_def.get('where_conditions', []),
join_conditions=query_def.get('join_conditions', []),
order_by=query_def.get('order_by', []),
group_by=query_def.get('group_by', []),
frequency=query_def.get('frequency', 1),
avg_execution_time_ms=query_def.get('avg_execution_time_ms')
)
self.query_patterns.append(pattern)
def analyze_missing_indexes(self) -> List[IndexRecommendation]:
"""Identify missing indexes based on query patterns."""
recommendations = []
for pattern in self.query_patterns:
table_name = pattern.table
if table_name not in self.tables:
continue
# Analyze WHERE conditions for single-column indexes
for condition in pattern.where_conditions:
column = condition.get('column')
operator = condition.get('operator', '=')
if column and column in self.tables[table_name]:
if not self._has_covering_index(table_name, [column]):
recommendation = self._create_single_column_recommendation(
table_name, column, pattern, operator
)
if recommendation:
recommendations.append(recommendation)
# Analyze composite indexes for multi-column WHERE conditions
where_columns = [cond.get('column') for cond in pattern.where_conditions
if cond.get('column') and cond.get('column') in self.tables[table_name]]
if len(where_columns) > 1:
composite_recommendation = self._create_composite_recommendation(
table_name, where_columns, pattern
)
if composite_recommendation:
recommendations.append(composite_recommendation)
# Analyze covering indexes for SELECT with ORDER BY
if pattern.order_by and where_columns:
covering_recommendation = self._create_covering_index_recommendation(
table_name, where_columns, pattern
)
if covering_recommendation:
recommendations.append(covering_recommendation)
# Analyze JOIN conditions
for join_condition in pattern.join_conditions:
local_column = join_condition.get('local_column')
if local_column and local_column in self.tables[table_name]:
if not self._has_covering_index(table_name, [local_column]):
recommendation = self._create_join_index_recommendation(
table_name, local_column, pattern, join_condition
)
if recommendation:
recommendations.append(recommendation)
# Remove duplicates and prioritize
recommendations = self._deduplicate_recommendations(recommendations)
recommendations = self._prioritize_recommendations(recommendations)
return recommendations
def _has_covering_index(self, table_name: str, columns: List[str]) -> bool:
"""Check if existing indexes cover the specified columns."""
if table_name not in self.existing_indexes:
return False
for index in self.existing_indexes[table_name]:
# Check if index starts with required columns (prefix match for composite)
if len(index.columns) >= len(columns):
if index.columns[:len(columns)] == columns:
return True
return False
def _create_single_column_recommendation(
self,
table_name: str,
column: str,
pattern: QueryPattern,
operator: str
) -> Optional[IndexRecommendation]:
"""Create recommendation for single-column index."""
column_obj = self.tables[table_name][column]
selectivity = self.selectivity_estimator.estimate_selectivity(column_obj)
# Skip very low selectivity columns unless frequently used
if selectivity < self.min_selectivity_for_index and pattern.frequency < 100:
return None
index_name = f"idx_{table_name}_{column}"
index = Index(
name=index_name,
table=table_name,
columns=[column],
unique=column_obj.unique,
index_type="btree"
)
reason = f"Optimize WHERE {column} {operator} queries"
if pattern.frequency > 10:
reason += f" (used {pattern.frequency} times)"
return IndexRecommendation(
recommendation_id=self._generate_recommendation_id(table_name, [column]),
table=table_name,
recommended_index=index,
reason=reason,
query_patterns_helped=[pattern.query_id],
estimated_benefit=self._estimate_benefit(selectivity, pattern.frequency),
estimated_overhead="Low (single column)",
priority=self._calculate_priority(selectivity, pattern.frequency, 1),
sql_statement=f"CREATE INDEX {index_name} ON {table_name} ({column});",
selectivity_analysis={
"column_selectivity": selectivity,
"estimated_reduction": f"{int(selectivity * 100)}%"
}
)
def _create_composite_recommendation(
self,
table_name: str,
columns: List[str],
pattern: QueryPattern
) -> Optional[IndexRecommendation]:
"""Create recommendation for composite index."""
if len(columns) > self.max_composite_index_columns:
columns = columns[:self.max_composite_index_columns]
# Order columns by selectivity (most selective first)
column_selectivities = []
for col in columns:
col_obj = self.tables[table_name][col]
selectivity = self.selectivity_estimator.estimate_selectivity(col_obj)
column_selectivities.append((col, selectivity))
# Sort by selectivity descending
column_selectivities.sort(key=lambda x: x[1], reverse=True)
ordered_columns = [col for col, _ in column_selectivities]
# Calculate combined selectivity
combined_selectivity = min(sum(sel for _, sel in column_selectivities) / len(columns), 0.95)
index_name = f"idx_{table_name}_{'_'.join(ordered_columns)}"
if len(index_name) > 63: # PostgreSQL limit
index_name = f"idx_{table_name}_composite_{abs(hash('_'.join(ordered_columns))) % 10000}"
index = Index(
name=index_name,
table=table_name,
columns=ordered_columns,
index_type="btree"
)
reason = f"Optimize multi-column WHERE conditions: {', '.join(ordered_columns)}"
return IndexRecommendation(
recommendation_id=self._generate_recommendation_id(table_name, ordered_columns),
table=table_name,
recommended_index=index,
reason=reason,
query_patterns_helped=[pattern.query_id],
estimated_benefit=self._estimate_benefit(combined_selectivity, pattern.frequency),
estimated_overhead=f"Medium (composite index with {len(ordered_columns)} columns)",
priority=self._calculate_priority(combined_selectivity, pattern.frequency, len(ordered_columns)),
sql_statement=f"CREATE INDEX {index_name} ON {table_name} ({', '.join(ordered_columns)});",
selectivity_analysis={
"column_selectivities": {col: sel for col, sel in column_selectivities},
"combined_selectivity": combined_selectivity,
"column_order_rationale": "Ordered by selectivity (most selective first)"
}
)
def _create_covering_index_recommendation(
self,
table_name: str,
where_columns: List[str],
pattern: QueryPattern
) -> Optional[IndexRecommendation]:
"""Create recommendation for covering index."""
order_columns = [col['column'] for col in pattern.order_by if col['column'] in self.tables[table_name]]
# Combine WHERE and ORDER BY columns
index_columns = where_columns.copy()
include_columns = []
# Add ORDER BY columns to index columns
for col in order_columns:
if col not in index_columns:
index_columns.append(col)
# Limit index columns
if len(index_columns) > self.max_composite_index_columns:
include_columns = index_columns[self.max_composite_index_columns:]
index_columns = index_columns[:self.max_composite_index_columns]
index_name = f"idx_{table_name}_covering_{'_'.join(index_columns[:3])}"
if len(index_name) > 63:
index_name = f"idx_{table_name}_covering_{abs(hash('_'.join(index_columns))) % 10000}"
index = Index(
name=index_name,
table=table_name,
columns=index_columns,
include_columns=include_columns,
index_type="btree"
)
reason = f"Covering index for WHERE + ORDER BY optimization"
# Calculate selectivity for main columns
main_selectivity = 0.5 # Default for covering indexes
if where_columns:
selectivities = [
self.selectivity_estimator.estimate_selectivity(self.tables[table_name][col])
for col in where_columns[:2] # Consider first 2 columns
]
main_selectivity = max(selectivities)
sql_parts = [f"CREATE INDEX {index_name} ON {table_name} ({', '.join(index_columns)})"]
if include_columns:
sql_parts.append(f" INCLUDE ({', '.join(include_columns)})")
sql_statement = ''.join(sql_parts) + ";"
return IndexRecommendation(
recommendation_id=self._generate_recommendation_id(table_name, index_columns, "covering"),
table=table_name,
recommended_index=index,
reason=reason,
query_patterns_helped=[pattern.query_id],
estimated_benefit="High (eliminates table lookups for SELECT)",
estimated_overhead=f"High (covering index with {len(index_columns)} columns)",
priority=self._calculate_priority(main_selectivity, pattern.frequency, len(index_columns)),
sql_statement=sql_statement,
selectivity_analysis={
"main_columns_selectivity": main_selectivity,
"covering_benefit": "Eliminates table lookup for SELECT queries"
}
)
def _create_join_index_recommendation(
self,
table_name: str,
column: str,
pattern: QueryPattern,
join_condition: Dict[str, Any]
) -> Optional[IndexRecommendation]:
"""Create recommendation for JOIN optimization index."""
column_obj = self.tables[table_name][column]
selectivity = self.selectivity_estimator.estimate_selectivity(column_obj)
index_name = f"idx_{table_name}_{column}_join"
index = Index(
name=index_name,
table=table_name,
columns=[column],
index_type="btree"
)
foreign_table = join_condition.get('foreign_table', 'unknown')
reason = f"Optimize JOIN with {foreign_table} table on {column}"
return IndexRecommendation(
recommendation_id=self._generate_recommendation_id(table_name, [column], "join"),
table=table_name,
recommended_index=index,
reason=reason,
query_patterns_helped=[pattern.query_id],
estimated_benefit=self._estimate_join_benefit(pattern.frequency),
estimated_overhead="Low (single column for JOIN)",
priority=2, # JOINs are generally high priority
sql_statement=f"CREATE INDEX {index_name} ON {table_name} ({column});",
selectivity_analysis={
"column_selectivity": selectivity,
"join_optimization": True
}
)
def _generate_recommendation_id(self, table: str, columns: List[str], suffix: str = "") -> str:
"""Generate unique recommendation ID."""
content = f"{table}_{'_'.join(sorted(columns))}_{suffix}"
return hashlib.md5(content.encode()).hexdigest()[:8]
def _estimate_benefit(self, selectivity: float, frequency: int) -> str:
"""Estimate performance benefit of index."""
if selectivity > 0.8 and frequency > 50:
return "Very High"
elif selectivity > 0.6 and frequency > 20:
return "High"
elif selectivity > 0.4 or frequency > 10:
return "Medium"
else:
return "Low"
def _estimate_join_benefit(self, frequency: int) -> str:
"""Estimate benefit for JOIN indexes."""
if frequency > 50:
return "Very High (frequent JOINs)"
elif frequency > 20:
return "High (regular JOINs)"
elif frequency > 5:
return "Medium (occasional JOINs)"
else:
return "Low (rare JOINs)"
def _calculate_priority(self, selectivity: float, frequency: int, column_count: int) -> int:
"""Calculate priority score (1 = highest priority)."""
# Base score calculation
score = 0
# Selectivity contribution (0-50 points)
score += int(selectivity * 50)
# Frequency contribution (0-30 points)
score += min(frequency, 30)
# Penalty for complex indexes (subtract points)
score -= (column_count - 1) * 5
# Convert to priority levels
if score >= 70:
return 1 # Highest
elif score >= 50:
return 2 # High
elif score >= 30:
return 3 # Medium
else:
return 4 # Low
def _deduplicate_recommendations(self, recommendations: List[IndexRecommendation]) -> List[IndexRecommendation]:
"""Remove duplicate recommendations."""
seen_indexes = set()
unique_recommendations = []
for rec in recommendations:
index_signature = (rec.table, tuple(rec.recommended_index.columns))
if index_signature not in seen_indexes:
seen_indexes.add(index_signature)
unique_recommendations.append(rec)
else:
# Merge query patterns helped
for existing_rec in unique_recommendations:
if (existing_rec.table == rec.table and
existing_rec.recommended_index.columns == rec.recommended_index.columns):
existing_rec.query_patterns_helped.extend(rec.query_patterns_helped)
break
return unique_recommendations
def _prioritize_recommendations(self, recommendations: List[IndexRecommendation]) -> List[IndexRecommendation]:
"""Sort recommendations by priority."""
return sorted(recommendations, key=lambda x: (x.priority, -len(x.query_patterns_helped)))
def analyze_redundant_indexes(self) -> List[RedundancyIssue]:
"""Identify redundant, overlapping, and potentially unused indexes."""
redundancy_issues = []
for table_name, indexes in self.existing_indexes.items():
if len(indexes) < 2:
continue
# Find duplicate indexes
duplicates = self._find_duplicate_indexes(table_name, indexes)
redundancy_issues.extend(duplicates)
# Find overlapping indexes
overlapping = self._find_overlapping_indexes(table_name, indexes)
redundancy_issues.extend(overlapping)
# Find potentially unused indexes
unused = self._find_unused_indexes(table_name, indexes)
redundancy_issues.extend(unused)
return redundancy_issues
def _find_duplicate_indexes(self, table_name: str, indexes: List[Index]) -> List[RedundancyIssue]:
"""Find exactly duplicate indexes."""
issues = []
seen_signatures = {}
for index in indexes:
signature = (tuple(index.columns), index.unique, index.partial_condition)
if signature in seen_signatures:
existing_index = seen_signatures[signature]
issues.append(RedundancyIssue(
issue_type="DUPLICATE",
affected_indexes=[existing_index.name, index.name],
table=table_name,
description=f"Indexes '{existing_index.name}' and '{index.name}' are identical",
recommendation=f"Drop one of the duplicate indexes",
sql_statements=[f"DROP INDEX {index.name};"]
))
else:
seen_signatures[signature] = index
return issues
def _find_overlapping_indexes(self, table_name: str, indexes: List[Index]) -> List[RedundancyIssue]:
"""Find overlapping indexes that might be redundant."""
issues = []
for i, index1 in enumerate(indexes):
for index2 in indexes[i+1:]:
overlap_ratio = self._calculate_overlap_ratio(index1, index2)
if overlap_ratio >= self.redundancy_overlap_threshold:
# Determine which index to keep
if len(index1.columns) <= len(index2.columns):
redundant_index = index1
keep_index = index2
else:
redundant_index = index2
keep_index = index1
issues.append(RedundancyIssue(
issue_type="OVERLAPPING",
affected_indexes=[index1.name, index2.name],
table=table_name,
description=f"Index '{redundant_index.name}' overlaps {int(overlap_ratio * 100)}% "
f"with '{keep_index.name}'",
recommendation=f"Consider dropping '{redundant_index.name}' as it's largely "
f"covered by '{keep_index.name}'",
sql_statements=[f"DROP INDEX {redundant_index.name};"]
))
return issues
def _calculate_overlap_ratio(self, index1: Index, index2: Index) -> float:
"""Calculate overlap ratio between two indexes."""
cols1 = set(index1.columns)
cols2 = set(index2.columns)
if not cols1 or not cols2:
return 0.0
intersection = len(cols1.intersection(cols2))
union = len(cols1.union(cols2))
return intersection / union if union > 0 else 0.0
def _find_unused_indexes(self, table_name: str, indexes: List[Index]) -> List[RedundancyIssue]:
"""Find potentially unused indexes based on query patterns."""
issues = []
# Collect all columns used in query patterns for this table
used_columns = set()
table_patterns = [p for p in self.query_patterns if p.table == table_name]
for pattern in table_patterns:
# Add WHERE condition columns
for condition in pattern.where_conditions:
if condition.get('column'):
used_columns.add(condition['column'])
# Add JOIN columns
for join in pattern.join_conditions:
if join.get('local_column'):
used_columns.add(join['local_column'])
# Add ORDER BY columns
for order in pattern.order_by:
if order.get('column'):
used_columns.add(order['column'])
# Add GROUP BY columns
used_columns.update(pattern.group_by)
if not used_columns:
return issues # Can't determine usage without query patterns
for index in indexes:
index_columns = set(index.columns)
if not index_columns.intersection(used_columns):
issues.append(RedundancyIssue(
issue_type="UNUSED",
affected_indexes=[index.name],
table=table_name,
description=f"Index '{index.name}' columns {index.columns} are not used in any query patterns",
recommendation="Consider dropping this index if it's truly unused (verify with query logs)",
sql_statements=[f"-- Review usage before dropping\n-- DROP INDEX {index.name};"]
))
return issues
def estimate_index_sizes(self) -> Dict[str, Dict[str, Any]]:
"""Estimate storage requirements for recommended indexes."""
size_estimates = {}
# This is a simplified estimation - in practice, would need actual table statistics
for table_name in self.tables:
size_estimates[table_name] = {
"estimated_table_rows": 10000, # Default estimate
"existing_indexes_size_mb": len(self.existing_indexes.get(table_name, [])) * 5, # Rough estimate
"index_overhead_per_column_mb": 2 # Rough estimate per column
}
return size_estimates
def generate_analysis_report(self) -> Dict[str, Any]:
"""Generate comprehensive analysis report."""
recommendations = self.analyze_missing_indexes()
redundancy_issues = self.analyze_redundant_indexes()
size_estimates = self.estimate_index_sizes()
# Calculate statistics
total_existing_indexes = sum(len(indexes) for indexes in self.existing_indexes.values())
tables_analyzed = len(self.tables)
query_patterns_analyzed = len(self.query_patterns)
# Categorize recommendations by priority
high_priority = [r for r in recommendations if r.priority <= 2]
medium_priority = [r for r in recommendations if r.priority == 3]
low_priority = [r for r in recommendations if r.priority >= 4]
return {
"analysis_summary": {
"tables_analyzed": tables_analyzed,
"query_patterns_analyzed": query_patterns_analyzed,
"existing_indexes": total_existing_indexes,
"total_recommendations": len(recommendations),
"high_priority_recommendations": len(high_priority),
"redundancy_issues_found": len(redundancy_issues)
},
"index_recommendations": {
"high_priority": [asdict(r) for r in high_priority],
"medium_priority": [asdict(r) for r in medium_priority],
"low_priority": [asdict(r) for r in low_priority]
},
"redundancy_analysis": [asdict(issue) for issue in redundancy_issues],
"size_estimates": size_estimates,
"sql_statements": {
"create_indexes": [rec.sql_statement for rec in recommendations],
"drop_redundant": [
stmt for issue in redundancy_issues
for stmt in issue.sql_statements
]
},
"performance_impact": self._generate_performance_impact_analysis(recommendations)
}
def _generate_performance_impact_analysis(self, recommendations: List[IndexRecommendation]) -> Dict[str, Any]:
"""Generate performance impact analysis."""
impact_analysis = {
"query_optimization": {},
"write_overhead": {},
"storage_impact": {}
}
# Analyze query optimization impact
query_benefits = defaultdict(list)
for rec in recommendations:
for query_id in rec.query_patterns_helped:
query_benefits[query_id].append(rec.estimated_benefit)
impact_analysis["query_optimization"] = {
"queries_improved": len(query_benefits),
"high_impact_queries": len([q for q, benefits in query_benefits.items()
if any("High" in benefit for benefit in benefits)]),
"benefit_distribution": dict(Counter(
rec.estimated_benefit for rec in recommendations
))
}
# Analyze write overhead
impact_analysis["write_overhead"] = {
"total_new_indexes": len(recommendations),
"estimated_insert_overhead": f"{len(recommendations) * 5}%", # Rough estimate
"tables_most_affected": list(Counter(rec.table for rec in recommendations).most_common(3))
}
return impact_analysis
def format_text_report(self, analysis: Dict[str, Any]) -> str:
"""Format analysis as human-readable text report."""
lines = []
lines.append("DATABASE INDEX OPTIMIZATION REPORT")
lines.append("=" * 50)
lines.append("")
# Summary
summary = analysis["analysis_summary"]
lines.append("ANALYSIS SUMMARY")
lines.append("-" * 16)
lines.append(f"Tables Analyzed: {summary['tables_analyzed']}")
lines.append(f"Query Patterns: {summary['query_patterns_analyzed']}")
lines.append(f"Existing Indexes: {summary['existing_indexes']}")
lines.append(f"New Recommendations: {summary['total_recommendations']}")
lines.append(f"High Priority: {summary['high_priority_recommendations']}")
lines.append(f"Redundancy Issues: {summary['redundancy_issues_found']}")
lines.append("")
# High Priority Recommendations
high_priority = analysis["index_recommendations"]["high_priority"]
if high_priority:
lines.append(f"HIGH PRIORITY RECOMMENDATIONS ({len(high_priority)})")
lines.append("-" * 35)
for i, rec in enumerate(high_priority[:10], 1): # Show top 10
lines.append(f"{i}. {rec['table']}: {rec['reason']}")
lines.append(f" Columns: {', '.join(rec['recommended_index']['columns'])}")
lines.append(f" Benefit: {rec['estimated_benefit']}")
lines.append(f" SQL: {rec['sql_statement']}")
lines.append("")
# Redundancy Issues
redundancy = analysis["redundancy_analysis"]
if redundancy:
lines.append(f"REDUNDANCY ISSUES ({len(redundancy)})")
lines.append("-" * 20)
for issue in redundancy[:5]: # Show first 5
lines.append(f"• {issue['issue_type']}: {issue['description']}")
lines.append(f" Recommendation: {issue['recommendation']}")
if issue['sql_statements']:
lines.append(f" SQL: {issue['sql_statements'][0]}")
lines.append("")
# Performance Impact
perf_impact = analysis["performance_impact"]
lines.append("PERFORMANCE IMPACT ANALYSIS")
lines.append("-" * 30)
query_opt = perf_impact["query_optimization"]
lines.append(f"Queries to be optimized: {query_opt['queries_improved']}")
lines.append(f"High impact optimizations: {query_opt['high_impact_queries']}")
write_overhead = perf_impact["write_overhead"]
lines.append(f"Estimated insert overhead: {write_overhead['estimated_insert_overhead']}")
lines.append("")
# SQL Statements Summary
sql_statements = analysis["sql_statements"]
create_statements = sql_statements["create_indexes"]
if create_statements:
lines.append("RECOMMENDED CREATE INDEX STATEMENTS")
lines.append("-" * 36)
for i, stmt in enumerate(create_statements[:10], 1):
lines.append(f"{i}. {stmt}")
if len(create_statements) > 10:
lines.append(f"... and {len(create_statements) - 10} more")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Optimize database indexes based on schema and query patterns")
parser.add_argument("--schema", "-s", required=True, help="Schema definition JSON file")
parser.add_argument("--queries", "-q", required=True, help="Query patterns JSON file")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
parser.add_argument("--format", "-f", choices=["json", "text"], default="text",
help="Output format")
parser.add_argument("--analyze-existing", "-e", action="store_true",
help="Include analysis of existing indexes")
parser.add_argument("--min-priority", "-p", type=int, default=4,
help="Minimum priority level to include (1=highest, 4=lowest)")
args = parser.parse_args()
try:
# Load schema
with open(args.schema, 'r') as f:
schema_data = json.load(f)
# Load queries
with open(args.queries, 'r') as f:
query_data = json.load(f)
# Initialize optimizer
optimizer = IndexOptimizer()
optimizer.load_schema(schema_data)
optimizer.load_query_patterns(query_data)
# Generate analysis
analysis = optimizer.generate_analysis_report()
# Filter by priority if specified
if args.min_priority < 4:
for priority_level in ["high_priority", "medium_priority", "low_priority"]:
analysis["index_recommendations"][priority_level] = [
rec for rec in analysis["index_recommendations"][priority_level]
if rec["priority"] <= args.min_priority
]
# Format output
if args.format == "json":
output = json.dumps(analysis, indent=2)
else:
output = optimizer.format_text_report(analysis)
# Write output
if args.output:
with open(args.output, 'w') as f:
f.write(output)
else:
print(output)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main()) #!/usr/bin/env python3
"""
Database Migration Generator
Generates safe migration scripts between schema versions:
- Compares current and target schemas
- Generates ALTER TABLE statements for schema changes
- Implements zero-downtime migration strategies (expand-contract pattern)
- Creates rollback scripts for all changes
- Generates validation queries to verify migrations
- Handles complex changes like table splits/merges
Input: Current schema JSON + Target schema JSON
Output: Migration SQL + Rollback SQL + Validation queries + Execution plan
Usage:
python migration_generator.py --current current_schema.json --target target_schema.json --output migration.sql
python migration_generator.py --current current.json --target target.json --format json
python migration_generator.py --current current.json --target target.json --zero-downtime
python migration_generator.py --current current.json --target target.json --validate-only
"""
import argparse
import json
import re
import sys
from collections import defaultdict, OrderedDict
from typing import Dict, List, Set, Tuple, Optional, Any, Union
from dataclasses import dataclass, asdict
from datetime import datetime
import hashlib
@dataclass
class Column:
name: str
data_type: str
nullable: bool = True
primary_key: bool = False
unique: bool = False
foreign_key: Optional[str] = None
default_value: Optional[str] = None
check_constraint: Optional[str] = None
@dataclass
class Table:
name: str
columns: Dict[str, Column]
primary_key: List[str]
foreign_keys: Dict[str, str] # column -> referenced_table.column
unique_constraints: List[List[str]]
check_constraints: Dict[str, str]
indexes: List[Dict[str, Any]]
@dataclass
class MigrationStep:
step_id: str
step_type: str
table: str
description: str
sql_forward: str
sql_rollback: str
validation_sql: Optional[str] = None
dependencies: List[str] = None
risk_level: str = "LOW" # LOW, MEDIUM, HIGH
estimated_time: Optional[str] = None
zero_downtime_phase: Optional[str] = None # EXPAND, CONTRACT, or None
@dataclass
class MigrationPlan:
migration_id: str
created_at: str
source_schema_hash: str
target_schema_hash: str
steps: List[MigrationStep]
summary: Dict[str, Any]
execution_order: List[str]
rollback_order: List[str]
@dataclass
class ValidationCheck:
check_id: str
check_type: str
table: str
description: str
sql_query: str
expected_result: Any
critical: bool = True
class SchemaComparator:
"""Compares two schema versions and identifies differences."""
def __init__(self):
self.current_schema: Dict[str, Table] = {}
self.target_schema: Dict[str, Table] = {}
self.changes: Dict[str, List[Dict[str, Any]]] = {
'tables_added': [],
'tables_dropped': [],
'tables_renamed': [],
'columns_added': [],
'columns_dropped': [],
'columns_modified': [],
'columns_renamed': [],
'constraints_added': [],
'constraints_dropped': [],
'indexes_added': [],
'indexes_dropped': []
}
def load_schemas(self, current_data: Dict[str, Any], target_data: Dict[str, Any]):
"""Load current and target schemas."""
self.current_schema = self._parse_schema(current_data)
self.target_schema = self._parse_schema(target_data)
def _parse_schema(self, schema_data: Dict[str, Any]) -> Dict[str, Table]:
"""Parse schema JSON into Table objects."""
tables = {}
if 'tables' not in schema_data:
return tables
for table_name, table_def in schema_data['tables'].items():
columns = {}
primary_key = table_def.get('primary_key', [])
foreign_keys = {}
# Parse columns
for col_name, col_def in table_def.get('columns', {}).items():
column = Column(
name=col_name,
data_type=col_def.get('type', 'VARCHAR(255)'),
nullable=col_def.get('nullable', True),
primary_key=col_name in primary_key,
unique=col_def.get('unique', False),
foreign_key=col_def.get('foreign_key'),
default_value=col_def.get('default'),
check_constraint=col_def.get('check_constraint')
)
columns[col_name] = column
if column.foreign_key:
foreign_keys[col_name] = column.foreign_key
table = Table(
name=table_name,
columns=columns,
primary_key=primary_key,
foreign_keys=foreign_keys,
unique_constraints=table_def.get('unique_constraints', []),
check_constraints=table_def.get('check_constraints', {}),
indexes=table_def.get('indexes', [])
)
tables[table_name] = table
return tables
def compare_schemas(self) -> Dict[str, List[Dict[str, Any]]]:
"""Compare schemas and identify all changes."""
self._compare_tables()
self._compare_columns()
self._compare_constraints()
self._compare_indexes()
return self.changes
def _compare_tables(self):
"""Compare table-level changes."""
current_tables = set(self.current_schema.keys())
target_tables = set(self.target_schema.keys())
# Tables added
for table_name in target_tables - current_tables:
self.changes['tables_added'].append({
'table': table_name,
'definition': self.target_schema[table_name]
})
# Tables dropped
for table_name in current_tables - target_tables:
self.changes['tables_dropped'].append({
'table': table_name,
'definition': self.current_schema[table_name]
})
# Tables renamed (heuristic based on column similarity)
self._detect_renamed_tables(current_tables - target_tables, target_tables - current_tables)
def _detect_renamed_tables(self, dropped_tables: Set[str], added_tables: Set[str]):
"""Detect renamed tables based on column similarity."""
if not dropped_tables or not added_tables:
return
# Calculate similarity scores
similarity_scores = []
for dropped_table in dropped_tables:
for added_table in added_tables:
score = self._calculate_table_similarity(dropped_table, added_table)
if score > 0.7: # High similarity threshold
similarity_scores.append((score, dropped_table, added_table))
# Sort by similarity and identify renames
similarity_scores.sort(reverse=True)
used_tables = set()
for score, old_name, new_name in similarity_scores:
if old_name not in used_tables and new_name not in used_tables:
self.changes['tables_renamed'].append({
'old_name': old_name,
'new_name': new_name,
'similarity_score': score
})
used_tables.add(old_name)
used_tables.add(new_name)
# Remove from added/dropped lists
self.changes['tables_added'] = [t for t in self.changes['tables_added'] if t['table'] != new_name]
self.changes['tables_dropped'] = [t for t in self.changes['tables_dropped'] if t['table'] != old_name]
def _calculate_table_similarity(self, table1_name: str, table2_name: str) -> float:
"""Calculate similarity between two tables based on columns."""
table1 = self.current_schema[table1_name]
table2 = self.target_schema[table2_name]
cols1 = set(table1.columns.keys())
cols2 = set(table2.columns.keys())
if not cols1 and not cols2:
return 1.0
elif not cols1 or not cols2:
return 0.0
intersection = len(cols1.intersection(cols2))
union = len(cols1.union(cols2))
return intersection / union
def _compare_columns(self):
"""Compare column-level changes."""
common_tables = set(self.current_schema.keys()).intersection(set(self.target_schema.keys()))
for table_name in common_tables:
current_table = self.current_schema[table_name]
target_table = self.target_schema[table_name]
current_columns = set(current_table.columns.keys())
target_columns = set(target_table.columns.keys())
# Columns added
for col_name in target_columns - current_columns:
self.changes['columns_added'].append({
'table': table_name,
'column': col_name,
'definition': target_table.columns[col_name]
})
# Columns dropped
for col_name in current_columns - target_columns:
self.changes['columns_dropped'].append({
'table': table_name,
'column': col_name,
'definition': current_table.columns[col_name]
})
# Columns modified
for col_name in current_columns.intersection(target_columns):
current_col = current_table.columns[col_name]
target_col = target_table.columns[col_name]
if self._columns_different(current_col, target_col):
self.changes['columns_modified'].append({
'table': table_name,
'column': col_name,
'current_definition': current_col,
'target_definition': target_col,
'changes': self._describe_column_changes(current_col, target_col)
})
def _columns_different(self, col1: Column, col2: Column) -> bool:
"""Check if two columns have different definitions."""
return (col1.data_type != col2.data_type or
col1.nullable != col2.nullable or
col1.default_value != col2.default_value or
col1.unique != col2.unique or
col1.foreign_key != col2.foreign_key or
col1.check_constraint != col2.check_constraint)
def _describe_column_changes(self, current_col: Column, target_col: Column) -> List[str]:
"""Describe specific changes between column definitions."""
changes = []
if current_col.data_type != target_col.data_type:
changes.append(f"type: {current_col.data_type} -> {target_col.data_type}")
if current_col.nullable != target_col.nullable:
changes.append(f"nullable: {current_col.nullable} -> {target_col.nullable}")
if current_col.default_value != target_col.default_value:
changes.append(f"default: {current_col.default_value} -> {target_col.default_value}")
if current_col.unique != target_col.unique:
changes.append(f"unique: {current_col.unique} -> {target_col.unique}")
if current_col.foreign_key != target_col.foreign_key:
changes.append(f"foreign_key: {current_col.foreign_key} -> {target_col.foreign_key}")
return changes
def _compare_constraints(self):
"""Compare constraint changes."""
common_tables = set(self.current_schema.keys()).intersection(set(self.target_schema.keys()))
for table_name in common_tables:
current_table = self.current_schema[table_name]
target_table = self.target_schema[table_name]
# Compare primary keys
if current_table.primary_key != target_table.primary_key:
if current_table.primary_key:
self.changes['constraints_dropped'].append({
'table': table_name,
'constraint_type': 'PRIMARY_KEY',
'columns': current_table.primary_key
})
if target_table.primary_key:
self.changes['constraints_added'].append({
'table': table_name,
'constraint_type': 'PRIMARY_KEY',
'columns': target_table.primary_key
})
# Compare unique constraints
current_unique = set(tuple(uc) for uc in current_table.unique_constraints)
target_unique = set(tuple(uc) for uc in target_table.unique_constraints)
for constraint in target_unique - current_unique:
self.changes['constraints_added'].append({
'table': table_name,
'constraint_type': 'UNIQUE',
'columns': list(constraint)
})
for constraint in current_unique - target_unique:
self.changes['constraints_dropped'].append({
'table': table_name,
'constraint_type': 'UNIQUE',
'columns': list(constraint)
})
# Compare check constraints
current_checks = set(current_table.check_constraints.items())
target_checks = set(target_table.check_constraints.items())
for name, condition in target_checks - current_checks:
self.changes['constraints_added'].append({
'table': table_name,
'constraint_type': 'CHECK',
'constraint_name': name,
'condition': condition
})
for name, condition in current_checks - target_checks:
self.changes['constraints_dropped'].append({
'table': table_name,
'constraint_type': 'CHECK',
'constraint_name': name,
'condition': condition
})
def _compare_indexes(self):
"""Compare index changes."""
common_tables = set(self.current_schema.keys()).intersection(set(self.target_schema.keys()))
for table_name in common_tables:
current_indexes = {idx['name']: idx for idx in self.current_schema[table_name].indexes}
target_indexes = {idx['name']: idx for idx in self.target_schema[table_name].indexes}
current_names = set(current_indexes.keys())
target_names = set(target_indexes.keys())
# Indexes added
for idx_name in target_names - current_names:
self.changes['indexes_added'].append({
'table': table_name,
'index': target_indexes[idx_name]
})
# Indexes dropped
for idx_name in current_names - target_names:
self.changes['indexes_dropped'].append({
'table': table_name,
'index': current_indexes[idx_name]
})
class MigrationGenerator:
"""Generates migration steps from schema differences."""
def __init__(self, zero_downtime: bool = False):
self.zero_downtime = zero_downtime
self.migration_steps: List[MigrationStep] = []
self.step_counter = 0
# Data type conversion safety
self.safe_type_conversions = {
('VARCHAR(50)', 'VARCHAR(100)'): True, # Expanding varchar
('INT', 'BIGINT'): True, # Expanding integer
('DECIMAL(10,2)', 'DECIMAL(12,2)'): True, # Expanding decimal precision
}
self.risky_type_conversions = {
('VARCHAR(100)', 'VARCHAR(50)'): 'Data truncation possible',
('BIGINT', 'INT'): 'Data loss possible for large values',
('TEXT', 'VARCHAR(255)'): 'Data truncation possible'
}
def generate_migration(self, changes: Dict[str, List[Dict[str, Any]]]) -> MigrationPlan:
"""Generate complete migration plan from schema changes."""
self.migration_steps = []
self.step_counter = 0
# Generate steps in dependency order
self._generate_table_creation_steps(changes['tables_added'])
self._generate_column_addition_steps(changes['columns_added'])
self._generate_constraint_addition_steps(changes['constraints_added'])
self._generate_index_addition_steps(changes['indexes_added'])
self._generate_column_modification_steps(changes['columns_modified'])
self._generate_table_rename_steps(changes['tables_renamed'])
self._generate_index_removal_steps(changes['indexes_dropped'])
self._generate_constraint_removal_steps(changes['constraints_dropped'])
self._generate_column_removal_steps(changes['columns_dropped'])
self._generate_table_removal_steps(changes['tables_dropped'])
# Create migration plan
migration_id = self._generate_migration_id(changes)
execution_order = [step.step_id for step in self.migration_steps]
rollback_order = list(reversed(execution_order))
return MigrationPlan(
migration_id=migration_id,
created_at=datetime.now().isoformat(),
source_schema_hash=self._calculate_changes_hash(changes),
target_schema_hash="", # Would be calculated from target schema
steps=self.migration_steps,
summary=self._generate_summary(changes),
execution_order=execution_order,
rollback_order=rollback_order
)
def _generate_step_id(self) -> str:
"""Generate unique step ID."""
self.step_counter += 1
return f"step_{self.step_counter:03d}"
def _generate_table_creation_steps(self, tables_added: List[Dict[str, Any]]):
"""Generate steps for creating new tables."""
for table_info in tables_added:
table = table_info['definition']
step = self._create_table_step(table)
self.migration_steps.append(step)
def _create_table_step(self, table: Table) -> MigrationStep:
"""Create migration step for table creation."""
columns_sql = []
for col_name, column in table.columns.items():
col_sql = f"{col_name} {column.data_type}"
if not column.nullable:
col_sql += " NOT NULL"
if column.default_value:
col_sql += f" DEFAULT {column.default_value}"
if column.unique:
col_sql += " UNIQUE"
columns_sql.append(col_sql)
# Add primary key
if table.primary_key:
pk_sql = f"PRIMARY KEY ({', '.join(table.primary_key)})"
columns_sql.append(pk_sql)
# Add foreign keys
for col_name, ref in table.foreign_keys.items():
fk_sql = f"FOREIGN KEY ({col_name}) REFERENCES {ref}"
columns_sql.append(fk_sql)
create_sql = f"CREATE TABLE {table.name} (\n " + ",\n ".join(columns_sql) + "\n);"
drop_sql = f"DROP TABLE IF EXISTS {table.name};"
return MigrationStep(
step_id=self._generate_step_id(),
step_type="CREATE_TABLE",
table=table.name,
description=f"Create table {table.name} with {len(table.columns)} columns",
sql_forward=create_sql,
sql_rollback=drop_sql,
validation_sql=f"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table.name}';",
risk_level="LOW"
)
def _generate_column_addition_steps(self, columns_added: List[Dict[str, Any]]):
"""Generate steps for adding columns."""
for col_info in columns_added:
if self.zero_downtime:
# For zero-downtime, add columns as nullable first
step = self._add_column_zero_downtime_step(col_info)
else:
step = self._add_column_step(col_info)
self.migration_steps.append(step)
def _add_column_step(self, col_info: Dict[str, Any]) -> MigrationStep:
"""Create step for adding a column."""
table = col_info['table']
column = col_info['definition']
col_sql = f"{column.name} {column.data_type}"
if not column.nullable:
if column.default_value:
col_sql += f" DEFAULT {column.default_value} NOT NULL"
else:
# This is risky - adding NOT NULL without default
col_sql += " NOT NULL"
elif column.default_value:
col_sql += f" DEFAULT {column.default_value}"
add_sql = f"ALTER TABLE {table} ADD COLUMN {col_sql};"
drop_sql = f"ALTER TABLE {table} DROP COLUMN {column.name};"
risk_level = "HIGH" if not column.nullable and not column.default_value else "LOW"
return MigrationStep(
step_id=self._generate_step_id(),
step_type="ADD_COLUMN",
table=table,
description=f"Add column {column.name} to {table}",
sql_forward=add_sql,
sql_rollback=drop_sql,
validation_sql=f"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table}' AND column_name = '{column.name}';",
risk_level=risk_level
)
def _add_column_zero_downtime_step(self, col_info: Dict[str, Any]) -> MigrationStep:
"""Create zero-downtime step for adding column."""
table = col_info['table']
column = col_info['definition']
# Phase 1: Add as nullable with default if needed
col_sql = f"{column.name} {column.data_type}"
if column.default_value:
col_sql += f" DEFAULT {column.default_value}"
add_sql = f"ALTER TABLE {table} ADD COLUMN {col_sql};"
# If column should be NOT NULL, handle in separate phase
if not column.nullable:
# Add comment about needing follow-up step
add_sql += f"\n-- Follow-up needed: Add NOT NULL constraint after data population"
drop_sql = f"ALTER TABLE {table} DROP COLUMN {column.name};"
return MigrationStep(
step_id=self._generate_step_id(),
step_type="ADD_COLUMN_ZD",
table=table,
description=f"Add column {column.name} to {table} (zero-downtime phase 1)",
sql_forward=add_sql,
sql_rollback=drop_sql,
validation_sql=f"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table}' AND column_name = '{column.name}';",
risk_level="LOW",
zero_downtime_phase="EXPAND"
)
def _generate_column_modification_steps(self, columns_modified: List[Dict[str, Any]]):
"""Generate steps for modifying columns."""
for col_info in columns_modified:
if self.zero_downtime:
steps = self._modify_column_zero_downtime_steps(col_info)
self.migration_steps.extend(steps)
else:
step = self._modify_column_step(col_info)
self.migration_steps.append(step)
def _modify_column_step(self, col_info: Dict[str, Any]) -> MigrationStep:
"""Create step for modifying a column."""
table = col_info['table']
column = col_info['column']
current_def = col_info['current_definition']
target_def = col_info['target_definition']
changes = col_info['changes']
alter_statements = []
rollback_statements = []
# Handle different types of changes
if current_def.data_type != target_def.data_type:
alter_statements.append(f"ALTER COLUMN {column} TYPE {target_def.data_type}")
rollback_statements.append(f"ALTER COLUMN {column} TYPE {current_def.data_type}")
if current_def.nullable != target_def.nullable:
if target_def.nullable:
alter_statements.append(f"ALTER COLUMN {column} DROP NOT NULL")
rollback_statements.append(f"ALTER COLUMN {column} SET NOT NULL")
else:
alter_statements.append(f"ALTER COLUMN {column} SET NOT NULL")
rollback_statements.append(f"ALTER COLUMN {column} DROP NOT NULL")
if current_def.default_value != target_def.default_value:
if target_def.default_value:
alter_statements.append(f"ALTER COLUMN {column} SET DEFAULT {target_def.default_value}")
else:
alter_statements.append(f"ALTER COLUMN {column} DROP DEFAULT")
if current_def.default_value:
rollback_statements.append(f"ALTER COLUMN {column} SET DEFAULT {current_def.default_value}")
else:
rollback_statements.append(f"ALTER COLUMN {column} DROP DEFAULT")
# Build SQL
alter_sql = f"ALTER TABLE {table}\n " + ",\n ".join(alter_statements) + ";"
rollback_sql = f"ALTER TABLE {table}\n " + ",\n ".join(rollback_statements) + ";"
# Assess risk
risk_level = self._assess_column_modification_risk(current_def, target_def)
return MigrationStep(
step_id=self._generate_step_id(),
step_type="MODIFY_COLUMN",
table=table,
description=f"Modify column {column}: {', '.join(changes)}",
sql_forward=alter_sql,
sql_rollback=rollback_sql,
validation_sql=f"SELECT data_type, is_nullable FROM information_schema.columns WHERE table_name = '{table}' AND column_name = '{column}';",
risk_level=risk_level
)
def _modify_column_zero_downtime_steps(self, col_info: Dict[str, Any]) -> List[MigrationStep]:
"""Create zero-downtime steps for column modification."""
table = col_info['table']
column = col_info['column']
current_def = col_info['current_definition']
target_def = col_info['target_definition']
steps = []
# For zero-downtime, use expand-contract pattern
temp_column = f"{column}_new"
# Step 1: Add new column
step1 = MigrationStep(
step_id=self._generate_step_id(),
step_type="ADD_TEMP_COLUMN",
table=table,
description=f"Add temporary column {temp_column} for zero-downtime migration",
sql_forward=f"ALTER TABLE {table} ADD COLUMN {temp_column} {target_def.data_type};",
sql_rollback=f"ALTER TABLE {table} DROP COLUMN {temp_column};",
zero_downtime_phase="EXPAND"
)
steps.append(step1)
# Step 2: Copy data
step2 = MigrationStep(
step_id=self._generate_step_id(),
step_type="COPY_COLUMN_DATA",
table=table,
description=f"Copy data from {column} to {temp_column}",
sql_forward=f"UPDATE {table} SET {temp_column} = {column};",
sql_rollback=f"UPDATE {table} SET {temp_column} = NULL;",
zero_downtime_phase="EXPAND"
)
steps.append(step2)
# Step 3: Drop old column
step3 = MigrationStep(
step_id=self._generate_step_id(),
step_type="DROP_OLD_COLUMN",
table=table,
description=f"Drop original column {column}",
sql_forward=f"ALTER TABLE {table} DROP COLUMN {column};",
sql_rollback=f"ALTER TABLE {table} ADD COLUMN {column} {current_def.data_type};",
zero_downtime_phase="CONTRACT"
)
steps.append(step3)
# Step 4: Rename new column
step4 = MigrationStep(
step_id=self._generate_step_id(),
step_type="RENAME_COLUMN",
table=table,
description=f"Rename {temp_column} to {column}",
sql_forward=f"ALTER TABLE {table} RENAME COLUMN {temp_column} TO {column};",
sql_rollback=f"ALTER TABLE {table} RENAME COLUMN {column} TO {temp_column};",
zero_downtime_phase="CONTRACT"
)
steps.append(step4)
return steps
def _assess_column_modification_risk(self, current: Column, target: Column) -> str:
"""Assess risk level of column modification."""
if current.data_type != target.data_type:
conversion_key = (current.data_type, target.data_type)
if conversion_key in self.risky_type_conversions:
return "HIGH"
elif conversion_key not in self.safe_type_conversions:
return "MEDIUM"
if current.nullable and not target.nullable:
return "HIGH" # Adding NOT NULL constraint
return "LOW"
def _generate_constraint_addition_steps(self, constraints_added: List[Dict[str, Any]]):
"""Generate steps for adding constraints."""
for constraint_info in constraints_added:
step = self._add_constraint_step(constraint_info)
self.migration_steps.append(step)
def _add_constraint_step(self, constraint_info: Dict[str, Any]) -> MigrationStep:
"""Create step for adding constraint."""
table = constraint_info['table']
constraint_type = constraint_info['constraint_type']
if constraint_type == 'PRIMARY_KEY':
columns = constraint_info['columns']
constraint_name = f"pk_{table}"
add_sql = f"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} PRIMARY KEY ({', '.join(columns)});"
drop_sql = f"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};"
description = f"Add primary key on {', '.join(columns)}"
elif constraint_type == 'UNIQUE':
columns = constraint_info['columns']
constraint_name = f"uq_{table}_{'_'.join(columns)}"
add_sql = f"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} UNIQUE ({', '.join(columns)});"
drop_sql = f"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};"
description = f"Add unique constraint on {', '.join(columns)}"
elif constraint_type == 'CHECK':
constraint_name = constraint_info['constraint_name']
condition = constraint_info['condition']
add_sql = f"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} CHECK ({condition});"
drop_sql = f"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};"
description = f"Add check constraint: {condition}"
else:
return None
return MigrationStep(
step_id=self._generate_step_id(),
step_type="ADD_CONSTRAINT",
table=table,
description=description,
sql_forward=add_sql,
sql_rollback=drop_sql,
risk_level="MEDIUM" # Constraints can fail if data doesn't comply
)
def _generate_index_addition_steps(self, indexes_added: List[Dict[str, Any]]):
"""Generate steps for adding indexes."""
for index_info in indexes_added:
step = self._add_index_step(index_info)
self.migration_steps.append(step)
def _add_index_step(self, index_info: Dict[str, Any]) -> MigrationStep:
"""Create step for adding index."""
table = index_info['table']
index = index_info['index']
unique_keyword = "UNIQUE " if index.get('unique', False) else ""
columns_sql = ', '.join(index['columns'])
create_sql = f"CREATE {unique_keyword}INDEX {index['name']} ON {table} ({columns_sql});"
drop_sql = f"DROP INDEX {index['name']};"
return MigrationStep(
step_id=self._generate_step_id(),
step_type="ADD_INDEX",
table=table,
description=f"Create index {index['name']} on ({columns_sql})",
sql_forward=create_sql,
sql_rollback=drop_sql,
estimated_time="1-5 minutes depending on table size",
risk_level="LOW"
)
def _generate_table_rename_steps(self, tables_renamed: List[Dict[str, Any]]):
"""Generate steps for renaming tables."""
for rename_info in tables_renamed:
step = self._rename_table_step(rename_info)
self.migration_steps.append(step)
def _rename_table_step(self, rename_info: Dict[str, Any]) -> MigrationStep:
"""Create step for renaming table."""
old_name = rename_info['old_name']
new_name = rename_info['new_name']
rename_sql = f"ALTER TABLE {old_name} RENAME TO {new_name};"
rollback_sql = f"ALTER TABLE {new_name} RENAME TO {old_name};"
return MigrationStep(
step_id=self._generate_step_id(),
step_type="RENAME_TABLE",
table=old_name,
description=f"Rename table {old_name} to {new_name}",
sql_forward=rename_sql,
sql_rollback=rollback_sql,
validation_sql=f"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{new_name}';",
risk_level="LOW"
)
def _generate_column_removal_steps(self, columns_dropped: List[Dict[str, Any]]):
"""Generate steps for removing columns."""
for col_info in columns_dropped:
step = self._drop_column_step(col_info)
self.migration_steps.append(step)
def _drop_column_step(self, col_info: Dict[str, Any]) -> MigrationStep:
"""Create step for dropping column."""
table = col_info['table']
column = col_info['definition']
drop_sql = f"ALTER TABLE {table} DROP COLUMN {column.name};"
# Recreate column for rollback
col_sql = f"{column.name} {column.data_type}"
if not column.nullable:
col_sql += " NOT NULL"
if column.default_value:
col_sql += f" DEFAULT {column.default_value}"
add_sql = f"ALTER TABLE {table} ADD COLUMN {col_sql};"
return MigrationStep(
step_id=self._generate_step_id(),
step_type="DROP_COLUMN",
table=table,
description=f"Drop column {column.name} from {table}",
sql_forward=drop_sql,
sql_rollback=add_sql,
risk_level="HIGH" # Data loss risk
)
def _generate_constraint_removal_steps(self, constraints_dropped: List[Dict[str, Any]]):
"""Generate steps for removing constraints."""
for constraint_info in constraints_dropped:
step = self._drop_constraint_step(constraint_info)
if step:
self.migration_steps.append(step)
def _drop_constraint_step(self, constraint_info: Dict[str, Any]) -> Optional[MigrationStep]:
"""Create step for dropping constraint."""
table = constraint_info['table']
constraint_type = constraint_info['constraint_type']
if constraint_type == 'PRIMARY_KEY':
constraint_name = f"pk_{table}"
drop_sql = f"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};"
columns = constraint_info['columns']
add_sql = f"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} PRIMARY KEY ({', '.join(columns)});"
description = f"Drop primary key constraint"
elif constraint_type == 'UNIQUE':
columns = constraint_info['columns']
constraint_name = f"uq_{table}_{'_'.join(columns)}"
drop_sql = f"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};"
add_sql = f"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} UNIQUE ({', '.join(columns)});"
description = f"Drop unique constraint on {', '.join(columns)}"
elif constraint_type == 'CHECK':
constraint_name = constraint_info['constraint_name']
condition = constraint_info.get('condition', '')
drop_sql = f"ALTER TABLE {table} DROP CONSTRAINT {constraint_name};"
add_sql = f"ALTER TABLE {table} ADD CONSTRAINT {constraint_name} CHECK ({condition});"
description = f"Drop check constraint {constraint_name}"
else:
return None
return MigrationStep(
step_id=self._generate_step_id(),
step_type="DROP_CONSTRAINT",
table=table,
description=description,
sql_forward=drop_sql,
sql_rollback=add_sql,
risk_level="MEDIUM"
)
def _generate_index_removal_steps(self, indexes_dropped: List[Dict[str, Any]]):
"""Generate steps for removing indexes."""
for index_info in indexes_dropped:
step = self._drop_index_step(index_info)
self.migration_steps.append(step)
def _drop_index_step(self, index_info: Dict[str, Any]) -> MigrationStep:
"""Create step for dropping index."""
table = index_info['table']
index = index_info['index']
drop_sql = f"DROP INDEX {index['name']};"
# Recreate for rollback
unique_keyword = "UNIQUE " if index.get('unique', False) else ""
columns_sql = ', '.join(index['columns'])
create_sql = f"CREATE {unique_keyword}INDEX {index['name']} ON {table} ({columns_sql});"
return MigrationStep(
step_id=self._generate_step_id(),
step_type="DROP_INDEX",
table=table,
description=f"Drop index {index['name']}",
sql_forward=drop_sql,
sql_rollback=create_sql,
risk_level="LOW"
)
def _generate_table_removal_steps(self, tables_dropped: List[Dict[str, Any]]):
"""Generate steps for removing tables."""
for table_info in tables_dropped:
step = self._drop_table_step(table_info)
self.migration_steps.append(step)
def _drop_table_step(self, table_info: Dict[str, Any]) -> MigrationStep:
"""Create step for dropping table."""
table = table_info['definition']
drop_sql = f"DROP TABLE {table.name};"
# Would need to recreate entire table for rollback
# This is simplified - full implementation would generate CREATE TABLE statement
create_sql = f"-- Recreate table {table.name} (implementation needed)"
return MigrationStep(
step_id=self._generate_step_id(),
step_type="DROP_TABLE",
table=table.name,
description=f"Drop table {table.name}",
sql_forward=drop_sql,
sql_rollback=create_sql,
risk_level="HIGH" # Data loss risk
)
def _generate_migration_id(self, changes: Dict[str, List[Dict[str, Any]]]) -> str:
"""Generate unique migration ID."""
content = json.dumps(changes, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()[:8]
def _calculate_changes_hash(self, changes: Dict[str, List[Dict[str, Any]]]) -> str:
"""Calculate hash of changes for versioning."""
content = json.dumps(changes, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
def _generate_summary(self, changes: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]:
"""Generate migration summary."""
summary = {
"total_steps": len(self.migration_steps),
"changes_summary": {
"tables_added": len(changes['tables_added']),
"tables_dropped": len(changes['tables_dropped']),
"tables_renamed": len(changes['tables_renamed']),
"columns_added": len(changes['columns_added']),
"columns_dropped": len(changes['columns_dropped']),
"columns_modified": len(changes['columns_modified']),
"constraints_added": len(changes['constraints_added']),
"constraints_dropped": len(changes['constraints_dropped']),
"indexes_added": len(changes['indexes_added']),
"indexes_dropped": len(changes['indexes_dropped'])
},
"risk_assessment": {
"high_risk_steps": len([s for s in self.migration_steps if s.risk_level == "HIGH"]),
"medium_risk_steps": len([s for s in self.migration_steps if s.risk_level == "MEDIUM"]),
"low_risk_steps": len([s for s in self.migration_steps if s.risk_level == "LOW"])
},
"zero_downtime": self.zero_downtime
}
return summary
class ValidationGenerator:
"""Generates validation queries for migration verification."""
def generate_validations(self, migration_plan: MigrationPlan) -> List[ValidationCheck]:
"""Generate validation checks for migration plan."""
validations = []
for step in migration_plan.steps:
if step.step_type == "CREATE_TABLE":
validations.append(self._create_table_validation(step))
elif step.step_type == "ADD_COLUMN":
validations.append(self._add_column_validation(step))
elif step.step_type == "MODIFY_COLUMN":
validations.append(self._modify_column_validation(step))
elif step.step_type == "ADD_INDEX":
validations.append(self._add_index_validation(step))
return validations
def _create_table_validation(self, step: MigrationStep) -> ValidationCheck:
"""Create validation for table creation."""
return ValidationCheck(
check_id=f"validate_{step.step_id}",
check_type="TABLE_EXISTS",
table=step.table,
description=f"Verify table {step.table} exists",
sql_query=f"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{step.table}';",
expected_result=1
)
def _add_column_validation(self, step: MigrationStep) -> ValidationCheck:
"""Create validation for column addition."""
# Extract column name from SQL
column_match = re.search(r'ADD COLUMN (\w+)', step.sql_forward)
column_name = column_match.group(1) if column_match else "unknown"
return ValidationCheck(
check_id=f"validate_{step.step_id}",
check_type="COLUMN_EXISTS",
table=step.table,
description=f"Verify column {column_name} exists in {step.table}",
sql_query=f"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{step.table}' AND column_name = '{column_name}';",
expected_result=1
)
def _modify_column_validation(self, step: MigrationStep) -> ValidationCheck:
"""Create validation for column modification."""
return ValidationCheck(
check_id=f"validate_{step.step_id}",
check_type="COLUMN_MODIFIED",
table=step.table,
description=f"Verify column modification in {step.table}",
sql_query=step.validation_sql or f"SELECT 1;", # Use provided validation or default
expected_result=1
)
def _add_index_validation(self, step: MigrationStep) -> ValidationCheck:
"""Create validation for index addition."""
# Extract index name from SQL
index_match = re.search(r'INDEX (\w+)', step.sql_forward)
index_name = index_match.group(1) if index_match else "unknown"
return ValidationCheck(
check_id=f"validate_{step.step_id}",
check_type="INDEX_EXISTS",
table=step.table,
description=f"Verify index {index_name} exists",
sql_query=f"SELECT COUNT(*) FROM information_schema.statistics WHERE index_name = '{index_name}';",
expected_result=1
)
def format_migration_plan_text(plan: MigrationPlan, validations: List[ValidationCheck] = None) -> str:
"""Format migration plan as human-readable text."""
lines = []
lines.append("DATABASE MIGRATION PLAN")
lines.append("=" * 50)
lines.append(f"Migration ID: {plan.migration_id}")
lines.append(f"Created: {plan.created_at}")
lines.append(f"Zero Downtime: {plan.summary['zero_downtime']}")
lines.append("")
# Summary
summary = plan.summary
lines.append("MIGRATION SUMMARY")
lines.append("-" * 17)
lines.append(f"Total Steps: {summary['total_steps']}")
changes = summary['changes_summary']
for change_type, count in changes.items():
if count > 0:
lines.append(f"{change_type.replace('_', ' ').title()}: {count}")
lines.append("")
# Risk Assessment
risk = summary['risk_assessment']
lines.append("RISK ASSESSMENT")
lines.append("-" * 15)
lines.append(f"High Risk Steps: {risk['high_risk_steps']}")
lines.append(f"Medium Risk Steps: {risk['medium_risk_steps']}")
lines.append(f"Low Risk Steps: {risk['low_risk_steps']}")
lines.append("")
# Migration Steps
lines.append("MIGRATION STEPS")
lines.append("-" * 15)
for i, step in enumerate(plan.steps, 1):
lines.append(f"{i}. {step.description} ({step.risk_level} risk)")
lines.append(f" Type: {step.step_type}")
if step.zero_downtime_phase:
lines.append(f" Phase: {step.zero_downtime_phase}")
lines.append(f" Forward SQL: {step.sql_forward}")
lines.append(f" Rollback SQL: {step.sql_rollback}")
if step.estimated_time:
lines.append(f" Estimated Time: {step.estimated_time}")
lines.append("")
# Validation Checks
if validations:
lines.append("VALIDATION CHECKS")
lines.append("-" * 17)
for validation in validations:
lines.append(f"• {validation.description}")
lines.append(f" SQL: {validation.sql_query}")
lines.append(f" Expected: {validation.expected_result}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Generate database migration scripts")
parser.add_argument("--current", "-c", required=True, help="Current schema JSON file")
parser.add_argument("--target", "-t", required=True, help="Target schema JSON file")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
parser.add_argument("--format", "-f", choices=["json", "text", "sql"], default="text",
help="Output format")
parser.add_argument("--zero-downtime", "-z", action="store_true",
help="Generate zero-downtime migration strategy")
parser.add_argument("--validate-only", "-v", action="store_true",
help="Only generate validation queries")
parser.add_argument("--include-validations", action="store_true",
help="Include validation queries in output")
args = parser.parse_args()
try:
# Load schemas
with open(args.current, 'r') as f:
current_schema = json.load(f)
with open(args.target, 'r') as f:
target_schema = json.load(f)
# Compare schemas
comparator = SchemaComparator()
comparator.load_schemas(current_schema, target_schema)
changes = comparator.compare_schemas()
if not any(changes.values()):
print("No schema changes detected.")
return 0
# Generate migration
generator = MigrationGenerator(zero_downtime=args.zero_downtime)
migration_plan = generator.generate_migration(changes)
# Generate validations if requested
validations = None
if args.include_validations or args.validate_only:
validator = ValidationGenerator()
validations = validator.generate_validations(migration_plan)
# Format output
if args.validate_only:
output = json.dumps([asdict(v) for v in validations], indent=2)
elif args.format == "json":
result = {"migration_plan": asdict(migration_plan)}
if validations:
result["validations"] = [asdict(v) for v in validations]
output = json.dumps(result, indent=2)
elif args.format == "sql":
sql_lines = []
sql_lines.append("-- Database Migration Script")
sql_lines.append(f"-- Migration ID: {migration_plan.migration_id}")
sql_lines.append(f"-- Created: {migration_plan.created_at}")
sql_lines.append("")
for step in migration_plan.steps:
sql_lines.append(f"-- Step: {step.description}")
sql_lines.append(step.sql_forward)
sql_lines.append("")
output = "\n".join(sql_lines)
else: # text format
output = format_migration_plan_text(migration_plan, validations)
# Write output
if args.output:
with open(args.output, 'w') as f:
f.write(output)
else:
print(output)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main()) database-designer reference
Database Design Principles
Normalization Forms
First Normal Form (1NF)
- Atomic Values: Each column contains indivisible values
- Unique Column Names: No duplicate column names within a table
- Uniform Data Types: Each column contains the same type of data
- Row Uniqueness: No duplicate rows in the table
Example Violation:
-- BAD: Multiple phone numbers in one column
CREATE TABLE contacts (
id INT PRIMARY KEY,
name VARCHAR(100),
phones VARCHAR(200) -- "123-456-7890, 098-765-4321"
);
-- GOOD: Separate table for phone numbers
CREATE TABLE contacts (
id INT PRIMARY KEY,
name VARCHAR(100)
);
CREATE TABLE contact_phones (
id INT PRIMARY KEY,
contact_id INT REFERENCES contacts(id),
phone_number VARCHAR(20),
phone_type VARCHAR(10)
);Second Normal Form (2NF)
- 1NF Compliance: Must satisfy First Normal Form
- Full Functional Dependency: Non-key attributes depend on the entire primary key
- Partial Dependency Elimination: Remove attributes that depend on part of a composite key
Example Violation:
-- BAD: Student course table with partial dependencies
CREATE TABLE student_courses (
student_id INT,
course_id INT,
student_name VARCHAR(100), -- Depends only on student_id
course_name VARCHAR(100), -- Depends only on course_id
grade CHAR(1),
PRIMARY KEY (student_id, course_id)
);
-- GOOD: Separate tables eliminate partial dependencies
CREATE TABLE students (
id INT PRIMARY KEY,
name VARCHAR(100)
);
CREATE TABLE courses (
id INT PRIMARY KEY,
name VARCHAR(100)
);
CREATE TABLE enrollments (
student_id INT REFERENCES students(id),
course_id INT REFERENCES courses(id),
grade CHAR(1),
PRIMARY KEY (student_id, course_id)
);Third Normal Form (3NF)
- 2NF Compliance: Must satisfy Second Normal Form
- Transitive Dependency Elimination: Non-key attributes should not depend on other non-key attributes
- Direct Dependency: Non-key attributes depend directly on the primary key
Example Violation:
-- BAD: Employee table with transitive dependency
CREATE TABLE employees (
id INT PRIMARY KEY,
name VARCHAR(100),
department_id INT,
department_name VARCHAR(100), -- Depends on department_id, not employee id
department_budget DECIMAL(10,2) -- Transitive dependency
);
-- GOOD: Separate department information
CREATE TABLE departments (
id INT PRIMARY KEY,
name VARCHAR(100),
budget DECIMAL(10,2)
);
CREATE TABLE employees (
id INT PRIMARY KEY,
name VARCHAR(100),
department_id INT REFERENCES departments(id)
);Boyce-Codd Normal Form (BCNF)
- 3NF Compliance: Must satisfy Third Normal Form
- Determinant Key Rule: Every determinant must be a candidate key
- Stricter 3NF: Handles anomalies not covered by 3NF
Denormalization Strategies
When to Denormalize
- Read-Heavy Workloads: High query frequency with acceptable write trade-offs
- Performance Bottlenecks: Join operations causing significant latency
- Aggregation Needs: Frequent calculation of derived values
- Caching Requirements: Pre-computed results for common queries
Common Denormalization Patterns
Redundant Storage
-- Store calculated values to avoid expensive joins
CREATE TABLE orders (
id INT PRIMARY KEY,
customer_id INT REFERENCES customers(id),
customer_name VARCHAR(100), -- Denormalized from customers table
order_total DECIMAL(10,2), -- Denormalized calculation
created_at TIMESTAMP
);Materialized Aggregates
-- Pre-computed summary tables
CREATE TABLE customer_statistics (
customer_id INT PRIMARY KEY,
total_orders INT,
lifetime_value DECIMAL(12,2),
last_order_date DATE,
updated_at TIMESTAMP
);Index Optimization Strategies
B-Tree Indexes
- Default Choice: Best for range queries, sorting, and equality matches
- Column Order: Most selective columns first for composite indexes
- Prefix Matching: Supports leading column subset queries
- Maintenance Cost: Balanced tree structure with logarithmic operations
Hash Indexes
- Equality Queries: Optimal for exact match lookups
- Memory Efficiency: Constant-time access for single-value queries
- Range Limitations: Cannot support range or partial matches
- Use Cases: Primary keys, unique constraints, cache keys
Composite Indexes
-- Query pattern determines optimal column order
-- Query: WHERE status = 'active' AND created_date > '2023-01-01' ORDER BY priority DESC
CREATE INDEX idx_task_status_date_priority
ON tasks (status, created_date, priority DESC);
-- Query: WHERE user_id = 123 AND category IN ('A', 'B') AND date_field BETWEEN '...' AND '...'
CREATE INDEX idx_user_category_date
ON user_activities (user_id, category, date_field);Covering Indexes
-- Include additional columns to avoid table lookups
CREATE INDEX idx_user_email_covering
ON users (email)
INCLUDE (first_name, last_name, status);
-- Query can be satisfied entirely from the index
-- SELECT first_name, last_name, status FROM users WHERE email = '[email protected]';Partial Indexes
-- Index only relevant subset of data
CREATE INDEX idx_active_users_email
ON users (email)
WHERE status = 'active';
-- Index for recent orders only
CREATE INDEX idx_recent_orders_customer
ON orders (customer_id, created_at)
WHERE created_at > CURRENT_DATE - INTERVAL '30 days';Query Analysis & Optimization
Query Patterns Recognition
- Equality Filters: Single-column B-tree indexes
- Range Queries: B-tree with proper column ordering
- Text Search: Full-text indexes or trigram indexes
- Join Operations: Foreign key indexes on both sides
- Sorting Requirements: Indexes matching ORDER BY clauses
Index Selection Algorithm
1. Identify WHERE clause columns
2. Determine most selective columns first
3. Consider JOIN conditions
4. Include ORDER BY columns if possible
5. Evaluate covering index opportunities
6. Check for existing overlapping indexesData Modeling Patterns
Star Schema (Data Warehousing)
-- Central fact table
CREATE TABLE sales_facts (
sale_id BIGINT PRIMARY KEY,
product_id INT REFERENCES products(id),
customer_id INT REFERENCES customers(id),
date_id INT REFERENCES date_dimension(id),
store_id INT REFERENCES stores(id),
quantity INT,
unit_price DECIMAL(8,2),
total_amount DECIMAL(10,2)
);
-- Dimension tables
CREATE TABLE date_dimension (
id INT PRIMARY KEY,
date_value DATE,
year INT,
quarter INT,
month INT,
day_of_week INT,
is_weekend BOOLEAN
);Snowflake Schema
-- Normalized dimension tables
CREATE TABLE products (
id INT PRIMARY KEY,
name VARCHAR(200),
category_id INT REFERENCES product_categories(id),
brand_id INT REFERENCES brands(id)
);
CREATE TABLE product_categories (
id INT PRIMARY KEY,
name VARCHAR(100),
parent_category_id INT REFERENCES product_categories(id)
);Document Model (JSON Storage)
-- Flexible document storage with indexing
CREATE TABLE documents (
id UUID PRIMARY KEY,
document_type VARCHAR(50),
data JSONB,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Index on JSON properties
CREATE INDEX idx_documents_user_id
ON documents USING GIN ((data->>'user_id'));
CREATE INDEX idx_documents_status
ON documents ((data->>'status'))
WHERE document_type = 'order';Graph Data Patterns
-- Adjacency list for hierarchical data
CREATE TABLE categories (
id INT PRIMARY KEY,
name VARCHAR(100),
parent_id INT REFERENCES categories(id),
level INT,
path VARCHAR(500) -- Materialized path: "/1/5/12/"
);
-- Many-to-many relationships
CREATE TABLE relationships (
id UUID PRIMARY KEY,
from_entity_id UUID,
to_entity_id UUID,
relationship_type VARCHAR(50),
created_at TIMESTAMP,
INDEX (from_entity_id, relationship_type),
INDEX (to_entity_id, relationship_type)
);Migration Strategies
Zero-Downtime Migration (Expand-Contract Pattern)
Phase 1: Expand
-- Add new column without constraints
ALTER TABLE users ADD COLUMN new_email VARCHAR(255);
-- Backfill data in batches
UPDATE users SET new_email = email WHERE id BETWEEN 1 AND 1000;
-- Continue in batches...
-- Add constraints after backfill
ALTER TABLE users ADD CONSTRAINT users_new_email_unique UNIQUE (new_email);
ALTER TABLE users ALTER COLUMN new_email SET NOT NULL;Phase 2: Contract
-- Update application to use new column
-- Deploy application changes
-- Verify new column is being used
-- Remove old column
ALTER TABLE users DROP COLUMN email;
-- Rename new column
ALTER TABLE users RENAME COLUMN new_email TO email;Data Type Changes
-- Safe string to integer conversion
ALTER TABLE products ADD COLUMN sku_number INTEGER;
UPDATE products SET sku_number = CAST(sku AS INTEGER) WHERE sku ~ '^[0-9]+$';
-- Validate conversion success before dropping old columnPartitioning Strategies
Horizontal Partitioning (Sharding)
-- Range partitioning by date
CREATE TABLE sales_2023 PARTITION OF sales
FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');
CREATE TABLE sales_2024 PARTITION OF sales
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
-- Hash partitioning by user_id
CREATE TABLE user_data_0 PARTITION OF user_data
FOR VALUES WITH (MODULUS 4, REMAINDER 0);
CREATE TABLE user_data_1 PARTITION OF user_data
FOR VALUES WITH (MODULUS 4, REMAINDER 1);Vertical Partitioning
-- Separate frequently accessed columns
CREATE TABLE users_core (
id INT PRIMARY KEY,
email VARCHAR(255),
status VARCHAR(20),
created_at TIMESTAMP
);
-- Less frequently accessed profile data
CREATE TABLE users_profile (
user_id INT PRIMARY KEY REFERENCES users_core(id),
bio TEXT,
preferences JSONB,
last_login TIMESTAMP
);Connection Management
Connection Pooling
- Pool Size: CPU cores × 2 + effective spindle count
- Connection Lifetime: Rotate connections to prevent resource leaks
- Timeout Settings: Connection, idle, and query timeouts
- Health Checks: Regular connection validation
Read Replicas Strategy
-- Write queries to primary
INSERT INTO users (email, name) VALUES ('[email protected]', 'John Doe');
-- Read queries to replicas (with appropriate read preference)
SELECT * FROM users WHERE status = 'active'; -- Route to read replica
-- Consistent reads when required
SELECT * FROM users WHERE id = LAST_INSERT_ID(); -- Route to primaryCaching Layers
Cache-Aside Pattern
def get_user(user_id):
# Try cache first
user = cache.get(f"user:{user_id}")
if user is None:
# Cache miss - query database
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
# Store in cache
cache.set(f"user:{user_id}", user, ttl=3600)
return userWrite-Through Cache
- Consistency: Always keep cache and database in sync
- Write Latency: Higher due to dual writes
- Data Safety: No data loss on cache failures
Cache Invalidation Strategies
- TTL-Based: Time-based expiration
- Event-Driven: Invalidate on data changes
- Version-Based: Use version numbers for consistency
- Tag-Based: Group related cache entries
Database Selection Guide
SQL Databases
PostgreSQL
- Strengths: ACID compliance, complex queries, JSON support, extensibility
- Use Cases: OLTP applications, data warehousing, geospatial data
- Scale: Vertical scaling with read replicas
MySQL
- Strengths: Performance, replication, wide ecosystem support
- Use Cases: Web applications, content management, e-commerce
- Scale: Horizontal scaling through sharding
NoSQL Databases
Document Stores (MongoDB, CouchDB)
- Strengths: Flexible schema, horizontal scaling, developer productivity
- Use Cases: Content management, catalogs, user profiles
- Trade-offs: Eventual consistency, complex queries limitations
Key-Value Stores (Redis, DynamoDB)
- Strengths: High performance, simple model, excellent caching
- Use Cases: Session storage, real-time analytics, gaming leaderboards
- Trade-offs: Limited query capabilities, data modeling constraints
Column-Family (Cassandra, HBase)
- Strengths: Write-heavy workloads, linear scalability, fault tolerance
- Use Cases: Time-series data, IoT applications, messaging systems
- Trade-offs: Query flexibility, consistency model complexity
Graph Databases (Neo4j, Amazon Neptune)
- Strengths: Relationship queries, pattern matching, recommendation engines
- Use Cases: Social networks, fraud detection, knowledge graphs
- Trade-offs: Specialized use cases, learning curve
NewSQL Databases
Distributed SQL (CockroachDB, TiDB, Spanner)
- Strengths: SQL compatibility with horizontal scaling
- Use Cases: Global applications requiring ACID guarantees
- Trade-offs: Complexity, latency for distributed transactions
Tools & Scripts
Schema Analyzer
- Input: SQL DDL files, JSON schema definitions
- Analysis: Normalization compliance, constraint validation, naming conventions
- Output: Analysis report, Mermaid ERD, improvement recommendations
Index Optimizer
- Input: Schema definition, query patterns
- Analysis: Missing indexes, redundancy detection, selectivity estimation
- Output: Index recommendations, CREATE INDEX statements, performance projections
Migration Generator
- Input: Current and target schemas
- Analysis: Schema differences, dependency resolution, risk assessment
- Output: Migration scripts, rollback plans, validation queries
Database Selection Decision Tree
Overview
Choosing the right database technology is crucial for application success. This guide provides a systematic approach to database selection based on specific requirements, data patterns, and operational constraints.
Decision Framework
Primary Questions
What is your primary use case?
- OLTP (Online Transaction Processing)
- OLAP (Online Analytical Processing)
- Real-time analytics
- Content management
- Search and discovery
- Time-series data
- Graph relationships
What are your consistency requirements?
- Strong consistency (ACID)
- Eventual consistency
- Causal consistency
- Session consistency
What are your scalability needs?
- Vertical scaling sufficient
- Horizontal scaling required
- Global distribution needed
- Multi-region requirements
What is your data structure?
- Structured (relational)
- Semi-structured (JSON/XML)
- Unstructured (documents, media)
- Graph relationships
- Time-series data
- Key-value pairs
Decision Tree
START: What is your primary use case?
│
├── OLTP (Transactional Applications)
│ │
│ ├── Do you need strong ACID guarantees?
│ │ ├── YES → Do you need horizontal scaling?
│ │ │ ├── YES → Distributed SQL
│ │ │ │ ├── CockroachDB (Global, multi-region)
│ │ │ │ ├── TiDB (MySQL compatibility)
│ │ │ │ └── Spanner (Google Cloud)
│ │ │ └── NO → Traditional SQL
│ │ │ ├── PostgreSQL (Feature-rich, extensions)
│ │ │ ├── MySQL (Performance, ecosystem)
│ │ │ └── SQL Server (Microsoft stack)
│ │ └── NO → Are you primarily key-value access?
│ │ ├── YES → Key-Value Stores
│ │ │ ├── Redis (In-memory, caching)
│ │ │ ├── DynamoDB (AWS managed)
│ │ │ └── Cassandra (High availability)
│ │ └── NO → Document Stores
│ │ ├── MongoDB (General purpose)
│ │ ├── CouchDB (Sync, replication)
│ │ └── Amazon DocumentDB (MongoDB compatible)
│ │
├── OLAP (Analytics and Reporting)
│ │
│ ├── What is your data volume?
│ │ ├── Small to Medium (< 1TB) → Traditional SQL with optimization
│ │ │ ├── PostgreSQL with columnar extensions
│ │ │ ├── MySQL with analytics engine
│ │ │ └── SQL Server with columnstore
│ │ ├── Large (1TB - 100TB) → Data Warehouse Solutions
│ │ │ ├── Snowflake (Cloud-native)
│ │ │ ├── BigQuery (Google Cloud)
│ │ │ ├── Redshift (AWS)
│ │ │ └── Synapse (Azure)
│ │ └── Very Large (> 100TB) → Big Data Platforms
│ │ ├── Databricks (Unified analytics)
│ │ ├── Apache Spark on cloud
│ │ └── Hadoop ecosystem
│ │
├── Real-time Analytics
│ │
│ ├── Do you need sub-second query responses?
│ │ ├── YES → Stream Processing + OLAP
│ │ │ ├── ClickHouse (Fast analytics)
│ │ │ ├── Apache Druid (Real-time OLAP)
│ │ │ ├── Pinot (LinkedIn's real-time DB)
│ │ │ └── TimescaleDB (Time-series)
│ │ └── NO → Traditional OLAP solutions
│ │
├── Search and Discovery
│ │
│ ├── What type of search?
│ │ ├── Full-text search → Search Engines
│ │ │ ├── Elasticsearch (Full-featured)
│ │ │ ├── OpenSearch (AWS fork of ES)
│ │ │ └── Solr (Apache Lucene-based)
│ │ ├── Vector/similarity search → Vector Databases
│ │ │ ├── Pinecone (Managed vector DB)
│ │ │ ├── Weaviate (Open source)
│ │ │ ├── Chroma (Embeddings)
│ │ │ └── PostgreSQL with pgvector
│ │ └── Faceted search → Search + SQL combination
│ │
├── Graph Relationships
│ │
│ ├── Do you need complex graph traversals?
│ │ ├── YES → Graph Databases
│ │ │ ├── Neo4j (Property graph)
│ │ │ ├── Amazon Neptune (Multi-model)
│ │ │ ├── ArangoDB (Multi-model)
│ │ │ └── TigerGraph (Analytics focused)
│ │ └── NO → SQL with recursive queries
│ │ └── PostgreSQL with recursive CTEs
│ │
└── Time-series Data
│
├── What is your write volume?
├── High (millions/sec) → Specialized Time-series
│ ├── InfluxDB (Purpose-built)
│ ├── TimescaleDB (PostgreSQL extension)
│ ├── Apache Druid (Analytics focused)
│ └── Prometheus (Monitoring)
└── Medium → SQL with time-series optimization
└── PostgreSQL with partitioningDatabase Categories Deep Dive
Traditional SQL Databases
PostgreSQL
- Best For: Complex queries, JSON data, extensions, geospatial
- Strengths: Feature-rich, reliable, strong consistency, extensible
- Use Cases: OLTP, mixed workloads, JSON documents, geospatial applications
- Scaling: Vertical scaling, read replicas, partitioning
- When to Choose: Need SQL features, complex queries, moderate scale
MySQL
- Best For: Web applications, read-heavy workloads, simple schema
- Strengths: Performance, replication, large ecosystem
- Use Cases: Web apps, content management, e-commerce
- Scaling: Read replicas, sharding, clustering (MySQL Cluster)
- When to Choose: Simple schema, performance priority, large community
SQL Server
- Best For: Microsoft ecosystem, enterprise features, business intelligence
- Strengths: Integration, tooling, enterprise features
- Use Cases: Enterprise applications, .NET applications, BI
- Scaling: Always On availability groups, partitioning
- When to Choose: Microsoft stack, enterprise requirements
Distributed SQL (NewSQL)
CockroachDB
- Best For: Global applications, strong consistency, horizontal scaling
- Strengths: ACID guarantees, automatic scaling, survival
- Use Cases: Multi-region apps, financial services, global SaaS
- Trade-offs: Complex setup, higher latency for global transactions
- When to Choose: Need SQL + global scale + consistency
TiDB
- Best For: MySQL compatibility with horizontal scaling
- Strengths: MySQL protocol, HTAP (hybrid), cloud-native
- Use Cases: MySQL migrations, hybrid workloads
- When to Choose: Existing MySQL expertise, need scale
NoSQL Document Stores
MongoDB
- Best For: Flexible schema, rapid development, document-centric data
- Strengths: Developer experience, flexible schema, rich queries
- Use Cases: Content management, catalogs, user profiles, IoT
- Scaling: Automatic sharding, replica sets
- When to Choose: Schema evolution, document structure, rapid development
CouchDB
- Best For: Offline-first applications, multi-master replication
- Strengths: HTTP API, replication, conflict resolution
- Use Cases: Mobile apps, distributed systems, offline scenarios
- When to Choose: Need offline capabilities, bi-directional sync
Key-Value Stores
Redis
- Best For: Caching, sessions, real-time applications, pub/sub
- Strengths: Performance, data structures, persistence options
- Use Cases: Caching, leaderboards, real-time analytics, queues
- Scaling: Clustering, sentinel for HA
- When to Choose: High performance, simple data model, caching
DynamoDB
- Best For: Serverless applications, predictable performance, AWS ecosystem
- Strengths: Managed, auto-scaling, consistent performance
- Use Cases: Web applications, gaming, IoT, mobile backends
- Trade-offs: Vendor lock-in, limited querying
- When to Choose: AWS ecosystem, serverless, managed solution
Column-Family Stores
Cassandra
- Best For: Write-heavy workloads, high availability, linear scalability
- Strengths: No single point of failure, tunable consistency
- Use Cases: Time-series, IoT, messaging, activity feeds
- Trade-offs: Complex operations, eventual consistency
- When to Choose: High write volume, availability over consistency
HBase
- Best For: Big data applications, Hadoop ecosystem
- Strengths: Hadoop integration, consistent reads
- Use Cases: Analytics on big data, time-series at scale
- When to Choose: Hadoop ecosystem, very large datasets
Graph Databases
Neo4j
- Best For: Complex relationships, graph algorithms, traversals
- Strengths: Mature ecosystem, Cypher query language, algorithms
- Use Cases: Social networks, recommendation engines, fraud detection
- Trade-offs: Specialized use case, learning curve
- When to Choose: Relationship-heavy data, graph algorithms
Time-Series Databases
InfluxDB
- Best For: Time-series data, IoT, monitoring, analytics
- Strengths: Purpose-built, efficient storage, query language
- Use Cases: IoT sensors, monitoring, DevOps metrics
- When to Choose: High-volume time-series data
TimescaleDB
- Best For: Time-series with SQL familiarity
- Strengths: PostgreSQL compatibility, SQL queries, ecosystem
- Use Cases: Financial data, IoT with complex queries
- When to Choose: Time-series + SQL requirements
Search Engines
Elasticsearch
- Best For: Full-text search, log analysis, real-time search
- Strengths: Powerful search, analytics, ecosystem (ELK stack)
- Use Cases: Search applications, log analysis, monitoring
- Trade-offs: Complex operations, resource intensive
- When to Choose: Advanced search requirements, analytics
Data Warehouses
Snowflake
- Best For: Cloud-native analytics, data sharing, varied workloads
- Strengths: Separation of compute/storage, automatic scaling
- Use Cases: Data warehousing, analytics, data science
- When to Choose: Cloud-native, analytics-focused, multi-cloud
BigQuery
- Best For: Serverless analytics, Google ecosystem, machine learning
- Strengths: Serverless, petabyte scale, ML integration
- Use Cases: Analytics, data science, reporting
- When to Choose: Google Cloud, serverless analytics
Selection Criteria Matrix
| Criterion | SQL | NewSQL | Document | Key-Value | Column-Family | Graph | Time-Series |
|---|---|---|---|---|---|---|---|
| ACID Guarantees | ✅ Strong | ✅ Strong | ⚠️ Eventual | ⚠️ Eventual | ⚠️ Tunable | ⚠️ Varies | ⚠️ Varies |
| Horizontal Scaling | ❌ Limited | ✅ Native | ✅ Native | ✅ Native | ✅ Native | ⚠️ Limited | ✅ Native |
| Query Flexibility | ✅ High | ✅ High | ⚠️ Moderate | ❌ Low | ❌ Low | ✅ High | ⚠️ Specialized |
| Schema Flexibility | ❌ Rigid | ❌ Rigid | ✅ High | ✅ High | ⚠️ Moderate | ✅ High | ⚠️ Structured |
| Performance (Reads) | ⚠️ Good | ⚠️ Good | ✅ Excellent | ✅ Excellent | ✅ Excellent | ⚠️ Good | ✅ Excellent |
| Performance (Writes) | ⚠️ Good | ⚠️ Good | ✅ Excellent | ✅ Excellent | ✅ Excellent | ⚠️ Good | ✅ Excellent |
| Operational Complexity | ✅ Low | ❌ High | ⚠️ Moderate | ✅ Low | ❌ High | ⚠️ Moderate | ⚠️ Moderate |
| Ecosystem Maturity | ✅ Mature | ⚠️ Growing | ✅ Mature | ✅ Mature | ✅ Mature | ✅ Mature | ⚠️ Growing |
Decision Checklist
Requirements Analysis
- Data Volume: Current and projected data size
- Transaction Volume: Reads per second, writes per second
- Consistency Requirements: Strong vs eventual consistency needs
- Query Patterns: Simple lookups vs complex analytics
- Schema Evolution: How often does schema change?
- Geographic Distribution: Single region vs global
- Availability Requirements: Acceptable downtime
- Team Expertise: Existing knowledge and learning curve
- Budget Constraints: Licensing, infrastructure, operational costs
- Compliance Requirements: Data residency, audit trails
Technical Evaluation
- Performance Testing: Benchmark with realistic data and queries
- Scalability Testing: Test scaling limits and patterns
- Failure Scenarios: Test backup, recovery, and failure handling
- Integration Testing: APIs, connectors, ecosystem tools
- Migration Path: How to migrate from current system
- Monitoring and Observability: Available tooling and metrics
Operational Considerations
- Management Complexity: Setup, configuration, maintenance
- Backup and Recovery: Built-in vs external tools
- Security Features: Authentication, authorization, encryption
- Upgrade Path: Version compatibility and upgrade process
- Support Options: Community vs commercial support
- Lock-in Risk: Portability and vendor independence
Common Decision Patterns
E-commerce Platform
Typical Choice: PostgreSQL or MySQL
- Primary Data: Product catalog, orders, users (structured)
- Query Patterns: OLTP with some analytics
- Consistency: Strong consistency for financial data
- Scale: Moderate with read replicas
- Additional: Redis for caching, Elasticsearch for product search
IoT/Sensor Data Platform
Typical Choice: TimescaleDB or InfluxDB
- Primary Data: Time-series sensor readings
- Query Patterns: Time-based aggregations, trend analysis
- Scale: High write volume, moderate read volume
- Additional: Kafka for ingestion, PostgreSQL for metadata
Social Media Application
Typical Choice: Combination approach
- User Profiles: MongoDB (flexible schema)
- Relationships: Neo4j (graph relationships)
- Activity Feeds: Cassandra (high write volume)
- Search: Elasticsearch (content discovery)
- Caching: Redis (sessions, real-time data)
Analytics Platform
Typical Choice: Snowflake or BigQuery
- Primary Use: Complex analytical queries
- Data Volume: Large (TB to PB scale)
- Query Patterns: Ad-hoc analytics, reporting
- Users: Data analysts, data scientists
- Additional: Data lake (S3/GCS) for raw data storage
Global SaaS Application
Typical Choice: CockroachDB or DynamoDB
- Requirements: Multi-region, strong consistency
- Scale: Global user base
- Compliance: Data residency requirements
- Availability: High availability across regions
Migration Strategies
From Monolithic to Distributed
- Assessment: Identify scaling bottlenecks
- Data Partitioning: Plan how to split data
- Gradual Migration: Move non-critical data first
- Dual Writes: Run both systems temporarily
- Validation: Verify data consistency
- Cutover: Switch reads and writes gradually
Technology Stack Evolution
- Start Simple: Begin with PostgreSQL or MySQL
- Identify Bottlenecks: Monitor performance and scaling issues
- Selective Scaling: Move specific workloads to specialized databases
- Polyglot Persistence: Use multiple databases for different use cases
- Service Boundaries: Align database choice with service boundaries
Conclusion
Database selection should be driven by:
- Specific Use Case Requirements: Not all applications need the same database
- Data Characteristics: Structure, volume, and access patterns matter
- Non-functional Requirements: Consistency, availability, performance targets
- Team and Organizational Factors: Expertise, operational capacity, budget
- Evolution Path: How requirements and scale will change over time
The best database choice is often not a single technology, but a combination of databases that each excel at their specific use case within your application architecture.
Index Strategy Patterns
Overview
Database indexes are critical for query performance, but they come with trade-offs. This guide covers proven patterns for index design, optimization strategies, and common pitfalls to avoid.
Index Types and Use Cases
B-Tree Indexes (Default)
Best For:
- Equality queries (
WHERE column = value) - Range queries (
WHERE column BETWEEN x AND y) - Sorting (
ORDER BY column) - Pattern matching with leading wildcards (
WHERE column LIKE 'prefix%')
Characteristics:
- Logarithmic lookup time O(log n)
- Supports partial matches on composite indexes
- Most versatile index type
Example:
-- Single column B-tree index
CREATE INDEX idx_customers_email ON customers (email);
-- Composite B-tree index
CREATE INDEX idx_orders_customer_date ON orders (customer_id, order_date);Hash Indexes
Best For:
- Exact equality matches only
- High-cardinality columns
- Primary key lookups
Characteristics:
- Constant lookup time O(1) for exact matches
- Cannot support range queries or sorting
- Memory-efficient for equality operations
Example:
-- Hash index for exact lookups (PostgreSQL)
CREATE INDEX idx_users_id_hash ON users USING HASH (user_id);Partial Indexes
Best For:
- Filtering on subset of data
- Reducing index size and maintenance overhead
- Query patterns that consistently use specific filters
Example:
-- Index only active users
CREATE INDEX idx_active_users_email
ON users (email)
WHERE status = 'active';
-- Index recent orders only
CREATE INDEX idx_recent_orders
ON orders (customer_id, created_at)
WHERE created_at > CURRENT_DATE - INTERVAL '90 days';
-- Index non-null values only
CREATE INDEX idx_customers_phone
ON customers (phone_number)
WHERE phone_number IS NOT NULL;Covering Indexes
Best For:
- Eliminating table lookups for SELECT queries
- Frequently accessed column combinations
- Read-heavy workloads
Example:
-- Covering index with INCLUDE clause (SQL Server/PostgreSQL)
CREATE INDEX idx_orders_customer_covering
ON orders (customer_id, order_date)
INCLUDE (order_total, status);
-- Query can be satisfied entirely from index:
-- SELECT order_total, status FROM orders
-- WHERE customer_id = 123 AND order_date > '2024-01-01';Functional/Expression Indexes
Best For:
- Queries on transformed column values
- Case-insensitive searches
- Complex calculations
Example:
-- Case-insensitive email searches
CREATE INDEX idx_users_email_lower
ON users (LOWER(email));
-- Date part extraction
CREATE INDEX idx_orders_month
ON orders (EXTRACT(MONTH FROM order_date));
-- JSON field indexing
CREATE INDEX idx_users_preferences_theme
ON users ((preferences->>'theme'));Composite Index Design Patterns
Column Ordering Strategy
Rule: Most Selective First
-- Query: WHERE status = 'active' AND city = 'New York' AND age > 25
-- Assume: status has 3 values, city has 100 values, age has 80 values
-- GOOD: Most selective column first
CREATE INDEX idx_users_city_age_status ON users (city, age, status);
-- BAD: Least selective first
CREATE INDEX idx_users_status_city_age ON users (status, city, age);Selectivity Calculation:
-- Estimate selectivity for each column
SELECT
'status' as column_name,
COUNT(DISTINCT status)::float / COUNT(*) as selectivity
FROM users
UNION ALL
SELECT
'city' as column_name,
COUNT(DISTINCT city)::float / COUNT(*) as selectivity
FROM users
UNION ALL
SELECT
'age' as column_name,
COUNT(DISTINCT age)::float / COUNT(*) as selectivity
FROM users;Query Pattern Matching
Pattern 1: Equality + Range
-- Query: WHERE customer_id = 123 AND order_date BETWEEN '2024-01-01' AND '2024-03-31'
CREATE INDEX idx_orders_customer_date ON orders (customer_id, order_date);Pattern 2: Multiple Equality Conditions
-- Query: WHERE status = 'active' AND category = 'premium' AND region = 'US'
CREATE INDEX idx_users_status_category_region ON users (status, category, region);Pattern 3: Equality + Sorting
-- Query: WHERE category = 'electronics' ORDER BY price DESC, created_at DESC
CREATE INDEX idx_products_category_price_date ON products (category, price DESC, created_at DESC);Prefix Optimization
Efficient Prefix Usage:
-- Index supports all these queries efficiently:
CREATE INDEX idx_users_lastname_firstname_email ON users (last_name, first_name, email);
-- ✓ Uses index: WHERE last_name = 'Smith'
-- ✓ Uses index: WHERE last_name = 'Smith' AND first_name = 'John'
-- ✓ Uses index: WHERE last_name = 'Smith' AND first_name = 'John' AND email = 'john@...'
-- ✗ Cannot use index: WHERE first_name = 'John'
-- ✗ Cannot use index: WHERE email = 'john@...'Performance Optimization Patterns
Index Intersection vs Composite Indexes
Scenario: Multiple single-column indexes
CREATE INDEX idx_users_age ON users (age);
CREATE INDEX idx_users_city ON users (city);
CREATE INDEX idx_users_status ON users (status);
-- Query: WHERE age > 25 AND city = 'NYC' AND status = 'active'
-- Database may use index intersection (combining multiple indexes)
-- Performance varies by database engine and data distributionBetter: Purpose-built composite index
-- More efficient for the specific query pattern
CREATE INDEX idx_users_city_status_age ON users (city, status, age);Index Size vs Performance Trade-off
Wide Indexes (Many Columns):
-- Pros: Covers many query patterns, excellent for covering queries
-- Cons: Large index size, slower writes, more memory usage
CREATE INDEX idx_orders_comprehensive
ON orders (customer_id, order_date, status, total_amount, shipping_method, created_at)
INCLUDE (order_notes, billing_address);Narrow Indexes (Few Columns):
-- Pros: Smaller size, faster writes, less memory
-- Cons: May not cover all query patterns
CREATE INDEX idx_orders_customer_date ON orders (customer_id, order_date);
CREATE INDEX idx_orders_status ON orders (status);Maintenance Optimization
Regular Index Analysis:
-- PostgreSQL: Check index usage statistics
SELECT
schemaname,
tablename,
indexname,
idx_scan as index_scans,
idx_tup_read as tuples_read,
idx_tup_fetch as tuples_fetched
FROM pg_stat_user_indexes
WHERE idx_scan = 0 -- Potentially unused indexes
ORDER BY schemaname, tablename;
-- Check index size
SELECT
indexname,
pg_size_pretty(pg_relation_size(indexname::regclass)) as index_size
FROM pg_indexes
WHERE schemaname = 'public'
ORDER BY pg_relation_size(indexname::regclass) DESC;Common Anti-Patterns
1. Over-Indexing
Problem:
-- Too many similar indexes
CREATE INDEX idx_orders_customer ON orders (customer_id);
CREATE INDEX idx_orders_customer_date ON orders (customer_id, order_date);
CREATE INDEX idx_orders_customer_status ON orders (customer_id, status);
CREATE INDEX idx_orders_customer_date_status ON orders (customer_id, order_date, status);Solution:
-- One well-designed composite index can often replace several
CREATE INDEX idx_orders_customer_date_status ON orders (customer_id, order_date, status);
-- Drop redundant indexes: idx_orders_customer, idx_orders_customer_date, idx_orders_customer_status2. Wrong Column Order
Problem:
-- Query: WHERE active = true AND user_type = 'premium' AND city = 'Chicago'
-- Bad order: boolean first (lowest selectivity)
CREATE INDEX idx_users_active_type_city ON users (active, user_type, city);Solution:
-- Good order: most selective first
CREATE INDEX idx_users_city_type_active ON users (city, user_type, active);3. Ignoring Query Patterns
Problem:
-- Index doesn't match common query patterns
CREATE INDEX idx_products_name ON products (product_name);
-- But queries are: WHERE category = 'electronics' AND price BETWEEN 100 AND 500
-- Index is not helpful for these queriesSolution:
-- Match actual query patterns
CREATE INDEX idx_products_category_price ON products (category, price);4. Function in WHERE Without Functional Index
Problem:
-- Query uses function but no functional index
SELECT * FROM users WHERE LOWER(email) = '[email protected]';
-- Regular index on email won't helpSolution:
-- Create functional index
CREATE INDEX idx_users_email_lower ON users (LOWER(email));Advanced Patterns
Multi-Column Statistics
When Columns Are Correlated:
-- If city and state are highly correlated, create extended statistics
CREATE STATISTICS stats_address_correlation ON city, state FROM addresses;
ANALYZE addresses;
-- Helps query planner make better decisions for:
-- WHERE city = 'New York' AND state = 'NY'Conditional Indexes for Data Lifecycle
Pattern: Different indexes for different data ages
-- Hot data (recent orders) - optimized for OLTP
CREATE INDEX idx_orders_hot_customer_date
ON orders (customer_id, order_date DESC)
WHERE order_date > CURRENT_DATE - INTERVAL '30 days';
-- Warm data (older orders) - optimized for analytics
CREATE INDEX idx_orders_warm_date_total
ON orders (order_date, total_amount)
WHERE order_date <= CURRENT_DATE - INTERVAL '30 days'
AND order_date > CURRENT_DATE - INTERVAL '1 year';
-- Cold data (archived orders) - minimal indexing
CREATE INDEX idx_orders_cold_date
ON orders (order_date)
WHERE order_date <= CURRENT_DATE - INTERVAL '1 year';Index-Only Scan Optimization
Design indexes to avoid table access:
-- Query: SELECT order_id, total_amount, status FROM orders WHERE customer_id = ?
CREATE INDEX idx_orders_customer_covering
ON orders (customer_id)
INCLUDE (order_id, total_amount, status);
-- Or as composite index (if database doesn't support INCLUDE)
CREATE INDEX idx_orders_customer_covering
ON orders (customer_id, order_id, total_amount, status);Index Monitoring and Maintenance
Performance Monitoring Queries
Find slow queries that might benefit from indexes:
-- PostgreSQL: Find queries with high cost
SELECT
query,
calls,
total_time,
mean_time,
rows
FROM pg_stat_statements
WHERE mean_time > 1000 -- Queries taking > 1 second
ORDER BY mean_time DESC;Identify missing indexes:
-- Look for sequential scans on large tables
SELECT
schemaname,
tablename,
seq_scan,
seq_tup_read,
idx_scan,
n_tup_ins + n_tup_upd + n_tup_del as write_activity
FROM pg_stat_user_tables
WHERE seq_scan > 100
AND seq_tup_read > 100000 -- Large sequential scans
AND (idx_scan = 0 OR seq_scan > idx_scan * 2)
ORDER BY seq_tup_read DESC;Index Maintenance Schedule
Regular Maintenance Tasks:
-- Rebuild fragmented indexes (SQL Server)
ALTER INDEX ALL ON orders REBUILD;
-- Update statistics (PostgreSQL)
ANALYZE orders;
-- Check for unused indexes monthly
SELECT * FROM pg_stat_user_indexes WHERE idx_scan = 0;Conclusion
Effective index strategy requires:
- Understanding Query Patterns: Analyze actual application queries, not theoretical scenarios
- Measuring Performance: Use query execution plans and timing to validate index effectiveness
- Balancing Trade-offs: More indexes improve reads but slow writes and increase storage
- Regular Maintenance: Monitor index usage and performance, remove unused indexes
- Iterative Improvement: Start with essential indexes, add and optimize based on real usage
The goal is not to index every possible query pattern, but to create a focused set of indexes that provide maximum benefit for your application's specific workload while minimizing maintenance overhead.
Database Normalization Guide
Overview
Database normalization is the process of organizing data to minimize redundancy and dependency issues. It involves decomposing tables to eliminate data anomalies and improve data integrity.
Normal Forms
First Normal Form (1NF)
Requirements:
- Each column contains atomic (indivisible) values
- Each column contains values of the same type
- Each column has a unique name
- The order of data storage doesn't matter
Violations and Solutions:
Problem: Multiple values in single column
-- BAD: Multiple phone numbers in one column
CREATE TABLE customers (
id INT PRIMARY KEY,
name VARCHAR(100),
phones VARCHAR(500) -- "555-1234, 555-5678, 555-9012"
);
-- GOOD: Separate table for multiple phones
CREATE TABLE customers (
id INT PRIMARY KEY,
name VARCHAR(100)
);
CREATE TABLE customer_phones (
id INT PRIMARY KEY,
customer_id INT REFERENCES customers(id),
phone VARCHAR(20),
phone_type VARCHAR(10) -- 'mobile', 'home', 'work'
);Problem: Repeating groups
-- BAD: Repeating column patterns
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_id INT,
item1_name VARCHAR(100),
item1_qty INT,
item1_price DECIMAL(8,2),
item2_name VARCHAR(100),
item2_qty INT,
item2_price DECIMAL(8,2),
item3_name VARCHAR(100),
item3_qty INT,
item3_price DECIMAL(8,2)
);
-- GOOD: Separate table for order items
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_id INT,
order_date DATE
);
CREATE TABLE order_items (
id INT PRIMARY KEY,
order_id INT REFERENCES orders(order_id),
item_name VARCHAR(100),
quantity INT,
unit_price DECIMAL(8,2)
);Second Normal Form (2NF)
Requirements:
- Must be in 1NF
- All non-key attributes must be fully functionally dependent on the primary key
- No partial dependencies (applies only to tables with composite primary keys)
Violations and Solutions:
Problem: Partial dependency on composite key
-- BAD: Student course enrollment with partial dependencies
CREATE TABLE student_courses (
student_id INT,
course_id INT,
student_name VARCHAR(100), -- Depends only on student_id
student_major VARCHAR(50), -- Depends only on student_id
course_title VARCHAR(200), -- Depends only on course_id
course_credits INT, -- Depends only on course_id
grade CHAR(2), -- Depends on both student_id AND course_id
PRIMARY KEY (student_id, course_id)
);
-- GOOD: Separate tables eliminate partial dependencies
CREATE TABLE students (
student_id INT PRIMARY KEY,
student_name VARCHAR(100),
student_major VARCHAR(50)
);
CREATE TABLE courses (
course_id INT PRIMARY KEY,
course_title VARCHAR(200),
course_credits INT
);
CREATE TABLE enrollments (
student_id INT,
course_id INT,
grade CHAR(2),
enrollment_date DATE,
PRIMARY KEY (student_id, course_id),
FOREIGN KEY (student_id) REFERENCES students(student_id),
FOREIGN KEY (course_id) REFERENCES courses(course_id)
);Third Normal Form (3NF)
Requirements:
- Must be in 2NF
- No transitive dependencies (non-key attributes should not depend on other non-key attributes)
- All non-key attributes must depend directly on the primary key
Violations and Solutions:
Problem: Transitive dependency
-- BAD: Employee table with transitive dependency
CREATE TABLE employees (
employee_id INT PRIMARY KEY,
employee_name VARCHAR(100),
department_id INT,
department_name VARCHAR(100), -- Depends on department_id, not employee_id
department_location VARCHAR(100), -- Transitive dependency through department_id
department_budget DECIMAL(10,2), -- Transitive dependency through department_id
salary DECIMAL(8,2)
);
-- GOOD: Separate department information
CREATE TABLE departments (
department_id INT PRIMARY KEY,
department_name VARCHAR(100),
department_location VARCHAR(100),
department_budget DECIMAL(10,2)
);
CREATE TABLE employees (
employee_id INT PRIMARY KEY,
employee_name VARCHAR(100),
department_id INT,
salary DECIMAL(8,2),
FOREIGN KEY (department_id) REFERENCES departments(department_id)
);Boyce-Codd Normal Form (BCNF)
Requirements:
- Must be in 3NF
- Every determinant must be a candidate key
- Stricter than 3NF - handles cases where 3NF doesn't eliminate all anomalies
Violations and Solutions:
Problem: Determinant that's not a candidate key
-- BAD: Student advisor relationship with BCNF violation
-- Assumption: Each student has one advisor per subject,
-- each advisor teaches only one subject, but can advise multiple students
CREATE TABLE student_advisor (
student_id INT,
subject VARCHAR(50),
advisor_id INT,
PRIMARY KEY (student_id, subject)
);
-- Problem: advisor_id determines subject, but advisor_id is not a candidate key
-- GOOD: Separate the functional dependencies
CREATE TABLE advisors (
advisor_id INT PRIMARY KEY,
subject VARCHAR(50)
);
CREATE TABLE student_advisor_assignments (
student_id INT,
advisor_id INT,
PRIMARY KEY (student_id, advisor_id),
FOREIGN KEY (advisor_id) REFERENCES advisors(advisor_id)
);Denormalization Strategies
When to Denormalize
- Performance Requirements: When query performance is more critical than storage efficiency
- Read-Heavy Workloads: When data is read much more frequently than it's updated
- Reporting Systems: When complex joins negatively impact reporting performance
- Caching Strategies: When pre-computed values eliminate expensive calculations
Common Denormalization Patterns
1. Redundant Storage for Performance
-- Store frequently accessed calculated values
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_id INT,
order_total DECIMAL(10,2), -- Denormalized: sum of order_items.total
item_count INT, -- Denormalized: count of order_items
created_at TIMESTAMP
);
CREATE TABLE order_items (
item_id INT PRIMARY KEY,
order_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL(8,2),
total DECIMAL(10,2) -- quantity * unit_price (denormalized)
);2. Materialized Aggregates
-- Pre-computed summary tables for reporting
CREATE TABLE monthly_sales_summary (
year_month VARCHAR(7), -- '2024-03'
product_category VARCHAR(50),
total_sales DECIMAL(12,2),
total_units INT,
avg_order_value DECIMAL(8,2),
unique_customers INT,
updated_at TIMESTAMP
);3. Historical Data Snapshots
-- Store historical state to avoid complex temporal queries
CREATE TABLE customer_status_history (
id INT PRIMARY KEY,
customer_id INT,
status VARCHAR(20),
tier VARCHAR(10),
total_lifetime_value DECIMAL(12,2), -- Snapshot at this point in time
snapshot_date DATE
);Trade-offs Analysis
Normalization Benefits
- Data Integrity: Reduced risk of inconsistent data
- Storage Efficiency: Less data duplication
- Update Efficiency: Changes need to be made in only one place
- Flexibility: Easier to modify schema as requirements change
Normalization Costs
- Query Complexity: More joins required for data retrieval
- Performance Impact: Joins can be expensive on large datasets
- Development Complexity: More complex data access patterns
Denormalization Benefits
- Query Performance: Fewer joins, faster queries
- Simplified Queries: Direct access to related data
- Read Optimization: Optimized for data retrieval patterns
- Reduced Load: Less database processing for common operations
Denormalization Costs
- Data Redundancy: Increased storage requirements
- Update Complexity: Multiple places may need updates
- Consistency Risk: Higher risk of data inconsistencies
- Maintenance Overhead: Additional code to maintain derived values
Best Practices
1. Start with Full Normalization
- Begin with a fully normalized design
- Identify performance bottlenecks through testing
- Selectively denormalize based on actual performance needs
2. Use Triggers for Consistency
-- Trigger to maintain denormalized order_total
CREATE TRIGGER update_order_total
AFTER INSERT OR UPDATE OR DELETE ON order_items
FOR EACH ROW
BEGIN
UPDATE orders
SET order_total = (
SELECT SUM(quantity * unit_price)
FROM order_items
WHERE order_id = NEW.order_id
)
WHERE order_id = NEW.order_id;
END;3. Consider Materialized Views
-- Materialized view for complex aggregations
CREATE MATERIALIZED VIEW customer_summary AS
SELECT
c.customer_id,
c.customer_name,
COUNT(o.order_id) as order_count,
SUM(o.order_total) as lifetime_value,
AVG(o.order_total) as avg_order_value,
MAX(o.created_at) as last_order_date
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.customer_name;4. Document Denormalization Decisions
- Clearly document why denormalization was chosen
- Specify which data is derived and how it's maintained
- Include performance benchmarks that justify the decision
5. Monitor and Validate
- Implement validation checks for denormalized data
- Regular audits to ensure data consistency
- Performance monitoring to validate denormalization benefits
Common Anti-Patterns
1. Premature Denormalization
Starting with denormalized design without understanding actual performance requirements.
2. Over-Normalization
Creating too many small tables that require excessive joins for simple queries.
3. Inconsistent Approach
Mixing normalized and denormalized patterns without clear strategy.
4. Ignoring Maintenance
Denormalizing without proper mechanisms to maintain data consistency.
Conclusion
Normalization and denormalization are both valuable tools in database design. The key is understanding when to apply each approach:
- Use normalization for transactional systems where data integrity is paramount
- Consider denormalization for analytical systems or when performance testing reveals bottlenecks
- Apply selectively based on actual usage patterns and performance requirements
- Maintain consistency through proper design patterns and validation mechanisms
The goal is not to achieve perfect normalization or denormalization, but to create a design that best serves your application's specific needs while maintaining data quality and system performance.
#!/usr/bin/env python3
"""
Database Schema Analyzer
Analyzes SQL DDL statements and JSON schema definitions for:
- Normalization level compliance (1NF-BCNF)
- Missing constraints (FK, NOT NULL, UNIQUE)
- Data type issues and antipatterns
- Naming convention violations
- Missing indexes on foreign key columns
- Table relationship mapping
- Generates Mermaid ERD diagrams
Input: SQL DDL file or JSON schema definition
Output: Analysis report + Mermaid ERD + recommendations
Usage:
python schema_analyzer.py --input schema.sql --output-format json
python schema_analyzer.py --input schema.json --output-format text
python schema_analyzer.py --input schema.sql --generate-erd --output analysis.json
"""
import argparse
import json
import re
import sys
from collections import defaultdict, namedtuple
from typing import Dict, List, Set, Tuple, Optional, Any
from dataclasses import dataclass, asdict
@dataclass
class Column:
name: str
data_type: str
nullable: bool = True
primary_key: bool = False
unique: bool = False
foreign_key: Optional[str] = None
default_value: Optional[str] = None
check_constraint: Optional[str] = None
@dataclass
class Index:
name: str
table: str
columns: List[str]
unique: bool = False
index_type: str = "btree"
@dataclass
class Table:
name: str
columns: List[Column]
primary_key: List[str]
foreign_keys: List[Tuple[str, str]] # (column, referenced_table.column)
unique_constraints: List[List[str]]
check_constraints: Dict[str, str]
indexes: List[Index]
@dataclass
class NormalizationIssue:
table: str
issue_type: str
severity: str
description: str
suggestion: str
columns_affected: List[str]
@dataclass
class DataTypeIssue:
table: str
column: str
current_type: str
issue: str
suggested_type: str
rationale: str
@dataclass
class ConstraintIssue:
table: str
issue_type: str
severity: str
description: str
suggestion: str
columns_affected: List[str]
@dataclass
class NamingIssue:
table: str
column: Optional[str]
issue: str
current_name: str
suggested_name: str
class SchemaAnalyzer:
def __init__(self):
self.tables: Dict[str, Table] = {}
self.normalization_issues: List[NormalizationIssue] = []
self.datatype_issues: List[DataTypeIssue] = []
self.constraint_issues: List[ConstraintIssue] = []
self.naming_issues: List[NamingIssue] = []
# Data type antipatterns
self.varchar_255_pattern = re.compile(r'VARCHAR\(255\)', re.IGNORECASE)
self.bad_datetime_patterns = [
re.compile(r'VARCHAR\(\d+\)', re.IGNORECASE),
re.compile(r'CHAR\(\d+\)', re.IGNORECASE)
]
# Naming conventions
self.table_naming_pattern = re.compile(r'^[a-z][a-z0-9_]*[a-z0-9]$')
self.column_naming_pattern = re.compile(r'^[a-z][a-z0-9_]*[a-z0-9]$')
def parse_sql_ddl(self, ddl_content: str) -> None:
"""Parse SQL DDL statements and extract schema information."""
# Remove comments and normalize whitespace
ddl_content = re.sub(r'--.*$', '', ddl_content, flags=re.MULTILINE)
ddl_content = re.sub(r'/\*.*?\*/', '', ddl_content, flags=re.DOTALL)
ddl_content = re.sub(r'\s+', ' ', ddl_content.strip())
# Extract CREATE TABLE statements
create_table_pattern = re.compile(
r'CREATE\s+TABLE\s+(\w+)\s*\(\s*(.*?)\s*\)',
re.IGNORECASE | re.DOTALL
)
for match in create_table_pattern.finditer(ddl_content):
table_name = match.group(1).lower()
table_definition = match.group(2)
table = self._parse_table_definition(table_name, table_definition)
self.tables[table_name] = table
# Extract CREATE INDEX statements
self._parse_indexes(ddl_content)
def _parse_table_definition(self, table_name: str, definition: str) -> Table:
"""Parse individual table definition."""
columns = []
primary_key = []
foreign_keys = []
unique_constraints = []
check_constraints = {}
# Split by commas, but handle nested parentheses
parts = self._split_table_parts(definition)
for part in parts:
part = part.strip()
if not part:
continue
if part.upper().startswith('PRIMARY KEY'):
primary_key = self._parse_primary_key(part)
elif part.upper().startswith('FOREIGN KEY'):
fk = self._parse_foreign_key(part)
if fk:
foreign_keys.append(fk)
elif part.upper().startswith('UNIQUE'):
unique = self._parse_unique_constraint(part)
if unique:
unique_constraints.append(unique)
elif part.upper().startswith('CHECK'):
check = self._parse_check_constraint(part)
if check:
check_constraints.update(check)
else:
# Column definition
column = self._parse_column_definition(part)
if column:
columns.append(column)
if column.primary_key:
primary_key.append(column.name)
return Table(
name=table_name,
columns=columns,
primary_key=primary_key,
foreign_keys=foreign_keys,
unique_constraints=unique_constraints,
check_constraints=check_constraints,
indexes=[]
)
def _split_table_parts(self, definition: str) -> List[str]:
"""Split table definition by commas, respecting nested parentheses."""
parts = []
current_part = ""
paren_count = 0
for char in definition:
if char == '(':
paren_count += 1
elif char == ')':
paren_count -= 1
elif char == ',' and paren_count == 0:
parts.append(current_part.strip())
current_part = ""
continue
current_part += char
if current_part.strip():
parts.append(current_part.strip())
return parts
def _parse_column_definition(self, definition: str) -> Optional[Column]:
"""Parse individual column definition."""
# Pattern for column definition
pattern = re.compile(
r'(\w+)\s+([A-Z]+(?:\(\d+(?:,\d+)?\))?)\s*(.*)',
re.IGNORECASE
)
match = pattern.match(definition.strip())
if not match:
return None
column_name = match.group(1).lower()
data_type = match.group(2).upper()
constraints = match.group(3).upper() if match.group(3) else ""
column = Column(
name=column_name,
data_type=data_type,
nullable='NOT NULL' not in constraints,
primary_key='PRIMARY KEY' in constraints,
unique='UNIQUE' in constraints
)
# Parse foreign key reference
fk_pattern = re.compile(r'REFERENCES\s+(\w+)\s*\(\s*(\w+)\s*\)', re.IGNORECASE)
fk_match = fk_pattern.search(constraints)
if fk_match:
column.foreign_key = f"{fk_match.group(1).lower()}.{fk_match.group(2).lower()}"
# Parse default value
default_pattern = re.compile(r'DEFAULT\s+([^,\s]+)', re.IGNORECASE)
default_match = default_pattern.search(constraints)
if default_match:
column.default_value = default_match.group(1)
return column
def _parse_primary_key(self, definition: str) -> List[str]:
"""Parse PRIMARY KEY constraint."""
pattern = re.compile(r'PRIMARY\s+KEY\s*\(\s*(.*?)\s*\)', re.IGNORECASE)
match = pattern.search(definition)
if match:
columns = [col.strip().lower() for col in match.group(1).split(',')]
return columns
return []
def _parse_foreign_key(self, definition: str) -> Optional[Tuple[str, str]]:
"""Parse FOREIGN KEY constraint."""
pattern = re.compile(
r'FOREIGN\s+KEY\s*\(\s*(\w+)\s*\)\s+REFERENCES\s+(\w+)\s*\(\s*(\w+)\s*\)',
re.IGNORECASE
)
match = pattern.search(definition)
if match:
column = match.group(1).lower()
ref_table = match.group(2).lower()
ref_column = match.group(3).lower()
return (column, f"{ref_table}.{ref_column}")
return None
def _parse_unique_constraint(self, definition: str) -> Optional[List[str]]:
"""Parse UNIQUE constraint."""
pattern = re.compile(r'UNIQUE\s*\(\s*(.*?)\s*\)', re.IGNORECASE)
match = pattern.search(definition)
if match:
columns = [col.strip().lower() for col in match.group(1).split(',')]
return columns
return None
def _parse_check_constraint(self, definition: str) -> Optional[Dict[str, str]]:
"""Parse CHECK constraint."""
pattern = re.compile(r'CHECK\s*\(\s*(.*?)\s*\)', re.IGNORECASE)
match = pattern.search(definition)
if match:
constraint_name = f"check_constraint_{len(self.tables)}"
return {constraint_name: match.group(1)}
return None
def _parse_indexes(self, ddl_content: str) -> None:
"""Parse CREATE INDEX statements."""
index_pattern = re.compile(
r'CREATE\s+(?:(UNIQUE)\s+)?INDEX\s+(\w+)\s+ON\s+(\w+)\s*\(\s*(.*?)\s*\)',
re.IGNORECASE
)
for match in index_pattern.finditer(ddl_content):
unique = match.group(1) is not None
index_name = match.group(2).lower()
table_name = match.group(3).lower()
columns_str = match.group(4)
columns = [col.strip().lower() for col in columns_str.split(',')]
index = Index(
name=index_name,
table=table_name,
columns=columns,
unique=unique
)
if table_name in self.tables:
self.tables[table_name].indexes.append(index)
def parse_json_schema(self, json_content: str) -> None:
"""Parse JSON schema definition."""
try:
schema = json.loads(json_content)
if 'tables' not in schema:
raise ValueError("JSON schema must contain 'tables' key")
for table_name, table_def in schema['tables'].items():
table = self._parse_json_table(table_name.lower(), table_def)
self.tables[table_name.lower()] = table
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON: {e}")
def _parse_json_table(self, table_name: str, table_def: Dict[str, Any]) -> Table:
"""Parse JSON table definition."""
columns = []
primary_key = table_def.get('primary_key', [])
foreign_keys = []
unique_constraints = table_def.get('unique_constraints', [])
check_constraints = table_def.get('check_constraints', {})
for col_name, col_def in table_def.get('columns', {}).items():
column = Column(
name=col_name.lower(),
data_type=col_def.get('type', 'VARCHAR(255)').upper(),
nullable=col_def.get('nullable', True),
primary_key=col_name.lower() in [pk.lower() for pk in primary_key],
unique=col_def.get('unique', False),
foreign_key=col_def.get('foreign_key'),
default_value=col_def.get('default')
)
columns.append(column)
if column.foreign_key:
foreign_keys.append((column.name, column.foreign_key))
return Table(
name=table_name,
columns=columns,
primary_key=[pk.lower() for pk in primary_key],
foreign_keys=foreign_keys,
unique_constraints=unique_constraints,
check_constraints=check_constraints,
indexes=[]
)
def analyze_normalization(self) -> None:
"""Analyze normalization compliance."""
for table_name, table in self.tables.items():
self._check_first_normal_form(table)
self._check_second_normal_form(table)
self._check_third_normal_form(table)
self._check_bcnf(table)
def _check_first_normal_form(self, table: Table) -> None:
"""Check First Normal Form compliance."""
# Check for atomic values (no arrays or delimited strings)
for column in table.columns:
if any(pattern in column.data_type.upper() for pattern in ['ARRAY', 'JSON', 'TEXT']):
if 'JSON' in column.data_type.upper():
# JSON columns can violate 1NF if storing arrays
self.normalization_issues.append(NormalizationIssue(
table=table.name,
issue_type="1NF_VIOLATION",
severity="WARNING",
description=f"Column '{column.name}' uses JSON type which may contain non-atomic values",
suggestion="Consider normalizing JSON arrays into separate tables",
columns_affected=[column.name]
))
# Check for potential delimited values in VARCHAR/TEXT
if column.data_type.upper().startswith(('VARCHAR', 'CHAR', 'TEXT')):
if any(delimiter in column.name.lower() for delimiter in ['list', 'array', 'tags', 'items']):
self.normalization_issues.append(NormalizationIssue(
table=table.name,
issue_type="1NF_VIOLATION",
severity="HIGH",
description=f"Column '{column.name}' appears to store delimited values",
suggestion="Create separate table for individual values with foreign key relationship",
columns_affected=[column.name]
))
def _check_second_normal_form(self, table: Table) -> None:
"""Check Second Normal Form compliance."""
if len(table.primary_key) <= 1:
return # 2NF only applies to tables with composite primary keys
# Look for potential partial dependencies
non_key_columns = [col for col in table.columns if col.name not in table.primary_key]
for column in non_key_columns:
# Heuristic: columns that seem related to only part of the composite key
for pk_part in table.primary_key:
if pk_part in column.name or column.name.startswith(pk_part.split('_')[0]):
self.normalization_issues.append(NormalizationIssue(
table=table.name,
issue_type="2NF_VIOLATION",
severity="MEDIUM",
description=f"Column '{column.name}' may have partial dependency on '{pk_part}'",
suggestion=f"Consider moving '{column.name}' to a separate table related to '{pk_part}'",
columns_affected=[column.name, pk_part]
))
break
def _check_third_normal_form(self, table: Table) -> None:
"""Check Third Normal Form compliance."""
# Look for transitive dependencies
non_key_columns = [col for col in table.columns if col.name not in table.primary_key]
# Group columns by potential entities they describe
entity_groups = defaultdict(list)
for column in non_key_columns:
# Simple heuristic: group by prefix before underscore
prefix = column.name.split('_')[0]
if prefix != column.name: # Has underscore
entity_groups[prefix].append(column.name)
for entity, columns in entity_groups.items():
if len(columns) > 1 and entity != table.name.split('_')[0]:
# Potential entity that should be in its own table
id_column = f"{entity}_id"
if id_column in [col.name for col in table.columns]:
self.normalization_issues.append(NormalizationIssue(
table=table.name,
issue_type="3NF_VIOLATION",
severity="MEDIUM",
description=f"Columns {columns} may have transitive dependency through '{id_column}'",
suggestion=f"Consider creating separate '{entity}' table with these columns",
columns_affected=columns + [id_column]
))
def _check_bcnf(self, table: Table) -> None:
"""Check Boyce-Codd Normal Form compliance."""
# BCNF violations are complex to detect without functional dependencies
# Provide general guidance for composite keys
if len(table.primary_key) > 2:
self.normalization_issues.append(NormalizationIssue(
table=table.name,
issue_type="BCNF_WARNING",
severity="LOW",
description=f"Table has composite primary key with {len(table.primary_key)} columns",
suggestion="Review functional dependencies to ensure BCNF compliance",
columns_affected=table.primary_key
))
def analyze_data_types(self) -> None:
"""Analyze data type usage for antipatterns."""
for table_name, table in self.tables.items():
for column in table.columns:
self._check_varchar_255_antipattern(table.name, column)
self._check_inappropriate_types(table.name, column)
self._check_size_optimization(table.name, column)
def _check_varchar_255_antipattern(self, table_name: str, column: Column) -> None:
"""Check for VARCHAR(255) antipattern."""
if self.varchar_255_pattern.match(column.data_type):
self.datatype_issues.append(DataTypeIssue(
table=table_name,
column=column.name,
current_type=column.data_type,
issue="VARCHAR(255) antipattern",
suggested_type="Appropriately sized VARCHAR or TEXT",
rationale="VARCHAR(255) is often used as default without considering actual data length requirements"
))
def _check_inappropriate_types(self, table_name: str, column: Column) -> None:
"""Check for inappropriate data types."""
# Date/time stored as string
if column.name.lower() in ['date', 'time', 'created', 'updated', 'modified', 'timestamp']:
if column.data_type.upper().startswith(('VARCHAR', 'CHAR', 'TEXT')):
self.datatype_issues.append(DataTypeIssue(
table=table_name,
column=column.name,
current_type=column.data_type,
issue="Date/time stored as string",
suggested_type="TIMESTAMP, DATE, or TIME",
rationale="Proper date/time types enable date arithmetic and indexing optimization"
))
# Boolean stored as string/integer
if column.name.lower() in ['active', 'enabled', 'deleted', 'visible', 'published']:
if not column.data_type.upper().startswith('BOOL'):
self.datatype_issues.append(DataTypeIssue(
table=table_name,
column=column.name,
current_type=column.data_type,
issue="Boolean value stored as non-boolean type",
suggested_type="BOOLEAN",
rationale="Boolean type is more explicit and can be more storage efficient"
))
# Numeric IDs as VARCHAR
if column.name.lower().endswith('_id') or column.name.lower() == 'id':
if column.data_type.upper().startswith(('VARCHAR', 'CHAR')):
self.datatype_issues.append(DataTypeIssue(
table=table_name,
column=column.name,
current_type=column.data_type,
issue="Numeric ID stored as string",
suggested_type="INTEGER, BIGINT, or UUID",
rationale="Numeric types are more efficient for ID columns and enable better indexing"
))
def _check_size_optimization(self, table_name: str, column: Column) -> None:
"""Check for size optimization opportunities."""
# Oversized integer types
if column.data_type.upper() == 'BIGINT':
if not any(keyword in column.name.lower() for keyword in ['timestamp', 'big', 'large', 'count']):
self.datatype_issues.append(DataTypeIssue(
table=table_name,
column=column.name,
current_type=column.data_type,
issue="Potentially oversized integer type",
suggested_type="INTEGER",
rationale="INTEGER is sufficient for most ID and count fields unless very large values are expected"
))
def analyze_constraints(self) -> None:
"""Analyze missing constraints."""
for table_name, table in self.tables.items():
self._check_missing_primary_key(table)
self._check_missing_foreign_key_constraints(table)
self._check_missing_not_null_constraints(table)
self._check_missing_unique_constraints(table)
self._check_missing_check_constraints(table)
def _check_missing_primary_key(self, table: Table) -> None:
"""Check for missing primary key."""
if not table.primary_key:
self.constraint_issues.append(ConstraintIssue(
table=table.name,
issue_type="MISSING_PRIMARY_KEY",
severity="HIGH",
description="Table has no primary key defined",
suggestion="Add a primary key column (e.g., 'id' with auto-increment)",
columns_affected=[]
))
def _check_missing_foreign_key_constraints(self, table: Table) -> None:
"""Check for missing foreign key constraints."""
for column in table.columns:
if column.name.endswith('_id') and column.name != 'id':
# Potential foreign key column
if not column.foreign_key:
referenced_table = column.name[:-3] # Remove '_id' suffix
if referenced_table in self.tables or referenced_table + 's' in self.tables:
self.constraint_issues.append(ConstraintIssue(
table=table.name,
issue_type="MISSING_FOREIGN_KEY",
severity="MEDIUM",
description=f"Column '{column.name}' appears to be a foreign key but has no constraint",
suggestion=f"Add foreign key constraint referencing {referenced_table} table",
columns_affected=[column.name]
))
def _check_missing_not_null_constraints(self, table: Table) -> None:
"""Check for missing NOT NULL constraints."""
for column in table.columns:
if column.nullable and column.name in ['email', 'name', 'title', 'status']:
self.constraint_issues.append(ConstraintIssue(
table=table.name,
issue_type="MISSING_NOT_NULL",
severity="LOW",
description=f"Column '{column.name}' allows NULL but typically should not",
suggestion=f"Consider adding NOT NULL constraint to '{column.name}'",
columns_affected=[column.name]
))
def _check_missing_unique_constraints(self, table: Table) -> None:
"""Check for missing unique constraints."""
for column in table.columns:
if column.name in ['email', 'username', 'slug', 'code'] and not column.unique:
if column.name not in table.primary_key:
self.constraint_issues.append(ConstraintIssue(
table=table.name,
issue_type="MISSING_UNIQUE",
severity="MEDIUM",
description=f"Column '{column.name}' should likely have UNIQUE constraint",
suggestion=f"Add UNIQUE constraint to '{column.name}'",
columns_affected=[column.name]
))
def _check_missing_check_constraints(self, table: Table) -> None:
"""Check for missing check constraints."""
for column in table.columns:
# Email format validation
if column.name == 'email' and 'email' not in str(table.check_constraints):
self.constraint_issues.append(ConstraintIssue(
table=table.name,
issue_type="MISSING_CHECK_CONSTRAINT",
severity="LOW",
description=f"Email column lacks format validation",
suggestion="Add CHECK constraint for email format validation",
columns_affected=[column.name]
))
# Positive values for counts, prices, etc.
if column.name.lower() in ['price', 'amount', 'count', 'quantity', 'age']:
if column.name not in str(table.check_constraints):
self.constraint_issues.append(ConstraintIssue(
table=table.name,
issue_type="MISSING_CHECK_CONSTRAINT",
severity="LOW",
description=f"Column '{column.name}' should validate positive values",
suggestion=f"Add CHECK constraint: {column.name} > 0",
columns_affected=[column.name]
))
def analyze_naming_conventions(self) -> None:
"""Analyze naming convention compliance."""
for table_name, table in self.tables.items():
self._check_table_naming(table_name)
for column in table.columns:
self._check_column_naming(table_name, column.name)
def _check_table_naming(self, table_name: str) -> None:
"""Check table naming conventions."""
if not self.table_naming_pattern.match(table_name):
suggested_name = self._suggest_table_name(table_name)
self.naming_issues.append(NamingIssue(
table=table_name,
column=None,
issue="Invalid table naming convention",
current_name=table_name,
suggested_name=suggested_name
))
# Check for plural naming
if not table_name.endswith('s') and table_name not in ['data', 'information']:
self.naming_issues.append(NamingIssue(
table=table_name,
column=None,
issue="Table name should be plural",
current_name=table_name,
suggested_name=table_name + 's'
))
def _check_column_naming(self, table_name: str, column_name: str) -> None:
"""Check column naming conventions."""
if not self.column_naming_pattern.match(column_name):
suggested_name = self._suggest_column_name(column_name)
self.naming_issues.append(NamingIssue(
table=table_name,
column=column_name,
issue="Invalid column naming convention",
current_name=column_name,
suggested_name=suggested_name
))
def _suggest_table_name(self, table_name: str) -> str:
"""Suggest corrected table name."""
# Convert to snake_case and make plural
name = re.sub(r'([A-Z])', r'_\1', table_name).lower().strip('_')
return name + 's' if not name.endswith('s') else name
def _suggest_column_name(self, column_name: str) -> str:
"""Suggest corrected column name."""
# Convert to snake_case
return re.sub(r'([A-Z])', r'_\1', column_name).lower().strip('_')
def check_missing_indexes(self) -> List[Dict[str, Any]]:
"""Check for missing indexes on foreign key columns."""
missing_indexes = []
for table_name, table in self.tables.items():
existing_indexed_columns = set()
# Collect existing indexed columns
for index in table.indexes:
existing_indexed_columns.update(index.columns)
# Primary key columns are automatically indexed
existing_indexed_columns.update(table.primary_key)
# Check foreign key columns
for column in table.columns:
if column.foreign_key and column.name not in existing_indexed_columns:
missing_indexes.append({
'table': table_name,
'column': column.name,
'type': 'foreign_key',
'suggestion': f"CREATE INDEX idx_{table_name}_{column.name} ON {table_name} ({column.name});"
})
return missing_indexes
def generate_mermaid_erd(self) -> str:
"""Generate Mermaid ERD diagram."""
erd_lines = ["erDiagram"]
# Add table definitions
for table_name, table in self.tables.items():
erd_lines.append(f" {table_name.upper()} {{")
for column in table.columns:
data_type = column.data_type
constraints = []
if column.primary_key:
constraints.append("PK")
if column.foreign_key:
constraints.append("FK")
if not column.nullable:
constraints.append("NOT NULL")
if column.unique:
constraints.append("UNIQUE")
constraint_str = " ".join(constraints)
if constraint_str:
constraint_str = f" \"{constraint_str}\""
erd_lines.append(f" {data_type} {column.name}{constraint_str}")
erd_lines.append(" }")
# Add relationships
relationships = set()
for table_name, table in self.tables.items():
for column in table.columns:
if column.foreign_key:
ref_table = column.foreign_key.split('.')[0]
if ref_table in self.tables:
relationship = f" {ref_table.upper()} ||--o{{ {table_name.upper()} : has"
relationships.add(relationship)
erd_lines.extend(sorted(relationships))
return "\n".join(erd_lines)
def get_analysis_summary(self) -> Dict[str, Any]:
"""Get comprehensive analysis summary."""
return {
"schema_overview": {
"total_tables": len(self.tables),
"total_columns": sum(len(table.columns) for table in self.tables.values()),
"tables_with_primary_keys": len([t for t in self.tables.values() if t.primary_key]),
"total_foreign_keys": sum(len(table.foreign_keys) for table in self.tables.values()),
"total_indexes": sum(len(table.indexes) for table in self.tables.values())
},
"normalization_analysis": {
"total_issues": len(self.normalization_issues),
"by_severity": {
"high": len([i for i in self.normalization_issues if i.severity == "HIGH"]),
"medium": len([i for i in self.normalization_issues if i.severity == "MEDIUM"]),
"low": len([i for i in self.normalization_issues if i.severity == "LOW"]),
"warning": len([i for i in self.normalization_issues if i.severity == "WARNING"])
},
"issues": [asdict(issue) for issue in self.normalization_issues]
},
"data_type_analysis": {
"total_issues": len(self.datatype_issues),
"issues": [asdict(issue) for issue in self.datatype_issues]
},
"constraint_analysis": {
"total_issues": len(self.constraint_issues),
"by_severity": {
"high": len([i for i in self.constraint_issues if i.severity == "HIGH"]),
"medium": len([i for i in self.constraint_issues if i.severity == "MEDIUM"]),
"low": len([i for i in self.constraint_issues if i.severity == "LOW"])
},
"issues": [asdict(issue) for issue in self.constraint_issues]
},
"naming_analysis": {
"total_issues": len(self.naming_issues),
"issues": [asdict(issue) for issue in self.naming_issues]
},
"missing_indexes": self.check_missing_indexes(),
"recommendations": self._generate_recommendations()
}
def _generate_recommendations(self) -> List[str]:
"""Generate high-level recommendations."""
recommendations = []
# High severity issues
high_severity_issues = [
i for i in self.normalization_issues + self.constraint_issues
if i.severity == "HIGH"
]
if high_severity_issues:
recommendations.append(f"Address {len(high_severity_issues)} high-severity issues immediately")
# Missing primary keys
tables_without_pk = [name for name, table in self.tables.items() if not table.primary_key]
if tables_without_pk:
recommendations.append(f"Add primary keys to tables: {', '.join(tables_without_pk)}")
# Data type improvements
varchar_255_issues = [i for i in self.datatype_issues if "VARCHAR(255)" in i.issue]
if varchar_255_issues:
recommendations.append(f"Review {len(varchar_255_issues)} VARCHAR(255) columns for right-sizing")
# Missing foreign keys
missing_fks = [i for i in self.constraint_issues if i.issue_type == "MISSING_FOREIGN_KEY"]
if missing_fks:
recommendations.append(f"Consider adding {len(missing_fks)} foreign key constraints for referential integrity")
# Normalization improvements
normalization_issues_count = len(self.normalization_issues)
if normalization_issues_count > 0:
recommendations.append(f"Review {normalization_issues_count} normalization issues for schema optimization")
return recommendations
def format_text_report(self, analysis: Dict[str, Any]) -> str:
"""Format analysis as human-readable text report."""
lines = []
lines.append("DATABASE SCHEMA ANALYSIS REPORT")
lines.append("=" * 50)
lines.append("")
# Overview
overview = analysis["schema_overview"]
lines.append("SCHEMA OVERVIEW")
lines.append("-" * 15)
lines.append(f"Total Tables: {overview['total_tables']}")
lines.append(f"Total Columns: {overview['total_columns']}")
lines.append(f"Tables with Primary Keys: {overview['tables_with_primary_keys']}")
lines.append(f"Total Foreign Keys: {overview['total_foreign_keys']}")
lines.append(f"Total Indexes: {overview['total_indexes']}")
lines.append("")
# Recommendations
if analysis["recommendations"]:
lines.append("KEY RECOMMENDATIONS")
lines.append("-" * 18)
for i, rec in enumerate(analysis["recommendations"], 1):
lines.append(f"{i}. {rec}")
lines.append("")
# Normalization Issues
norm_analysis = analysis["normalization_analysis"]
if norm_analysis["total_issues"] > 0:
lines.append(f"NORMALIZATION ISSUES ({norm_analysis['total_issues']} total)")
lines.append("-" * 25)
severity_counts = norm_analysis["by_severity"]
lines.append(f"High: {severity_counts['high']}, Medium: {severity_counts['medium']}, "
f"Low: {severity_counts['low']}, Warning: {severity_counts['warning']}")
lines.append("")
for issue in norm_analysis["issues"][:5]: # Show first 5
lines.append(f"• {issue['table']}: {issue['description']}")
lines.append(f" Suggestion: {issue['suggestion']}")
lines.append("")
# Data Type Issues
dt_analysis = analysis["data_type_analysis"]
if dt_analysis["total_issues"] > 0:
lines.append(f"DATA TYPE ISSUES ({dt_analysis['total_issues']} total)")
lines.append("-" * 20)
for issue in dt_analysis["issues"][:5]: # Show first 5
lines.append(f"• {issue['table']}.{issue['column']}: {issue['issue']}")
lines.append(f" Current: {issue['current_type']} → Suggested: {issue['suggested_type']}")
lines.append(f" Rationale: {issue['rationale']}")
lines.append("")
# Constraint Issues
const_analysis = analysis["constraint_analysis"]
if const_analysis["total_issues"] > 0:
lines.append(f"CONSTRAINT ISSUES ({const_analysis['total_issues']} total)")
lines.append("-" * 20)
severity_counts = const_analysis["by_severity"]
lines.append(f"High: {severity_counts['high']}, Medium: {severity_counts['medium']}, "
f"Low: {severity_counts['low']}")
lines.append("")
for issue in const_analysis["issues"][:5]: # Show first 5
lines.append(f"• {issue['table']}: {issue['description']}")
lines.append(f" Suggestion: {issue['suggestion']}")
lines.append("")
# Missing Indexes
missing_idx = analysis["missing_indexes"]
if missing_idx:
lines.append(f"MISSING INDEXES ({len(missing_idx)} total)")
lines.append("-" * 17)
for idx in missing_idx[:5]: # Show first 5
lines.append(f"• {idx['table']}.{idx['column']} ({idx['type']})")
lines.append(f" SQL: {idx['suggestion']}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Analyze database schema for design issues and generate ERD")
parser.add_argument("--input", "-i", required=True, help="Input file (SQL DDL or JSON schema)")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
parser.add_argument("--output-format", "-f", choices=["json", "text"], default="text",
help="Output format")
parser.add_argument("--generate-erd", "-e", action="store_true", help="Include Mermaid ERD in output")
parser.add_argument("--erd-only", action="store_true", help="Output only the Mermaid ERD")
args = parser.parse_args()
try:
# Read input file
with open(args.input, 'r') as f:
content = f.read()
# Initialize analyzer
analyzer = SchemaAnalyzer()
# Parse input based on file extension
if args.input.lower().endswith('.json'):
analyzer.parse_json_schema(content)
else:
analyzer.parse_sql_ddl(content)
if not analyzer.tables:
print("Error: No tables found in input file", file=sys.stderr)
return 1
if args.erd_only:
# Output only ERD
erd = analyzer.generate_mermaid_erd()
if args.output:
with open(args.output, 'w') as f:
f.write(erd)
else:
print(erd)
return 0
# Perform analysis
analyzer.analyze_normalization()
analyzer.analyze_data_types()
analyzer.analyze_constraints()
analyzer.analyze_naming_conventions()
# Generate report
analysis = analyzer.get_analysis_summary()
if args.generate_erd:
analysis["mermaid_erd"] = analyzer.generate_mermaid_erd()
# Output results
if args.output_format == "json":
output = json.dumps(analysis, indent=2)
else:
output = analyzer.format_text_report(analysis)
if args.generate_erd:
output += "\n\nMERMAID ERD\n" + "=" * 11 + "\n"
output += analysis["mermaid_erd"]
if args.output:
with open(args.output, 'w') as f:
f.write(output)
else:
print(output)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main()) Install this Skill
Skills give your AI agent a consistent, structured approach to this task — better output than a one-off prompt.
npx skills add alirezarezvani/claude-skills --skill engineering/database-designer Community skill by @alirezarezvani. Need a walkthrough? See the install guide →
Works with
Prefer no terminal? Download the ZIP and place it manually.
Details
- Category
- Development
- License
- MIT
- Author
- @alirezarezvani
- Source
- GitHub →
- Source file
-
show path
engineering/database-designer/SKILL.md
People who install this also use
Senior Backend Engineer
REST and GraphQL API development, database schema optimization, authentication patterns, and backend architecture decisions from a senior engineer.
@alirezarezvani
Senior Data Engineer
ETL/ELT pipeline design, data warehouse architecture, dbt transformations, and data infrastructure at scale from a senior data engineer.
@alirezarezvani
Performance Profiler
Profile and optimize application performance — CPU, memory, network, and database bottlenecks identified and fixed with measurable improvements.
@alirezarezvani