Files
PromptTech/backend/check_database_health.py

306 lines
11 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Database Health Check Script
Verifies schema correctness, relationship integrity, and performance
"""
import asyncio
from sqlalchemy import text, inspect
from database import async_engine
import logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger(__name__)
async def check_schema():
"""Verify database schema matches models"""
async with async_engine.connect() as conn:
logger.info("="*70)
logger.info("DATABASE HEALTH CHECK")
logger.info("="*70)
# 1. Table Count
logger.info("\n📊 1. SCHEMA VERIFICATION")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
"""))
table_count = result.scalar()
expected_tables = 13 # Based on models.py
status = "" if table_count == expected_tables else "⚠️ "
logger.info(f" Tables: {table_count}/{expected_tables} {status}")
# 2. Foreign Keys
logger.info("\n🔗 2. RELATIONSHIPS & CONSTRAINTS")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT COUNT(*)
FROM information_schema.table_constraints
WHERE constraint_type = 'FOREIGN KEY' AND table_schema = 'public'
"""))
fk_count = result.scalar()
logger.info(f" Foreign Keys: {fk_count}")
# 3. Indexes
result = await conn.execute(text("""
SELECT COUNT(*)
FROM pg_indexes
WHERE schemaname = 'public' AND indexname NOT LIKE '%_pkey'
"""))
index_count = result.scalar()
logger.info(f" Indexes (non-PK): {index_count}")
# 4. Check Constraints
result = await conn.execute(text("""
SELECT COUNT(*)
FROM pg_constraint
WHERE contype = 'c' AND connamespace::regnamespace::text = 'public'
"""))
check_count = result.scalar()
logger.info(f" Check Constraints: {check_count}")
# 5. Verify all foreign keys have indexes
logger.info("\n⚡ 3. PERFORMANCE VALIDATION")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT
c.conrelid::regclass AS table_name,
a.attname AS column_name
FROM pg_constraint c
JOIN pg_attribute a ON a.attnum = ANY(c.conkey) AND a.attrelid = c.conrelid
LEFT JOIN pg_index i ON i.indrelid = c.conrelid
AND a.attnum = ANY(i.indkey)
WHERE c.contype = 'f' AND i.indexrelid IS NULL
"""))
missing_indexes = result.fetchall()
if missing_indexes:
logger.info(f" ⚠️ {len(missing_indexes)} foreign keys without indexes:")
for table, column in missing_indexes:
logger.info(f" {table}.{column}")
else:
logger.info(" ✅ All foreign keys are indexed")
# 6. Check for duplicate indexes
result = await conn.execute(text("""
SELECT
t.relname AS table_name,
i.relname AS index_name,
array_agg(a.attname ORDER BY a.attnum) AS columns
FROM pg_class t
JOIN pg_index ix ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
WHERE t.relkind = 'r'
AND t.relnamespace::regnamespace::text = 'public'
GROUP BY t.relname, i.relname
HAVING COUNT(*) > 0
"""))
indexes = result.fetchall()
# Group by columns to find duplicates
index_map = {}
for table, idx_name, columns in indexes:
key = (table, tuple(columns))
if key not in index_map:
index_map[key] = []
index_map[key].append(idx_name)
duplicates = [(k, v) for k, v in index_map.items() if len(v) > 1]
if duplicates:
logger.info(f" ⚠️ {len(duplicates)} duplicate index patterns found")
for (table, cols), names in duplicates[:3]:
logger.info(f" {table}{list(cols)}: {', '.join(names)}")
else:
logger.info(" ✅ No duplicate indexes found")
# 7. Table sizes and row counts
logger.info("\n📦 4. TABLE STATISTICS")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT
c.relname AS table_name,
pg_size_pretty(pg_total_relation_size(c.oid)) AS size,
c.reltuples::bigint AS estimated_rows
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r' AND nspname = 'public'
ORDER BY pg_total_relation_size(c.oid) DESC
LIMIT 10
"""))
sizes = result.fetchall()
logger.info(f" {'Table':<25} {'Size':>10} {'Rows':>10}")
logger.info(" " + "-" * 48)
for table, size, rows in sizes:
logger.info(f" {table:<25} {size:>10} {rows:>10}")
# 8. Check column data types alignment
logger.info("\n🔍 5. DATA TYPE VERIFICATION")
logger.info("-" * 70)
critical_columns = [
('users', 'email', 'character varying'),
('products', 'price', 'double precision'),
('services', 'price', 'double precision'),
('orders', 'total', 'double precision'),
('reviews', 'rating', 'integer'),
]
issues = []
for table, column, expected_type in critical_columns:
result = await conn.execute(text(f"""
SELECT data_type
FROM information_schema.columns
WHERE table_name = '{table}' AND column_name = '{column}'
"""))
actual_type = result.scalar()
if actual_type != expected_type:
issues.append(f"{table}.{column}: expected {expected_type}, got {actual_type}")
if issues:
logger.info(" ⚠️ Data type mismatches:")
for issue in issues:
logger.info(f" {issue}")
else:
logger.info(" ✅ All critical columns have correct types")
# 9. Check for missing NOT NULL constraints
logger.info("\n🛡️ 6. DATA INTEGRITY")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND column_name IN ('id', 'user_id', 'product_id', 'service_id', 'order_id')
AND is_nullable = 'YES'
ORDER BY table_name, column_name
"""))
nullable_fks = result.fetchall()
if nullable_fks:
logger.info(f" {len(nullable_fks)} nullable foreign/primary keys (by design):")
for table, column in nullable_fks[:5]:
logger.info(f" {table}.{column}")
else:
logger.info(" ✅ All IDs have appropriate nullability")
# 10. Enum types verification
logger.info("\n📋 7. ENUM TYPES")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT
t.typname AS enum_name,
array_agg(e.enumlabel ORDER BY e.enumsortorder) AS values
FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typnamespace::regnamespace::text = 'public'
GROUP BY t.typname
"""))
enums = result.fetchall()
if enums:
for enum_name, values in enums:
logger.info(f"{enum_name}: {len(values)} values")
else:
logger.info(" ⚠️ No enum types found")
# 11. Index usage statistics
logger.info("\n📈 8. INDEX USAGE")
logger.info("-" * 70)
try:
result = await conn.execute(text("""
SELECT
schemaname,
relname,
indexrelname,
idx_scan,
idx_tup_read
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan DESC
LIMIT 10
"""))
index_stats = result.fetchall()
logger.info(f" {'Table':<20} {'Index':<30} {'Scans':>10}")
logger.info(" " + "-" * 62)
for schema, table, idx_name, scans, reads in index_stats:
logger.info(f" {table:<20} {idx_name:<30} {scans:>10}")
except Exception as e:
logger.info(f" Index statistics not available (requires pg_stat_statements)")
# 12. Connection settings
logger.info("\n⚙️ 9. DATABASE CONFIGURATION")
logger.info("-" * 70)
settings_to_check = [
'max_connections',
'shared_buffers',
'effective_cache_size',
'work_mem',
'maintenance_work_mem'
]
for setting in settings_to_check:
try:
result = await conn.execute(text(f"SHOW {setting}"))
value = result.scalar()
logger.info(f" {setting:<25} {value}")
except:
pass
# Summary
logger.info("\n" + "="*70)
logger.info("HEALTH CHECK SUMMARY")
logger.info("="*70)
checks = [
(table_count == expected_tables, "Schema structure"),
(fk_count > 0, "Foreign key relationships"),
(index_count > 30, "Performance indexes"),
(check_count > 0, "Data validation constraints"),
(len(missing_indexes) == 0, "Foreign key indexing"),
(len(issues) == 0, "Data type correctness"),
]
passed = sum(1 for check, _ in checks if check)
total = len(checks)
logger.info(f"\n Passed: {passed}/{total} checks")
logger.info("")
for check, description in checks:
status = "" if check else "⚠️ "
logger.info(f" {status} {description}")
if passed == total:
logger.info("\n🎉 Database is healthy and optimized!")
elif passed >= total * 0.8:
logger.info("\n✅ Database is mostly healthy with minor issues")
else:
logger.info("\n⚠️ Database needs attention")
logger.info("\n" + "="*70)
async def main():
try:
await check_schema()
except Exception as e:
logger.error(f"\n❌ Health check failed: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())