306 lines
11 KiB
Python
306 lines
11 KiB
Python
"""
|
||
Database Health Check Script
|
||
Verifies schema correctness, relationship integrity, and performance
|
||
"""
|
||
|
||
import asyncio
|
||
from sqlalchemy import text, inspect
|
||
from database import async_engine
|
||
import logging
|
||
|
||
logging.basicConfig(level=logging.INFO, format='%(message)s')
|
||
logger = logging.getLogger(__name__)
|
||
|
||
async def check_schema():
|
||
"""Verify database schema matches models"""
|
||
|
||
async with async_engine.connect() as conn:
|
||
logger.info("="*70)
|
||
logger.info("DATABASE HEALTH CHECK")
|
||
logger.info("="*70)
|
||
|
||
# 1. Table Count
|
||
logger.info("\n📊 1. SCHEMA VERIFICATION")
|
||
logger.info("-" * 70)
|
||
|
||
result = await conn.execute(text("""
|
||
SELECT COUNT(*) FROM information_schema.tables
|
||
WHERE table_schema = 'public'
|
||
"""))
|
||
table_count = result.scalar()
|
||
|
||
expected_tables = 13 # Based on models.py
|
||
status = "✅" if table_count == expected_tables else "⚠️ "
|
||
logger.info(f" Tables: {table_count}/{expected_tables} {status}")
|
||
|
||
# 2. Foreign Keys
|
||
logger.info("\n🔗 2. RELATIONSHIPS & CONSTRAINTS")
|
||
logger.info("-" * 70)
|
||
|
||
result = await conn.execute(text("""
|
||
SELECT COUNT(*)
|
||
FROM information_schema.table_constraints
|
||
WHERE constraint_type = 'FOREIGN KEY' AND table_schema = 'public'
|
||
"""))
|
||
fk_count = result.scalar()
|
||
logger.info(f" Foreign Keys: {fk_count} ✅")
|
||
|
||
# 3. Indexes
|
||
result = await conn.execute(text("""
|
||
SELECT COUNT(*)
|
||
FROM pg_indexes
|
||
WHERE schemaname = 'public' AND indexname NOT LIKE '%_pkey'
|
||
"""))
|
||
index_count = result.scalar()
|
||
logger.info(f" Indexes (non-PK): {index_count} ✅")
|
||
|
||
# 4. Check Constraints
|
||
result = await conn.execute(text("""
|
||
SELECT COUNT(*)
|
||
FROM pg_constraint
|
||
WHERE contype = 'c' AND connamespace::regnamespace::text = 'public'
|
||
"""))
|
||
check_count = result.scalar()
|
||
logger.info(f" Check Constraints: {check_count} ✅")
|
||
|
||
# 5. Verify all foreign keys have indexes
|
||
logger.info("\n⚡ 3. PERFORMANCE VALIDATION")
|
||
logger.info("-" * 70)
|
||
|
||
result = await conn.execute(text("""
|
||
SELECT
|
||
c.conrelid::regclass AS table_name,
|
||
a.attname AS column_name
|
||
FROM pg_constraint c
|
||
JOIN pg_attribute a ON a.attnum = ANY(c.conkey) AND a.attrelid = c.conrelid
|
||
LEFT JOIN pg_index i ON i.indrelid = c.conrelid
|
||
AND a.attnum = ANY(i.indkey)
|
||
WHERE c.contype = 'f' AND i.indexrelid IS NULL
|
||
"""))
|
||
missing_indexes = result.fetchall()
|
||
|
||
if missing_indexes:
|
||
logger.info(f" ⚠️ {len(missing_indexes)} foreign keys without indexes:")
|
||
for table, column in missing_indexes:
|
||
logger.info(f" {table}.{column}")
|
||
else:
|
||
logger.info(" ✅ All foreign keys are indexed")
|
||
|
||
# 6. Check for duplicate indexes
|
||
result = await conn.execute(text("""
|
||
SELECT
|
||
t.relname AS table_name,
|
||
i.relname AS index_name,
|
||
array_agg(a.attname ORDER BY a.attnum) AS columns
|
||
FROM pg_class t
|
||
JOIN pg_index ix ON t.oid = ix.indrelid
|
||
JOIN pg_class i ON i.oid = ix.indexrelid
|
||
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
|
||
WHERE t.relkind = 'r'
|
||
AND t.relnamespace::regnamespace::text = 'public'
|
||
GROUP BY t.relname, i.relname
|
||
HAVING COUNT(*) > 0
|
||
"""))
|
||
indexes = result.fetchall()
|
||
|
||
# Group by columns to find duplicates
|
||
index_map = {}
|
||
for table, idx_name, columns in indexes:
|
||
key = (table, tuple(columns))
|
||
if key not in index_map:
|
||
index_map[key] = []
|
||
index_map[key].append(idx_name)
|
||
|
||
duplicates = [(k, v) for k, v in index_map.items() if len(v) > 1]
|
||
if duplicates:
|
||
logger.info(f" ⚠️ {len(duplicates)} duplicate index patterns found")
|
||
for (table, cols), names in duplicates[:3]:
|
||
logger.info(f" {table}{list(cols)}: {', '.join(names)}")
|
||
else:
|
||
logger.info(" ✅ No duplicate indexes found")
|
||
|
||
# 7. Table sizes and row counts
|
||
logger.info("\n📦 4. TABLE STATISTICS")
|
||
logger.info("-" * 70)
|
||
|
||
result = await conn.execute(text("""
|
||
SELECT
|
||
c.relname AS table_name,
|
||
pg_size_pretty(pg_total_relation_size(c.oid)) AS size,
|
||
c.reltuples::bigint AS estimated_rows
|
||
FROM pg_class c
|
||
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
|
||
WHERE relkind = 'r' AND nspname = 'public'
|
||
ORDER BY pg_total_relation_size(c.oid) DESC
|
||
LIMIT 10
|
||
"""))
|
||
sizes = result.fetchall()
|
||
|
||
logger.info(f" {'Table':<25} {'Size':>10} {'Rows':>10}")
|
||
logger.info(" " + "-" * 48)
|
||
for table, size, rows in sizes:
|
||
logger.info(f" {table:<25} {size:>10} {rows:>10}")
|
||
|
||
# 8. Check column data types alignment
|
||
logger.info("\n🔍 5. DATA TYPE VERIFICATION")
|
||
logger.info("-" * 70)
|
||
|
||
critical_columns = [
|
||
('users', 'email', 'character varying'),
|
||
('products', 'price', 'double precision'),
|
||
('services', 'price', 'double precision'),
|
||
('orders', 'total', 'double precision'),
|
||
('reviews', 'rating', 'integer'),
|
||
]
|
||
|
||
issues = []
|
||
for table, column, expected_type in critical_columns:
|
||
result = await conn.execute(text(f"""
|
||
SELECT data_type
|
||
FROM information_schema.columns
|
||
WHERE table_name = '{table}' AND column_name = '{column}'
|
||
"""))
|
||
actual_type = result.scalar()
|
||
|
||
if actual_type != expected_type:
|
||
issues.append(f"{table}.{column}: expected {expected_type}, got {actual_type}")
|
||
|
||
if issues:
|
||
logger.info(" ⚠️ Data type mismatches:")
|
||
for issue in issues:
|
||
logger.info(f" {issue}")
|
||
else:
|
||
logger.info(" ✅ All critical columns have correct types")
|
||
|
||
# 9. Check for missing NOT NULL constraints
|
||
logger.info("\n🛡️ 6. DATA INTEGRITY")
|
||
logger.info("-" * 70)
|
||
|
||
result = await conn.execute(text("""
|
||
SELECT table_name, column_name
|
||
FROM information_schema.columns
|
||
WHERE table_schema = 'public'
|
||
AND column_name IN ('id', 'user_id', 'product_id', 'service_id', 'order_id')
|
||
AND is_nullable = 'YES'
|
||
ORDER BY table_name, column_name
|
||
"""))
|
||
nullable_fks = result.fetchall()
|
||
|
||
if nullable_fks:
|
||
logger.info(f" ℹ️ {len(nullable_fks)} nullable foreign/primary keys (by design):")
|
||
for table, column in nullable_fks[:5]:
|
||
logger.info(f" {table}.{column}")
|
||
else:
|
||
logger.info(" ✅ All IDs have appropriate nullability")
|
||
|
||
# 10. Enum types verification
|
||
logger.info("\n📋 7. ENUM TYPES")
|
||
logger.info("-" * 70)
|
||
|
||
result = await conn.execute(text("""
|
||
SELECT
|
||
t.typname AS enum_name,
|
||
array_agg(e.enumlabel ORDER BY e.enumsortorder) AS values
|
||
FROM pg_type t
|
||
JOIN pg_enum e ON t.oid = e.enumtypid
|
||
WHERE t.typnamespace::regnamespace::text = 'public'
|
||
GROUP BY t.typname
|
||
"""))
|
||
enums = result.fetchall()
|
||
|
||
if enums:
|
||
for enum_name, values in enums:
|
||
logger.info(f" ✅ {enum_name}: {len(values)} values")
|
||
else:
|
||
logger.info(" ⚠️ No enum types found")
|
||
|
||
# 11. Index usage statistics
|
||
logger.info("\n📈 8. INDEX USAGE")
|
||
logger.info("-" * 70)
|
||
|
||
try:
|
||
result = await conn.execute(text("""
|
||
SELECT
|
||
schemaname,
|
||
relname,
|
||
indexrelname,
|
||
idx_scan,
|
||
idx_tup_read
|
||
FROM pg_stat_user_indexes
|
||
WHERE schemaname = 'public'
|
||
ORDER BY idx_scan DESC
|
||
LIMIT 10
|
||
"""))
|
||
index_stats = result.fetchall()
|
||
|
||
logger.info(f" {'Table':<20} {'Index':<30} {'Scans':>10}")
|
||
logger.info(" " + "-" * 62)
|
||
for schema, table, idx_name, scans, reads in index_stats:
|
||
logger.info(f" {table:<20} {idx_name:<30} {scans:>10}")
|
||
except Exception as e:
|
||
logger.info(f" ℹ️ Index statistics not available (requires pg_stat_statements)")
|
||
|
||
# 12. Connection settings
|
||
logger.info("\n⚙️ 9. DATABASE CONFIGURATION")
|
||
logger.info("-" * 70)
|
||
|
||
settings_to_check = [
|
||
'max_connections',
|
||
'shared_buffers',
|
||
'effective_cache_size',
|
||
'work_mem',
|
||
'maintenance_work_mem'
|
||
]
|
||
|
||
for setting in settings_to_check:
|
||
try:
|
||
result = await conn.execute(text(f"SHOW {setting}"))
|
||
value = result.scalar()
|
||
logger.info(f" {setting:<25} {value}")
|
||
except:
|
||
pass
|
||
|
||
# Summary
|
||
logger.info("\n" + "="*70)
|
||
logger.info("HEALTH CHECK SUMMARY")
|
||
logger.info("="*70)
|
||
|
||
checks = [
|
||
(table_count == expected_tables, "Schema structure"),
|
||
(fk_count > 0, "Foreign key relationships"),
|
||
(index_count > 30, "Performance indexes"),
|
||
(check_count > 0, "Data validation constraints"),
|
||
(len(missing_indexes) == 0, "Foreign key indexing"),
|
||
(len(issues) == 0, "Data type correctness"),
|
||
]
|
||
|
||
passed = sum(1 for check, _ in checks if check)
|
||
total = len(checks)
|
||
|
||
logger.info(f"\n Passed: {passed}/{total} checks")
|
||
logger.info("")
|
||
|
||
for check, description in checks:
|
||
status = "✅" if check else "⚠️ "
|
||
logger.info(f" {status} {description}")
|
||
|
||
if passed == total:
|
||
logger.info("\n🎉 Database is healthy and optimized!")
|
||
elif passed >= total * 0.8:
|
||
logger.info("\n✅ Database is mostly healthy with minor issues")
|
||
else:
|
||
logger.info("\n⚠️ Database needs attention")
|
||
|
||
logger.info("\n" + "="*70)
|
||
|
||
async def main():
|
||
try:
|
||
await check_schema()
|
||
except Exception as e:
|
||
logger.error(f"\n❌ Health check failed: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|