Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/ready-items-shout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@openfn/engine-multi': minor
'@openfn/ws-worker': minor
'@openfn/runtime': minor
---

Measure the size of state objects at the end of each step, and throw if they exceed a limit

In the Worker, this limit is set to 25% of the available runtime memory.
9 changes: 9 additions & 0 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,17 @@ const execute = async (context: ExecutionContext) => {
repoDir: options.repoDir,
profile: context.options.profile,
profilePollInteval: context.options.profilePollInterval,
// work out the max size of the state object at the end of each step
// This must be fairly high to prevent crashes
stateLimitMb:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently set in the engine and worker with different rules!

Maybe the worker just passes in a fixed value, if it has one, and the engine defaults to a % of runtime memory. That's probably cleanest.

options.stateLimitMb ??
Math.max((options.memoryLimitMb ?? 1000) * 0.25),
} as RunOptions;

logger.debug(
`${state.plan.id} setting runtime state limit to ${runOptions.stateLimitMb}mb`
);

// Construct the payload limits object
const payloadLimits: PayloadLimits = {};
if (options.payloadLimitMb !== undefined) {
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ const createEngine = async (
callWorker,
options: {
...options,
stateLimitMb: opts.stateLimitMb,
sanitize: opts.sanitize,
resolvers: opts.resolvers,
runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout,
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export type ExecutionContextConstructor = {
};

export type ExecuteOptions = {
stateLimitMb?: number;
payloadLimitMb?: number;
logPayloadLimitMb?: number;
memoryLimitMb?: number;
Expand Down Expand Up @@ -72,7 +73,7 @@ export interface RuntimeEngine {
execute(
plan: ExecutionPlan,
input: State,
options?: Partial<EngineOptions>
options?: ExecuteOptions
): Pick<EventEmitter, 'on' | 'off' | 'once'>;

destroy(instant?: boolean): Promise<void>;
Expand Down
6 changes: 4 additions & 2 deletions packages/engine-multi/src/worker/thread/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export type RunOptions = {
jobLogLevel?: LogLevel;
profile?: boolean;
profilePollInterval?: number;
stateLimitMb?: number;
};

const eventMap = {
Expand All @@ -39,6 +40,7 @@ register({
jobLogLevel,
profile,
profilePollInterval,
stateLimitMb,
} = runOptions;
const { logger, jobLogger, adaptorLogger } = createLoggers(
plan.id!,
Expand All @@ -54,9 +56,8 @@ register({
console = adaptorLogger;

// Leave console.debug for local debugging
// This goes to stdout but not the adapator logger
// This goes to stdout but not the adaptor logger
console.debug = debug;

// TODO I would like to pull these options out of here
const options = {
// disable the runtime's own timeout
Expand All @@ -72,6 +73,7 @@ register({
profile,
profilePollInterval,
statePropsToRemove,
stateLimitMb,
callbacks: {
// TODO: this won't actually work across the worker boundary
// For now I am preloading credentials
Expand Down
29 changes: 29 additions & 0 deletions packages/engine-multi/test/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,35 @@ test.serial.skip('vm oom error', (t) => {
});
});

test.serial('state object too big', async (t) => {
return new Promise((done) => {
const plan = {
id: 'x',
workflow: {
steps: [
{
expression: `export default [(s) => {
s.data = new Array(1024 * 1024).fill("x").join("");
return s;
}]`,
},
],
},
};

const options = {
stateLimitMb: 0.1,
};

engine.execute(plan, {}, options).on(WORKFLOW_ERROR, (evt) => {
t.is(evt.type, 'StateTooLargeError');
t.is(evt.severity, 'kill');
t.is(evt.message, 'State exceeds the limit of 0.1mb');
done();
});
});
});

test.serial('execution error from async code', (t) => {
return new Promise((done) => {
const plan = {
Expand Down
1 change: 1 addition & 0 deletions packages/runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"dependencies": {
"@openfn/logger": "workspace:*",
"fast-safe-stringify": "^2.1.1",
"json-stream-stringify": "^3.1.6",
"semver": "^7.7.2",
"source-map": "^0.7.6"
}
Expand Down
9 changes: 9 additions & 0 deletions packages/runtime/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ export const extractStackTrace = (e: Error) => {
}
};

export class StateTooLargeError extends Error {
name = 'StateTooLargeError';
severity = 'kill';
constructor(limit_mb: number) {
super();
this.message = `State exceeds the limit of ${limit_mb}mb`;
}
}

// Abstract error supertype
export class RTError extends Error {
source = 'runtime';
Expand Down
27 changes: 23 additions & 4 deletions packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
import { isNullState } from '../util/null-state';
import sourcemapErrors from '../util/sourcemap-errors';
import createProfiler from '../util/profile-memory';
import ensureStateSize from '../util/ensure-state-size';

const loadCredentials = async (
job: Job,
Expand Down Expand Up @@ -79,13 +80,21 @@ const calculateNext = (job: CompiledStep, result: any, logger: Logger) => {

// TODO this is suboptimal and may be slow on large objects
// (especially as the result get stringified again downstream)
const prepareFinalState = (
const prepareFinalState = async (
state: any,
logger: Logger,
statePropsToRemove?: string[]
statePropsToRemove?: string[],
stateLimit_mb?: number
) => {
if (isNullState(state)) return undefined;
if (state) {
try {
await ensureStateSize(state, stateLimit_mb);
} catch (e) {
logger.error('Critical error processing state:');
throw e;
}

if (!statePropsToRemove) {
// As a strict default, remove the configuration key
// tbh this should happen higher up in the stack but it causes havoc in unit testing
Expand Down Expand Up @@ -190,7 +199,12 @@ const executeStep = async (
const duration = logger.timer(timerId);
logger.error(`${jobName} aborted with error (${duration})`);

state = prepareFinalState(state, logger, ctx.opts.statePropsToRemove);
state = await prepareFinalState(
state,
logger,
ctx.opts.statePropsToRemove,
ctx.opts.stateLimitMb
);
// Whatever the final state was, save that as the initial state to the next thing
result = state;

Expand Down Expand Up @@ -219,7 +233,12 @@ const executeStep = async (
if (!didError) {
const humanDuration = logger.timer(timerId);
logger.success(`${jobName} completed in ${humanDuration}`);
result = prepareFinalState(result, logger, ctx.opts.statePropsToRemove);
result = await prepareFinalState(
result,
logger,
ctx.opts.statePropsToRemove,
ctx.opts.stateLimitMb
);

// Take a memory snapshot
// IMPORTANT: this runs _after_ the state object has been serialized
Expand Down
2 changes: 2 additions & 0 deletions packages/runtime/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export type Options = {

/** Optional name for the expression (if passed a string) */
defaultStepId?: string;

stateLimitMb?: number;
};

type RawOptions = Omit<Options, 'linker'> & {
Expand Down
2 changes: 0 additions & 2 deletions packages/runtime/src/util/clone.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import type { State } from '@openfn/lexicon';
import stringify from 'fast-safe-stringify';

// TODO I'm in the market for the best solution here - immer? deep-clone?
// What should we do if functions are in the state?
export default (state: State) => JSON.parse(stringify(state));
34 changes: 34 additions & 0 deletions packages/runtime/src/util/ensure-state-size.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { JsonStreamStringify } from 'json-stream-stringify';
import { StateTooLargeError } from '../errors';

const replacer = (_key: string, value: any) => {
// Ignore non serializable keys
if (
value === undefined ||
typeof value === 'function' ||
value?.constructor?.name === 'Promise'
) {
return undefined;
}

return value;
};

// throws if state exceeds a particular size limit
export default async (value: any, limit_mb: number = 500) => {
if (value && !isNaN(limit_mb) && limit_mb > 0) {
const limitBytes = limit_mb * 1024 * 1024;
let size_bytes = 0;
const stream = new JsonStreamStringify(value, replacer, 0, true);
for await (const chunk of stream) {
// Each chunk is a string token from the JSON output
size_bytes += Buffer.byteLength(chunk, 'utf8');

if (size_bytes > limitBytes) {
stream.destroy();
throw new StateTooLargeError(limit_mb);
}
}
stream.destroy();
}
};
37 changes: 37 additions & 0 deletions packages/runtime/test/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -899,3 +899,40 @@ test('accept a whitelist as a string', async (t) => {
t.is(error.message, 'module blacklisted: blah');
}
});

test('do not enforce state size limit if state is small enough', async (t) => {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
expression:
'export default [(s) => { s.data.large = new Array(1024).fill("z").join(""); return s; }]',
},
],
},
};

await run(plan, {}, {});
t.pass('did not fail');
});

test('enforce state size limit from runtime option', async (t) => {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
expression:
'export default [(s) => { s.data.large = new Array(1024).fill("z").join(""); return s; }]',
},
],
},
};

try {
await run(plan, {}, { stateLimitMb: 1 / 1024 });
t.fail('Should have thrown StateTooLargeError');
} catch (error: any) {
t.is(error.name, 'StateTooLargeError');
t.regex(error.message, /State exceeds the limit/);
}
});
Loading