diff --git a/src/hooks.server.ts b/src/hooks.server.ts index 8114cd8..32b985e 100644 --- a/src/hooks.server.ts +++ b/src/hooks.server.ts @@ -4,10 +4,10 @@ import { sequence } from '@sveltejs/kit/hooks'; import { building } from '$app/environment'; import OTEL from '$lib/otel'; import { tryVerifyAPIToken, tryVerifyCookie } from '$lib/server/auth'; -import { QueueConnected, getQueues } from '$lib/server/bullmq'; +import { QueueConnected, closeAllConnections, getQueues } from '$lib/server/bullmq'; import { bullboardHandle } from '$lib/server/bullmq/BullBoard'; import { allWorkers } from '$lib/server/bullmq/BullMQ'; -import { DatabaseConnected } from '$lib/server/prisma'; +import { DatabaseConnected, closeDatabaseConnection } from '$lib/server/prisma'; const handleAPIRoute: Handle = async ({ event, resolve }) => { if (event.route.id === '/(api)/health') return resolve(event); @@ -35,10 +35,52 @@ if (!building) { // Likewise, initialize the Prisma connection heartbeat DatabaseConnected(); - // Graceful shutdown - process.on('sveltekit:shutdown', async () => { - OTEL.instance.logger.info('Shutting down gracefully...'); - await Promise.all(allWorkers.map((worker) => worker.worker?.close())); + // Graceful shutdown handler + const shutdown = async (signal: string) => { + OTEL.instance.logger.info(`Received ${signal}, shutting down gracefully...`); + try { + // Close all workers first + await Promise.all(allWorkers.map((worker) => worker.worker?.close())); + OTEL.instance.logger.info('All workers closed'); + + // Close all queue and Redis connections + await closeAllConnections(); + OTEL.instance.logger.info('All connections closed'); + + // Close database connection + await closeDatabaseConnection(); + OTEL.instance.logger.info('Database connection closed'); + + process.exit(0); + } catch (error) { + OTEL.instance.logger.error('Error during shutdown', { + error: error instanceof Error ? error.message : String(error) + }); + process.exit(1); + } + }; + + // Register shutdown handlers + process.on('sveltekit:shutdown', () => shutdown('sveltekit:shutdown')); + process.on('SIGTERM', () => shutdown('SIGTERM')); + process.on('SIGINT', () => shutdown('SIGINT')); + + // Handle uncaught errors + process.on('uncaughtException', async (error) => { + OTEL.instance.logger.error('Uncaught exception', { + error: error.message, + stack: error.stack + }); + console.error('Uncaught exception:', error); + await shutdown('uncaughtException'); + }); + + process.on('unhandledRejection', async (reason) => { + OTEL.instance.logger.error('Unhandled rejection', { + reason: reason instanceof Error ? reason.message : String(reason) + }); + console.error('Unhandled rejection:', reason); + await shutdown('unhandledRejection'); }); } diff --git a/src/lib/server/bullmq/index.ts b/src/lib/server/bullmq/index.ts index befc7f7..7e5fc71 100644 --- a/src/lib/server/bullmq/index.ts +++ b/src/lib/server/bullmq/index.ts @@ -1,2 +1,9 @@ -export { QueueConnected, getQueueConfig, getQueues, getWorkerConfig } from './queues'; +export { + QueueConnected, + closeAllConnections, + closeAllQueues, + getQueueConfig, + getQueues, + getWorkerConfig +} from './queues'; export * as BullMQ from './types'; diff --git a/src/lib/server/bullmq/queues.ts b/src/lib/server/bullmq/queues.ts index a1c675a..9b8958c 100644 --- a/src/lib/server/bullmq/queues.ts +++ b/src/lib/server/bullmq/queues.ts @@ -9,6 +9,7 @@ import OTEL from '$lib/otel'; class Connection { private conn: Redis; private connected: boolean; + private heartbeatInterval: NodeJS.Timeout | null = null; constructor(isQueueConnection = false, keyPrefix?: string) { this.conn = new Redis({ host: process.env.NODE_ENV === 'development' ? 'localhost' : process.env.VALKEY_HOST, @@ -41,7 +42,7 @@ class Connection { console.error('Valkey connection error', err); } }); - setInterval(() => { + this.heartbeatInterval = setInterval(() => { if (this.connected) { this.conn .ping() @@ -60,7 +61,9 @@ class Connection { } }); } - }, 10000).unref(); // Check every 10 seconds + }, 10000); + // Ensure the interval doesn't prevent Node from exiting + this.heartbeatInterval.unref(); } public IsConnected() { return this.connected; @@ -69,6 +72,15 @@ class Connection { public connection() { return this.conn; } + + public async close() { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + this.connected = false; + await this.conn.quit(); + } } let _workerConnection: Connection | undefined = undefined; @@ -144,3 +156,33 @@ export function getQueues() { } return _queues; } + +export async function closeAllQueues() { + if (_queues) { + await Promise.all([ + _queues.Builds.close(), + _queues.S3.close(), + _queues.Releases.close(), + _queues.Polling.close(), + _queues.SystemStartup.close(), + _queues.SystemRecurring.close() + ]); + _queues = undefined; + } +} + +export async function closeAllConnections() { + await closeAllQueues(); + if (_workerConnection) { + await _workerConnection.close(); + _workerConnection = undefined; + } + if (_queueConnection) { + await _queueConnection.close(); + _queueConnection = undefined; + } + if (_authConnection) { + await _authConnection.close(); + _authConnection = undefined; + } +} diff --git a/src/lib/server/prisma.ts b/src/lib/server/prisma.ts index 0048006..1af6be1 100644 --- a/src/lib/server/prisma.ts +++ b/src/lib/server/prisma.ts @@ -5,10 +5,13 @@ export const prisma = new PrismaClient(); class ConnectionChecker { private connected: boolean; + private heartbeatInterval: NodeJS.Timeout | null = null; constructor() { this.connected = false; this.checkConnection(); - setInterval(async () => this.checkConnection(), 10000).unref(); // Check every 10 seconds + this.heartbeatInterval = setInterval(async () => this.checkConnection(), 10000); + // Ensure the interval doesn't prevent Node from exiting + this.heartbeatInterval.unref(); } private async checkConnection() { try { @@ -40,6 +43,13 @@ class ConnectionChecker { public IsConnected() { return this.connected; } + public close() { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + this.connected = false; + } } let conn: ConnectionChecker | null = null; @@ -53,3 +63,11 @@ export const DatabaseConnected = () => { } return conn.IsConnected(); }; + +export async function closeDatabaseConnection() { + if (conn) { + conn.close(); + conn = null; + } + await prisma.$disconnect(); +}