""" Database Optimization Script Creates missing indexes and constraints for PostgreSQL """ import asyncio from sqlalchemy import text from database import async_engine import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def optimize_database(): """Apply database optimizations including indexes and constraints""" async with async_engine.connect() as conn: logger.info("Starting database optimization...") # ============= FOREIGN KEY INDEXES ============= logger.info("\n1. Creating indexes on foreign keys...") fk_indexes = [ ("idx_products_category_id", "products", "category_id"), ("idx_services_category_id", "services", "category_id"), ("idx_orders_user_id", "orders", "user_id"), ("idx_cart_items_user_id", "cart_items", "user_id"), ("idx_cart_items_product_id", "cart_items", "product_id"), ("idx_order_items_order_id", "order_items", "order_id"), ("idx_order_items_product_id", "order_items", "product_id"), ("idx_order_status_history_order_id", "order_status_history", "order_id"), ("idx_reviews_user_id", "reviews", "user_id"), ("idx_reviews_product_id", "reviews", "product_id"), ("idx_reviews_service_id", "reviews", "service_id"), ("idx_bookings_user_id", "bookings", "user_id"), ("idx_bookings_service_id", "bookings", "service_id"), ("idx_inventory_logs_product_id", "inventory_logs", "product_id"), ] for idx_name, table, column in fk_indexes: try: await conn.execute(text(f""" CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({column}) """)) await conn.commit() logger.info(f" ✓ Created {idx_name} on {table}.{column}") except Exception as e: await conn.rollback() logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}") # ============= PERFORMANCE INDEXES ============= logger.info("\n2. Creating performance indexes...") performance_indexes = [ # Products - frequently filtered columns ("idx_products_is_active", "products", "is_active"), ("idx_products_category", "products", "category"), ("idx_products_stock", "products", "stock"), ("idx_products_created_at", "products", "created_at DESC"), # Services - frequently filtered columns ("idx_services_is_active", "services", "is_active"), ("idx_services_category", "services", "category"), ("idx_services_created_at", "services", "created_at DESC"), # Orders - status and date filtering ("idx_orders_status", "orders", "status"), ("idx_orders_created_at", "orders", "created_at DESC"), ("idx_orders_updated_at", "orders", "updated_at DESC"), # Reviews - approval status ("idx_reviews_is_approved", "reviews", "is_approved"), ("idx_reviews_created_at", "reviews", "created_at DESC"), # Bookings - status filtering ("idx_bookings_status", "bookings", "status"), ("idx_bookings_created_at", "bookings", "created_at DESC"), # Inventory logs - date filtering ("idx_inventory_logs_created_at", "inventory_logs", "created_at DESC"), # Cart items - user lookup ("idx_cart_items_created_at", "cart_items", "created_at DESC"), ] for idx_name, table, column in performance_indexes: try: await conn.execute(text(f""" CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({column}) """)) await conn.commit() logger.info(f" ✓ Created {idx_name} on {table}") except Exception as e: await conn.rollback() logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}") # ============= COMPOSITE INDEXES ============= logger.info("\n3. Creating composite indexes...") composite_indexes = [ # Products: category + active status (common query pattern) ("idx_products_category_active", "products", ["category", "is_active"]), # Services: category + active status ("idx_services_category_active", "services", ["category", "is_active"]), # Orders: user + status (for user order history) ("idx_orders_user_status", "orders", ["user_id", "status"]), # Reviews: product + approved (for product reviews) ("idx_reviews_product_approved", "reviews", ["product_id", "is_approved"]), # Reviews: service + approved (for service reviews) ("idx_reviews_service_approved", "reviews", ["service_id", "is_approved"]), # Inventory logs: product + created_at (for product history) ("idx_inventory_logs_product_date", "inventory_logs", ["product_id", "created_at DESC"]), ] for idx_name, table, columns in composite_indexes: try: cols_str = ", ".join(columns) await conn.execute(text(f""" CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({cols_str}) """)) await conn.commit() logger.info(f" ✓ Created {idx_name} on {table}({cols_str})") except Exception as e: await conn.rollback() logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}") # ============= PARTIAL INDEXES ============= logger.info("\n4. Creating partial indexes...") partial_indexes = [ # Only index active products ("idx_products_active_only", "products", "category", "is_active = true"), # Only index active services ("idx_services_active_only", "services", "category", "is_active = true"), # Only index approved reviews ("idx_reviews_approved_only", "reviews", "product_id", "is_approved = true"), # Only index low stock products ("idx_products_low_stock", "products", "id", "stock <= low_stock_threshold AND is_active = true"), ] for idx_name, table, column, condition in partial_indexes: try: await conn.execute(text(f""" CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({column}) WHERE {condition} """)) await conn.commit() logger.info(f" ✓ Created partial index {idx_name}") except Exception as e: await conn.rollback() logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}") # ============= OPTIMIZE ENUMS ============= logger.info("\n5. Ensuring enum types exist...") try: # Check if enum types exist, create if missing result = await conn.execute(text(""" SELECT EXISTS ( SELECT 1 FROM pg_type WHERE typname = 'orderstatus' ) """)) if not result.scalar(): logger.info(" Creating OrderStatus enum type...") await conn.execute(text(""" CREATE TYPE orderstatus AS ENUM ( 'pending', 'processing', 'layaway', 'shipped', 'delivered', 'cancelled', 'refunded', 'on_hold' ) """)) result = await conn.execute(text(""" SELECT EXISTS ( SELECT 1 FROM pg_type WHERE typname = 'userrole' ) """)) if not result.scalar(): logger.info(" Creating UserRole enum type...") await conn.execute(text(""" CREATE TYPE userrole AS ENUM ('user', 'admin') """)) logger.info(" ✓ Enum types verified") except Exception as e: logger.warning(f" ⚠ Enum type check: {str(e)}") # ============= ADD CONSTRAINTS ============= logger.info("\n6. Adding check constraints...") constraints = [ ("chk_products_price_positive", "products", "price > 0"), ("chk_products_stock_nonnegative", "products", "stock >= 0"), ("chk_services_price_positive", "services", "price > 0"), ("chk_orders_totals_nonnegative", "orders", "subtotal >= 0 AND tax >= 0 AND shipping >= 0 AND total >= 0"), ("chk_reviews_rating_range", "reviews", "rating >= 1 AND rating <= 5"), ("chk_order_items_quantity_positive", "order_items", "quantity > 0"), ("chk_order_items_price_nonnegative", "order_items", "price >= 0"), ] for constraint_name, table, condition in constraints: try: # Check if constraint exists result = await conn.execute(text(f""" SELECT COUNT(*) FROM pg_constraint WHERE conname = '{constraint_name}' """)) if result.scalar() == 0: await conn.execute(text(f""" ALTER TABLE {table} ADD CONSTRAINT {constraint_name} CHECK ({condition}) """)) await conn.commit() logger.info(f" ✓ Added constraint {constraint_name}") else: logger.info(f" ⊙ Constraint {constraint_name} already exists") except Exception as e: await conn.rollback() logger.error(f" ✗ Failed to add {constraint_name}: {str(e)[:100]}") # ============= ANALYZE TABLES ============= logger.info("\n7. Analyzing tables for statistics...") tables = ['users', 'products', 'services', 'orders', 'order_items', 'reviews', 'bookings', 'cart_items', 'inventory_logs'] for table in tables: try: await conn.execute(text(f"ANALYZE {table}")) await conn.commit() logger.info(f" ✓ Analyzed {table}") except Exception as e: await conn.rollback() logger.error(f" ✗ Failed to analyze {table}: {str(e)[:100]}") logger.info("\n✅ Database optimization complete!") async def verify_optimization(): """Verify that optimizations were applied correctly""" async with async_engine.connect() as conn: logger.info("\n" + "="*60) logger.info("OPTIMIZATION VERIFICATION") logger.info("="*60) # Count indexes result = await conn.execute(text(""" SELECT COUNT(*) FROM pg_indexes WHERE schemaname = 'public' AND indexname NOT LIKE '%_pkey' """)) index_count = result.scalar() logger.info(f"\nTotal indexes created: {index_count}") # Count constraints result = await conn.execute(text(""" SELECT COUNT(*) FROM pg_constraint WHERE contype = 'c' AND connamespace::regnamespace::text = 'public' """)) constraint_count = result.scalar() logger.info(f"Total check constraints: {constraint_count}") # List all indexes by table result = await conn.execute(text(""" SELECT tablename, COUNT(*) as idx_count FROM pg_indexes WHERE schemaname = 'public' AND indexname NOT LIKE '%_pkey' GROUP BY tablename ORDER BY idx_count DESC """)) indexes_by_table = result.fetchall() logger.info("\nIndexes per table:") for table, count in indexes_by_table: logger.info(f" {table:<25} {count} indexes") # Check for any remaining unindexed foreign keys result = await conn.execute(text(""" SELECT c.conrelid::regclass AS table_name, a.attname AS column_name FROM pg_constraint c JOIN pg_attribute a ON a.attnum = ANY(c.conkey) AND a.attrelid = c.conrelid LEFT JOIN pg_index i ON i.indrelid = c.conrelid AND a.attnum = ANY(i.indkey) WHERE c.contype = 'f' AND i.indexrelid IS NULL """)) unindexed_fks = result.fetchall() if unindexed_fks: logger.warning(f"\n⚠️ {len(unindexed_fks)} foreign keys still without indexes:") for table, column in unindexed_fks: logger.warning(f" {table}.{column}") else: logger.info("\n✓ All foreign keys are properly indexed!") logger.info("\n" + "="*60) async def main(): """Main optimization workflow""" try: await optimize_database() await verify_optimization() logger.info("\n🎉 Database optimization successful!") logger.info("\nRecommendations:") logger.info(" 1. Monitor query performance with pg_stat_statements") logger.info(" 2. Run VACUUM ANALYZE periodically") logger.info(" 3. Consider partitioning large tables (orders, inventory_logs)") logger.info(" 4. Set up connection pooling for production") except Exception as e: logger.error(f"\n❌ Optimization failed: {str(e)}") raise if __name__ == "__main__": asyncio.run(main())