Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions src/hooks.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import { sequence } from '@sveltejs/kit/hooks';
import { building } from '$app/environment';
import OTEL from '$lib/otel';
import { tryVerifyAPIToken, tryVerifyCookie } from '$lib/server/auth';
import { QueueConnected, getQueues } from '$lib/server/bullmq';
import { QueueConnected, closeAllConnections, getQueues } from '$lib/server/bullmq';
import { bullboardHandle } from '$lib/server/bullmq/BullBoard';
import { allWorkers } from '$lib/server/bullmq/BullMQ';
import { DatabaseConnected } from '$lib/server/prisma';
import { DatabaseConnected, closeDatabaseConnection } from '$lib/server/prisma';

const handleAPIRoute: Handle = async ({ event, resolve }) => {
if (event.route.id === '/(api)/health') return resolve(event);
Expand Down Expand Up @@ -35,10 +35,52 @@ if (!building) {
// Likewise, initialize the Prisma connection heartbeat
DatabaseConnected();

// Graceful shutdown
process.on('sveltekit:shutdown', async () => {
OTEL.instance.logger.info('Shutting down gracefully...');
await Promise.all(allWorkers.map((worker) => worker.worker?.close()));
// Graceful shutdown handler
const shutdown = async (signal: string) => {
OTEL.instance.logger.info(`Received ${signal}, shutting down gracefully...`);
try {
// Close all workers first
await Promise.all(allWorkers.map((worker) => worker.worker?.close()));
OTEL.instance.logger.info('All workers closed');

// Close all queue and Redis connections
await closeAllConnections();
OTEL.instance.logger.info('All connections closed');

// Close database connection
await closeDatabaseConnection();
OTEL.instance.logger.info('Database connection closed');

process.exit(0);
} catch (error) {
OTEL.instance.logger.error('Error during shutdown', {
error: error instanceof Error ? error.message : String(error)
});
process.exit(1);
}
Comment on lines +39 to +60

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

ls -la
fd -t f -a "hooks.server.ts" . || true
fd -t f "hooks.server.ts" . -x echo "found {}" || true

# Print the relevant section with line numbers
echo "---- src/hooks.server.ts (around shutdown) ----"
sed -n '1,140p' src/hooks.server.ts | cat -n

# Find shutdown definition/usages
echo "---- rg shutdown ----"
rg -n "shutdown\s*\(" src/hooks.server.ts
rg -n "sveltekit:shutdown|SIGTERM|SIGINT|uncaughtException|unhandledRejection|process\.exit" src/hooks.server.ts

# Inspect helper functions if they are imported from elsewhere
echo "---- Imports in src/hooks.server.ts ----"
sed -n '1,80p' src/hooks.server.ts | cat -n

# Locate definitions of closeAllConnections / closeDatabaseConnection
echo "---- locating closeAllConnections ----"
rg -n "closeAllConnections" -S .

echo "---- locating closeDatabaseConnection ----"
rg -n "closeDatabaseConnection" -S .

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 12970


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "---- closeAllConnections implementation ----"
sed -n '130,230p' src/lib/server/bullmq/queues.ts | cat -n

echo "---- closeDatabaseConnection implementation ----"
sed -n '1,140p' src/lib/server/prisma.ts | cat -n

echo "---- allWorkers definition / worker.close usage ----"
rg -n "export const allWorkers|allWorkers\s*=" src/lib/server/bullmq -S
rg -n "worker\?\.close|close\(\)" src/lib/server/bullmq -S

# show the file that defines allWorkers
ls -la src/lib/server/bullmq
fd -t f "BullMQ.*|BullMQ.ts|BullMQ" src/lib/server/bullmq -a || true
fd -t f "BullMQ" src/lib/server/bullmq -a

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 6471


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Print candidate BullMQ file(s) content around where allWorkers is declared
for f in $(fd -t f "BullMQ" src/lib/server/bullmq -a); do
  echo "---- file: $f ----"
  wc -l "$f"
  sed -n '1,220p' "$f" | cat -n
done

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 615


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Additionally check if shutdown is guarded elsewhere (shouldn't be, but search)
rg -n "const shutdown\s*=|function shutdown|shutdown\(" src/hooks.server.ts src -S

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 967


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "---- Bull worker class close() (src/lib/server/bullmq/queues.ts around public async close) ----"
sed -n '40,120p' src/lib/server/bullmq/queues.ts | cat -n

echo "---- BullWorker close in src/lib/server/bullmq/BullWorker.ts (if present) ----"
rg -n "async close|close\(" src/lib/server/bullmq/BullWorker.ts
sed -n '1,220p' src/lib/server/bullmq/BullWorker.ts | cat -n

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 9636


Don’t abort shutdown on the first cleanup error.
src/hooks.server.ts uses await Promise.all(...) for worker closing; if any worker close rejects, the catch calls process.exit(1) and skips closeAllConnections() + closeDatabaseConnection().

Suggested patch
   const shutdown = async (signal: string) => {
     OTEL.instance.logger.info(`Received ${signal}, shutting down gracefully...`);
-    try {
-      // Close all workers first
-      await Promise.all(allWorkers.map((worker) => worker.worker?.close()));
-      OTEL.instance.logger.info('All workers closed');
-      
-      // Close all queue and Redis connections
-      await closeAllConnections();
-      OTEL.instance.logger.info('All connections closed');
-      
-      // Close database connection
-      await closeDatabaseConnection();
-      OTEL.instance.logger.info('Database connection closed');
-      
-      process.exit(0);
-    } catch (error) {
-      OTEL.instance.logger.error('Error during shutdown', {
-        error: error instanceof Error ? error.message : String(error)
-      });
-      process.exit(1);
-    }
+    const failures: unknown[] = [];
+
+    const workerResults = await Promise.allSettled(allWorkers.map((worker) => worker.worker?.close()));
+    for (const result of workerResults) {
+      if (result.status === 'rejected') failures.push(result.reason);
+    }
+
+    try {
+      await closeAllConnections();
+    } catch (error) {
+      failures.push(error);
+    }
+
+    try {
+      await closeDatabaseConnection();
+    } catch (error) {
+      failures.push(error);
+    }
+
+    if (failures.length > 0) {
+      OTEL.instance.logger.error('Error during shutdown', { failureCount: failures.length });
+      process.exit(1);
+    }
+    process.exit(0);
   };

Make shutdown idempotent to avoid overlapping closes.
shutdown() is wired to multiple independent process.on(...) events with no isShuttingDown/promise guard, so repeated signals/errors can trigger concurrent runs and double-close behavior.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/hooks.server.ts` around lines 39 - 60, The shutdown function currently
stops on the first rejection from Promise.all and can run concurrently on
repeated signals; change shutdown (the async shutdown function) to be idempotent
by adding an isShuttingDown flag or a shutdownPromise guard so repeated
invocations await the same promise and return immediately, and replace the
single await Promise.all(allWorkers.map(...)) with per-step guarded execution
that attempts each cleanup step independently (iterate allWorkers and close each
worker catching/logging each error, then call closeAllConnections() and
closeDatabaseConnection() each in their own try/catch) aggregating/logging all
errors to OTEL.instance.logger.error but still proceeding to run all cleanup
steps before finally exiting with 0 if no errors or 1 if any errors occurred.

};
Comment on lines +38 to +61

// Register shutdown handlers
process.on('sveltekit:shutdown', () => shutdown('sveltekit:shutdown'));
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));

// Handle uncaught errors
process.on('uncaughtException', async (error) => {
OTEL.instance.logger.error('Uncaught exception', {
error: error.message,
stack: error.stack
});
console.error('Uncaught exception:', error);
await shutdown('uncaughtException');
});

process.on('unhandledRejection', async (reason) => {
OTEL.instance.logger.error('Unhandled rejection', {
reason: reason instanceof Error ? reason.message : String(reason)
});
console.error('Unhandled rejection:', reason);
await shutdown('unhandledRejection');
});
Comment on lines +63 to 84

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate and print the relevant section of src/hooks.server.ts (with line numbers)
FILE="src/hooks.server.ts"
if [ ! -f "$FILE" ]; then
  echo "Missing: $FILE"
  exit 1
fi

echo "---- $FILE (lines 1-140) ----"
nl -ba "$FILE" | sed -n '1,140p'

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 185


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Find any existing shutdown definition/usages to see whether it is already guarded/idempotent.
rg -n --hidden --no-ignore-vcs "shutdown\s*[:=]|function\s+shutdown|const\s+shutdown|shutdown\(" src/hooks.server.ts src -S || true

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 967


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Check for other shutdown/idempotency patterns and where process.on/once handlers are registered in the repo.
rg -n --hidden --no-ignore-vcs "process\.on\(|process\.once\(|uncaughtException|unhandledRejection|sveltekit:shutdown|SIGTERM|SIGINT" src -S || true

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 607


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="src/hooks.server.ts"
# Print shutdown implementation area with line numbers (using awk since nl isn't available)
awk 'NR>=25 && NR<=110 {printf "%d:%s\n", NR, $0}' "$FILE" | sed -n '1,120p'

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 3091


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="src/hooks.server.ts"
# Also quickly show where shutdown is defined/called to confirm there isn't a different wrapper elsewhere in the file
rg -n --hidden --no-ignore-vcs "shutdown\s*[:=]|const\s+shutdown|process\.on\('sveltekit:shutdown'|process\.on\('SIGTERM'|process\.on\('SIGINT'|uncaughtException|unhandledRejection" "$FILE"

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 517


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Find the implementations/usages of closeAllConnections and closeDatabaseConnection
rg -n --hidden --no-ignore-vcs "function\s+closeAllConnections|const\s+closeAllConnections|closeAllConnections\s*=" src -S
rg -n --hidden --no-ignore-vcs "function\s+closeDatabaseConnection|const\s+closeDatabaseConnection|closeDatabaseConnection\s*=" src -S
rg -n --hidden --no-ignore-vcs "allWorkers" src/hooks.server.ts src -S

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 664


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="src/lib/server/bullmq/queues.ts"
# Print around closeAllConnections
awk 'NR>=130 && NR<=240 {printf "%d:%s\n", NR, $0}' "$FILE"

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 2095


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="src/lib/server/prisma.ts"
# Print around closeDatabaseConnection
awk 'NR>=1 && NR<=140 {printf "%d:%s\n", NR, $0}' "$FILE"

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 2410


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="src/lib/server/bullmq/BullMQ.ts"
# Print around allWorkers definition
awk 'NR>=1 && NR<=120 {printf "%d:%s\n", NR, $0}' "$FILE"

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 433


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Inspect the top portion of queues.ts for shared state (_queues, connections)
FILE="src/lib/server/bullmq/queues.ts"
awk 'NR>=1 && NR<=140 {printf "%d:%s\n", NR, $0}' "$FILE"

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 5067


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Find BullWorker implementations and inspect close behavior
rg -n --hidden --no-ignore-vcs "class\s+|export class|close\(" src/lib/server/bullmq/BullWorker* src/lib/server/bullmq -S

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 2560


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Show BullWorker files that exist
fd -t f "BullWorker*" src/lib/server/bullmq

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 115


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="src/lib/server/bullmq/BullWorker.ts"
awk 'NR>=1 && NR<=240 {printf "%d:%s\n", NR, $0}' "$FILE"

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 5872


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Look for any guards like "if (this.worker)" or "closed" in BullWorker
rg -n "close\(\)|closed|closing|if\s*\(this\.|worker\?" src/lib/server/bullmq/BullWorker.ts

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 252


Make shutdown() idempotent (and avoid multi-signal double invocation).

src/hooks.server.ts registers multiple process.on(...) handlers (SIGTERM/SIGINT/sveltekit shutdown + uncaughtException/unhandledRejection) that can call shutdown() concurrently; the current shutdown implementation has no guard, so it can attempt to close workers/queues/connections and call prisma.$disconnect() multiple times during a race.

Suggested patch
+  let shutdownPromise: Promise<void> | null = null;
+
-  const shutdown = async (signal: string) => {
+  const shutdown = (signal: string) => {
+    if (shutdownPromise) return shutdownPromise;
+    shutdownPromise = (async () => {
       OTEL.instance.logger.info(`Received ${signal}, shutting down gracefully...`);
-      ...
-  };
+      ...
+    })();
+    return shutdownPromise;
+  };
 
-  process.on('sveltekit:shutdown', () => shutdown('sveltekit:shutdown'));
-  process.on('SIGTERM', () => shutdown('SIGTERM'));
-  process.on('SIGINT', () => shutdown('SIGINT'));
+  process.once('sveltekit:shutdown', () => void shutdown('sveltekit:shutdown'));
+  process.once('SIGTERM', () => void shutdown('SIGTERM'));
+  process.once('SIGINT', () => void shutdown('SIGINT'));
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/hooks.server.ts` around lines 63 - 84, Multiple process handlers may call
shutdown() concurrently causing duplicate resource teardown
(workers/queues/connections and prisma.$disconnect()); make shutdown idempotent
by adding a module-scoped state guard (e.g., isShuttingDown boolean or a Promise
like shutdownPromise) inside the same file so subsequent calls return early or
await the in-progress shutdown, ensure cleanup steps (closing workers/queues,
connections, and prisma.$disconnect()) run only once, and have all
signal/exception handlers call the single guarded shutdown entry (refer to
shutdown, prisma.$disconnect, and the process.on(...) handlers to locate where
to add the guard).

}

Expand Down
9 changes: 8 additions & 1 deletion src/lib/server/bullmq/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
export { QueueConnected, getQueueConfig, getQueues, getWorkerConfig } from './queues';
export {
QueueConnected,
closeAllConnections,
closeAllQueues,
getQueueConfig,
getQueues,
getWorkerConfig
} from './queues';
export * as BullMQ from './types';
46 changes: 44 additions & 2 deletions src/lib/server/bullmq/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import OTEL from '$lib/otel';
class Connection {
private conn: Redis;
private connected: boolean;
private heartbeatInterval: NodeJS.Timeout | null = null;
constructor(isQueueConnection = false, keyPrefix?: string) {
this.conn = new Redis({
host: process.env.NODE_ENV === 'development' ? 'localhost' : process.env.VALKEY_HOST,
Expand Down Expand Up @@ -41,7 +42,7 @@ class Connection {
console.error('Valkey connection error', err);
}
});
setInterval(() => {
this.heartbeatInterval = setInterval(() => {
if (this.connected) {
this.conn
.ping()
Expand All @@ -60,7 +61,9 @@ class Connection {
}
});
}
}, 10000).unref(); // Check every 10 seconds
}, 10000);
// Ensure the interval doesn't prevent Node from exiting
this.heartbeatInterval.unref();
}
public IsConnected() {
return this.connected;
Expand All @@ -69,6 +72,15 @@ class Connection {
public connection() {
return this.conn;
}

public async close() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
this.connected = false;
await this.conn.quit();
}
Comment on lines +79 to +83
}

let _workerConnection: Connection | undefined = undefined;
Expand Down Expand Up @@ -144,3 +156,33 @@ export function getQueues() {
}
return _queues;
}

export async function closeAllQueues() {
if (_queues) {
await Promise.all([
_queues.Builds.close(),
_queues.S3.close(),
_queues.Releases.close(),
_queues.Polling.close(),
_queues.SystemStartup.close(),
_queues.SystemRecurring.close()
]);
_queues = undefined;
}
}
Comment on lines +160 to +172

export async function closeAllConnections() {
await closeAllQueues();
if (_workerConnection) {
await _workerConnection.close();
_workerConnection = undefined;
}
if (_queueConnection) {
await _queueConnection.close();
_queueConnection = undefined;
}
if (_authConnection) {
await _authConnection.close();
_authConnection = undefined;
}
}
Comment on lines +160 to +188

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

file="src/lib/server/bullmq/queues.ts"

# Basic sanity: file exists and print the relevant region with line numbers
ls -l "$file"
echo "---- closeAllQueues / closeAllConnections region ----"
nl -ba "$file" | sed -n '130,220p'

echo "---- find singleton declarations/initialization ----"
rg -n "let _queues|_queues\s*=|_workerConnection|_queueConnection|_authConnection" "$file" || true

echo "---- find usage of closeAllQueues/closeAllConnections ----"
rg -n "closeAllQueues\(|closeAllConnections\(" -S . || true

echo "---- find where queues/connections are created (likely _queues/worker/queue/auth assignment) ----"
rg -n "_queues\s*=\s*|_workerConnection\s*=\s*|_queueConnection\s*=\s*|_authConnection\s*=\s*" "$file" || true

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 275


🏁 Script executed:

#!/bin/bash
set -euo pipefail

file="src/lib/server/bullmq/queues.ts"

echo "---- file exists ----"
ls -l "$file"

echo "---- locate functions ----"
rg -n "export async function closeAllQueues|export async function closeAllConnections" "$file"

echo "---- print relevant lines (approx 145-205) ----"
cat -n "$file" | sed -n '140,210p'

echo "---- singleton declarations/initialization in this file ----"
rg -n "(_queues\b|_workerConnection\b|_queueConnection\b|_authConnection\b)" "$file"

echo "---- find where closeAll* are called in repo ----"
rg -n "closeAllQueues\(|closeAllConnections\(" -S . || true

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 3802


🏁 Script executed:

#!/bin/bash
set -euo pipefail
file="src/lib/server/bullmq/queues.ts"

echo "---- file header/imports ----"
sed -n '1,140p' "$file"

echo "---- inspect closeAllQueues/closeAllConnections call site ----"
sed -n '1,120p' src/hooks.server.ts

Repository: sillsdev/appbuilder-buildengine-api

Length of output: 8871


Make shutdown teardown best-effort and always clear cached BullMQ/Redis singletons.

src/lib/server/bullmq/queues.ts uses Promise.all in closeAllQueues() (lines ~161-169) and sequential await in closeAllConnections() (lines ~176-186). If any close() rejects, _queues / _workerConnection / _queueConnection / _authConnection won’t be cleared, and later teardowns won’t run. In src/hooks.server.ts, that thrown shutdown error aborts the rest of teardown (including DB close), making shutdown brittle.

Update teardown to continue closing everything even when one close fails (e.g., allSettled + try/finally to always clear the cached singletons, and ensure closeAllConnections() still closes Redis connections even if closeAllQueues() fails).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/lib/server/bullmq/queues.ts` around lines 160 - 188, The shutdown
currently uses Promise.all and sequential awaits so any rejected close() aborts
remaining teardowns and leaves cached singletons set; update closeAllQueues to
use Promise.allSettled (or individually try/catch each _queues.*.close()) and
wrap the close attempts in a try/finally that always sets _queues = undefined,
and update closeAllConnections to call closeAllQueues inside a try/catch (or use
allSettled) but always attempt to close/_clear _workerConnection,
_queueConnection, and _authConnection (use try/finally per connection or
aggregate allSettled results) so that all Redis/BullMQ connections are attempted
and their singleton vars (_workerConnection, _queueConnection, _authConnection)
are cleared even if some close() calls fail; optionally collect and rethrow or
log aggregated errors after all attempts.

20 changes: 19 additions & 1 deletion src/lib/server/prisma.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ export const prisma = new PrismaClient();

class ConnectionChecker {
private connected: boolean;
private heartbeatInterval: NodeJS.Timeout | null = null;
constructor() {
this.connected = false;
this.checkConnection();
setInterval(async () => this.checkConnection(), 10000).unref(); // Check every 10 seconds
this.heartbeatInterval = setInterval(async () => this.checkConnection(), 10000);
// Ensure the interval doesn't prevent Node from exiting
this.heartbeatInterval.unref();
}
Comment on lines 11 to 15
private async checkConnection() {
try {
Expand Down Expand Up @@ -40,6 +43,13 @@ class ConnectionChecker {
public IsConnected() {
return this.connected;
}
public close() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
this.connected = false;
}
}

let conn: ConnectionChecker | null = null;
Expand All @@ -53,3 +63,11 @@ export const DatabaseConnected = () => {
}
return conn.IsConnected();
};

export async function closeDatabaseConnection() {
if (conn) {
conn.close();
conn = null;
}
await prisma.$disconnect();
}
Loading