From bde6dc95f2be322da15733770bd42d4807613db5 Mon Sep 17 00:00:00 2001 From: Chris Hubbard Date: Fri, 12 Jun 2026 16:45:20 -0400 Subject: [PATCH 1/2] Copilot: Fix memory leaks causing OOM crashes in production Fixed critical memory leaks that caused Node.js to run out of heap memory and crash with "JavaScript heap out of memory" errors in production. Root causes: - Uncleaned setInterval timers in Connection and ConnectionChecker classes accumulated on every module re-evaluation - Redis/Valkey connections never properly closed via quit() - Prisma connection never disconnected - BullMQ workers and queues only cleaned up on graceful shutdown, which doesn't fire during OOM crashes - No handlers for uncaught exceptions or unhandled rejections Changes: - Added heartbeatInterval tracking and cleanup in Connection class (queues.ts) - Added heartbeatInterval tracking and cleanup in ConnectionChecker (prisma.ts) - Implemented closeAllConnections() to properly close all Redis connections - Implemented closeAllQueues() to close all BullMQ queues - Implemented closeDatabaseConnection() to disconnect Prisma and stop heartbeat - Enhanced shutdown handler in hooks.server.ts to close all resources - Added SIGTERM, SIGINT signal handlers for graceful shutdown - Added uncaughtException and unhandledRejection handlers to cleanup before exit - Exported cleanup functions from bullmq/index.ts All intervals now properly call clearInterval() and connections call quit() or $disconnect() during shutdown, preventing resource accumulation. --- src/hooks.server.ts | 54 +++++++++++++++++++++++++++++---- src/lib/server/bullmq/index.ts | 9 +++++- src/lib/server/bullmq/queues.ts | 46 ++++++++++++++++++++++++++-- src/lib/server/prisma.ts | 20 +++++++++++- 4 files changed, 119 insertions(+), 10 deletions(-) diff --git a/src/hooks.server.ts b/src/hooks.server.ts index 8114cd8f..a014ce21 100644 --- a/src/hooks.server.ts +++ b/src/hooks.server.ts @@ -4,10 +4,10 @@ import { sequence } from '@sveltejs/kit/hooks'; import { building } from '$app/environment'; import OTEL from '$lib/otel'; import { tryVerifyAPIToken, tryVerifyCookie } from '$lib/server/auth'; -import { QueueConnected, getQueues } from '$lib/server/bullmq'; +import { QueueConnected, getQueues, closeAllConnections } from '$lib/server/bullmq'; import { bullboardHandle } from '$lib/server/bullmq/BullBoard'; import { allWorkers } from '$lib/server/bullmq/BullMQ'; -import { DatabaseConnected } from '$lib/server/prisma'; +import { DatabaseConnected, closeDatabaseConnection } from '$lib/server/prisma'; const handleAPIRoute: Handle = async ({ event, resolve }) => { if (event.route.id === '/(api)/health') return resolve(event); @@ -35,10 +35,52 @@ if (!building) { // Likewise, initialize the Prisma connection heartbeat DatabaseConnected(); - // Graceful shutdown - process.on('sveltekit:shutdown', async () => { - OTEL.instance.logger.info('Shutting down gracefully...'); - await Promise.all(allWorkers.map((worker) => worker.worker?.close())); + // Graceful shutdown handler + const shutdown = async (signal: string) => { + OTEL.instance.logger.info(`Received ${signal}, shutting down gracefully...`); + try { + // Close all workers first + await Promise.all(allWorkers.map((worker) => worker.worker?.close())); + OTEL.instance.logger.info('All workers closed'); + + // Close all queue and Redis connections + await closeAllConnections(); + OTEL.instance.logger.info('All connections closed'); + + // Close database connection + await closeDatabaseConnection(); + OTEL.instance.logger.info('Database connection closed'); + + process.exit(0); + } catch (error) { + OTEL.instance.logger.error('Error during shutdown', { + error: error instanceof Error ? error.message : String(error) + }); + process.exit(1); + } + }; + + // Register shutdown handlers + process.on('sveltekit:shutdown', () => shutdown('sveltekit:shutdown')); + process.on('SIGTERM', () => shutdown('SIGTERM')); + process.on('SIGINT', () => shutdown('SIGINT')); + + // Handle uncaught errors + process.on('uncaughtException', async (error) => { + OTEL.instance.logger.error('Uncaught exception', { + error: error.message, + stack: error.stack + }); + console.error('Uncaught exception:', error); + await shutdown('uncaughtException'); + }); + + process.on('unhandledRejection', async (reason) => { + OTEL.instance.logger.error('Unhandled rejection', { + reason: reason instanceof Error ? reason.message : String(reason) + }); + console.error('Unhandled rejection:', reason); + await shutdown('unhandledRejection'); }); } diff --git a/src/lib/server/bullmq/index.ts b/src/lib/server/bullmq/index.ts index befc7f77..b6a6b935 100644 --- a/src/lib/server/bullmq/index.ts +++ b/src/lib/server/bullmq/index.ts @@ -1,2 +1,9 @@ -export { QueueConnected, getQueueConfig, getQueues, getWorkerConfig } from './queues'; +export { + QueueConnected, + getQueueConfig, + getQueues, + getWorkerConfig, + closeAllConnections, + closeAllQueues +} from './queues'; export * as BullMQ from './types'; diff --git a/src/lib/server/bullmq/queues.ts b/src/lib/server/bullmq/queues.ts index a1c675a0..9b8958cd 100644 --- a/src/lib/server/bullmq/queues.ts +++ b/src/lib/server/bullmq/queues.ts @@ -9,6 +9,7 @@ import OTEL from '$lib/otel'; class Connection { private conn: Redis; private connected: boolean; + private heartbeatInterval: NodeJS.Timeout | null = null; constructor(isQueueConnection = false, keyPrefix?: string) { this.conn = new Redis({ host: process.env.NODE_ENV === 'development' ? 'localhost' : process.env.VALKEY_HOST, @@ -41,7 +42,7 @@ class Connection { console.error('Valkey connection error', err); } }); - setInterval(() => { + this.heartbeatInterval = setInterval(() => { if (this.connected) { this.conn .ping() @@ -60,7 +61,9 @@ class Connection { } }); } - }, 10000).unref(); // Check every 10 seconds + }, 10000); + // Ensure the interval doesn't prevent Node from exiting + this.heartbeatInterval.unref(); } public IsConnected() { return this.connected; @@ -69,6 +72,15 @@ class Connection { public connection() { return this.conn; } + + public async close() { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + this.connected = false; + await this.conn.quit(); + } } let _workerConnection: Connection | undefined = undefined; @@ -144,3 +156,33 @@ export function getQueues() { } return _queues; } + +export async function closeAllQueues() { + if (_queues) { + await Promise.all([ + _queues.Builds.close(), + _queues.S3.close(), + _queues.Releases.close(), + _queues.Polling.close(), + _queues.SystemStartup.close(), + _queues.SystemRecurring.close() + ]); + _queues = undefined; + } +} + +export async function closeAllConnections() { + await closeAllQueues(); + if (_workerConnection) { + await _workerConnection.close(); + _workerConnection = undefined; + } + if (_queueConnection) { + await _queueConnection.close(); + _queueConnection = undefined; + } + if (_authConnection) { + await _authConnection.close(); + _authConnection = undefined; + } +} diff --git a/src/lib/server/prisma.ts b/src/lib/server/prisma.ts index 00480064..1af6be1e 100644 --- a/src/lib/server/prisma.ts +++ b/src/lib/server/prisma.ts @@ -5,10 +5,13 @@ export const prisma = new PrismaClient(); class ConnectionChecker { private connected: boolean; + private heartbeatInterval: NodeJS.Timeout | null = null; constructor() { this.connected = false; this.checkConnection(); - setInterval(async () => this.checkConnection(), 10000).unref(); // Check every 10 seconds + this.heartbeatInterval = setInterval(async () => this.checkConnection(), 10000); + // Ensure the interval doesn't prevent Node from exiting + this.heartbeatInterval.unref(); } private async checkConnection() { try { @@ -40,6 +43,13 @@ class ConnectionChecker { public IsConnected() { return this.connected; } + public close() { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + this.connected = false; + } } let conn: ConnectionChecker | null = null; @@ -53,3 +63,11 @@ export const DatabaseConnected = () => { } return conn.IsConnected(); }; + +export async function closeDatabaseConnection() { + if (conn) { + conn.close(); + conn = null; + } + await prisma.$disconnect(); +} From 436df323126a1c6b556eeb1ff80c651fd292ed33 Mon Sep 17 00:00:00 2001 From: Chris Hubbard Date: Fri, 12 Jun 2026 16:52:58 -0400 Subject: [PATCH 2/2] format for lint issues --- src/hooks.server.ts | 12 ++++++------ src/lib/server/bullmq/index.ts | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/hooks.server.ts b/src/hooks.server.ts index a014ce21..32b985e2 100644 --- a/src/hooks.server.ts +++ b/src/hooks.server.ts @@ -4,7 +4,7 @@ import { sequence } from '@sveltejs/kit/hooks'; import { building } from '$app/environment'; import OTEL from '$lib/otel'; import { tryVerifyAPIToken, tryVerifyCookie } from '$lib/server/auth'; -import { QueueConnected, getQueues, closeAllConnections } from '$lib/server/bullmq'; +import { QueueConnected, closeAllConnections, getQueues } from '$lib/server/bullmq'; import { bullboardHandle } from '$lib/server/bullmq/BullBoard'; import { allWorkers } from '$lib/server/bullmq/BullMQ'; import { DatabaseConnected, closeDatabaseConnection } from '$lib/server/prisma'; @@ -42,15 +42,15 @@ if (!building) { // Close all workers first await Promise.all(allWorkers.map((worker) => worker.worker?.close())); OTEL.instance.logger.info('All workers closed'); - + // Close all queue and Redis connections await closeAllConnections(); OTEL.instance.logger.info('All connections closed'); - + // Close database connection await closeDatabaseConnection(); OTEL.instance.logger.info('Database connection closed'); - + process.exit(0); } catch (error) { OTEL.instance.logger.error('Error during shutdown', { @@ -64,7 +64,7 @@ if (!building) { process.on('sveltekit:shutdown', () => shutdown('sveltekit:shutdown')); process.on('SIGTERM', () => shutdown('SIGTERM')); process.on('SIGINT', () => shutdown('SIGINT')); - + // Handle uncaught errors process.on('uncaughtException', async (error) => { OTEL.instance.logger.error('Uncaught exception', { @@ -74,7 +74,7 @@ if (!building) { console.error('Uncaught exception:', error); await shutdown('uncaughtException'); }); - + process.on('unhandledRejection', async (reason) => { OTEL.instance.logger.error('Unhandled rejection', { reason: reason instanceof Error ? reason.message : String(reason) diff --git a/src/lib/server/bullmq/index.ts b/src/lib/server/bullmq/index.ts index b6a6b935..7e5fc71b 100644 --- a/src/lib/server/bullmq/index.ts +++ b/src/lib/server/bullmq/index.ts @@ -1,9 +1,9 @@ export { QueueConnected, + closeAllConnections, + closeAllQueues, getQueueConfig, getQueues, - getWorkerConfig, - closeAllConnections, - closeAllQueues + getWorkerConfig } from './queues'; export * as BullMQ from './types';