diff --git a/.changeset/ready-items-shout.md b/.changeset/ready-items-shout.md new file mode 100644 index 000000000..e96b78718 --- /dev/null +++ b/.changeset/ready-items-shout.md @@ -0,0 +1,9 @@ +--- +'@openfn/engine-multi': minor +'@openfn/ws-worker': minor +'@openfn/runtime': minor +--- + +Measure the size of state objects at the end of each step, and throw if they exceed a limit + +In the Worker, this limit is set to 25% of the available runtime memory. diff --git a/packages/engine-multi/src/api/execute.ts b/packages/engine-multi/src/api/execute.ts index fabdf4b11..9e6cd0ea4 100644 --- a/packages/engine-multi/src/api/execute.ts +++ b/packages/engine-multi/src/api/execute.ts @@ -46,8 +46,17 @@ const execute = async (context: ExecutionContext) => { repoDir: options.repoDir, profile: context.options.profile, profilePollInteval: context.options.profilePollInterval, + // work out the max size of the state object at the end of each step + // This must be fairly high to prevent crashes + stateLimitMb: + options.stateLimitMb ?? + Math.max((options.memoryLimitMb ?? 1000) * 0.25), } as RunOptions; + logger.debug( + `${state.plan.id} setting runtime state limit to ${runOptions.stateLimitMb}mb` + ); + // Construct the payload limits object const payloadLimits: PayloadLimits = {}; if (options.payloadLimitMb !== undefined) { diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index adf2bb88b..630e334ab 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -167,6 +167,7 @@ const createEngine = async ( callWorker, options: { ...options, + stateLimitMb: opts.stateLimitMb, sanitize: opts.sanitize, resolvers: opts.resolvers, runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout, diff --git a/packages/engine-multi/src/types.ts b/packages/engine-multi/src/types.ts index e42255b1b..ae2a44b7a 100644 --- a/packages/engine-multi/src/types.ts +++ b/packages/engine-multi/src/types.ts @@ -45,6 +45,7 @@ export type ExecutionContextConstructor = { }; export type ExecuteOptions = { + stateLimitMb?: number; payloadLimitMb?: number; logPayloadLimitMb?: number; memoryLimitMb?: number; @@ -72,7 +73,7 @@ export interface RuntimeEngine { execute( plan: ExecutionPlan, input: State, - options?: Partial + options?: ExecuteOptions ): Pick; destroy(instant?: boolean): Promise; diff --git a/packages/engine-multi/src/worker/thread/run.ts b/packages/engine-multi/src/worker/thread/run.ts index 8d54d9a72..a388faf0f 100644 --- a/packages/engine-multi/src/worker/thread/run.ts +++ b/packages/engine-multi/src/worker/thread/run.ts @@ -20,6 +20,7 @@ export type RunOptions = { jobLogLevel?: LogLevel; profile?: boolean; profilePollInterval?: number; + stateLimitMb?: number; }; const eventMap = { @@ -39,6 +40,7 @@ register({ jobLogLevel, profile, profilePollInterval, + stateLimitMb, } = runOptions; const { logger, jobLogger, adaptorLogger } = createLoggers( plan.id!, @@ -54,9 +56,8 @@ register({ console = adaptorLogger; // Leave console.debug for local debugging - // This goes to stdout but not the adapator logger + // This goes to stdout but not the adaptor logger console.debug = debug; - // TODO I would like to pull these options out of here const options = { // disable the runtime's own timeout @@ -72,6 +73,7 @@ register({ profile, profilePollInterval, statePropsToRemove, + stateLimitMb, callbacks: { // TODO: this won't actually work across the worker boundary // For now I am preloading credentials diff --git a/packages/engine-multi/test/errors.test.ts b/packages/engine-multi/test/errors.test.ts index a206b37d2..15781753f 100644 --- a/packages/engine-multi/test/errors.test.ts +++ b/packages/engine-multi/test/errors.test.ts @@ -165,6 +165,35 @@ test.serial.skip('vm oom error', (t) => { }); }); +test.serial('state object too big', async (t) => { + return new Promise((done) => { + const plan = { + id: 'x', + workflow: { + steps: [ + { + expression: `export default [(s) => { + s.data = new Array(1024 * 1024).fill("x").join(""); + return s; + }]`, + }, + ], + }, + }; + + const options = { + stateLimitMb: 0.1, + }; + + engine.execute(plan, {}, options).on(WORKFLOW_ERROR, (evt) => { + t.is(evt.type, 'StateTooLargeError'); + t.is(evt.severity, 'kill'); + t.is(evt.message, 'State exceeds the limit of 0.1mb'); + done(); + }); + }); +}); + test.serial('execution error from async code', (t) => { return new Promise((done) => { const plan = { diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 3bcad74bc..1e87fa818 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -48,6 +48,7 @@ "dependencies": { "@openfn/logger": "workspace:*", "fast-safe-stringify": "^2.1.1", + "json-stream-stringify": "^3.1.6", "semver": "^7.7.2", "source-map": "^0.7.6" } diff --git a/packages/runtime/src/errors.ts b/packages/runtime/src/errors.ts index c31ec5a57..2279df992 100644 --- a/packages/runtime/src/errors.ts +++ b/packages/runtime/src/errors.ts @@ -91,6 +91,15 @@ export const extractStackTrace = (e: Error) => { } }; +export class StateTooLargeError extends Error { + name = 'StateTooLargeError'; + severity = 'kill'; + constructor(limit_mb: number) { + super(); + this.message = `State exceeds the limit of ${limit_mb}mb`; + } +} + // Abstract error supertype export class RTError extends Error { source = 'runtime'; diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 1e72168ea..e30912270 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -20,6 +20,7 @@ import { import { isNullState } from '../util/null-state'; import sourcemapErrors from '../util/sourcemap-errors'; import createProfiler from '../util/profile-memory'; +import ensureStateSize from '../util/ensure-state-size'; const loadCredentials = async ( job: Job, @@ -79,13 +80,21 @@ const calculateNext = (job: CompiledStep, result: any, logger: Logger) => { // TODO this is suboptimal and may be slow on large objects // (especially as the result get stringified again downstream) -const prepareFinalState = ( +const prepareFinalState = async ( state: any, logger: Logger, - statePropsToRemove?: string[] + statePropsToRemove?: string[], + stateLimit_mb?: number ) => { if (isNullState(state)) return undefined; if (state) { + try { + await ensureStateSize(state, stateLimit_mb); + } catch (e) { + logger.error('Critical error processing state:'); + throw e; + } + if (!statePropsToRemove) { // As a strict default, remove the configuration key // tbh this should happen higher up in the stack but it causes havoc in unit testing @@ -190,7 +199,12 @@ const executeStep = async ( const duration = logger.timer(timerId); logger.error(`${jobName} aborted with error (${duration})`); - state = prepareFinalState(state, logger, ctx.opts.statePropsToRemove); + state = await prepareFinalState( + state, + logger, + ctx.opts.statePropsToRemove, + ctx.opts.stateLimitMb + ); // Whatever the final state was, save that as the initial state to the next thing result = state; @@ -219,7 +233,12 @@ const executeStep = async ( if (!didError) { const humanDuration = logger.timer(timerId); logger.success(`${jobName} completed in ${humanDuration}`); - result = prepareFinalState(result, logger, ctx.opts.statePropsToRemove); + result = await prepareFinalState( + result, + logger, + ctx.opts.statePropsToRemove, + ctx.opts.stateLimitMb + ); // Take a memory snapshot // IMPORTANT: this runs _after_ the state object has been serialized diff --git a/packages/runtime/src/runtime.ts b/packages/runtime/src/runtime.ts index 11b60f6c0..ba47d0327 100644 --- a/packages/runtime/src/runtime.ts +++ b/packages/runtime/src/runtime.ts @@ -38,6 +38,8 @@ export type Options = { /** Optional name for the expression (if passed a string) */ defaultStepId?: string; + + stateLimitMb?: number; }; type RawOptions = Omit & { diff --git a/packages/runtime/src/util/clone.ts b/packages/runtime/src/util/clone.ts index 0920f7106..e1f3f9add 100644 --- a/packages/runtime/src/util/clone.ts +++ b/packages/runtime/src/util/clone.ts @@ -1,6 +1,4 @@ import type { State } from '@openfn/lexicon'; import stringify from 'fast-safe-stringify'; -// TODO I'm in the market for the best solution here - immer? deep-clone? -// What should we do if functions are in the state? export default (state: State) => JSON.parse(stringify(state)); diff --git a/packages/runtime/src/util/ensure-state-size.ts b/packages/runtime/src/util/ensure-state-size.ts new file mode 100644 index 000000000..f884d1d44 --- /dev/null +++ b/packages/runtime/src/util/ensure-state-size.ts @@ -0,0 +1,34 @@ +import { JsonStreamStringify } from 'json-stream-stringify'; +import { StateTooLargeError } from '../errors'; + +const replacer = (_key: string, value: any) => { + // Ignore non serializable keys + if ( + value === undefined || + typeof value === 'function' || + value?.constructor?.name === 'Promise' + ) { + return undefined; + } + + return value; +}; + +// throws if state exceeds a particular size limit +export default async (value: any, limit_mb: number = 500) => { + if (value && !isNaN(limit_mb) && limit_mb > 0) { + const limitBytes = limit_mb * 1024 * 1024; + let size_bytes = 0; + const stream = new JsonStreamStringify(value, replacer, 0, true); + for await (const chunk of stream) { + // Each chunk is a string token from the JSON output + size_bytes += Buffer.byteLength(chunk, 'utf8'); + + if (size_bytes > limitBytes) { + stream.destroy(); + throw new StateTooLargeError(limit_mb); + } + } + stream.destroy(); + } +}; diff --git a/packages/runtime/test/runtime.test.ts b/packages/runtime/test/runtime.test.ts index 209fdf865..adcf52bed 100644 --- a/packages/runtime/test/runtime.test.ts +++ b/packages/runtime/test/runtime.test.ts @@ -899,3 +899,40 @@ test('accept a whitelist as a string', async (t) => { t.is(error.message, 'module blacklisted: blah'); } }); + +test('do not enforce state size limit if state is small enough', async (t) => { + const plan: ExecutionPlan = { + workflow: { + steps: [ + { + expression: + 'export default [(s) => { s.data.large = new Array(1024).fill("z").join(""); return s; }]', + }, + ], + }, + }; + + await run(plan, {}, {}); + t.pass('did not fail'); +}); + +test('enforce state size limit from runtime option', async (t) => { + const plan: ExecutionPlan = { + workflow: { + steps: [ + { + expression: + 'export default [(s) => { s.data.large = new Array(1024).fill("z").join(""); return s; }]', + }, + ], + }, + }; + + try { + await run(plan, {}, { stateLimitMb: 1 / 1024 }); + t.fail('Should have thrown StateTooLargeError'); + } catch (error: any) { + t.is(error.name, 'StateTooLargeError'); + t.regex(error.message, /State exceeds the limit/); + } +}); diff --git a/packages/runtime/test/util/ensure-state-size.test.ts b/packages/runtime/test/util/ensure-state-size.test.ts new file mode 100644 index 000000000..dfb5616e3 --- /dev/null +++ b/packages/runtime/test/util/ensure-state-size.test.ts @@ -0,0 +1,108 @@ +import test from 'ava'; +import ensureStateSize from '../../src/util/ensure-state-size'; +import { StateTooLargeError } from '../../src/errors'; + +test('do not throw for limit 0kb', async (t) => { + const state = { data: new Array(1024).fill('z').join('') }; + await t.notThrowsAsync(() => ensureStateSize(state, 0)); +}); + +test('throw for limit 1kb, payload 1kb', async (t) => { + const state = { data: new Array(1024).fill('z').join('') }; + await t.throwsAsync(() => ensureStateSize(state, 1 / 1024), { + instanceOf: StateTooLargeError, + message: 'State exceeds the limit of 0.0009765625mb', + }); +}); + +test('ok for limit 2kb, payload 1kb', async (t) => { + const state = { data: new Array(1024).fill('z').join('') }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('allow circular references', async (t) => { + const state: any = { data: 'test' }; + state.self = state; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('allow circular references 2', async (t) => { + const arr = [{ id: 1 }, { id: 2 }, { id: 3 }]; + const state: any = { data: arr, arr }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle promise in state', async (t) => { + const state = { + data: 'test', + promise: new Promise((r) => r), + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle function in state', async (t) => { + const state = { + data: 'test', + fn: () => 'hello', + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle undefined in state', async (t) => { + const state = { + data: 'test', + undef: undefined, + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle symbol in state', async (t) => { + const state = { + data: 'test', + sym: Symbol('test'), + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle BigInt in state', async (t) => { + const state = { + data: 'test', + big: BigInt(123456789), + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle Date in state', async (t) => { + const state = { + data: 'test', + date: new Date('2024-01-01'), + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle RegExp in state', async (t) => { + const state = { + data: 'test', + regex: /test/gi, + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle Map in state', async (t) => { + const map = new Map(); + map.set('key', 'value'); + const state = { + data: 'test', + map: map, + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle Set in state', async (t) => { + const set = new Set([1, 2, 3]); + const state = { + data: 'test', + set: set, + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); diff --git a/packages/ws-worker/src/mock/runtime-engine.ts b/packages/ws-worker/src/mock/runtime-engine.ts index 282168b7e..2d76b0b74 100644 --- a/packages/ws-worker/src/mock/runtime-engine.ts +++ b/packages/ws-worker/src/mock/runtime-engine.ts @@ -74,7 +74,7 @@ async function createMock() { for (const step of steps) { const job = step as Job; if (typeof job.configuration === 'string') { - // Call the crendtial callback, but don't do anything with it + // Call the credential callback, but don't do anything with it job.configuration = await options.resolvers?.credential?.( job.configuration ); diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index 6e8cd7c39..88c09bdb4 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -312,6 +312,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { if (!('logPayloadLimitMb' in options)) { options.logPayloadLimitMb = app.options.logPayloadLimitMb; } + options.timeoutRetryCount = app.options.timeoutRetryCount; options.timeoutRetryDelay = app.options.timeoutRetryDelayMs ?? app.options.socketTimeoutSeconds; diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index 3f2deeb4a..a16f2b5fa 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -181,14 +181,14 @@ test.serial(`should reset backoff after claim`, (t) => { }); lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { - // set this articially high - if there are no more claims, the test will fail + // set this artificially high - if there are no more claims, the test will fail lastClaimDiff = 10000; // When the run is finished, the claims should resume // but with a smaller backoff setTimeout(() => { t.log('Backoff after run:', lastClaimDiff); - t.true(lastClaimDiff < 5); + t.true(lastClaimDiff < 10); done(); }, 10); }); diff --git a/packages/ws-worker/test/mock/runtime-engine.test.ts b/packages/ws-worker/test/mock/runtime-engine.test.ts index 28842769e..a876baf07 100644 --- a/packages/ws-worker/test/mock/runtime-engine.test.ts +++ b/packages/ws-worker/test/mock/runtime-engine.test.ts @@ -1,15 +1,16 @@ import test from 'ava'; import type { ExecutionPlan } from '@openfn/lexicon'; +import create from '../../src/mock/runtime-engine'; +import { waitForEvent, clone, createPlan } from '../util'; + import type { JobCompletePayload, JobStartPayload, WorkflowCompletePayload, WorkflowStartPayload, } from '@openfn/engine-multi'; -import create from '../../src/mock/runtime-engine'; -import { waitForEvent, clone, createPlan } from '../util'; -import { WorkflowErrorPayload } from '@openfn/engine-multi'; +import type { WorkflowErrorPayload } from '@openfn/engine-multi'; const sampleWorkflow = { id: 'w1', @@ -117,21 +118,30 @@ test.serial('wait function', async (t) => { test.serial( 'resolve credential before job-start if credential is a string', async (t) => { - const wf = clone(sampleWorkflow); - wf.id = t.title; - wf.workflow.steps[0].configuration = 'x'; - - let didCallCredentials; - const credential = async () => { - didCallCredentials = true; - return {}; - }; - - // @ts-ignore - engine.execute(wf, {}, { resolvers: { credential } }); - - await waitForEvent(engine, 'job-start'); - t.true(didCallCredentials); + t.plan(1); + return new Promise((resolve) => { + const wf = clone(sampleWorkflow); + wf.id = t.title; + wf.workflow.steps[0].configuration = 'x'; + + let didCallCredentials = false; + const credential = async () => { + didCallCredentials = true; + return {}; + }; + + engine.listen(wf.id, { + 'job-start': () => { + t.true(didCallCredentials); + }, + 'workflow-complete': () => { + resolve(); + }, + }); + + // @ts-ignore + engine.execute(wf, {}, { resolvers: { credential } }); + }); } ); @@ -145,7 +155,7 @@ test.serial('listen to events', async (t) => { }; const wf = createPlan({ - id: 'j1', + id: t.title, adaptors: ['common@1.0.0'], expression: 'export default [() => { console.log("x"); }]', }); diff --git a/packages/ws-worker/test/reasons.test.ts b/packages/ws-worker/test/reasons.test.ts index d44430d28..0e1bc797d 100644 --- a/packages/ws-worker/test/reasons.test.ts +++ b/packages/ws-worker/test/reasons.test.ts @@ -279,6 +279,26 @@ test('kill: timeout', async (t) => { t.is(reason.error_message, 'Workflow failed to return within 100ms'); }); +test('exception: state too large', async (t) => { + const plan = createPlan({ + id: 'x', + expression: `export default [(s) => { + s.data = new Array(1024 * 1024 * 10).fill("x").join(""); + return s; + }]`, + }); + + const options = { + stateLimitMb: 0.1, + }; + + const { reason } = await execute(plan, {}, options); + t.is(reason.reason, 'kill'); + t.is(reason.error_type, 'StateTooLargeError'); + t.truthy(reason.error_message); + t.regex(reason.error_message!, /State exceeds the limit/); +}); + test.todo('crash: workflow validation error'); test.todo('fail: adaptor error'); test.todo('crash: import error'); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 34a8e6953..7066d0636 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -695,6 +695,9 @@ importers: fast-safe-stringify: specifier: ^2.1.1 version: 2.1.1 + json-stream-stringify: + specifier: ^3.1.6 + version: 3.1.6 semver: specifier: ^7.7.2 version: 7.7.2