Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ WORKDIR /app

# TODO: remove simple build once prod optimized build is working ---------------
FROM base AS ws-worker
RUN apk add --no-cache git
RUN apk add --no-cache git util-linux
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --frozen-lockfile
RUN pnpm build
WORKDIR /app/packages/ws-worker
Expand Down
4 changes: 2 additions & 2 deletions packages/engine-multi/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export type APIOptions = Partial<Omit<EngineOptions, 'whitelist'>>;

const DEFAULT_REPO_DIR = path.join(os.homedir(), '.openfn/worker/repo');

const DEFAULT_MEMORY_LIMIT = 500;
const DEFAULT_RUN_MEMORY_LIMIT = 500;

// Create the engine and handle user-facing stuff, like options parsing
// and defaulting
Expand Down Expand Up @@ -57,7 +57,7 @@ const createAPI = async function (
autoinstall: options.autoinstall,

maxWorkers: options.maxWorkers,
memoryLimitMb: options.memoryLimitMb || DEFAULT_MEMORY_LIMIT,
memoryLimitMb: options.memoryLimitMb || DEFAULT_RUN_MEMORY_LIMIT,
runTimeoutMs: options.runTimeoutMs,

statePropsToRemove: options.statePropsToRemove ?? [
Expand Down
3 changes: 3 additions & 0 deletions packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export type WorkerEvent = {

type WorkerOptions = {
maxWorkers?: number;
maxWorkerMemoryMb?: number; // kernel-level memory limit per child process (cgroup v2)
env?: any;
timeout?: number; // ms
memoryLimitMb?: number;
Expand All @@ -26,6 +27,7 @@ export default function initWorkers(
const {
env = {},
maxWorkers = 5,
maxWorkerMemoryMb,
memoryLimitMb,
proxyStdout = false,
} = options;
Expand All @@ -34,6 +36,7 @@ export default function initWorkers(
workerPath,
{
maxWorkers,
maxWorkerMemoryMb,
env,
memoryLimitMb,
proxyStdout,
Expand Down
6 changes: 6 additions & 0 deletions packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ export type EngineOptions = {
// compile?: { skip?: boolean } // TODO no support yet
logger: Logger;
maxWorkers?: number;

/** Sets the maximum total memory usable by the engine. Otherwise uses all available memory */
maxEngineMemoryMb?: number;

/** Memory limit per run */
memoryLimitMb?: number;
payloadLimitMb?: number;
logPayloadLimitMb?: number;
Expand Down Expand Up @@ -141,6 +146,7 @@ const createEngine = async (
{
maxWorkers: options.maxWorkers,
memoryLimitMb: defaultMemoryLimit,
maxWorkerMemoryMb: defaultMemoryLimit,
proxyStdout: options.proxyStdout,
},
options.logger
Expand Down
3 changes: 3 additions & 0 deletions packages/engine-multi/src/util/memory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export const mb = (valueInBytes: number) => valueInBytes / 1024 / 1024;

export const b = (valueInMb: number) => valueInMb * 1024 * 1024;
118 changes: 118 additions & 0 deletions packages/engine-multi/src/worker/limits.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import process from 'node:process';
import os from 'node:os';
import { execSync } from 'node:child_process';
import type { Logger } from '@openfn/logger';
import { ChildProcess } from 'node:child_process';
import { b, mb } from '../util/memory';
import type { PoolOptions } from './pool';

let prlimitAvailable: boolean | null = null;

/**
* Check if the prlimit command is available (Linux with util-linux).
* Result is cached for the process lifetime.
*/
export function detectPrlimitSupport(): boolean {
if (prlimitAvailable === null) {
try {
execSync('prlimit', ['--version'], { stdio: 'ignore' });
prlimitAvailable = true;
} catch {
prlimitAvailable = false;
}
}

return prlimitAvailable;
}

export const getAvailableMemory = (options: PoolOptions): number => {
if (options.totalMemoryMb) {
return b(options.totalMemoryMb);
}
const total = os.totalmem();
const constrained = process.constrainedMemory?.() ?? total;
return Math.floor(Math.min(total, constrained));
};

export const calculateLimits = (
totalMemory_bytes: number,
mainProcessOverhead_bytes: number,
capacity: number
): number => {
return Math.floor((totalMemory_bytes - mainProcessOverhead_bytes) / capacity);
};

export function setHardMemoryLimit(
child: ChildProcess,
limit_bytes: number,
logger: Logger
) {
if (prlimitAvailable) {
logger.debug(
`pool: setting hard limit on pid ${child.pid} to ${Math.floor(
mb(limit_bytes)
)}mb`
);
const roundLimit = Math.round(limit_bytes);

// this will set the hard and soft limits at once
// The hard limit is the absolute maximum we can possibly set
// (but doesn't actually allocate memory, just sets a ceiling)
// The soft limit can be changed per run
execSync(`prlimit --pid=${child.pid} --as=${roundLimit}`);

// Pretty print the result
const out = execSync(
`prlimit --pid=${child.pid} --as --noheadings -o "SOFT,HARD,UNITS"`
);
logger.debug(' > ', out.toString());
}
}

// TODO before each run starst, we should set the soft limit
// to that run's limit
/**
* Set a memory limit on a child process
* If prlmit is available, this will set the soft limit
*
* Apply RLIMIT_AS (virtual address space limit) to a child process.
* When exceeded, mmap/brk fails with ENOMEM, causing the process to crash.
*/

// TODO: it's plausible that du to bad config the hard limit
// is lower than the actual run memory limit
// if this true we'll get an error here
// we should probably od a check and raise a warning, then use the smallest of soft and hard limit
// most runs should be quite happy to run in this
export function applyMemoryLimit(
child: ChildProcess,
limitBytes: number,
logger: Logger
): boolean {
if (prlimitAvailable) {
const pid = child.pid;
try {
console.log({ hardLimit });
console.log(`prlimit --pid ${child.pid} --as=${limitBytes}:`);
// note we still have to pass the hard limit here
const out = execSync(`prlimit --pid ${child.pid} --as=${limitBytes}:`);
console.log({ out: out.toString() });
logger.debug(
`Soft memory limit on worker ${pid} set to ${Math.round(
mb(limitBytes)
)}MB`
);
return true;
} catch (e: any) {
logger.warn(`Failed to set soft for worker ${pid}:`, e.message);
return false;
}
}
return false;
}

// TODO rename this. maybe make it setprlimit rather than a global reset
// Exported for testing only
export function _resetCache(): void {
prlimitAvailable = null;
}
44 changes: 42 additions & 2 deletions packages/engine-multi/src/worker/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,30 @@ import {
import { HANDLED_EXIT_CODE } from '../events';
import { Logger } from '@openfn/logger';
import type { PayloadLimits } from './thread/runtime';
import {
detectPrlimitSupport,
getAvailableMemory,
calculateLimits,
setHardMemoryLimit,
applyMemoryLimit,
} from './limits';
import { b, mb } from '../util/memory';

export type PoolOptions = {
capacity?: number; // defaults to 5
maxWorkers?: number; // alias for capacity. Which is best?

/* Total memory available to the whole engine (will auto-detect if not set) */
totalMemoryMb?: number;
env?: Record<string, string>; // default environment for workers
memoryLimitMb?: number; // --max-old-space-size for child processes

proxyStdout?: boolean; // print internal stdout to console
};

// TODO set through options
const ENGINE_MAIN_OVERHEAD_MB = 400;

type RunTaskEvent = {
type: typeof ENGINE_RUN_TASK;
task: string;
Expand Down Expand Up @@ -72,7 +86,22 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
logger.debug(`pool: Creating new child process pool | capacity: ${capacity}`);
let destroyed = false;

// a pool of processes
const hasPrlimit = detectPrlimitSupport();
let hardMemCap_bytes;

if (hasPrlimit) {
hardMemCap_bytes = calculateLimits(
getAvailableMemory(options),
ENGINE_MAIN_OVERHEAD_MB,
capacity
);
logger.info(
`Memory enforcement enabled | hard limit: ${Math.floor(
mb(hardMemCap_bytes)
)}mb per child`
);
}

const pool: ChildProcessPool = new Array(capacity).fill(false);

const queue: QueuedTask[] = [];
Expand All @@ -86,6 +115,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
// create a new child process and load the module script into it
const execArgv = ['--experimental-vm-modules', '--no-warnings'];
if (options.memoryLimitMb) {
// TODO this is just a fallback value - prlimit is the primary
// mechanism for controlling memory use
execArgv.push(`--max-old-space-size=${options.memoryLimitMb}`);
}

Expand All @@ -105,12 +136,20 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
});
}

setHardMemoryLimit(child, hardMemCap_bytes!, logger);

logger.debug('pool: Created new child process', child.pid);
allWorkers[child.pid!] = child;
} else {
child = maybeChild as ChildProcess;
logger.debug('pool: Using existing child process', child.pid);
}

// TODO: memory limit is currently set for all runs in the worker according to config
// We could now set this per-run by passing a run option
if (options.memoryLimitMb) {
applyMemoryLimit(child, b(options.memoryLimitMb), logger);
}
return child;
};

Expand Down Expand Up @@ -148,7 +187,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {

const promise = new Promise<T>(async (resolve, reject) => {
// TODO what should we do if a process in the pool dies, perhaps due to OOM?
const onExit = async (code: number) => {
const onExit = async (code: number, e) => {
console.log(code, e);
if (code !== HANDLED_EXIT_CODE) {
logger.debug(`pool: Worker exited unexpectedly with code ${code}`);
clearTimeout(timeout);
Expand Down
17 changes: 17 additions & 0 deletions packages/engine-multi/test/util/memory.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import test from 'ava';
import { b, mb } from '../../src/util/memory';

test('mb converts bytes to megabytes', (t) => {
t.is(mb(1024 * 1024), 1);
t.is(mb(512 * 1024 * 1024), 512);
});

test('b converts megabytes to bytes', (t) => {
t.is(b(1), 1024 * 1024);
t.is(b(512), 512 * 1024 * 1024);
});

test('b and mb are inverses', (t) => {
t.is(mb(b(256)), 256);
t.is(b(mb(256 * 1024 * 1024)), 256 * 1024 * 1024);
});
66 changes: 66 additions & 0 deletions packages/engine-multi/test/worker/limit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import test from 'ava';
import { createMockLogger } from '@openfn/logger';

import {
calculateLimits,
detectPrlimitSupport,
applyMemoryLimit,
_resetCache,
} from '../../src/worker/limits';
const logger = createMockLogger();

test.beforeEach(() => {
_resetCache();
});

test('calculateLimits divides available memory evenly across workers', (t) => {
const result = calculateLimits(100, 20, 2);
t.is(result, 40);
});

test('calculateLimits floors the result', (t) => {
const result = calculateLimits(100, 20, 3);
t.is(result, 26);
});

test('calculateLimits with capacity of 1', (t) => {
const result = calculateLimits(100, 20, 1);
t.is(result, 80);
});

test('detectPrlimitSupport caches the result across calls', (t) => {
const result1 = detectPrlimitSupport(logger);
const result2 = detectPrlimitSupport(logger);
t.is(result1, result2);
});

// On macOS, prlimit is not available
const isLinux = process.platform === 'linux';

if (!isLinux) {
test('detectPrlimitSupport returns false on non-Linux', (t) => {
const result = detectPrlimitSupport(logger);
t.false(result);
});
}

test('applyMemoryLimit returns false when prlimit is not available', (t) => {
if (detectPrlimitSupport(logger)) {
t.pass('prlimit is available — skipping negative test');
return;
}
const result = applyMemoryLimit(99999, 500 * 1024 * 1024, logger);
t.false(result);
});

// Integration tests — only run on Linux with prlimit available
const hasPrlimit = isLinux && detectPrlimitSupport(createMockLogger());
_resetCache(); // reset after the check so tests start clean

const prlimitTest = hasPrlimit ? test : test.skip;

prlimitTest('applyMemoryLimit succeeds on own process', (t) => {
// Apply a very generous limit to our own process (won't interfere with test)
const result = applyMemoryLimit(process.pid, 8 * 1024 * 1024 * 1024, logger);
t.true(result);
});