diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index 0dde8b32c..c9890bd50 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,18 @@ # @openfn/integration-tests-worker +## 1.1.0 + +### Minor Changes + +- 8ebc086: Add support for state memory limit via worker + +### Patch Changes + +- Updated dependencies [8ebc086] + - @openfn/engine-multi@1.12.0 + - @openfn/ws-worker@1.27.0 + - @openfn/lightning-mock@2.4.20 + ## 1.0.96 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 7b88e3a83..01cd47ca2 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.96", + "version": "1.1.0", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/worker/test/exit-reasons.test.ts b/integration-tests/worker/test/exit-reasons.test.ts index f378c6ecd..d2bc91fa6 100644 --- a/integration-tests/worker/test/exit-reasons.test.ts +++ b/integration-tests/worker/test/exit-reasons.test.ts @@ -226,6 +226,92 @@ test.serial('kill: oom (large, kill vm)', async (t) => { t.is(error_message, 'Run exceeded maximum memory usage'); }); +test.serial('kill: state exceeds the configured state limit', async (t) => { + const attempt = { + id: crypto.randomUUID(), + jobs: [ + { + adaptor: '@openfn/language-common@latest', + // ~2mb string for a 1mb limit + body: `fn((s) => { + s.data = new Array(2 * 1024 * 1024).fill('a').join(''); + return s; + })`, + }, + ], + options: { + state_limit_mb: 1, + }, + }; + + const result = await run(attempt); + + const { reason, error_type, error_message } = result; + t.is(reason, 'kill'); + t.is(error_type, 'StateTooLargeError'); + t.regex(error_message, /State exceeds the limit of 1mb/); +}); + +test.serial( + 'kill: state limit is enforced between jobs (downstream job does not run)', + async (t) => { + const jobOne = { + id: crypto.randomUUID(), + adaptor: '@openfn/language-common@latest', + // ~2mb state, over the 1mb limit set below + body: `fn((s) => { + s.data = new Array(2 * 1024 * 1024).fill('a').join(''); + return s; + })`, + }; + + // not expected to run because the first job is expected to trigger state size crash + const jobTwo = { + id: crypto.randomUUID(), + adaptor: '@openfn/language-common@latest', + body: `fn(() => ({ data: 'ok' }))`, + }; + + const attempt = { + id: crypto.randomUUID(), + jobs: [jobOne, jobTwo], + edges: [ + { + id: crypto.randomUUID(), + source_job_id: jobOne.id, + target_job_id: jobTwo.id, + condition: 'always', + }, + ], + options: { + state_limit_mb: 1, + }, + }; + + const startedJobs: string[] = []; + const unsubscribe = lightning.onSocketEvent( + 'step:start', + attempt.id, + (evt) => { + if (evt.runId === attempt.id) { + startedJobs.push(evt.payload.job_id); + } + }, + false + ); + + const result = await run(attempt); + unsubscribe(); + + const { reason, error_type, error_message } = result; + t.is(reason, 'kill'); + t.is(error_type, 'StateTooLargeError'); + t.regex(error_message, /State exceeds the limit of 1mb/); + + t.deepEqual(startedJobs, [jobOne.id]); + } +); + test.serial('crash: process.exit() triggered by postgres', async (t) => { const attempt = { id: crypto.randomUUID(), diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index 7687e6f44..287d80d6b 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,17 @@ # engine-multi +## 1.12.0 + +### Minor Changes + +- 8ebc086: Add support for state memory limit via worker +- reduce default size of state objects + +### Patch Changes + +- Updated dependencies [8ebc086] + - @openfn/lexicon@2.3.0 + ## 1.11.4 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 1e4a1f6c4..35ef12543 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.11.4", + "version": "1.12.0", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/engine-multi/src/api/execute.ts b/packages/engine-multi/src/api/execute.ts index 4281eae9f..57f5fa561 100644 --- a/packages/engine-multi/src/api/execute.ts +++ b/packages/engine-multi/src/api/execute.ts @@ -50,7 +50,7 @@ const execute = async (context: ExecutionContext) => { // This must be fairly high to prevent crashes stateLimitMb: options.stateLimitMb ?? - Math.max((options.memoryLimitMb ?? 1000) * 0.25), + Math.max(50, options.memoryLimitMb ?? 1000 * 0.15), } as RunOptions; logger.debug( diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index 9360d5c82..38467b97c 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -74,6 +74,7 @@ export type EngineOptions = { logger: Logger; maxWorkers?: number; memoryLimitMb?: number; + stateLimitMb?: number; payloadLimitMb?: number; logPayloadLimitMb?: number; repoDir: string; @@ -169,7 +170,7 @@ const createEngine = async ( callWorker, options: { ...options, - stateLimitMb: opts.stateLimitMb, + stateLimitMb: opts.stateLimitMb ?? options.stateLimitMb, sanitize: opts.sanitize, resolvers: opts.resolvers, runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout, diff --git a/packages/lexicon/CHANGELOG.md b/packages/lexicon/CHANGELOG.md index a7a38c52e..85715c9ec 100644 --- a/packages/lexicon/CHANGELOG.md +++ b/packages/lexicon/CHANGELOG.md @@ -1,5 +1,11 @@ # lexicon +## 2.3.0 + +### Minor Changes + +- 8ebc086: Add support for state memory limit via worker + ## 2.2.1 ### Patch Changes diff --git a/packages/lexicon/lightning.d.ts b/packages/lexicon/lightning.d.ts index aa93a7304..8554c0fb6 100644 --- a/packages/lexicon/lightning.d.ts +++ b/packages/lexicon/lightning.d.ts @@ -59,6 +59,7 @@ export type LightningPlanOptions = { output_dataclips?: boolean; run_memory_limit_mb?: number; + state_limit_mb?: number; payload_limit_mb?: number; log_payload_limit_mb?: number; job_log_level?: LogLevel; diff --git a/packages/lexicon/package.json b/packages/lexicon/package.json index b8671c947..7ea4cc990 100644 --- a/packages/lexicon/package.json +++ b/packages/lexicon/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lexicon", - "version": "2.2.1", + "version": "2.3.0", "description": "Central repo of names and type definitions", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index bef369a56..b26ec2745 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/lightning-mock +## 2.4.20 + +### Patch Changes + +- Updated dependencies [8ebc086] + - @openfn/engine-multi@1.12.0 + - @openfn/lexicon@2.3.0 + ## 2.4.19 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 649bfbb18..6f92f16ed 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.4.19", + "version": "2.4.20", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index e5ef79a52..1b80106ba 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -91,7 +91,7 @@ const prepareFinalState = async ( if (isNullState(state)) return undefined; if (state) { try { - await ensureStateSize(state, stateLimit_mb); + await ensureStateSize(state, stateLimit_mb, logger); } catch (e: any) { logger.error('Critical error processing state: ', e.message); throw e; diff --git a/packages/runtime/src/util/ensure-state-size.ts b/packages/runtime/src/util/ensure-state-size.ts index 84ab9744b..98096a9e6 100644 --- a/packages/runtime/src/util/ensure-state-size.ts +++ b/packages/runtime/src/util/ensure-state-size.ts @@ -1,5 +1,6 @@ import { JsonStreamStringify } from 'json-stream-stringify'; import { StateTooLargeError } from '../errors'; +import { Logger } from '@openfn/logger'; const replacer = (_key: string, value: any) => { // Ignore non serializable keys @@ -15,7 +16,7 @@ const replacer = (_key: string, value: any) => { }; // throws if state exceeds a particular size limit -export default async (value: any, limit_mb: number = 500) => { +export default async (value: any, limit_mb: number = 500, logger?: Logger) => { if (value && !isNaN(limit_mb) && limit_mb > 0) { const limitBytes = limit_mb * 1024 * 1024; let size_bytes = 0; @@ -25,8 +26,20 @@ export default async (value: any, limit_mb: number = 500) => { size_bytes += Buffer.byteLength(chunk, 'utf8'); if (size_bytes > limitBytes) { + logger?.info( + `state object exceeds limit ${limit_mb} (${ + size_bytes / 1024 / 1024 + }mb)` + ); throw new StateTooLargeError(limit_mb); } } + if (size_bytes < 1024 * 1024) { + logger?.debug(`State object serializes to less than 1mb`); + } else { + logger?.debug( + `State object serializes to ${(size_bytes / 1024 / 1024).toFixed(2)}mb` + ); + } } }; diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 479a569e1..aad53806e 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,18 @@ # ws-worker +## 1.27.0 + +### Minor Changes + +- 8ebc086: Add support for state memory limit via worker +- reduce default size of state objects + +### Patch Changes + +- Updated dependencies [8ebc086] + - @openfn/engine-multi@1.12.0 + - @openfn/lexicon@2.3.0 + ## 1.26.1 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 8739662d3..affd051da 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.26.1", + "version": "1.27.0", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/start.ts b/packages/ws-worker/src/start.ts index 9d901c7af..4e08e2601 100644 --- a/packages/ws-worker/src/start.ts +++ b/packages/ws-worker/src/start.ts @@ -111,6 +111,7 @@ if (args.mock) { const engineOptions = { repoDir: args.repoDir, memoryLimitMb: args.runMemory, + stateLimitMb: args.stateMemory, maxWorkers: effectiveCapacity, statePropsToRemove: args.statePropsToRemove, runTimeoutMs: args.maxRunDurationSeconds * 1000, diff --git a/packages/ws-worker/src/util/cli.ts b/packages/ws-worker/src/util/cli.ts index ebae7ccb5..f2850a383 100644 --- a/packages/ws-worker/src/util/cli.ts +++ b/packages/ws-worker/src/util/cli.ts @@ -38,6 +38,7 @@ type Args = { profilePollIntervalMs?: number; repoDir?: string; runMemory?: number; + stateMemory?: number; secret?: string; sentryDsn?: string; sentryEnv?: string; @@ -91,6 +92,7 @@ export default function parseArgs(argv: string[]): Args { WORKER_MAX_LOG_PAYLOAD_MB, WORKER_MAX_RUN_DURATION_SECONDS, WORKER_MAX_RUN_MEMORY_MB, + WORKER_MAX_STATE_MEMORY_MB, WORKER_MESSAGE_TIMEOUT_SECONDS, WORKER_PORT, WORKER_PROFILE_POLL_INTERVAL_MS, @@ -219,6 +221,11 @@ export default function parseArgs(argv: string[]): Args { 'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB', type: 'number', }) + .option('state-memory', { + description: + 'Maximum size of the state object returned by each step, in mb. Defaults to 25% of run-memory. Env: WORKER_MAX_STATE_MEMORY_MB', + type: 'number', + }) .option('payload-memory', { description: 'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_PAYLOAD_MB', @@ -326,6 +333,12 @@ export default function parseArgs(argv: string[]): Args { ['configuration', 'response'] ), runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500), + // No default: when unset the engine derives the limit from run-memory (25%) + stateMemory: + args.stateMemory ?? + (WORKER_MAX_STATE_MEMORY_MB + ? parseInt(WORKER_MAX_STATE_MEMORY_MB, 10) + : undefined), payloadMemory: setArg(args.payloadMemory, WORKER_MAX_PAYLOAD_MB, 10), logPayloadMemory: setArg( args.logPayloadMemory, diff --git a/packages/ws-worker/src/util/convert-lightning-plan.ts b/packages/ws-worker/src/util/convert-lightning-plan.ts index 4307e7dbe..f9fe2180b 100644 --- a/packages/ws-worker/src/util/convert-lightning-plan.ts +++ b/packages/ws-worker/src/util/convert-lightning-plan.ts @@ -146,6 +146,9 @@ export default ( if ('run_memory_limit_mb' in run.options) { engineOpts.memoryLimitMb = run.options.run_memory_limit_mb; } + if ('state_limit_mb' in run.options) { + engineOpts.stateLimitMb = run.options.state_limit_mb; + } if ('sanitize' in run.options) { engineOpts.sanitize = run.options.sanitize; } diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index 8d0e70686..1e33e615d 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -301,8 +301,11 @@ test('loadDataclip report to sentry on fail', async (t) => { } catch (e) {} const reports = await waitForSentryReport(testkit); - t.is(reports.length, 1); - t.is(reports[0].error.name, 'LightningSocketError'); + const dataclip_report = reports.find((r: any) => + /fetch\:dataclip.+not_found/i.test(r.error.message) + ); + t.truthy(dataclip_report); + t.is(dataclip_report.error.name, 'LightningSocketError'); }); test('loadCredential should fetch a credential', async (t) => { diff --git a/packages/ws-worker/test/util/cli.test.ts b/packages/ws-worker/test/util/cli.test.ts index 698008dc8..be156dd72 100644 --- a/packages/ws-worker/test/util/cli.test.ts +++ b/packages/ws-worker/test/util/cli.test.ts @@ -62,6 +62,7 @@ test('cli should set default values for unspecified options', (t) => { t.falsy(args.sentryDsn); t.deepEqual(args.statePropsToRemove, ['configuration', 'response']); t.is(args.runMemory, 500); + t.is(args.stateMemory, undefined); t.is(args.maxRunDurationSeconds, 300); t.is(args.engineValidationRetries, 3); t.is(args.engineValidationTimeoutMs, 5000); diff --git a/packages/ws-worker/test/util/convert-lightning-plan.test.ts b/packages/ws-worker/test/util/convert-lightning-plan.test.ts index 12a11a0e4..224855125 100644 --- a/packages/ws-worker/test/util/convert-lightning-plan.test.ts +++ b/packages/ws-worker/test/util/convert-lightning-plan.test.ts @@ -123,6 +123,25 @@ test('convert a single job with options', (t) => { }); }); +test('convert a single job with state_limit_mb', (t) => { + const run: Partial = { + id: 'w', + jobs: [createNode()], + triggers: [], + edges: [], + options: { + run_memory_limit_mb: 500, + state_limit_mb: 50, + }, + }; + const { options } = convertPlan(run as LightningPlan); + + t.deepEqual(options, { + memoryLimitMb: 500, + stateLimitMb: 50, + }); +}); + test('convert a single job with log_payload_limit_mb', (t) => { const run: Partial = { id: 'w',