Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# @openfn/integration-tests-worker

## 1.1.0

### Minor Changes

- 8ebc086: Add support for state memory limit via worker

### Patch Changes

- Updated dependencies [8ebc086]
- @openfn/engine-multi@1.12.0
- @openfn/ws-worker@1.27.0
- @openfn/lightning-mock@2.4.20

## 1.0.96

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.96",
"version": "1.1.0",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <admin@openfn.org>",
"license": "ISC",
Expand Down
86 changes: 86 additions & 0 deletions integration-tests/worker/test/exit-reasons.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,92 @@ test.serial('kill: oom (large, kill vm)', async (t) => {
t.is(error_message, 'Run exceeded maximum memory usage');
});

test.serial('kill: state exceeds the configured state limit', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest',
// ~2mb string for a 1mb limit
body: `fn((s) => {
s.data = new Array(2 * 1024 * 1024).fill('a').join('');
return s;
})`,
},
],
options: {
state_limit_mb: 1,
},
};

const result = await run(attempt);

const { reason, error_type, error_message } = result;
t.is(reason, 'kill');
t.is(error_type, 'StateTooLargeError');
t.regex(error_message, /State exceeds the limit of 1mb/);
});

test.serial(
'kill: state limit is enforced between jobs (downstream job does not run)',
async (t) => {
const jobOne = {
id: crypto.randomUUID(),
adaptor: '@openfn/language-common@latest',
// ~2mb state, over the 1mb limit set below
body: `fn((s) => {
s.data = new Array(2 * 1024 * 1024).fill('a').join('');
return s;
})`,
};

// not expected to run because the first job is expected to trigger state size crash
const jobTwo = {
id: crypto.randomUUID(),
adaptor: '@openfn/language-common@latest',
body: `fn(() => ({ data: 'ok' }))`,
};

const attempt = {
id: crypto.randomUUID(),
jobs: [jobOne, jobTwo],
edges: [
{
id: crypto.randomUUID(),
source_job_id: jobOne.id,
target_job_id: jobTwo.id,
condition: 'always',
},
],
options: {
state_limit_mb: 1,
},
};

const startedJobs: string[] = [];
const unsubscribe = lightning.onSocketEvent(
'step:start',
attempt.id,
(evt) => {
if (evt.runId === attempt.id) {
startedJobs.push(evt.payload.job_id);
}
},
false
);

const result = await run(attempt);
unsubscribe();

const { reason, error_type, error_message } = result;
t.is(reason, 'kill');
t.is(error_type, 'StateTooLargeError');
t.regex(error_message, /State exceeds the limit of 1mb/);

t.deepEqual(startedJobs, [jobOne.id]);
}
);

