""" Database Health Check Script Verifies schema correctness, relationship integrity, and performance """ import asyncio from sqlalchemy import text, inspect from database import async_engine import logging logging.basicConfig(level=logging.INFO, format='%(message)s') logger = logging.getLogger(__name__) async def check_schema(): """Verify database schema matches models""" async with async_engine.connect() as conn: logger.info("="*70) logger.info("DATABASE HEALTH CHECK") logger.info("="*70) # 1. Table Count logger.info("\nšŸ“Š 1. SCHEMA VERIFICATION") logger.info("-" * 70) result = await conn.execute(text(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public' """)) table_count = result.scalar() expected_tables = 13 # Based on models.py status = "āœ…" if table_count == expected_tables else "āš ļø " logger.info(f" Tables: {table_count}/{expected_tables} {status}") # 2. Foreign Keys logger.info("\nšŸ”— 2. RELATIONSHIPS & CONSTRAINTS") logger.info("-" * 70) result = await conn.execute(text(""" SELECT COUNT(*) FROM information_schema.table_constraints WHERE constraint_type = 'FOREIGN KEY' AND table_schema = 'public' """)) fk_count = result.scalar() logger.info(f" Foreign Keys: {fk_count} āœ…") # 3. Indexes result = await conn.execute(text(""" SELECT COUNT(*) FROM pg_indexes WHERE schemaname = 'public' AND indexname NOT LIKE '%_pkey' """)) index_count = result.scalar() logger.info(f" Indexes (non-PK): {index_count} āœ…") # 4. Check Constraints result = await conn.execute(text(""" SELECT COUNT(*) FROM pg_constraint WHERE contype = 'c' AND connamespace::regnamespace::text = 'public' """)) check_count = result.scalar() logger.info(f" Check Constraints: {check_count} āœ…") # 5. Verify all foreign keys have indexes logger.info("\n⚔ 3. PERFORMANCE VALIDATION") logger.info("-" * 70) result = await conn.execute(text(""" SELECT c.conrelid::regclass AS table_name, a.attname AS column_name FROM pg_constraint c JOIN pg_attribute a ON a.attnum = ANY(c.conkey) AND a.attrelid = c.conrelid LEFT JOIN pg_index i ON i.indrelid = c.conrelid AND a.attnum = ANY(i.indkey) WHERE c.contype = 'f' AND i.indexrelid IS NULL """)) missing_indexes = result.fetchall() if missing_indexes: logger.info(f" āš ļø {len(missing_indexes)} foreign keys without indexes:") for table, column in missing_indexes: logger.info(f" {table}.{column}") else: logger.info(" āœ… All foreign keys are indexed") # 6. Check for duplicate indexes result = await conn.execute(text(""" SELECT t.relname AS table_name, i.relname AS index_name, array_agg(a.attname ORDER BY a.attnum) AS columns FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) WHERE t.relkind = 'r' AND t.relnamespace::regnamespace::text = 'public' GROUP BY t.relname, i.relname HAVING COUNT(*) > 0 """)) indexes = result.fetchall() # Group by columns to find duplicates index_map = {} for table, idx_name, columns in indexes: key = (table, tuple(columns)) if key not in index_map: index_map[key] = [] index_map[key].append(idx_name) duplicates = [(k, v) for k, v in index_map.items() if len(v) > 1] if duplicates: logger.info(f" āš ļø {len(duplicates)} duplicate index patterns found") for (table, cols), names in duplicates[:3]: logger.info(f" {table}{list(cols)}: {', '.join(names)}") else: logger.info(" āœ… No duplicate indexes found") # 7. Table sizes and row counts logger.info("\nšŸ“¦ 4. TABLE STATISTICS") logger.info("-" * 70) result = await conn.execute(text(""" SELECT c.relname AS table_name, pg_size_pretty(pg_total_relation_size(c.oid)) AS size, c.reltuples::bigint AS estimated_rows FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace WHERE relkind = 'r' AND nspname = 'public' ORDER BY pg_total_relation_size(c.oid) DESC LIMIT 10 """)) sizes = result.fetchall() logger.info(f" {'Table':<25} {'Size':>10} {'Rows':>10}") logger.info(" " + "-" * 48) for table, size, rows in sizes: logger.info(f" {table:<25} {size:>10} {rows:>10}") # 8. Check column data types alignment logger.info("\nšŸ” 5. DATA TYPE VERIFICATION") logger.info("-" * 70) critical_columns = [ ('users', 'email', 'character varying'), ('products', 'price', 'double precision'), ('services', 'price', 'double precision'), ('orders', 'total', 'double precision'), ('reviews', 'rating', 'integer'), ] issues = [] for table, column, expected_type in critical_columns: result = await conn.execute(text(f""" SELECT data_type FROM information_schema.columns WHERE table_name = '{table}' AND column_name = '{column}' """)) actual_type = result.scalar() if actual_type != expected_type: issues.append(f"{table}.{column}: expected {expected_type}, got {actual_type}") if issues: logger.info(" āš ļø Data type mismatches:") for issue in issues: logger.info(f" {issue}") else: logger.info(" āœ… All critical columns have correct types") # 9. Check for missing NOT NULL constraints logger.info("\nšŸ›”ļø 6. DATA INTEGRITY") logger.info("-" * 70) result = await conn.execute(text(""" SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = 'public' AND column_name IN ('id', 'user_id', 'product_id', 'service_id', 'order_id') AND is_nullable = 'YES' ORDER BY table_name, column_name """)) nullable_fks = result.fetchall() if nullable_fks: logger.info(f" ā„¹ļø {len(nullable_fks)} nullable foreign/primary keys (by design):") for table, column in nullable_fks[:5]: logger.info(f" {table}.{column}") else: logger.info(" āœ… All IDs have appropriate nullability") # 10. Enum types verification logger.info("\nšŸ“‹ 7. ENUM TYPES") logger.info("-" * 70) result = await conn.execute(text(""" SELECT t.typname AS enum_name, array_agg(e.enumlabel ORDER BY e.enumsortorder) AS values FROM pg_type t JOIN pg_enum e ON t.oid = e.enumtypid WHERE t.typnamespace::regnamespace::text = 'public' GROUP BY t.typname """)) enums = result.fetchall() if enums: for enum_name, values in enums: logger.info(f" āœ… {enum_name}: {len(values)} values") else: logger.info(" āš ļø No enum types found") # 11. Index usage statistics logger.info("\nšŸ“ˆ 8. INDEX USAGE") logger.info("-" * 70) try: result = await conn.execute(text(""" SELECT schemaname, relname, indexrelname, idx_scan, idx_tup_read FROM pg_stat_user_indexes WHERE schemaname = 'public' ORDER BY idx_scan DESC LIMIT 10 """)) index_stats = result.fetchall() logger.info(f" {'Table':<20} {'Index':<30} {'Scans':>10}") logger.info(" " + "-" * 62) for schema, table, idx_name, scans, reads in index_stats: logger.info(f" {table:<20} {idx_name:<30} {scans:>10}") except Exception as e: logger.info(f" ā„¹ļø Index statistics not available (requires pg_stat_statements)") # 12. Connection settings logger.info("\nāš™ļø 9. DATABASE CONFIGURATION") logger.info("-" * 70) settings_to_check = [ 'max_connections', 'shared_buffers', 'effective_cache_size', 'work_mem', 'maintenance_work_mem' ] for setting in settings_to_check: try: result = await conn.execute(text(f"SHOW {setting}")) value = result.scalar() logger.info(f" {setting:<25} {value}") except: pass # Summary logger.info("\n" + "="*70) logger.info("HEALTH CHECK SUMMARY") logger.info("="*70) checks = [ (table_count == expected_tables, "Schema structure"), (fk_count > 0, "Foreign key relationships"), (index_count > 30, "Performance indexes"), (check_count > 0, "Data validation constraints"), (len(missing_indexes) == 0, "Foreign key indexing"), (len(issues) == 0, "Data type correctness"), ] passed = sum(1 for check, _ in checks if check) total = len(checks) logger.info(f"\n Passed: {passed}/{total} checks") logger.info("") for check, description in checks: status = "āœ…" if check else "āš ļø " logger.info(f" {status} {description}") if passed == total: logger.info("\nšŸŽ‰ Database is healthy and optimized!") elif passed >= total * 0.8: logger.info("\nāœ… Database is mostly healthy with minor issues") else: logger.info("\nāš ļø Database needs attention") logger.info("\n" + "="*70) async def main(): try: await check_schema() except Exception as e: logger.error(f"\nāŒ Health check failed: {str(e)}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())