diff --git a/.changeset/dirty-teeth-invent.md b/.changeset/dirty-teeth-invent.md new file mode 100644 index 000000000..b1ee11bf4 --- /dev/null +++ b/.changeset/dirty-teeth-invent.md @@ -0,0 +1,7 @@ +--- +'@openfn/runtime': minor +--- + +Use async serialization on state objects at the end of each step. + +This may result in slightly different handling of state objects at the end of each step. It should add stability by making sure that huge state objects throw a graceful OOMKill, rather than blowing up the wrapping worker. diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 3bcad74bc..1e87fa818 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -48,6 +48,7 @@ "dependencies": { "@openfn/logger": "workspace:*", "fast-safe-stringify": "^2.1.1", + "json-stream-stringify": "^3.1.6", "semver": "^7.7.2", "source-map": "^0.7.6" } diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 1e72168ea..74cde31b7 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -2,7 +2,7 @@ import type { Job, State, StepId } from '@openfn/lexicon'; import type { Logger } from '@openfn/logger'; import executeExpression, { ExecutionErrorWrapper } from './expression'; -import clone from '../util/clone'; +import clone, { asyncClone } from '../util/clone'; import assembleState from '../util/assemble-state'; import type { CompiledStep, @@ -79,7 +79,7 @@ const calculateNext = (job: CompiledStep, result: any, logger: Logger) => { // TODO this is suboptimal and may be slow on large objects // (especially as the result get stringified again downstream) -const prepareFinalState = ( +const prepareFinalState = async ( state: any, logger: Logger, statePropsToRemove?: string[] @@ -104,7 +104,23 @@ const prepareFinalState = ( `Cleaning up state. Removing keys: ${removedProps.join(', ')}` ); - return clone(state); + // Technically this should restrict all state objects + // to be the dataclip size limit + // But that's likely to be a breaking change + // So for now, just set a very high dataclip size limit here + // In practice, any state objects this large are likely to trigger + // an OOM kill, so the runtime behaviour is a bit academic + const stateLimit_mb = 1000; + try { + return asyncClone(state, stateLimit_mb); + } catch (e) { + console.log('****'); + // If the clone failed, we're in trouble + logger.error( + `Error: State object exceeds size limit of ${stateLimit_mb}. An empty state object will be returned from this step.` + ); + return {}; + } } return state; }; @@ -190,7 +206,11 @@ const executeStep = async ( const duration = logger.timer(timerId); logger.error(`${jobName} aborted with error (${duration})`); - state = prepareFinalState(state, logger, ctx.opts.statePropsToRemove); + state = await prepareFinalState( + state, + logger, + ctx.opts.statePropsToRemove + ); // Whatever the final state was, save that as the initial state to the next thing result = state; @@ -219,7 +239,11 @@ const executeStep = async ( if (!didError) { const humanDuration = logger.timer(timerId); logger.success(`${jobName} completed in ${humanDuration}`); - result = prepareFinalState(result, logger, ctx.opts.statePropsToRemove); + result = await prepareFinalState( + result, + logger, + ctx.opts.statePropsToRemove + ); // Take a memory snapshot // IMPORTANT: this runs _after_ the state object has been serialized diff --git a/packages/runtime/src/util/clone.ts b/packages/runtime/src/util/clone.ts index 0920f7106..82fce28d9 100644 --- a/packages/runtime/src/util/clone.ts +++ b/packages/runtime/src/util/clone.ts @@ -1,6 +1,71 @@ import type { State } from '@openfn/lexicon'; import stringify from 'fast-safe-stringify'; +import { JsonStreamStringify } from 'json-stream-stringify'; + // TODO I'm in the market for the best solution here - immer? deep-clone? // What should we do if functions are in the state? export default (state: State) => JSON.parse(stringify(state)); + +const replacer = (_key: string, value: any) => { + // Ignore non serializable keys + if ( + value === undefined || + typeof value === 'function' || + value.constructor?.name === 'Promise' + ) { + return undefined; + } + + return value; +}; + +export const asyncClone = async ( + state: State, + limit_mb = 1000 +): Promise => { + const limit_bytes = limit_mb * 1024 * 1024; + let size_bytes = 0; + let jsonString = ''; + + // one big worry with this approach is that jsonstreamstringify + // does not behave the same as stringify + // ie, how it handles functions + const stream = new JsonStreamStringify(state, replacer, undefined, true); + + try { + for await (const chunk of stream) { + // Each chunk is a string token from the JSON output + const chunkSize = Buffer.byteLength(chunk, 'utf8'); + size_bytes += chunkSize; + + if (size_bytes > limit_bytes) { + stream.destroy(); + throw new Error( + `State size exceeds limit: ${(size_bytes / 1024 / 1024).toFixed( + 2 + )}MB > ${limit_mb}MB` + ); + } + + jsonString += chunk; + } + + // Re-parse the stringified JSON back into an object + // Use a reviver to convert circular reference markers to '[Circular]' + return JSON.parse(jsonString, (_key, value) => { + if ( + value && + typeof value === 'object' && + value.$ref && + Object.keys(value).length === 1 + ) { + return '[Circular]'; + } + return value; + }); + } catch (error) { + stream.destroy(); + throw error; + } +}; diff --git a/packages/runtime/test/execute/plan.test.ts b/packages/runtime/test/execute/plan.test.ts index 4666e29c0..c267cbba5 100644 --- a/packages/runtime/test/execute/plan.test.ts +++ b/packages/runtime/test/execute/plan.test.ts @@ -1071,7 +1071,6 @@ test('steps can write circular references to state without blowing up downstream ]); const result: any = await executePlan(plan, {}, {}, mockLogger); - t.notThrows(() => JSON.stringify(result)); t.deepEqual(result, { data: { diff --git a/packages/runtime/test/util/clone.test.ts b/packages/runtime/test/util/clone.test.ts new file mode 100644 index 000000000..e4c86b6ba --- /dev/null +++ b/packages/runtime/test/util/clone.test.ts @@ -0,0 +1,75 @@ +import test from 'ava'; +import { asyncClone } from '../../src/util/clone'; + +test('asyncClone: should clone a simple object', async (t) => { + const obj = { + a: 1, + b: 'hello', + c: true, + d: { nested: 'value' }, + }; + const result = await asyncClone(obj); + t.deepEqual(result, obj); + t.not(result, obj); // ensure it's a new object + t.not(result.d, obj.d); // ensure nested objects are cloned +}); + +test('asyncClone: should remove undefined values', async (t) => { + const obj = { + a: 1, + b: undefined, + c: 'hello', + }; + const result = await asyncClone(obj); + t.deepEqual(result, { a: 1, c: 'hello' }); + t.false('b' in result); +}); + +test('asyncClone: should remove functions', async (t) => { + const obj = { + a: 1, + b: () => 'test', + c: 'hello', + }; + const result = await asyncClone(obj); + t.deepEqual(result, { a: 1, c: 'hello' }); + t.false('b' in result); +}); + +test('asyncClone: should handle arrays', async (t) => { + const obj = { + items: [1, 2, 3, { nested: 'value' }], + }; + const result = await asyncClone(obj); + t.deepEqual(result, obj); + t.not(result, obj); + t.not(result.items, obj.items); +}); + +test('asyncClone: should handle circular references', async (t) => { + const inner: any = { value: 42 }; + const obj: any = { + a: 1, + inner: inner, + }; + inner.parent = obj; // create circular reference + + const result = await asyncClone(obj); + t.is(result.a, 1); + t.is(result.inner.value, 42); + // Circular reference should be handled without throwing + t.truthy(result.inner.parent); + t.is(result.inner.parent, '[Circular]'); +}); + +test('asyncClone: should throw error when size exceeds limit', async (t) => { + // Create an object slightly larger than 0.1MB (~0.2MB) + const largeArray = new Array(1000).fill('x'.repeat(200)); + const obj = { items: largeArray }; + + const error = await t.throwsAsync( + async () => await asyncClone(obj, 0.1), // 0.1MB limit + { instanceOf: Error } + ); + t.regex(error.message, /State size exceeds limit/); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 34a8e6953..7066d0636 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -695,6 +695,9 @@ importers: fast-safe-stringify: specifier: ^2.1.1 version: 2.1.1 + json-stream-stringify: + specifier: ^3.1.6 + version: 3.1.6 semver: specifier: ^7.7.2 version: 7.7.2