test.serial('crash: process.exit() triggered by postgres', async (t) => {
const attempt = {
id: crypto.randomUUID(),
Expand Down
12 changes: 12 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# engine-multi

## 1.12.0

### Minor Changes

- 8ebc086: Add support for state memory limit via worker
- reduce default size of state objects

### Patch Changes

- Updated dependencies [8ebc086]
- @openfn/lexicon@2.3.0

## 1.11.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.11.4",
"version": "1.12.0",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const execute = async (context: ExecutionContext) => {
// This must be fairly high to prevent crashes
stateLimitMb:
options.stateLimitMb ??
Math.max((options.memoryLimitMb ?? 1000) * 0.25),
Math.max(50, options.memoryLimitMb ?? 1000 * 0.15),
} as RunOptions;

logger.debug(
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export type EngineOptions = {
logger: Logger;
maxWorkers?: number;
memoryLimitMb?: number;
stateLimitMb?: number;
payloadLimitMb?: number;
logPayloadLimitMb?: number;
repoDir: string;
Expand Down Expand Up @@ -169,7 +170,7 @@ const createEngine = async (
callWorker,
options: {
...options,
stateLimitMb: opts.stateLimitMb,
stateLimitMb: opts.stateLimitMb ?? options.stateLimitMb,
sanitize: opts.sanitize,
resolvers: opts.resolvers,
runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout,
Expand Down
6 changes: 6 additions & 0 deletions packages/lexicon/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# lexicon

## 2.3.0

### Minor Changes

- 8ebc086: Add support for state memory limit via worker

## 2.2.1

### Patch Changes
Expand Down
1 change: 1 addition & 0 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export type LightningPlanOptions = {
output_dataclips?: boolean;

run_memory_limit_mb?: number;
state_limit_mb?: number;
payload_limit_mb?: number;
log_payload_limit_mb?: number;
job_log_level?: LogLevel;
Expand Down
2 changes: 1 addition & 1 deletion packages/lexicon/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lexicon",
"version": "2.2.1",
"version": "2.3.0",
"description": "Central repo of names and type definitions",
"author": "Open Function Group <admin@openfn.org>",
"license": "ISC",
Expand Down
8 changes: 8 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/lightning-mock

## 2.4.20

### Patch Changes

- Updated dependencies [8ebc086]
- @openfn/engine-multi@1.12.0
- @openfn/lexicon@2.3.0

## 2.4.19

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.4.19",
"version": "2.4.20",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
2 changes: 1 addition & 1 deletion packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ const prepareFinalState = async (
if (isNullState(state)) return undefined;
if (state) {
try {
await ensureStateSize(state, stateLimit_mb);
await ensureStateSize(state, stateLimit_mb, logger);
} catch (e: any) {
logger.error('Critical error processing state: ', e.message);
throw e;
Expand Down
15 changes: 14 additions & 1 deletion packages/runtime/src/util/ensure-state-size.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { JsonStreamStringify } from 'json-stream-stringify';
import { StateTooLargeError } from '../errors';
import { Logger } from '@openfn/logger';

const replacer = (_key: string, value: any) => {
// Ignore non serializable keys
Expand All @@ -15,7 +16,7 @@ const replacer = (_key: string, value: any) => {
};

// throws if state exceeds a particular size limit
export default async (value: any, limit_mb: number = 500) => {
export default async (value: any, limit_mb: number = 500, logger?: Logger) => {
if (value && !isNaN(limit_mb) && limit_mb > 0) {
const limitBytes = limit_mb * 1024 * 1024;
let size_bytes = 0;
Expand All @@ -25,8 +26,20 @@ export default async (value: any, limit_mb: number = 500) => {
size_bytes += Buffer.byteLength(chunk, 'utf8');

if (size_bytes > limitBytes) {
logger?.info(
`state object exceeds limit ${limit_mb} (${
size_bytes / 1024 / 1024
}mb)`
);
throw new StateTooLargeError(limit_mb);
}
}
if (size_bytes < 1024 * 1024) {
logger?.debug(`State object serializes to less than 1mb`);
} else {
logger?.debug(
`State object serializes to ${(size_bytes / 1024 / 1024).toFixed(2)}mb`
);
}
}
};
13 changes: 13 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# ws-worker

## 1.27.0

### Minor Changes

- 8ebc086: Add support for state memory limit via worker
- reduce default size of state objects

### Patch Changes

- Updated dependencies [8ebc086]
- @openfn/engine-multi@1.12.0
- @openfn/lexicon@2.3.0

## 1.26.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.26.1",
"version": "1.27.0",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
1 change: 1 addition & 0 deletions packages/ws-worker/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ if (args.mock) {
const engineOptions = {
repoDir: args.repoDir,
memoryLimitMb: args.runMemory,
stateLimitMb: args.stateMemory,
maxWorkers: effectiveCapacity,
statePropsToRemove: args.statePropsToRemove,
runTimeoutMs: args.maxRunDurationSeconds * 1000,
Expand Down
13 changes: 13 additions & 0 deletions packages/ws-worker/src/util/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Args = {
profilePollIntervalMs?: number;
repoDir?: string;
runMemory?: number;
stateMemory?: number;
secret?: string;
sentryDsn?: string;
sentryEnv?: string;
Expand Down Expand Up @@ -91,6 +92,7 @@ export default function parseArgs(argv: string[]): Args {
WORKER_MAX_LOG_PAYLOAD_MB,
WORKER_MAX_RUN_DURATION_SECONDS,
WORKER_MAX_RUN_MEMORY_MB,
WORKER_MAX_STATE_MEMORY_MB,
WORKER_MESSAGE_TIMEOUT_SECONDS,
WORKER_PORT,
WORKER_PROFILE_POLL_INTERVAL_MS,
Expand Down Expand Up @@ -219,6 +221,11 @@ export default function parseArgs(argv: string[]): Args {
'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB',
type: 'number',
})
.option('state-memory', {
description:
'Maximum size of the state object returned by each step, in mb. Defaults to 25% of run-memory. Env: WORKER_MAX_STATE_MEMORY_MB',

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to think about this before merging

I'd missed/forgotten that we're doing this already. Which means the limit in prod right now isn't 500mb (as is the default in the function), it's more like 250mb. And that might be big enough to cause problems still.

Should we reduce the default to 10% of working memory? Is that too strict and small?

Also should we be adding some logging around this? For users or GCP?

I do see a logger.debug line which prints the state size limit but I can't see that being logged in production - what's that all about?

type: 'number',
})
.option('payload-memory', {
description:
'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_PAYLOAD_MB',
Expand Down Expand Up @@ -326,6 +333,12 @@ export default function parseArgs(argv: string[]): Args {
['configuration', 'response']
),
runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500),
// No default: when unset the engine derives the limit from run-memory (25%)
stateMemory:
args.stateMemory ??
(WORKER_MAX_STATE_MEMORY_MB
? parseInt(WORKER_MAX_STATE_MEMORY_MB, 10)
: undefined),
payloadMemory: setArg(args.payloadMemory, WORKER_MAX_PAYLOAD_MB, 10),
logPayloadMemory: setArg(
args.logPayloadMemory,
Expand Down
3 changes: 3 additions & 0 deletions packages/ws-worker/src/util/convert-lightning-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ export default (
if ('run_memory_limit_mb' in run.options) {
engineOpts.memoryLimitMb = run.options.run_memory_limit_mb;
}
if ('state_limit_mb' in run.options) {
engineOpts.stateLimitMb = run.options.state_limit_mb;
}
if ('sanitize' in run.options) {
engineOpts.sanitize = run.options.sanitize;
}
Expand Down
7 changes: 5 additions & 2 deletions packages/ws-worker/test/api/execute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,11 @@ test('loadDataclip report to sentry on fail', async (t) => {
} catch (e) {}

const reports = await waitForSentryReport(testkit);
t.is(reports.length, 1);
t.is(reports[0].error.name, 'LightningSocketError');
const dataclip_report = reports.find((r: any) =>
/fetch\:dataclip.+not_found/i.test(r.error.message)
);
t.truthy(dataclip_report);
t.is(dataclip_report.error.name, 'LightningSocketError');
});

test('loadCredential should fetch a credential', async (t) => {
Expand Down
1 change: 1 addition & 0 deletions packages/ws-worker/test/util/cli.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ test('cli should set default values for unspecified options', (t) => {
t.falsy(args.sentryDsn);
t.deepEqual(args.statePropsToRemove, ['configuration', 'response']);
t.is(args.runMemory, 500);
t.is(args.stateMemory, undefined);
t.is(args.maxRunDurationSeconds, 300);
t.is(args.engineValidationRetries, 3);
t.is(args.engineValidationTimeoutMs, 5000);
Expand Down
Loading