Files
PromptTech/backend/optimize_database.py

327 lines
14 KiB
Python
Raw Permalink Normal View History

2026-01-27 18:07:00 -06:00
"""
Database Optimization Script
Creates missing indexes and constraints for PostgreSQL
"""
import asyncio
from sqlalchemy import text
from database import async_engine
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def optimize_database():
"""Apply database optimizations including indexes and constraints"""
async with async_engine.connect() as conn:
logger.info("Starting database optimization...")
# ============= FOREIGN KEY INDEXES =============
logger.info("\n1. Creating indexes on foreign keys...")
fk_indexes = [
("idx_products_category_id", "products", "category_id"),
("idx_services_category_id", "services", "category_id"),
("idx_orders_user_id", "orders", "user_id"),
("idx_cart_items_user_id", "cart_items", "user_id"),
("idx_cart_items_product_id", "cart_items", "product_id"),
("idx_order_items_order_id", "order_items", "order_id"),
("idx_order_items_product_id", "order_items", "product_id"),
("idx_order_status_history_order_id", "order_status_history", "order_id"),
("idx_reviews_user_id", "reviews", "user_id"),
("idx_reviews_product_id", "reviews", "product_id"),
("idx_reviews_service_id", "reviews", "service_id"),
("idx_bookings_user_id", "bookings", "user_id"),
("idx_bookings_service_id", "bookings", "service_id"),
("idx_inventory_logs_product_id", "inventory_logs", "product_id"),
]
for idx_name, table, column in fk_indexes:
try:
await conn.execute(text(f"""
CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({column})
"""))
await conn.commit()
logger.info(f" ✓ Created {idx_name} on {table}.{column}")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}")
# ============= PERFORMANCE INDEXES =============
logger.info("\n2. Creating performance indexes...")
performance_indexes = [
# Products - frequently filtered columns
("idx_products_is_active", "products", "is_active"),
("idx_products_category", "products", "category"),
("idx_products_stock", "products", "stock"),
("idx_products_created_at", "products", "created_at DESC"),
# Services - frequently filtered columns
("idx_services_is_active", "services", "is_active"),
("idx_services_category", "services", "category"),
("idx_services_created_at", "services", "created_at DESC"),
# Orders - status and date filtering
("idx_orders_status", "orders", "status"),
("idx_orders_created_at", "orders", "created_at DESC"),
("idx_orders_updated_at", "orders", "updated_at DESC"),
# Reviews - approval status
("idx_reviews_is_approved", "reviews", "is_approved"),
("idx_reviews_created_at", "reviews", "created_at DESC"),
# Bookings - status filtering
("idx_bookings_status", "bookings", "status"),
("idx_bookings_created_at", "bookings", "created_at DESC"),
# Inventory logs - date filtering
("idx_inventory_logs_created_at", "inventory_logs", "created_at DESC"),
# Cart items - user lookup
("idx_cart_items_created_at", "cart_items", "created_at DESC"),
]
for idx_name, table, column in performance_indexes:
try:
await conn.execute(text(f"""
CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({column})
"""))
await conn.commit()
logger.info(f" ✓ Created {idx_name} on {table}")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}")
# ============= COMPOSITE INDEXES =============
logger.info("\n3. Creating composite indexes...")
composite_indexes = [
# Products: category + active status (common query pattern)
("idx_products_category_active", "products", ["category", "is_active"]),
# Services: category + active status
("idx_services_category_active", "services", ["category", "is_active"]),
# Orders: user + status (for user order history)
("idx_orders_user_status", "orders", ["user_id", "status"]),
# Reviews: product + approved (for product reviews)
("idx_reviews_product_approved", "reviews", ["product_id", "is_approved"]),
# Reviews: service + approved (for service reviews)
("idx_reviews_service_approved", "reviews", ["service_id", "is_approved"]),
# Inventory logs: product + created_at (for product history)
("idx_inventory_logs_product_date", "inventory_logs", ["product_id", "created_at DESC"]),
]
for idx_name, table, columns in composite_indexes:
try:
cols_str = ", ".join(columns)
await conn.execute(text(f"""
CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({cols_str})
"""))
await conn.commit()
logger.info(f" ✓ Created {idx_name} on {table}({cols_str})")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}")
# ============= PARTIAL INDEXES =============
logger.info("\n4. Creating partial indexes...")
partial_indexes = [
# Only index active products
("idx_products_active_only", "products", "category", "is_active = true"),
# Only index active services
("idx_services_active_only", "services", "category", "is_active = true"),
# Only index approved reviews
("idx_reviews_approved_only", "reviews", "product_id", "is_approved = true"),
# Only index low stock products
("idx_products_low_stock", "products", "id",
"stock <= low_stock_threshold AND is_active = true"),
]
for idx_name, table, column, condition in partial_indexes:
try:
await conn.execute(text(f"""
CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({column})
WHERE {condition}
"""))
await conn.commit()
logger.info(f" ✓ Created partial index {idx_name}")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}")
# ============= OPTIMIZE ENUMS =============
logger.info("\n5. Ensuring enum types exist...")
try:
# Check if enum types exist, create if missing
result = await conn.execute(text("""
SELECT EXISTS (
SELECT 1 FROM pg_type WHERE typname = 'orderstatus'
)
"""))
if not result.scalar():
logger.info(" Creating OrderStatus enum type...")
await conn.execute(text("""
CREATE TYPE orderstatus AS ENUM (
'pending', 'processing', 'layaway', 'shipped',
'delivered', 'cancelled', 'refunded', 'on_hold'
)
"""))
result = await conn.execute(text("""
SELECT EXISTS (
SELECT 1 FROM pg_type WHERE typname = 'userrole'
)
"""))
if not result.scalar():
logger.info(" Creating UserRole enum type...")
await conn.execute(text("""
CREATE TYPE userrole AS ENUM ('user', 'admin')
"""))
logger.info(" ✓ Enum types verified")
except Exception as e:
logger.warning(f" ⚠ Enum type check: {str(e)}")
# ============= ADD CONSTRAINTS =============
logger.info("\n6. Adding check constraints...")
constraints = [
("chk_products_price_positive", "products", "price > 0"),
("chk_products_stock_nonnegative", "products", "stock >= 0"),
("chk_services_price_positive", "services", "price > 0"),
("chk_orders_totals_nonnegative", "orders",
"subtotal >= 0 AND tax >= 0 AND shipping >= 0 AND total >= 0"),
("chk_reviews_rating_range", "reviews", "rating >= 1 AND rating <= 5"),
("chk_order_items_quantity_positive", "order_items", "quantity > 0"),
("chk_order_items_price_nonnegative", "order_items", "price >= 0"),
]
for constraint_name, table, condition in constraints:
try:
# Check if constraint exists
result = await conn.execute(text(f"""
SELECT COUNT(*) FROM pg_constraint
WHERE conname = '{constraint_name}'
"""))
if result.scalar() == 0:
await conn.execute(text(f"""
ALTER TABLE {table}
ADD CONSTRAINT {constraint_name} CHECK ({condition})
"""))
await conn.commit()
logger.info(f" ✓ Added constraint {constraint_name}")
else:
logger.info(f" ⊙ Constraint {constraint_name} already exists")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to add {constraint_name}: {str(e)[:100]}")
# ============= ANALYZE TABLES =============
logger.info("\n7. Analyzing tables for statistics...")
tables = ['users', 'products', 'services', 'orders', 'order_items',
'reviews', 'bookings', 'cart_items', 'inventory_logs']
for table in tables:
try:
await conn.execute(text(f"ANALYZE {table}"))
await conn.commit()
logger.info(f" ✓ Analyzed {table}")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to analyze {table}: {str(e)[:100]}")
logger.info("\n✅ Database optimization complete!")
async def verify_optimization():
"""Verify that optimizations were applied correctly"""
async with async_engine.connect() as conn:
logger.info("\n" + "="*60)
logger.info("OPTIMIZATION VERIFICATION")
logger.info("="*60)
# Count indexes
result = await conn.execute(text("""
SELECT COUNT(*) FROM pg_indexes
WHERE schemaname = 'public' AND indexname NOT LIKE '%_pkey'
"""))
index_count = result.scalar()
logger.info(f"\nTotal indexes created: {index_count}")
# Count constraints
result = await conn.execute(text("""
SELECT COUNT(*) FROM pg_constraint
WHERE contype = 'c' AND connamespace::regnamespace::text = 'public'
"""))
constraint_count = result.scalar()
logger.info(f"Total check constraints: {constraint_count}")
# List all indexes by table
result = await conn.execute(text("""
SELECT tablename, COUNT(*) as idx_count
FROM pg_indexes
WHERE schemaname = 'public' AND indexname NOT LIKE '%_pkey'
GROUP BY tablename
ORDER BY idx_count DESC
"""))
indexes_by_table = result.fetchall()
logger.info("\nIndexes per table:")
for table, count in indexes_by_table:
logger.info(f" {table:<25} {count} indexes")
# Check for any remaining unindexed foreign keys
result = await conn.execute(text("""
SELECT
c.conrelid::regclass AS table_name,
a.attname AS column_name
FROM pg_constraint c
JOIN pg_attribute a ON a.attnum = ANY(c.conkey) AND a.attrelid = c.conrelid
LEFT JOIN pg_index i ON i.indrelid = c.conrelid
AND a.attnum = ANY(i.indkey)
WHERE c.contype = 'f'
AND i.indexrelid IS NULL
"""))
unindexed_fks = result.fetchall()
if unindexed_fks:
logger.warning(f"\n⚠️ {len(unindexed_fks)} foreign keys still without indexes:")
for table, column in unindexed_fks:
logger.warning(f" {table}.{column}")
else:
logger.info("\n✓ All foreign keys are properly indexed!")
logger.info("\n" + "="*60)
async def main():
"""Main optimization workflow"""
try:
await optimize_database()
await verify_optimization()
logger.info("\n🎉 Database optimization successful!")
logger.info("\nRecommendations:")
logger.info(" 1. Monitor query performance with pg_stat_statements")
logger.info(" 2. Run VACUUM ANALYZE periodically")
logger.info(" 3. Consider partitioning large tables (orders, inventory_logs)")
logger.info(" 4. Set up connection pooling for production")
except Exception as e:
logger.error(f"\n❌ Optimization failed: {str(e)}")
raise
if __name__ == "__main__":
asyncio.run(main())