298 lines
7.7 KiB
JavaScript
298 lines
7.7 KiB
JavaScript
const { Pool } = require("pg");
|
|
const crypto = require("crypto");
|
|
const logger = require("./logger");
|
|
require("dotenv").config();
|
|
|
|
const pool = new Pool({
|
|
host: process.env.DB_HOST || "localhost",
|
|
port: process.env.DB_PORT || 5432,
|
|
database: process.env.DB_NAME || "skyartshop",
|
|
user: process.env.DB_USER || "skyartapp",
|
|
password: process.env.DB_PASSWORD,
|
|
max: 30, // Increased to 30 for higher concurrency
|
|
min: 10, // Keep 10 connections warm for instant response
|
|
idleTimeoutMillis: 60000,
|
|
connectionTimeoutMillis: 3000,
|
|
application_name: "skyartshop-api",
|
|
keepAlive: true, // TCP keepalive
|
|
keepAliveInitialDelayMillis: 10000,
|
|
statement_timeout: 30000, // 30s query timeout
|
|
query_timeout: 30000, // SAFEGUARD: Force query timeout at pool level
|
|
});
|
|
|
|
// SAFEGUARD: Track pool health
|
|
let poolConnected = false;
|
|
let connectionAttempts = 0;
|
|
const MAX_CONNECTION_ATTEMPTS = 3;
|
|
|
|
pool.on("connect", (client) => {
|
|
poolConnected = true;
|
|
connectionAttempts = 0;
|
|
logger.info("✓ PostgreSQL connected", {
|
|
total: pool.totalCount,
|
|
idle: pool.idleCount,
|
|
waiting: pool.waitingCount,
|
|
});
|
|
});
|
|
|
|
pool.on("error", (err, client) => {
|
|
poolConnected = false;
|
|
connectionAttempts++;
|
|
logger.error("💥 PostgreSQL pool error", {
|
|
error: err.message,
|
|
code: err.code,
|
|
attempts: connectionAttempts,
|
|
pool: {
|
|
total: pool.totalCount,
|
|
idle: pool.idleCount,
|
|
waiting: pool.waitingCount,
|
|
},
|
|
});
|
|
|
|
// SAFEGUARD: Critical failure detection
|
|
if (connectionAttempts >= MAX_CONNECTION_ATTEMPTS) {
|
|
logger.error(
|
|
"🚨 Database connection critically unstable - manual intervention required"
|
|
);
|
|
}
|
|
});
|
|
|
|
pool.on("acquire", (client) => {
|
|
logger.debug("Pool client acquired", {
|
|
total: pool.totalCount,
|
|
idle: pool.idleCount,
|
|
});
|
|
});
|
|
|
|
pool.on("release", (err, client) => {
|
|
if (err) {
|
|
logger.warn("Client released with error", { error: err.message });
|
|
}
|
|
});
|
|
|
|
// Query cache for SELECT statements with crypto-based keys
|
|
const queryCache = new Map();
|
|
const queryCacheOrder = []; // LRU tracking
|
|
const QUERY_CACHE_TTL = 15000; // 15 seconds (increased)
|
|
const QUERY_CACHE_MAX_SIZE = 500; // 500 cached queries (increased)
|
|
const SLOW_QUERY_THRESHOLD = 50; // 50ms threshold (stricter)
|
|
const QUERY_TIMEOUT = 35000; // SAFEGUARD: 35s query timeout (slightly higher than pool's 30s)
|
|
|
|
// Generate fast cache key using crypto hash
|
|
const getCacheKey = (text, params) => {
|
|
const hash = crypto.createHash("md5");
|
|
hash.update(text);
|
|
if (params) hash.update(JSON.stringify(params));
|
|
return hash.digest("hex");
|
|
};
|
|
|
|
const query = async (text, params) => {
|
|
const start = Date.now();
|
|
const isSelect = text.trim().toUpperCase().startsWith("SELECT");
|
|
|
|
// Check cache for SELECT queries
|
|
if (isSelect) {
|
|
const cacheKey = getCacheKey(text, params);
|
|
const cached = queryCache.get(cacheKey);
|
|
|
|
if (cached && Date.now() - cached.timestamp < QUERY_CACHE_TTL) {
|
|
logger.debug("Query cache hit", { duration: Date.now() - start });
|
|
return cached.data;
|
|
}
|
|
}
|
|
|
|
try {
|
|
// SAFEGUARD: Add query timeout wrapper
|
|
const queryPromise = pool.query(text, params);
|
|
const timeoutPromise = new Promise((_, reject) => {
|
|
setTimeout(() => {
|
|
reject(
|
|
new Error(
|
|
`Query timeout after ${QUERY_TIMEOUT}ms: ${text.substring(
|
|
0,
|
|
50
|
|
)}...`
|
|
)
|
|
);
|
|
}, QUERY_TIMEOUT);
|
|
});
|
|
|
|
const res = await Promise.race([queryPromise, timeoutPromise]);
|
|
const duration = Date.now() - start;
|
|
|
|
// Cache SELECT queries with LRU eviction
|
|
if (isSelect) {
|
|
const cacheKey = getCacheKey(text, params);
|
|
|
|
// LRU eviction
|
|
if (queryCache.size >= QUERY_CACHE_MAX_SIZE) {
|
|
const oldestKey = queryCacheOrder.shift();
|
|
if (oldestKey) queryCache.delete(oldestKey);
|
|
}
|
|
|
|
queryCache.set(cacheKey, { data: res, timestamp: Date.now() });
|
|
queryCacheOrder.push(cacheKey);
|
|
}
|
|
|
|
// Log slow queries
|
|
if (duration > SLOW_QUERY_THRESHOLD) {
|
|
logger.warn("Slow query", {
|
|
duration,
|
|
text: text.substring(0, 100),
|
|
rows: res.rowCount,
|
|
params: params?.length || 0,
|
|
});
|
|
}
|
|
|
|
return res;
|
|
} catch (error) {
|
|
const duration = Date.now() - start;
|
|
logger.error("Query error", {
|
|
error: error.message,
|
|
code: error.code,
|
|
duration,
|
|
text: text.substring(0, 100),
|
|
});
|
|
|
|
// SAFEGUARD: Clear potentially corrupted cache entry
|
|
if (isSelect) {
|
|
const cacheKey = getCacheKey(text, params);
|
|
queryCache.delete(cacheKey);
|
|
const index = queryCacheOrder.indexOf(cacheKey);
|
|
if (index > -1) {
|
|
queryCacheOrder.splice(index, 1);
|
|
}
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
};
|
|
|
|
// Transaction helper
|
|
const transaction = async (callback) => {
|
|
const client = await pool.connect();
|
|
try {
|
|
await client.query("BEGIN");
|
|
const result = await callback(client);
|
|
await client.query("COMMIT");
|
|
return result;
|
|
} catch (error) {
|
|
await client.query("ROLLBACK");
|
|
logger.error("Transaction rolled back:", error);
|
|
throw error;
|
|
} finally {
|
|
client.release();
|
|
}
|
|
};
|
|
|
|
// Batch query execution for parallel operations
|
|
const batchQuery = async (queries) => {
|
|
try {
|
|
const results = await Promise.all(
|
|
queries.map(({ text, params }) => query(text, params))
|
|
);
|
|
return results;
|
|
} catch (error) {
|
|
logger.error("Batch query error:", error);
|
|
throw error;
|
|
}
|
|
};
|
|
|
|
// Clear query cache (useful for cache invalidation)
|
|
const clearQueryCache = (pattern) => {
|
|
if (pattern) {
|
|
// Clear specific pattern
|
|
for (const key of queryCache.keys()) {
|
|
if (key.includes(pattern)) {
|
|
queryCache.delete(key);
|
|
}
|
|
}
|
|
} else {
|
|
// Clear all
|
|
queryCache.clear();
|
|
queryCacheOrder.length = 0;
|
|
}
|
|
logger.info("Query cache cleared", { pattern: pattern || "all" });
|
|
};
|
|
|
|
// Health check with pool metrics
|
|
const healthCheck = async (timeoutMs = 5000) => {
|
|
// SAFEGUARD: Wrap health check in timeout promise
|
|
const healthPromise = (async () => {
|
|
try {
|
|
const result = await query(
|
|
"SELECT NOW() as time, current_database() as database"
|
|
);
|
|
return {
|
|
healthy: true,
|
|
database: result.rows[0].database,
|
|
timestamp: result.rows[0].time,
|
|
pool: {
|
|
total: pool.totalCount,
|
|
idle: pool.idleCount,
|
|
waiting: pool.waitingCount,
|
|
connected: poolConnected,
|
|
},
|
|
cache: {
|
|
size: queryCache.size,
|
|
maxSize: QUERY_CACHE_MAX_SIZE,
|
|
},
|
|
};
|
|
} catch (error) {
|
|
logger.error("Database health check failed:", error);
|
|
return {
|
|
healthy: false,
|
|
error: error.message,
|
|
pool: {
|
|
total: pool.totalCount,
|
|
idle: pool.idleCount,
|
|
waiting: pool.waitingCount,
|
|
connected: poolConnected,
|
|
},
|
|
};
|
|
}
|
|
})();
|
|
|
|
// SAFEGUARD: Add timeout protection
|
|
const timeoutPromise = new Promise((_, reject) => {
|
|
setTimeout(
|
|
() => reject(new Error(`Health check timeout after ${timeoutMs}ms`)),
|
|
timeoutMs
|
|
);
|
|
});
|
|
|
|
return Promise.race([healthPromise, timeoutPromise]);
|
|
};
|
|
|
|
// SAFEGUARD: Graceful pool shutdown for scripts/testing
|
|
const closePool = async () => {
|
|
try {
|
|
await pool.end();
|
|
logger.info("Database pool closed gracefully");
|
|
return true;
|
|
} catch (error) {
|
|
logger.error("Error closing database pool:", error);
|
|
return false;
|
|
}
|
|
};
|
|
|
|
// SAFEGUARD: Get pool status for monitoring
|
|
const getPoolStatus = () => ({
|
|
total: pool.totalCount,
|
|
idle: pool.idleCount,
|
|
waiting: pool.waitingCount,
|
|
connected: poolConnected,
|
|
cacheSize: queryCache.size,
|
|
});
|
|
|
|
module.exports = {
|
|
pool,
|
|
query,
|
|
transaction,
|
|
batchQuery,
|
|
clearQueryCache,
|
|
healthCheck,
|
|
closePool,
|
|
getPoolStatus,
|
|
};
|