From 19db4593677d11df0fe6a7fb2cc1f7cc79c30d59 Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Mon, 13 Apr 2026 14:43:44 -0400 Subject: [PATCH 1/5] cgroup --- packages/engine-multi/src/api/call-worker.ts | 3 + packages/engine-multi/src/engine.ts | 1 + packages/engine-multi/src/worker/cgroup.ts | 152 ++++++++++++++++++ packages/engine-multi/src/worker/pool.ts | 60 ++++++- .../engine-multi/test/worker/cgroup.test.ts | 98 +++++++++++ 5 files changed, 310 insertions(+), 4 deletions(-) create mode 100644 packages/engine-multi/src/worker/cgroup.ts create mode 100644 packages/engine-multi/test/worker/cgroup.test.ts diff --git a/packages/engine-multi/src/api/call-worker.ts b/packages/engine-multi/src/api/call-worker.ts index c3a062407..6e9f7aff0 100644 --- a/packages/engine-multi/src/api/call-worker.ts +++ b/packages/engine-multi/src/api/call-worker.ts @@ -10,6 +10,7 @@ export type WorkerEvent = { type WorkerOptions = { maxWorkers?: number; + maxWorkerMemoryMb?: number; // kernel-level memory limit per child process (cgroup v2) env?: any; timeout?: number; // ms memoryLimitMb?: number; @@ -26,6 +27,7 @@ export default function initWorkers( const { env = {}, maxWorkers = 5, + maxWorkerMemoryMb, memoryLimitMb, proxyStdout = false, } = options; @@ -34,6 +36,7 @@ export default function initWorkers( workerPath, { maxWorkers, + maxWorkerMemoryMb, env, memoryLimitMb, proxyStdout, diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index 9360d5c82..d7fa80d78 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -141,6 +141,7 @@ const createEngine = async ( { maxWorkers: options.maxWorkers, memoryLimitMb: defaultMemoryLimit, + maxWorkerMemoryMb: defaultMemoryLimit, proxyStdout: options.proxyStdout, }, options.logger diff --git a/packages/engine-multi/src/worker/cgroup.ts b/packages/engine-multi/src/worker/cgroup.ts new file mode 100644 index 000000000..76a60b47e --- /dev/null +++ b/packages/engine-multi/src/worker/cgroup.ts @@ -0,0 +1,152 @@ +import fs from 'node:fs'; +import path from 'node:path'; +import type { Logger } from '@openfn/logger'; + +type CgroupSupport = { + supported: boolean; + cgroupRoot: string | null; +}; + +let cachedResult: CgroupSupport | null = null; + +/** + * Detect whether cgroup v2 memory enforcement is available. + * Called once at pool creation; result is cached for the process lifetime. + */ +export function detectCgroupSupport(logger: Logger): CgroupSupport { + if (cachedResult) return cachedResult; + + try { + // Step 1: confirm cgroup v2 + if (!fs.existsSync('/sys/fs/cgroup/cgroup.controllers')) { + logger.debug('cgroup: v2 not available'); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } + + // Step 2: find our cgroup path + const procCgroup = fs.readFileSync('/proc/self/cgroup', 'utf-8').trim(); + const match = procCgroup.match(/^0::(.+)$/m); + if (!match) { + logger.debug('cgroup: could not parse /proc/self/cgroup'); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } + + const cgroupRoot = path.join('/sys/fs/cgroup', match[1]); + + // Step 3: ensure memory controller is delegated + const subtreeControl = fs + .readFileSync(path.join(cgroupRoot, 'cgroup.subtree_control'), 'utf-8') + .trim(); + + if (!subtreeControl.includes('memory')) { + // Try to enable memory delegation + try { + fs.writeFileSync( + path.join(cgroupRoot, 'cgroup.subtree_control'), + '+memory' + ); + } catch (e: any) { + if (e.code === 'EBUSY') { + // "no internal process" constraint — move ourselves to a child cgroup + const initPath = path.join(cgroupRoot, 'openfn-init'); + try { + fs.mkdirSync(initPath, { recursive: true }); + fs.writeFileSync( + path.join(initPath, 'cgroup.procs'), + String(process.pid) + ); + fs.writeFileSync( + path.join(cgroupRoot, 'cgroup.subtree_control'), + '+memory' + ); + } catch (inner: any) { + logger.warn('cgroup: failed to delegate memory controller:', inner.message); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } + } else { + logger.warn('cgroup: failed to enable memory controller:', e.message); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } + } + } + + // Step 4: smoke test — create and remove a probe cgroup + const probePath = path.join(cgroupRoot, `openfn-probe-${process.pid}`); + try { + fs.mkdirSync(probePath); + fs.writeFileSync(path.join(probePath, 'memory.max'), '1073741824'); // 1GB + fs.rmdirSync(probePath); + } catch (e: any) { + logger.warn('cgroup: smoke test failed:', e.message); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } + + logger.info('cgroup: memory enforcement available at', cgroupRoot); + cachedResult = { supported: true, cgroupRoot }; + return cachedResult; + } catch (e: any) { + logger.debug('cgroup: detection failed:', e.message); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } +} + +/** + * Create a cgroup for a child process and apply a memory limit. + * Returns the cgroup directory path, or null on failure. + */ +export function setupCgroup( + pid: number, + memoryLimitBytes: number, + cgroupRoot: string, + logger: Logger +): string | null { + const cgroupPath = path.join(cgroupRoot, `openfn-worker-${pid}`); + + try { + fs.mkdirSync(cgroupPath); + fs.writeFileSync( + path.join(cgroupPath, 'memory.max'), + String(memoryLimitBytes) + ); + fs.writeFileSync(path.join(cgroupPath, 'memory.swap.max'), '0'); + fs.writeFileSync(path.join(cgroupPath, 'cgroup.procs'), String(pid)); + + logger.debug( + `cgroup: worker ${pid} limited to ${Math.round(memoryLimitBytes / 1024 / 1024)}MB` + ); + return cgroupPath; + } catch (e: any) { + logger.warn(`cgroup: failed to set up cgroup for worker ${pid}:`, e.message); + // Clean up partial state + try { + fs.rmdirSync(cgroupPath); + } catch { + // ignore cleanup failure + } + return null; + } +} + +/** + * Remove a cgroup directory after the child process has exited. + */ +export function cleanupCgroup(cgroupPath: string, logger: Logger): void { + try { + fs.rmdirSync(cgroupPath); + } catch (e: any) { + if (e.code !== 'ENOENT') { + logger.warn('cgroup: cleanup failed for', cgroupPath, e.message); + } + } +} + +// Exported for testing only +export function _resetCache(): void { + cachedResult = null; +} diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 48a268803..1189b2084 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -13,10 +13,12 @@ import { import { HANDLED_EXIT_CODE } from '../events'; import { Logger } from '@openfn/logger'; import type { PayloadLimits } from './thread/runtime'; +import { detectCgroupSupport, setupCgroup, cleanupCgroup } from './cgroup'; export type PoolOptions = { capacity?: number; // defaults to 5 maxWorkers?: number; // alias for capacity. Which is best? + maxWorkerMemoryMb?: number; // kernel-level memory limit per child process (cgroup v2) env?: Record; // default environment for workers memoryLimitMb?: number; // --max-old-space-size for child processes @@ -72,6 +74,15 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug(`pool: Creating new child process pool | capacity: ${capacity}`); let destroyed = false; + const cgroupInfo = detectCgroupSupport(logger); + const cgroupPaths = new Map(); + + if (cgroupInfo.supported && options.maxWorkerMemoryMb) { + logger.info( + `pool: cgroup memory enforcement enabled | limit: ${options.maxWorkerMemoryMb}MB per child` + ); + } + // a pool of processes const pool: ChildProcessPool = new Array(capacity).fill(false); @@ -107,6 +118,21 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug('pool: Created new child process', child.pid); allWorkers[child.pid!] = child; + + if (cgroupInfo.supported && options.maxWorkerMemoryMb) { + const limitBytes = Math.ceil( + (options.maxWorkerMemoryMb * 1.2 + 50) * 1024 * 1024 + ); + const cgPath = setupCgroup( + child.pid!, + limitBytes, + cgroupInfo.cgroupRoot!, + logger + ); + if (cgPath) { + cgroupPaths.set(child.pid!, cgPath); + } + } } else { child = maybeChild as ChildProcess; logger.debug('pool: Using existing child process', child.pid); @@ -114,6 +140,14 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { return child; }; + const maybeCleanupCgroup = (pid: number) => { + const cgPath = cgroupPaths.get(pid); + if (cgPath) { + cleanupCgroup(cgPath, logger); + cgroupPaths.delete(pid); + } + }; + const finish = (worker: ChildProcess | false) => { if (worker) { logger.debug('pool: finished task in worker', worker.pid); @@ -148,7 +182,20 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const promise = new Promise(async (resolve, reject) => { // TODO what should we do if a process in the pool dies, perhaps due to OOM? - const onExit = async (code: number) => { + const onExit = async (code: number | null, signal: string | null) => { + // Kernel OOM kill: cgroup sends SIGKILL with null exit code + if (signal === 'SIGKILL' && code === null && !destroyed) { + logger.debug( + `pool: Worker ${worker.pid} killed by SIGKILL (probable OOM)` + ); + clearTimeout(timeout); + maybeCleanupCgroup(worker.pid!); + killWorker(worker); + finish(false); + reject(new OOMError()); + return; + } + if (code !== HANDLED_EXIT_CODE) { logger.debug(`pool: Worker exited unexpectedly with code ${code}`); clearTimeout(timeout); @@ -174,8 +221,9 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { } catch (e) { // do nothing } + maybeCleanupCgroup(worker.pid!); finish(worker); - reject(new ExitError(code)); + reject(new ExitError(code!)); } }; @@ -263,9 +311,11 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const killWorker = (worker: ChildProcess | false) => { if (worker) { - logger.debug('pool: destroying worker ', worker.pid); + const pid = worker.pid!; + logger.debug('pool: destroying worker ', pid); worker.kill(); - delete allWorkers[worker.pid!]; + delete allWorkers[pid]; + worker.once('exit', () => maybeCleanupCgroup(pid)); } }; @@ -282,11 +332,13 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const timeout = setTimeout(() => { logger.debug('pool: force killing worker', worker.pid); worker.kill('SIGKILL'); + maybeCleanupCgroup(worker.pid!); resolve(); }, forceKillTimeout); worker.once('exit', () => { clearTimeout(timeout); + maybeCleanupCgroup(worker.pid!); resolve(); }); diff --git a/packages/engine-multi/test/worker/cgroup.test.ts b/packages/engine-multi/test/worker/cgroup.test.ts new file mode 100644 index 000000000..f4bead0c6 --- /dev/null +++ b/packages/engine-multi/test/worker/cgroup.test.ts @@ -0,0 +1,98 @@ +import test from 'ava'; +import fs from 'node:fs'; +import path from 'node:path'; +import { createMockLogger } from '@openfn/logger'; + +import { + detectCgroupSupport, + setupCgroup, + cleanupCgroup, + _resetCache, +} from '../../src/worker/cgroup'; + +const logger = createMockLogger(); + +test.beforeEach(() => { + _resetCache(); +}); + +// On macOS / non-Linux, detection should return unsupported +test('detectCgroupSupport returns false when cgroup v2 is not available', (t) => { + // This test runs on any platform. On macOS, /sys/fs/cgroup doesn't exist. + // On Linux without cgroup v2, cgroup.controllers won't exist. + if (fs.existsSync('/sys/fs/cgroup/cgroup.controllers')) { + t.pass('cgroup v2 is available on this system — skipping negative test'); + return; + } + + const result = detectCgroupSupport(logger); + t.false(result.supported); + t.is(result.cgroupRoot, null); +}); + +test('detectCgroupSupport caches the result across calls', (t) => { + const result1 = detectCgroupSupport(logger); + const result2 = detectCgroupSupport(logger); + t.is(result1, result2); // same object reference +}); + +test('setupCgroup returns null when directory creation fails', (t) => { + // Use a nonexistent root so mkdirSync fails + const result = setupCgroup( + 99999, + 500 * 1024 * 1024, + '/nonexistent/cgroup/root', + logger + ); + t.is(result, null); +}); + +test('cleanupCgroup does not throw on ENOENT', (t) => { + t.notThrows(() => { + cleanupCgroup('/nonexistent/cgroup/path', logger); + }); +}); + +// Integration tests — only run on Linux with cgroup v2 and write access +const hasCgroupV2 = fs.existsSync('/sys/fs/cgroup/cgroup.controllers'); + +const cgroupTest = hasCgroupV2 ? test : test.skip; + +cgroupTest('detectCgroupSupport returns true on cgroup v2 system', (t) => { + const result = detectCgroupSupport(logger); + t.true(result.supported); + t.truthy(result.cgroupRoot); +}); + +cgroupTest('setupCgroup creates cgroup directory and cleanupCgroup removes it', (t) => { + const detection = detectCgroupSupport(logger); + if (!detection.supported) { + t.pass('cgroup not supported — skipping'); + return; + } + + // Use a fake PID that won't collide + const fakePid = 2147483640; + const limitBytes = 256 * 1024 * 1024; + + const cgPath = setupCgroup(fakePid, limitBytes, detection.cgroupRoot!, logger); + + if (!cgPath) { + t.pass('setupCgroup returned null — likely permission issue'); + return; + } + + t.true(fs.existsSync(cgPath)); + + // Verify memory.max was written + const memMax = fs.readFileSync(path.join(cgPath, 'memory.max'), 'utf-8').trim(); + t.is(memMax, String(limitBytes)); + + // Verify memory.swap.max was written + const swapMax = fs.readFileSync(path.join(cgPath, 'memory.swap.max'), 'utf-8').trim(); + t.is(swapMax, '0'); + + // Clean up + cleanupCgroup(cgPath, logger); + t.false(fs.existsSync(cgPath)); +}); From 6fff9875b02a677007759ca57d0655158b5ecaef Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Mon, 13 Apr 2026 14:58:10 -0400 Subject: [PATCH 2/5] fix tests --- packages/engine-multi/src/worker/cgroup.ts | 14 +++++++++++--- packages/engine-multi/test/worker/cgroup.test.ts | 8 ++++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/packages/engine-multi/src/worker/cgroup.ts b/packages/engine-multi/src/worker/cgroup.ts index 76a60b47e..dd08380cb 100644 --- a/packages/engine-multi/src/worker/cgroup.ts +++ b/packages/engine-multi/src/worker/cgroup.ts @@ -62,7 +62,10 @@ export function detectCgroupSupport(logger: Logger): CgroupSupport { '+memory' ); } catch (inner: any) { - logger.warn('cgroup: failed to delegate memory controller:', inner.message); + logger.warn( + 'cgroup: failed to delegate memory controller:', + inner.message + ); cachedResult = { supported: false, cgroupRoot: null }; return cachedResult; } @@ -118,11 +121,16 @@ export function setupCgroup( fs.writeFileSync(path.join(cgroupPath, 'cgroup.procs'), String(pid)); logger.debug( - `cgroup: worker ${pid} limited to ${Math.round(memoryLimitBytes / 1024 / 1024)}MB` + `cgroup: worker ${pid} limited to ${Math.round( + memoryLimitBytes / 1024 / 1024 + )}MB` ); return cgroupPath; } catch (e: any) { - logger.warn(`cgroup: failed to set up cgroup for worker ${pid}:`, e.message); + logger.warn( + `cgroup: failed to set up cgroup for worker ${pid}:`, + e.message + ); // Clean up partial state try { fs.rmdirSync(cgroupPath); diff --git a/packages/engine-multi/test/worker/cgroup.test.ts b/packages/engine-multi/test/worker/cgroup.test.ts index f4bead0c6..e1442d216 100644 --- a/packages/engine-multi/test/worker/cgroup.test.ts +++ b/packages/engine-multi/test/worker/cgroup.test.ts @@ -58,9 +58,13 @@ const hasCgroupV2 = fs.existsSync('/sys/fs/cgroup/cgroup.controllers'); const cgroupTest = hasCgroupV2 ? test : test.skip; -cgroupTest('detectCgroupSupport returns true on cgroup v2 system', (t) => { +cgroupTest('detectCgroupSupport on cgroup v2 system', (t) => { const result = detectCgroupSupport(logger); - t.true(result.supported); + if (!result.supported) { + // cgroup v2 is present but we lack permissions to delegate — that's fine + t.pass('cgroup v2 present but not writable — detection correctly returned false'); + return; + } t.truthy(result.cgroupRoot); }); From 3f1e00a85d097da2126beae57ba21424710c0e02 Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Mon, 13 Apr 2026 15:15:19 -0400 Subject: [PATCH 3/5] rlimit --- Dockerfile | 2 +- packages/engine-multi/src/worker/cgroup.ts | 160 ------------------ packages/engine-multi/src/worker/pool.ts | 62 ++----- packages/engine-multi/src/worker/rlimit.ts | 55 ++++++ .../engine-multi/test/worker/cgroup.test.ts | 102 ----------- .../engine-multi/test/worker/rlimit.test.ts | 51 ++++++ 6 files changed, 122 insertions(+), 310 deletions(-) delete mode 100644 packages/engine-multi/src/worker/cgroup.ts create mode 100644 packages/engine-multi/src/worker/rlimit.ts delete mode 100644 packages/engine-multi/test/worker/cgroup.test.ts create mode 100644 packages/engine-multi/test/worker/rlimit.test.ts diff --git a/Dockerfile b/Dockerfile index 72805704c..028e2b1f1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,7 @@ WORKDIR /app # TODO: remove simple build once prod optimized build is working --------------- FROM base AS ws-worker -RUN apk add --no-cache git +RUN apk add --no-cache git util-linux RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --frozen-lockfile RUN pnpm build WORKDIR /app/packages/ws-worker diff --git a/packages/engine-multi/src/worker/cgroup.ts b/packages/engine-multi/src/worker/cgroup.ts deleted file mode 100644 index dd08380cb..000000000 --- a/packages/engine-multi/src/worker/cgroup.ts +++ /dev/null @@ -1,160 +0,0 @@ -import fs from 'node:fs'; -import path from 'node:path'; -import type { Logger } from '@openfn/logger'; - -type CgroupSupport = { - supported: boolean; - cgroupRoot: string | null; -}; - -let cachedResult: CgroupSupport | null = null; - -/** - * Detect whether cgroup v2 memory enforcement is available. - * Called once at pool creation; result is cached for the process lifetime. - */ -export function detectCgroupSupport(logger: Logger): CgroupSupport { - if (cachedResult) return cachedResult; - - try { - // Step 1: confirm cgroup v2 - if (!fs.existsSync('/sys/fs/cgroup/cgroup.controllers')) { - logger.debug('cgroup: v2 not available'); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } - - // Step 2: find our cgroup path - const procCgroup = fs.readFileSync('/proc/self/cgroup', 'utf-8').trim(); - const match = procCgroup.match(/^0::(.+)$/m); - if (!match) { - logger.debug('cgroup: could not parse /proc/self/cgroup'); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } - - const cgroupRoot = path.join('/sys/fs/cgroup', match[1]); - - // Step 3: ensure memory controller is delegated - const subtreeControl = fs - .readFileSync(path.join(cgroupRoot, 'cgroup.subtree_control'), 'utf-8') - .trim(); - - if (!subtreeControl.includes('memory')) { - // Try to enable memory delegation - try { - fs.writeFileSync( - path.join(cgroupRoot, 'cgroup.subtree_control'), - '+memory' - ); - } catch (e: any) { - if (e.code === 'EBUSY') { - // "no internal process" constraint — move ourselves to a child cgroup - const initPath = path.join(cgroupRoot, 'openfn-init'); - try { - fs.mkdirSync(initPath, { recursive: true }); - fs.writeFileSync( - path.join(initPath, 'cgroup.procs'), - String(process.pid) - ); - fs.writeFileSync( - path.join(cgroupRoot, 'cgroup.subtree_control'), - '+memory' - ); - } catch (inner: any) { - logger.warn( - 'cgroup: failed to delegate memory controller:', - inner.message - ); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } - } else { - logger.warn('cgroup: failed to enable memory controller:', e.message); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } - } - } - - // Step 4: smoke test — create and remove a probe cgroup - const probePath = path.join(cgroupRoot, `openfn-probe-${process.pid}`); - try { - fs.mkdirSync(probePath); - fs.writeFileSync(path.join(probePath, 'memory.max'), '1073741824'); // 1GB - fs.rmdirSync(probePath); - } catch (e: any) { - logger.warn('cgroup: smoke test failed:', e.message); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } - - logger.info('cgroup: memory enforcement available at', cgroupRoot); - cachedResult = { supported: true, cgroupRoot }; - return cachedResult; - } catch (e: any) { - logger.debug('cgroup: detection failed:', e.message); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } -} - -/** - * Create a cgroup for a child process and apply a memory limit. - * Returns the cgroup directory path, or null on failure. - */ -export function setupCgroup( - pid: number, - memoryLimitBytes: number, - cgroupRoot: string, - logger: Logger -): string | null { - const cgroupPath = path.join(cgroupRoot, `openfn-worker-${pid}`); - - try { - fs.mkdirSync(cgroupPath); - fs.writeFileSync( - path.join(cgroupPath, 'memory.max'), - String(memoryLimitBytes) - ); - fs.writeFileSync(path.join(cgroupPath, 'memory.swap.max'), '0'); - fs.writeFileSync(path.join(cgroupPath, 'cgroup.procs'), String(pid)); - - logger.debug( - `cgroup: worker ${pid} limited to ${Math.round( - memoryLimitBytes / 1024 / 1024 - )}MB` - ); - return cgroupPath; - } catch (e: any) { - logger.warn( - `cgroup: failed to set up cgroup for worker ${pid}:`, - e.message - ); - // Clean up partial state - try { - fs.rmdirSync(cgroupPath); - } catch { - // ignore cleanup failure - } - return null; - } -} - -/** - * Remove a cgroup directory after the child process has exited. - */ -export function cleanupCgroup(cgroupPath: string, logger: Logger): void { - try { - fs.rmdirSync(cgroupPath); - } catch (e: any) { - if (e.code !== 'ENOENT') { - logger.warn('cgroup: cleanup failed for', cgroupPath, e.message); - } - } -} - -// Exported for testing only -export function _resetCache(): void { - cachedResult = null; -} diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 1189b2084..06028e059 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -13,12 +13,12 @@ import { import { HANDLED_EXIT_CODE } from '../events'; import { Logger } from '@openfn/logger'; import type { PayloadLimits } from './thread/runtime'; -import { detectCgroupSupport, setupCgroup, cleanupCgroup } from './cgroup'; +import { detectPrlimitSupport, applyMemoryLimit } from './rlimit'; export type PoolOptions = { capacity?: number; // defaults to 5 maxWorkers?: number; // alias for capacity. Which is best? - maxWorkerMemoryMb?: number; // kernel-level memory limit per child process (cgroup v2) + maxWorkerMemoryMb?: number; // process-level memory limit via RLIMIT_AS env?: Record; // default environment for workers memoryLimitMb?: number; // --max-old-space-size for child processes @@ -74,12 +74,11 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug(`pool: Creating new child process pool | capacity: ${capacity}`); let destroyed = false; - const cgroupInfo = detectCgroupSupport(logger); - const cgroupPaths = new Map(); + const hasPrlimit = detectPrlimitSupport(logger); - if (cgroupInfo.supported && options.maxWorkerMemoryMb) { + if (hasPrlimit && options.maxWorkerMemoryMb) { logger.info( - `pool: cgroup memory enforcement enabled | limit: ${options.maxWorkerMemoryMb}MB per child` + `pool: prlimit memory enforcement enabled | limit: ${options.maxWorkerMemoryMb}MB per child` ); } @@ -119,19 +118,14 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug('pool: Created new child process', child.pid); allWorkers[child.pid!] = child; - if (cgroupInfo.supported && options.maxWorkerMemoryMb) { + if (hasPrlimit && options.maxWorkerMemoryMb) { + // RLIMIT_AS counts virtual address space, not RSS. + // Node/V8 reserve ~1-2GB virtual memory at startup, so we need + // generous headroom to avoid false positives. const limitBytes = Math.ceil( - (options.maxWorkerMemoryMb * 1.2 + 50) * 1024 * 1024 + (options.maxWorkerMemoryMb * 3 + 512) * 1024 * 1024 ); - const cgPath = setupCgroup( - child.pid!, - limitBytes, - cgroupInfo.cgroupRoot!, - logger - ); - if (cgPath) { - cgroupPaths.set(child.pid!, cgPath); - } + applyMemoryLimit(child.pid!, limitBytes, logger); } } else { child = maybeChild as ChildProcess; @@ -140,14 +134,6 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { return child; }; - const maybeCleanupCgroup = (pid: number) => { - const cgPath = cgroupPaths.get(pid); - if (cgPath) { - cleanupCgroup(cgPath, logger); - cgroupPaths.delete(pid); - } - }; - const finish = (worker: ChildProcess | false) => { if (worker) { logger.debug('pool: finished task in worker', worker.pid); @@ -182,20 +168,7 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const promise = new Promise(async (resolve, reject) => { // TODO what should we do if a process in the pool dies, perhaps due to OOM? - const onExit = async (code: number | null, signal: string | null) => { - // Kernel OOM kill: cgroup sends SIGKILL with null exit code - if (signal === 'SIGKILL' && code === null && !destroyed) { - logger.debug( - `pool: Worker ${worker.pid} killed by SIGKILL (probable OOM)` - ); - clearTimeout(timeout); - maybeCleanupCgroup(worker.pid!); - killWorker(worker); - finish(false); - reject(new OOMError()); - return; - } - + const onExit = async (code: number) => { if (code !== HANDLED_EXIT_CODE) { logger.debug(`pool: Worker exited unexpectedly with code ${code}`); clearTimeout(timeout); @@ -221,9 +194,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { } catch (e) { // do nothing } - maybeCleanupCgroup(worker.pid!); finish(worker); - reject(new ExitError(code!)); + reject(new ExitError(code)); } }; @@ -311,11 +283,9 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const killWorker = (worker: ChildProcess | false) => { if (worker) { - const pid = worker.pid!; - logger.debug('pool: destroying worker ', pid); + logger.debug('pool: destroying worker ', worker.pid); worker.kill(); - delete allWorkers[pid]; - worker.once('exit', () => maybeCleanupCgroup(pid)); + delete allWorkers[worker.pid!]; } }; @@ -332,13 +302,11 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const timeout = setTimeout(() => { logger.debug('pool: force killing worker', worker.pid); worker.kill('SIGKILL'); - maybeCleanupCgroup(worker.pid!); resolve(); }, forceKillTimeout); worker.once('exit', () => { clearTimeout(timeout); - maybeCleanupCgroup(worker.pid!); resolve(); }); diff --git a/packages/engine-multi/src/worker/rlimit.ts b/packages/engine-multi/src/worker/rlimit.ts new file mode 100644 index 000000000..2966fa684 --- /dev/null +++ b/packages/engine-multi/src/worker/rlimit.ts @@ -0,0 +1,55 @@ +import { execFileSync } from 'node:child_process'; +import type { Logger } from '@openfn/logger'; + +let prlimitAvailable: boolean | null = null; + +/** + * Check if the prlimit command is available (Linux with util-linux). + * Result is cached for the process lifetime. + */ +export function detectPrlimitSupport(logger: Logger): boolean { + if (prlimitAvailable !== null) return prlimitAvailable; + + try { + execFileSync('prlimit', ['--version'], { stdio: 'ignore' }); + prlimitAvailable = true; + logger.info('prlimit: memory enforcement available'); + } catch { + prlimitAvailable = false; + logger.debug('prlimit: not available (util-linux not installed)'); + } + + return prlimitAvailable; +} + +/** + * Apply RLIMIT_AS (virtual address space limit) to a child process. + * When exceeded, mmap/brk fails with ENOMEM, causing the process to crash. + */ +export function applyMemoryLimit( + pid: number, + limitBytes: number, + logger: Logger +): boolean { + try { + execFileSync('prlimit', [ + '--pid', + String(pid), + `--as=${limitBytes}:${limitBytes}`, + ]); + logger.debug( + `prlimit: worker ${pid} RLIMIT_AS set to ${Math.round( + limitBytes / 1024 / 1024 + )}MB` + ); + return true; + } catch (e: any) { + logger.warn(`prlimit: failed to set limit for worker ${pid}:`, e.message); + return false; + } +} + +// Exported for testing only +export function _resetCache(): void { + prlimitAvailable = null; +} diff --git a/packages/engine-multi/test/worker/cgroup.test.ts b/packages/engine-multi/test/worker/cgroup.test.ts deleted file mode 100644 index e1442d216..000000000 --- a/packages/engine-multi/test/worker/cgroup.test.ts +++ /dev/null @@ -1,102 +0,0 @@ -import test from 'ava'; -import fs from 'node:fs'; -import path from 'node:path'; -import { createMockLogger } from '@openfn/logger'; - -import { - detectCgroupSupport, - setupCgroup, - cleanupCgroup, - _resetCache, -} from '../../src/worker/cgroup'; - -const logger = createMockLogger(); - -test.beforeEach(() => { - _resetCache(); -}); - -// On macOS / non-Linux, detection should return unsupported -test('detectCgroupSupport returns false when cgroup v2 is not available', (t) => { - // This test runs on any platform. On macOS, /sys/fs/cgroup doesn't exist. - // On Linux without cgroup v2, cgroup.controllers won't exist. - if (fs.existsSync('/sys/fs/cgroup/cgroup.controllers')) { - t.pass('cgroup v2 is available on this system — skipping negative test'); - return; - } - - const result = detectCgroupSupport(logger); - t.false(result.supported); - t.is(result.cgroupRoot, null); -}); - -test('detectCgroupSupport caches the result across calls', (t) => { - const result1 = detectCgroupSupport(logger); - const result2 = detectCgroupSupport(logger); - t.is(result1, result2); // same object reference -}); - -test('setupCgroup returns null when directory creation fails', (t) => { - // Use a nonexistent root so mkdirSync fails - const result = setupCgroup( - 99999, - 500 * 1024 * 1024, - '/nonexistent/cgroup/root', - logger - ); - t.is(result, null); -}); - -test('cleanupCgroup does not throw on ENOENT', (t) => { - t.notThrows(() => { - cleanupCgroup('/nonexistent/cgroup/path', logger); - }); -}); - -// Integration tests — only run on Linux with cgroup v2 and write access -const hasCgroupV2 = fs.existsSync('/sys/fs/cgroup/cgroup.controllers'); - -const cgroupTest = hasCgroupV2 ? test : test.skip; - -cgroupTest('detectCgroupSupport on cgroup v2 system', (t) => { - const result = detectCgroupSupport(logger); - if (!result.supported) { - // cgroup v2 is present but we lack permissions to delegate — that's fine - t.pass('cgroup v2 present but not writable — detection correctly returned false'); - return; - } - t.truthy(result.cgroupRoot); -}); - -cgroupTest('setupCgroup creates cgroup directory and cleanupCgroup removes it', (t) => { - const detection = detectCgroupSupport(logger); - if (!detection.supported) { - t.pass('cgroup not supported — skipping'); - return; - } - - // Use a fake PID that won't collide - const fakePid = 2147483640; - const limitBytes = 256 * 1024 * 1024; - - const cgPath = setupCgroup(fakePid, limitBytes, detection.cgroupRoot!, logger); - - if (!cgPath) { - t.pass('setupCgroup returned null — likely permission issue'); - return; - } - - t.true(fs.existsSync(cgPath)); - - // Verify memory.max was written - const memMax = fs.readFileSync(path.join(cgPath, 'memory.max'), 'utf-8').trim(); - t.is(memMax, String(limitBytes)); - - // Verify memory.swap.max was written - const swapMax = fs.readFileSync(path.join(cgPath, 'memory.swap.max'), 'utf-8').trim(); - t.is(swapMax, '0'); - - // Clean up - cleanupCgroup(cgPath, logger); - t.false(fs.existsSync(cgPath)); -}); diff --git a/packages/engine-multi/test/worker/rlimit.test.ts b/packages/engine-multi/test/worker/rlimit.test.ts new file mode 100644 index 000000000..71ade8b0e --- /dev/null +++ b/packages/engine-multi/test/worker/rlimit.test.ts @@ -0,0 +1,51 @@ +import test from 'ava'; +import { createMockLogger } from '@openfn/logger'; + +import { + detectPrlimitSupport, + applyMemoryLimit, + _resetCache, +} from '../../src/worker/rlimit'; + +const logger = createMockLogger(); + +test.beforeEach(() => { + _resetCache(); +}); + +test('detectPrlimitSupport caches the result across calls', (t) => { + const result1 = detectPrlimitSupport(logger); + const result2 = detectPrlimitSupport(logger); + t.is(result1, result2); +}); + +// On macOS, prlimit is not available +const isLinux = process.platform === 'linux'; + +if (!isLinux) { + test('detectPrlimitSupport returns false on non-Linux', (t) => { + const result = detectPrlimitSupport(logger); + t.false(result); + }); +} + +test('applyMemoryLimit returns false when prlimit is not available', (t) => { + if (detectPrlimitSupport(logger)) { + t.pass('prlimit is available — skipping negative test'); + return; + } + const result = applyMemoryLimit(99999, 500 * 1024 * 1024, logger); + t.false(result); +}); + +// Integration tests — only run on Linux with prlimit available +const hasPrlimit = isLinux && detectPrlimitSupport(createMockLogger()); +_resetCache(); // reset after the check so tests start clean + +const prlimitTest = hasPrlimit ? test : test.skip; + +prlimitTest('applyMemoryLimit succeeds on own process', (t) => { + // Apply a very generous limit to our own process (won't interfere with test) + const result = applyMemoryLimit(process.pid, 8 * 1024 * 1024 * 1024, logger); + t.true(result); +}); From c600b7833cc496bbfd011f97e974c931cc9e106e Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Mon, 13 Apr 2026 15:26:57 -0400 Subject: [PATCH 4/5] bigger --- packages/engine-multi/src/worker/pool.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 06028e059..248926486 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -120,10 +120,11 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { if (hasPrlimit && options.maxWorkerMemoryMb) { // RLIMIT_AS counts virtual address space, not RSS. - // Node/V8 reserve ~1-2GB virtual memory at startup, so we need - // generous headroom to avoid false positives. + // Node/V8 routinely reserves 4-8GB of virtual memory at startup + // (page table entries are cheap on 64-bit). We set a generous limit + // that only catches truly runaway allocations. const limitBytes = Math.ceil( - (options.maxWorkerMemoryMb * 3 + 512) * 1024 * 1024 + (options.maxWorkerMemoryMb * 10 + 2048) * 1024 * 1024 ); applyMemoryLimit(child.pid!, limitBytes, logger); } From b25192ff7096dde4725c5730fe273d2a42b0430e Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 24 Jun 2026 16:02:06 +0100 Subject: [PATCH 5/5] refinind rlimit usage But turns out this completely won't work --- packages/engine-multi/src/api.ts | 4 +- packages/engine-multi/src/engine.ts | 5 + packages/engine-multi/src/util/memory.ts | 3 + packages/engine-multi/src/worker/limits.ts | 118 ++++++++++++++++++ packages/engine-multi/src/worker/pool.ts | 55 +++++--- packages/engine-multi/src/worker/rlimit.ts | 55 -------- .../engine-multi/test/util/memory.test.ts | 17 +++ .../worker/{rlimit.test.ts => limit.test.ts} | 19 ++- 8 files changed, 199 insertions(+), 77 deletions(-) create mode 100644 packages/engine-multi/src/util/memory.ts create mode 100644 packages/engine-multi/src/worker/limits.ts delete mode 100644 packages/engine-multi/src/worker/rlimit.ts create mode 100644 packages/engine-multi/test/util/memory.test.ts rename packages/engine-multi/test/worker/{rlimit.test.ts => limit.test.ts} (76%) diff --git a/packages/engine-multi/src/api.ts b/packages/engine-multi/src/api.ts index 29450b16a..3a5a7ec59 100644 --- a/packages/engine-multi/src/api.ts +++ b/packages/engine-multi/src/api.ts @@ -27,7 +27,7 @@ export type APIOptions = Partial>; const DEFAULT_REPO_DIR = path.join(os.homedir(), '.openfn/worker/repo'); -const DEFAULT_MEMORY_LIMIT = 500; +const DEFAULT_RUN_MEMORY_LIMIT = 500; // Create the engine and handle user-facing stuff, like options parsing // and defaulting @@ -57,7 +57,7 @@ const createAPI = async function ( autoinstall: options.autoinstall, maxWorkers: options.maxWorkers, - memoryLimitMb: options.memoryLimitMb || DEFAULT_MEMORY_LIMIT, + memoryLimitMb: options.memoryLimitMb || DEFAULT_RUN_MEMORY_LIMIT, runTimeoutMs: options.runTimeoutMs, statePropsToRemove: options.statePropsToRemove ?? [ diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index d7fa80d78..93a030225 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -73,6 +73,11 @@ export type EngineOptions = { // compile?: { skip?: boolean } // TODO no support yet logger: Logger; maxWorkers?: number; + + /** Sets the maximum total memory usable by the engine. Otherwise uses all available memory */ + maxEngineMemoryMb?: number; + + /** Memory limit per run */ memoryLimitMb?: number; payloadLimitMb?: number; logPayloadLimitMb?: number; diff --git a/packages/engine-multi/src/util/memory.ts b/packages/engine-multi/src/util/memory.ts new file mode 100644 index 000000000..ee813fbb8 --- /dev/null +++ b/packages/engine-multi/src/util/memory.ts @@ -0,0 +1,3 @@ +export const mb = (valueInBytes: number) => valueInBytes / 1024 / 1024; + +export const b = (valueInMb: number) => valueInMb * 1024 * 1024; diff --git a/packages/engine-multi/src/worker/limits.ts b/packages/engine-multi/src/worker/limits.ts new file mode 100644 index 000000000..8fd96ffdf --- /dev/null +++ b/packages/engine-multi/src/worker/limits.ts @@ -0,0 +1,118 @@ +import process from 'node:process'; +import os from 'node:os'; +import { execSync } from 'node:child_process'; +import type { Logger } from '@openfn/logger'; +import { ChildProcess } from 'node:child_process'; +import { b, mb } from '../util/memory'; +import type { PoolOptions } from './pool'; + +let prlimitAvailable: boolean | null = null; + +/** + * Check if the prlimit command is available (Linux with util-linux). + * Result is cached for the process lifetime. + */ +export function detectPrlimitSupport(): boolean { + if (prlimitAvailable === null) { + try { + execSync('prlimit', ['--version'], { stdio: 'ignore' }); + prlimitAvailable = true; + } catch { + prlimitAvailable = false; + } + } + + return prlimitAvailable; +} + +export const getAvailableMemory = (options: PoolOptions): number => { + if (options.totalMemoryMb) { + return b(options.totalMemoryMb); + } + const total = os.totalmem(); + const constrained = process.constrainedMemory?.() ?? total; + return Math.floor(Math.min(total, constrained)); +}; + +export const calculateLimits = ( + totalMemory_bytes: number, + mainProcessOverhead_bytes: number, + capacity: number +): number => { + return Math.floor((totalMemory_bytes - mainProcessOverhead_bytes) / capacity); +}; + +export function setHardMemoryLimit( + child: ChildProcess, + limit_bytes: number, + logger: Logger +) { + if (prlimitAvailable) { + logger.debug( + `pool: setting hard limit on pid ${child.pid} to ${Math.floor( + mb(limit_bytes) + )}mb` + ); + const roundLimit = Math.round(limit_bytes); + + // this will set the hard and soft limits at once + // The hard limit is the absolute maximum we can possibly set + // (but doesn't actually allocate memory, just sets a ceiling) + // The soft limit can be changed per run + execSync(`prlimit --pid=${child.pid} --as=${roundLimit}`); + + // Pretty print the result + const out = execSync( + `prlimit --pid=${child.pid} --as --noheadings -o "SOFT,HARD,UNITS"` + ); + logger.debug(' > ', out.toString()); + } +} + +// TODO before each run starst, we should set the soft limit +// to that run's limit +/** + * Set a memory limit on a child process + * If prlmit is available, this will set the soft limit + * + * Apply RLIMIT_AS (virtual address space limit) to a child process. + * When exceeded, mmap/brk fails with ENOMEM, causing the process to crash. + */ + +// TODO: it's plausible that du to bad config the hard limit +// is lower than the actual run memory limit +// if this true we'll get an error here +// we should probably od a check and raise a warning, then use the smallest of soft and hard limit +// most runs should be quite happy to run in this +export function applyMemoryLimit( + child: ChildProcess, + limitBytes: number, + logger: Logger +): boolean { + if (prlimitAvailable) { + const pid = child.pid; + try { + console.log({ hardLimit }); + console.log(`prlimit --pid ${child.pid} --as=${limitBytes}:`); + // note we still have to pass the hard limit here + const out = execSync(`prlimit --pid ${child.pid} --as=${limitBytes}:`); + console.log({ out: out.toString() }); + logger.debug( + `Soft memory limit on worker ${pid} set to ${Math.round( + mb(limitBytes) + )}MB` + ); + return true; + } catch (e: any) { + logger.warn(`Failed to set soft for worker ${pid}:`, e.message); + return false; + } + } + return false; +} + +// TODO rename this. maybe make it setprlimit rather than a global reset +// Exported for testing only +export function _resetCache(): void { + prlimitAvailable = null; +} diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 248926486..565f40a16 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -13,18 +13,30 @@ import { import { HANDLED_EXIT_CODE } from '../events'; import { Logger } from '@openfn/logger'; import type { PayloadLimits } from './thread/runtime'; -import { detectPrlimitSupport, applyMemoryLimit } from './rlimit'; +import { + detectPrlimitSupport, + getAvailableMemory, + calculateLimits, + setHardMemoryLimit, + applyMemoryLimit, +} from './limits'; +import { b, mb } from '../util/memory'; export type PoolOptions = { capacity?: number; // defaults to 5 maxWorkers?: number; // alias for capacity. Which is best? - maxWorkerMemoryMb?: number; // process-level memory limit via RLIMIT_AS + + /* Total memory available to the whole engine (will auto-detect if not set) */ + totalMemoryMb?: number; env?: Record; // default environment for workers memoryLimitMb?: number; // --max-old-space-size for child processes proxyStdout?: boolean; // print internal stdout to console }; +// TODO set through options +const ENGINE_MAIN_OVERHEAD_MB = 400; + type RunTaskEvent = { type: typeof ENGINE_RUN_TASK; task: string; @@ -74,15 +86,22 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug(`pool: Creating new child process pool | capacity: ${capacity}`); let destroyed = false; - const hasPrlimit = detectPrlimitSupport(logger); + const hasPrlimit = detectPrlimitSupport(); + let hardMemCap_bytes; - if (hasPrlimit && options.maxWorkerMemoryMb) { + if (hasPrlimit) { + hardMemCap_bytes = calculateLimits( + getAvailableMemory(options), + ENGINE_MAIN_OVERHEAD_MB, + capacity + ); logger.info( - `pool: prlimit memory enforcement enabled | limit: ${options.maxWorkerMemoryMb}MB per child` + `Memory enforcement enabled | hard limit: ${Math.floor( + mb(hardMemCap_bytes) + )}mb per child` ); } - // a pool of processes const pool: ChildProcessPool = new Array(capacity).fill(false); const queue: QueuedTask[] = []; @@ -96,6 +115,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { // create a new child process and load the module script into it const execArgv = ['--experimental-vm-modules', '--no-warnings']; if (options.memoryLimitMb) { + // TODO this is just a fallback value - prlimit is the primary + // mechanism for controlling memory use execArgv.push(`--max-old-space-size=${options.memoryLimitMb}`); } @@ -115,23 +136,20 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { }); } + setHardMemoryLimit(child, hardMemCap_bytes!, logger); + logger.debug('pool: Created new child process', child.pid); allWorkers[child.pid!] = child; - - if (hasPrlimit && options.maxWorkerMemoryMb) { - // RLIMIT_AS counts virtual address space, not RSS. - // Node/V8 routinely reserves 4-8GB of virtual memory at startup - // (page table entries are cheap on 64-bit). We set a generous limit - // that only catches truly runaway allocations. - const limitBytes = Math.ceil( - (options.maxWorkerMemoryMb * 10 + 2048) * 1024 * 1024 - ); - applyMemoryLimit(child.pid!, limitBytes, logger); - } } else { child = maybeChild as ChildProcess; logger.debug('pool: Using existing child process', child.pid); } + + // TODO: memory limit is currently set for all runs in the worker according to config + // We could now set this per-run by passing a run option + if (options.memoryLimitMb) { + applyMemoryLimit(child, b(options.memoryLimitMb), logger); + } return child; }; @@ -169,7 +187,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const promise = new Promise(async (resolve, reject) => { // TODO what should we do if a process in the pool dies, perhaps due to OOM? - const onExit = async (code: number) => { + const onExit = async (code: number, e) => { + console.log(code, e); if (code !== HANDLED_EXIT_CODE) { logger.debug(`pool: Worker exited unexpectedly with code ${code}`); clearTimeout(timeout); diff --git a/packages/engine-multi/src/worker/rlimit.ts b/packages/engine-multi/src/worker/rlimit.ts deleted file mode 100644 index 2966fa684..000000000 --- a/packages/engine-multi/src/worker/rlimit.ts +++ /dev/null @@ -1,55 +0,0 @@ -import { execFileSync } from 'node:child_process'; -import type { Logger } from '@openfn/logger'; - -let prlimitAvailable: boolean | null = null; - -/** - * Check if the prlimit command is available (Linux with util-linux). - * Result is cached for the process lifetime. - */ -export function detectPrlimitSupport(logger: Logger): boolean { - if (prlimitAvailable !== null) return prlimitAvailable; - - try { - execFileSync('prlimit', ['--version'], { stdio: 'ignore' }); - prlimitAvailable = true; - logger.info('prlimit: memory enforcement available'); - } catch { - prlimitAvailable = false; - logger.debug('prlimit: not available (util-linux not installed)'); - } - - return prlimitAvailable; -} - -/** - * Apply RLIMIT_AS (virtual address space limit) to a child process. - * When exceeded, mmap/brk fails with ENOMEM, causing the process to crash. - */ -export function applyMemoryLimit( - pid: number, - limitBytes: number, - logger: Logger -): boolean { - try { - execFileSync('prlimit', [ - '--pid', - String(pid), - `--as=${limitBytes}:${limitBytes}`, - ]); - logger.debug( - `prlimit: worker ${pid} RLIMIT_AS set to ${Math.round( - limitBytes / 1024 / 1024 - )}MB` - ); - return true; - } catch (e: any) { - logger.warn(`prlimit: failed to set limit for worker ${pid}:`, e.message); - return false; - } -} - -// Exported for testing only -export function _resetCache(): void { - prlimitAvailable = null; -} diff --git a/packages/engine-multi/test/util/memory.test.ts b/packages/engine-multi/test/util/memory.test.ts new file mode 100644 index 000000000..d28079d60 --- /dev/null +++ b/packages/engine-multi/test/util/memory.test.ts @@ -0,0 +1,17 @@ +import test from 'ava'; +import { b, mb } from '../../src/util/memory'; + +test('mb converts bytes to megabytes', (t) => { + t.is(mb(1024 * 1024), 1); + t.is(mb(512 * 1024 * 1024), 512); +}); + +test('b converts megabytes to bytes', (t) => { + t.is(b(1), 1024 * 1024); + t.is(b(512), 512 * 1024 * 1024); +}); + +test('b and mb are inverses', (t) => { + t.is(mb(b(256)), 256); + t.is(b(mb(256 * 1024 * 1024)), 256 * 1024 * 1024); +}); diff --git a/packages/engine-multi/test/worker/rlimit.test.ts b/packages/engine-multi/test/worker/limit.test.ts similarity index 76% rename from packages/engine-multi/test/worker/rlimit.test.ts rename to packages/engine-multi/test/worker/limit.test.ts index 71ade8b0e..68d227dce 100644 --- a/packages/engine-multi/test/worker/rlimit.test.ts +++ b/packages/engine-multi/test/worker/limit.test.ts @@ -2,17 +2,32 @@ import test from 'ava'; import { createMockLogger } from '@openfn/logger'; import { + calculateLimits, detectPrlimitSupport, applyMemoryLimit, _resetCache, -} from '../../src/worker/rlimit'; - +} from '../../src/worker/limits'; const logger = createMockLogger(); test.beforeEach(() => { _resetCache(); }); +test('calculateLimits divides available memory evenly across workers', (t) => { + const result = calculateLimits(100, 20, 2); + t.is(result, 40); +}); + +test('calculateLimits floors the result', (t) => { + const result = calculateLimits(100, 20, 3); + t.is(result, 26); +}); + +test('calculateLimits with capacity of 1', (t) => { + const result = calculateLimits(100, 20, 1); + t.is(result, 80); +}); + test('detectPrlimitSupport caches the result across calls', (t) => { const result1 = detectPrlimitSupport(logger); const result2 = detectPrlimitSupport(logger);