Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/dirty-teeth-invent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@openfn/runtime': minor
---

Use async serialization on state objects at the end of each step.

This may result in slightly different handling of state objects at the end of each step. It should add stability by making sure that huge state objects throw a graceful OOMKill, rather than blowing up the wrapping worker.
1 change: 1 addition & 0 deletions packages/runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"dependencies": {
"@openfn/logger": "workspace:*",
"fast-safe-stringify": "^2.1.1",
"json-stream-stringify": "^3.1.6",
"semver": "^7.7.2",
"source-map": "^0.7.6"
}
Expand Down
34 changes: 29 additions & 5 deletions packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { Job, State, StepId } from '@openfn/lexicon';
import type { Logger } from '@openfn/logger';

import executeExpression, { ExecutionErrorWrapper } from './expression';
import clone from '../util/clone';
import clone, { asyncClone } from '../util/clone';
import assembleState from '../util/assemble-state';
import type {
CompiledStep,
Expand Down Expand Up @@ -79,7 +79,7 @@ const calculateNext = (job: CompiledStep, result: any, logger: Logger) => {

// TODO this is suboptimal and may be slow on large objects
// (especially as the result get stringified again downstream)
const prepareFinalState = (
const prepareFinalState = async (
state: any,
logger: Logger,
statePropsToRemove?: string[]
Expand All @@ -104,7 +104,23 @@ const prepareFinalState = (
`Cleaning up state. Removing keys: ${removedProps.join(', ')}`
);

return clone(state);
// Technically this should restrict all state objects
// to be the dataclip size limit
// But that's likely to be a breaking change
// So for now, just set a very high dataclip size limit here
// In practice, any state objects this large are likely to trigger
// an OOM kill, so the runtime behaviour is a bit academic
const stateLimit_mb = 1000;
try {
return asyncClone(state, stateLimit_mb);
} catch (e) {
console.log('****');
// If the clone failed, we're in trouble
logger.error(
`Error: State object exceeds size limit of ${stateLimit_mb}. An empty state object will be returned from this step.`
);
return {};
}
}
return state;
};
Expand Down Expand Up @@ -190,7 +206,11 @@ const executeStep = async (
const duration = logger.timer(timerId);
logger.error(`${jobName} aborted with error (${duration})`);

state = prepareFinalState(state, logger, ctx.opts.statePropsToRemove);
state = await prepareFinalState(
state,
logger,
ctx.opts.statePropsToRemove
);
// Whatever the final state was, save that as the initial state to the next thing
result = state;

Expand Down Expand Up @@ -219,7 +239,11 @@ const executeStep = async (
if (!didError) {
const humanDuration = logger.timer(timerId);
logger.success(`${jobName} completed in ${humanDuration}`);
result = prepareFinalState(result, logger, ctx.opts.statePropsToRemove);
result = await prepareFinalState(
result,
logger,
ctx.opts.statePropsToRemove
);

// Take a memory snapshot
// IMPORTANT: this runs _after_ the state object has been serialized
Expand Down
65 changes: 65 additions & 0 deletions packages/runtime/src/util/clone.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,71 @@
import type { State } from '@openfn/lexicon';
import stringify from 'fast-safe-stringify';

import { JsonStreamStringify } from 'json-stream-stringify';

// TODO I'm in the market for the best solution here - immer? deep-clone?
// What should we do if functions are in the state?
export default (state: State) => JSON.parse(stringify(state));

const replacer = (_key: string, value: any) => {
// Ignore non serializable keys
if (
value === undefined ||
typeof value === 'function' ||
value.constructor?.name === 'Promise'
) {
return undefined;
}

return value;
};

export const asyncClone = async (
state: State,
limit_mb = 1000
): Promise<State> => {
const limit_bytes = limit_mb * 1024 * 1024;
let size_bytes = 0;
let jsonString = '';

// one big worry with this approach is that jsonstreamstringify
// does not behave the same as stringify
// ie, how it handles functions
const stream = new JsonStreamStringify(state, replacer, undefined, true);

try {
for await (const chunk of stream) {
// Each chunk is a string token from the JSON output
const chunkSize = Buffer.byteLength(chunk, 'utf8');
size_bytes += chunkSize;

if (size_bytes > limit_bytes) {
stream.destroy();
throw new Error(
`State size exceeds limit: ${(size_bytes / 1024 / 1024).toFixed(
2
)}MB > ${limit_mb}MB`
);
}

jsonString += chunk;
}

// Re-parse the stringified JSON back into an object
// Use a reviver to convert circular reference markers to '[Circular]'
return JSON.parse(jsonString, (_key, value) => {
if (
value &&
typeof value === 'object' &&
value.$ref &&
Object.keys(value).length === 1
) {
return '[Circular]';
}
return value;
});
} catch (error) {
stream.destroy();
throw error;
}
};
1 change: 0 additions & 1 deletion packages/runtime/test/execute/plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,6 @@ test('steps can write circular references to state without blowing up downstream
]);

const result: any = await executePlan(plan, {}, {}, mockLogger);

t.notThrows(() => JSON.stringify(result));
t.deepEqual(result, {
data: {
Expand Down
75 changes: 75 additions & 0 deletions packages/runtime/test/util/clone.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import test from 'ava';
import { asyncClone } from '../../src/util/clone';

test('asyncClone: should clone a simple object', async (t) => {
const obj = {
a: 1,
b: 'hello',
c: true,
d: { nested: 'value' },
};
const result = await asyncClone(obj);
t.deepEqual(result, obj);
t.not(result, obj); // ensure it's a new object
t.not(result.d, obj.d); // ensure nested objects are cloned
});

test('asyncClone: should remove undefined values', async (t) => {
const obj = {
a: 1,
b: undefined,
c: 'hello',
};
const result = await asyncClone(obj);
t.deepEqual(result, { a: 1, c: 'hello' });
t.false('b' in result);
});

test('asyncClone: should remove functions', async (t) => {
const obj = {
a: 1,
b: () => 'test',
c: 'hello',
};
const result = await asyncClone(obj);
t.deepEqual(result, { a: 1, c: 'hello' });
t.false('b' in result);
});

test('asyncClone: should handle arrays', async (t) => {
const obj = {
items: [1, 2, 3, { nested: 'value' }],
};
const result = await asyncClone(obj);
t.deepEqual(result, obj);
t.not(result, obj);
t.not(result.items, obj.items);
});

test('asyncClone: should handle circular references', async (t) => {
const inner: any = { value: 42 };
const obj: any = {
a: 1,
inner: inner,
};
inner.parent = obj; // create circular reference

const result = await asyncClone(obj);
t.is(result.a, 1);
t.is(result.inner.value, 42);
// Circular reference should be handled without throwing
t.truthy(result.inner.parent);
t.is(result.inner.parent, '[Circular]');
});

test('asyncClone: should throw error when size exceeds limit', async (t) => {
// Create an object slightly larger than 0.1MB (~0.2MB)
const largeArray = new Array(1000).fill('x'.repeat(200));
const obj = { items: largeArray };

const error = await t.throwsAsync(
async () => await asyncClone(obj, 0.1), // 0.1MB limit
{ instanceOf: Error }
);
t.regex(error.message, /State size exceeds limit/);
});
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.