Skip to content

Commit 064933d

Browse files
authored
Limit state size in runtime (#1209)
* runtime: add function to ensure that state size does not exceed a particular limit * add new stateLimit_mb option * engine: support state limit mb as an option I don't think this is the right option in the right place - I'll check into that later * refactor rutime option * tidying up * fix failing test in mock * more generous timeout on flaky test * add debug log * changeset * tweak memory limit Drop the payload size thing - runtime memory will always be higher * refine options * runtime: type safety
1 parent f089f8d commit 064933d

File tree

20 files changed

+324
-31
lines changed

20 files changed

+324
-31
lines changed

.changeset/ready-items-shout.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@openfn/engine-multi': minor
3+
'@openfn/ws-worker': minor
4+
'@openfn/runtime': minor
5+
---
6+
7+
Measure the size of state objects at the end of each step, and throw if they exceed a limit
8+
9+
In the Worker, this limit is set to 25% of the available runtime memory.

packages/engine-multi/src/api/execute.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,17 @@ const execute = async (context: ExecutionContext) => {
4646
repoDir: options.repoDir,
4747
profile: context.options.profile,
4848
profilePollInteval: context.options.profilePollInterval,
49+
// work out the max size of the state object at the end of each step
50+
// This must be fairly high to prevent crashes
51+
stateLimitMb:
52+
options.stateLimitMb ??
53+
Math.max((options.memoryLimitMb ?? 1000) * 0.25),
4954
} as RunOptions;
5055

56+
logger.debug(
57+
`${state.plan.id} setting runtime state limit to ${runOptions.stateLimitMb}mb`
58+
);
59+
5160
// Construct the payload limits object
5261
const payloadLimits: PayloadLimits = {};
5362
if (options.payloadLimitMb !== undefined) {

packages/engine-multi/src/engine.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ const createEngine = async (
167167
callWorker,
168168
options: {
169169
...options,
170+
stateLimitMb: opts.stateLimitMb,
170171
sanitize: opts.sanitize,
171172
resolvers: opts.resolvers,
172173
runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout,

packages/engine-multi/src/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export type ExecutionContextConstructor = {
4545
};
4646

4747
export type ExecuteOptions = {
48+
stateLimitMb?: number;
4849
payloadLimitMb?: number;
4950
logPayloadLimitMb?: number;
5051
memoryLimitMb?: number;
@@ -72,7 +73,7 @@ export interface RuntimeEngine {
7273
execute(
7374
plan: ExecutionPlan,
7475
input: State,
75-
options?: Partial<EngineOptions>
76+
options?: ExecuteOptions
7677
): Pick<EventEmitter, 'on' | 'off' | 'once'>;
7778

7879
destroy(instant?: boolean): Promise<void>;

packages/engine-multi/src/worker/thread/run.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export type RunOptions = {
2020
jobLogLevel?: LogLevel;
2121
profile?: boolean;
2222
profilePollInterval?: number;
23+
stateLimitMb?: number;
2324
};
2425

2526
const eventMap = {
@@ -39,6 +40,7 @@ register({
3940
jobLogLevel,
4041
profile,
4142
profilePollInterval,
43+
stateLimitMb,
4244
} = runOptions;
4345
const { logger, jobLogger, adaptorLogger } = createLoggers(
4446
plan.id!,
@@ -54,9 +56,8 @@ register({
5456
console = adaptorLogger;
5557

5658
// Leave console.debug for local debugging
57-
// This goes to stdout but not the adapator logger
59+
// This goes to stdout but not the adaptor logger
5860
console.debug = debug;
59-
6061
// TODO I would like to pull these options out of here
6162
const options = {
6263
// disable the runtime's own timeout
@@ -72,6 +73,7 @@ register({
7273
profile,
7374
profilePollInterval,
7475
statePropsToRemove,
76+
stateLimitMb,
7577
callbacks: {
7678
// TODO: this won't actually work across the worker boundary
7779
// For now I am preloading credentials

packages/engine-multi/test/errors.test.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,35 @@ test.serial.skip('vm oom error', (t) => {
165165
});
166166
});
167167

168+
test.serial('state object too big', async (t) => {
169+
return new Promise((done) => {
170+
const plan = {
171+
id: 'x',
172+
workflow: {
173+
steps: [
174+
{
175+
expression: `export default [(s) => {
176+
s.data = new Array(1024 * 1024).fill("x").join("");
177+
return s;
178+
}]`,
179+
},
180+
],
181+
},
182+
};
183+
184+
const options = {
185+
stateLimitMb: 0.1,
186+
};
187+
188+
engine.execute(plan, {}, options).on(WORKFLOW_ERROR, (evt) => {
189+
t.is(evt.type, 'StateTooLargeError');
190+
t.is(evt.severity, 'kill');
191+
t.is(evt.message, 'State exceeds the limit of 0.1mb');
192+
done();
193+
});
194+
});
195+
});
196+
168197
test.serial('execution error from async code', (t) => {
169198
return new Promise((done) => {
170199
const plan = {

packages/runtime/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
"dependencies": {
4949
"@openfn/logger": "workspace:*",
5050
"fast-safe-stringify": "^2.1.1",
51+
"json-stream-stringify": "^3.1.6",
5152
"semver": "^7.7.2",
5253
"source-map": "^0.7.6"
5354
}

packages/runtime/src/errors.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,15 @@ export const extractStackTrace = (e: Error) => {
9191
}
9292
};
9393

94+
export class StateTooLargeError extends Error {
95+
name = 'StateTooLargeError';
96+
severity = 'kill';
97+
constructor(limit_mb: number) {
98+
super();
99+
this.message = `State exceeds the limit of ${limit_mb}mb`;
100+
}
101+
}
102+
94103
// Abstract error supertype
95104
export class RTError extends Error {
96105
source = 'runtime';

packages/runtime/src/execute/step.ts

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
import { isNullState } from '../util/null-state';
2121
import sourcemapErrors from '../util/sourcemap-errors';
2222
import createProfiler from '../util/profile-memory';
23+
import ensureStateSize from '../util/ensure-state-size';
2324

2425
const loadCredentials = async (
2526
job: Job,
@@ -79,13 +80,21 @@ const calculateNext = (job: CompiledStep, result: any, logger: Logger) => {
7980

8081
// TODO this is suboptimal and may be slow on large objects
8182
// (especially as the result get stringified again downstream)
82-
const prepareFinalState = (
83+
const prepareFinalState = async (
8384
state: any,
8485
logger: Logger,
85-
statePropsToRemove?: string[]
86+
statePropsToRemove?: string[],
87+
stateLimit_mb?: number
8688
) => {
8789
if (isNullState(state)) return undefined;
8890
if (state) {
91+
try {
92+
await ensureStateSize(state, stateLimit_mb);
93+
} catch (e) {
94+
logger.error('Critical error processing state:');
95+
throw e;
96+
}
97+
8998
if (!statePropsToRemove) {
9099
// As a strict default, remove the configuration key
91100
// tbh this should happen higher up in the stack but it causes havoc in unit testing
@@ -190,7 +199,12 @@ const executeStep = async (
190199
const duration = logger.timer(timerId);
191200
logger.error(`${jobName} aborted with error (${duration})`);
192201

193-
state = prepareFinalState(state, logger, ctx.opts.statePropsToRemove);
202+
state = await prepareFinalState(
203+
state,
204+
logger,
205+
ctx.opts.statePropsToRemove,
206+
ctx.opts.stateLimitMb
207+
);
194208
// Whatever the final state was, save that as the initial state to the next thing
195209
result = state;
196210

@@ -219,7 +233,12 @@ const executeStep = async (
219233
if (!didError) {
220234
const humanDuration = logger.timer(timerId);
221235
logger.success(`${jobName} completed in ${humanDuration}`);
222-
result = prepareFinalState(result, logger, ctx.opts.statePropsToRemove);
236+
result = await prepareFinalState(
237+
result,
238+
logger,
239+
ctx.opts.statePropsToRemove,
240+
ctx.opts.stateLimitMb
241+
);
223242

224243
// Take a memory snapshot
225244
// IMPORTANT: this runs _after_ the state object has been serialized

packages/runtime/src/runtime.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ export type Options = {
3838

3939
/** Optional name for the expression (if passed a string) */
4040
defaultStepId?: string;
41+
42+
stateLimitMb?: number;
4143
};
4244

4345
type RawOptions = Omit<Options, 'linker'> & {

0 commit comments

Comments
 (0)