Files
PromptTech/backend/check_database_health.py

306 lines
11 KiB
Python
Raw Permalink Normal View History

2026-01-27 18:07:00 -06:00
"""
Database Health Check Script
Verifies schema correctness, relationship integrity, and performance
"""
import asyncio
from sqlalchemy import text, inspect
from database import async_engine
import logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger(__name__)
async def check_schema():
"""Verify database schema matches models"""
async with async_engine.connect() as conn:
logger.info("="*70)
logger.info("DATABASE HEALTH CHECK")
logger.info("="*70)
# 1. Table Count
logger.info("\n📊 1. SCHEMA VERIFICATION")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
"""))
table_count = result.scalar()
expected_tables = 13 # Based on models.py
status = "" if table_count == expected_tables else "⚠️ "
logger.info(f" Tables: {table_count}/{expected_tables} {status}")
# 2. Foreign Keys
logger.info("\n🔗 2. RELATIONSHIPS & CONSTRAINTS")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT COUNT(*)
FROM information_schema.table_constraints
WHERE constraint_type = 'FOREIGN KEY' AND table_schema = 'public'
"""))
fk_count = result.scalar()
logger.info(f" Foreign Keys: {fk_count}")
# 3. Indexes
result = await conn.execute(text("""
SELECT COUNT(*)
FROM pg_indexes
WHERE schemaname = 'public' AND indexname NOT LIKE '%_pkey'
"""))
index_count = result.scalar()
logger.info(f" Indexes (non-PK): {index_count}")
# 4. Check Constraints
result = await conn.execute(text("""
SELECT COUNT(*)
FROM pg_constraint
WHERE contype = 'c' AND connamespace::regnamespace::text = 'public'
"""))
check_count = result.scalar()
logger.info(f" Check Constraints: {check_count}")
# 5. Verify all foreign keys have indexes
logger.info("\n⚡ 3. PERFORMANCE VALIDATION")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT
c.conrelid::regclass AS table_name,
a.attname AS column_name
FROM pg_constraint c
JOIN pg_attribute a ON a.attnum = ANY(c.conkey) AND a.attrelid = c.conrelid
LEFT JOIN pg_index i ON i.indrelid = c.conrelid
AND a.attnum = ANY(i.indkey)
WHERE c.contype = 'f' AND i.indexrelid IS NULL
"""))
missing_indexes = result.fetchall()
if missing_indexes:
logger.info(f" ⚠️ {len(missing_indexes)} foreign keys without indexes:")
for table, column in missing_indexes:
logger.info(f" {table}.{column}")
else:
logger.info(" ✅ All foreign keys are indexed")
# 6. Check for duplicate indexes
result = await conn.execute(text("""
SELECT
t.relname AS table_name,
i.relname AS index_name,
array_agg(a.attname ORDER BY a.attnum) AS columns
FROM pg_class t
JOIN pg_index ix ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
WHERE t.relkind = 'r'
AND t.relnamespace::regnamespace::text = 'public'
GROUP BY t.relname, i.relname
HAVING COUNT(*) > 0
"""))
indexes = result.fetchall()
# Group by columns to find duplicates
index_map = {}
for table, idx_name, columns in indexes:
key = (table, tuple(columns))
if key not in index_map:
index_map[key] = []
index_map[key].append(idx_name)
duplicates = [(k, v) for k, v in index_map.items() if len(v) > 1]
if duplicates:
logger.info(f" ⚠️ {len(duplicates)} duplicate index patterns found")
for (table, cols), names in duplicates[:3]:
logger.info(f" {table}{list(cols)}: {', '.join(names)}")
else:
logger.info(" ✅ No duplicate indexes found")
# 7. Table sizes and row counts
logger.info("\n📦 4. TABLE STATISTICS")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT
c.relname AS table_name,
pg_size_pretty(pg_total_relation_size(c.oid)) AS size,
c.reltuples::bigint AS estimated_rows
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r' AND nspname = 'public'
ORDER BY pg_total_relation_size(c.oid) DESC
LIMIT 10
"""))
sizes = result.fetchall()
logger.info(f" {'Table':<25} {'Size':>10} {'Rows':>10}")
logger.info(" " + "-" * 48)
for table, size, rows in sizes:
logger.info(f" {table:<25} {size:>10} {rows:>10}")
# 8. Check column data types alignment
logger.info("\n🔍 5. DATA TYPE VERIFICATION")
logger.info("-" * 70)
critical_columns = [
('users', 'email', 'character varying'),
('products', 'price', 'double precision'),
('services', 'price', 'double precision'),
('orders', 'total', 'double precision'),
('reviews', 'rating', 'integer'),
]
issues = []
for table, column, expected_type in critical_columns:
result = await conn.execute(text(f"""
SELECT data_type
FROM information_schema.columns
WHERE table_name = '{table}' AND column_name = '{column}'
"""))
actual_type = result.scalar()
if actual_type != expected_type:
issues.append(f"{table}.{column}: expected {expected_type}, got {actual_type}")
if issues:
logger.info(" ⚠️ Data type mismatches:")
for issue in issues:
logger.info(f" {issue}")
else:
logger.info(" ✅ All critical columns have correct types")
# 9. Check for missing NOT NULL constraints
logger.info("\n🛡️ 6. DATA INTEGRITY")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND column_name IN ('id', 'user_id', 'product_id', 'service_id', 'order_id')
AND is_nullable = 'YES'
ORDER BY table_name, column_name
"""))
nullable_fks = result.fetchall()
if nullable_fks:
logger.info(f" {len(nullable_fks)} nullable foreign/primary keys (by design):")
for table, column in nullable_fks[:5]:
logger.info(f" {table}.{column}")
else:
logger.info(" ✅ All IDs have appropriate nullability")
# 10. Enum types verification
logger.info("\n📋 7. ENUM TYPES")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT
t.typname AS enum_name,
array_agg(e.enumlabel ORDER BY e.enumsortorder) AS values
FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typnamespace::regnamespace::text = 'public'
GROUP BY t.typname
"""))
enums = result.fetchall()
if enums:
for enum_name, values in enums:
logger.info(f"{enum_name}: {len(values)} values")
else:
logger.info(" ⚠️ No enum types found")
# 11. Index usage statistics
logger.info("\n📈 8. INDEX USAGE")
logger.info("-" * 70)
try:
result = await conn.execute(text("""
SELECT
schemaname,
relname,
indexrelname,
idx_scan,
idx_tup_read
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan DESC
LIMIT 10
"""))
index_stats = result.fetchall()
logger.info(f" {'Table':<20} {'Index':<30} {'Scans':>10}")
logger.info(" " + "-" * 62)
for schema, table, idx_name, scans, reads in index_stats:
logger.info(f" {table:<20} {idx_name:<30} {scans:>10}")
except Exception as e:
logger.info(f" Index statistics not available (requires pg_stat_statements)")
# 12. Connection settings
logger.info("\n⚙️ 9. DATABASE CONFIGURATION")
logger.info("-" * 70)
settings_to_check = [
'max_connections',
'shared_buffers',
'effective_cache_size',
'work_mem',
'maintenance_work_mem'
]
for setting in settings_to_check:
try:
result = await conn.execute(text(f"SHOW {setting}"))
value = result.scalar()
logger.info(f" {setting:<25} {value}")
except:
pass
# Summary
logger.info("\n" + "="*70)
logger.info("HEALTH CHECK SUMMARY")
logger.info("="*70)
checks = [
(table_count == expected_tables, "Schema structure"),
(fk_count > 0, "Foreign key relationships"),
(index_count > 30, "Performance indexes"),
(check_count > 0, "Data validation constraints"),
(len(missing_indexes) == 0, "Foreign key indexing"),
(len(issues) == 0, "Data type correctness"),
]
passed = sum(1 for check, _ in checks if check)
total = len(checks)
logger.info(f"\n Passed: {passed}/{total} checks")
logger.info("")
for check, description in checks:
status = "" if check else "⚠️ "
logger.info(f" {status} {description}")
if passed == total:
logger.info("\n🎉 Database is healthy and optimized!")
elif passed >= total * 0.8:
logger.info("\n✅ Database is mostly healthy with minor issues")
else:
logger.info("\n⚠️ Database needs attention")
logger.info("\n" + "="*70)
async def main():
try:
await check_schema()
except Exception as e:
logger.error(f"\n❌ Health check failed: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())