diff --git a/integration-tests/cli/CHANGELOG.md b/integration-tests/cli/CHANGELOG.md index cf293004a..ab29a5f65 100644 --- a/integration-tests/cli/CHANGELOG.md +++ b/integration-tests/cli/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/integration-tests-cli +## 1.0.8 + +### Patch Changes + +- Updated dependencies [f089f8d] + - @openfn/project@0.10.1 + - @openfn/lightning-mock@2.4.1 + ## 1.0.7 ### Patch Changes diff --git a/integration-tests/cli/package.json b/integration-tests/cli/package.json index 7d3708edc..551e1b0c6 100644 --- a/integration-tests/cli/package.json +++ b/integration-tests/cli/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-cli", "private": true, - "version": "1.0.7", + "version": "1.0.8", "description": "CLI integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/cli/test/project-v1.test.ts b/integration-tests/cli/test/project-v1.test.ts index cb899a77b..13007e429 100644 --- a/integration-tests/cli/test/project-v1.test.ts +++ b/integration-tests/cli/test/project-v1.test.ts @@ -138,7 +138,7 @@ steps: next: transform-data: disabled: false - condition: true + condition: always - id: transform-data name: Transform data adaptor: "@openfn/language-common@latest" diff --git a/integration-tests/worker/src/factories.ts b/integration-tests/worker/src/factories.ts index 2b0251f0a..ee65c6ced 100644 --- a/integration-tests/worker/src/factories.ts +++ b/integration-tests/worker/src/factories.ts @@ -19,7 +19,7 @@ export const createTrigger = () => ({ id: crypto.randomUUID(), }); -export const createEdge = (a: any, b: any, condition?: string) => { +export const createEdge = (a: any, b: any, extra?: any) => { const edge: any = { id: crypto.randomUUID(), target_job_id: b.id, @@ -29,6 +29,7 @@ export const createEdge = (a: any, b: any, condition?: string) => { } else { edge.source_job_id = a.id; } + Object.assign(edge, extra); return edge; }; diff --git a/integration-tests/worker/test/runs.test.ts b/integration-tests/worker/test/runs.test.ts index 3b031c320..f834029ba 100644 --- a/integration-tests/worker/test/runs.test.ts +++ b/integration-tests/worker/test/runs.test.ts @@ -283,3 +283,24 @@ test.serial('Run with collections', async (t) => { { key: 'c', value: { id: 'c' } }, ]); }); + +test.serial('Run with edge conditions', async (t) => { + const job1 = createJob({ + body: `fn((s) => s)`, + }); + const job2 = createJob({ + body: `fn((s) => { + s.didExecuteStep2 = true + return s; + })`, + }); + const edge = createEdge(job1, job2, { + // I would prefer this ran on failure, but that's a lot + // harder to do the way these tests are set up + condition: 'on_job_success', + }); + const attempt = createRun([], [job1, job2], [edge]); + + const state = await run(t, attempt); + t.true(state.didExecuteStep2); +}); diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index ebd0f73ad..b01cfaf3e 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -1,5 +1,19 @@ # @openfn/cli +## 1.22.0 + +### Minor Changes + +- f089f8d: Fix edge conditions in pulled workflows + +### Patch Changes + +- Updated dependencies [f089f8d] +- Updated dependencies [f089f8d] +- Updated dependencies [064933d] + - @openfn/runtime@1.8.0 + - @openfn/project@0.10.1 + ## 1.21.0 ### Minor Changes diff --git a/packages/cli/package.json b/packages/cli/package.json index 3db8ed66b..84482bf4d 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/cli", - "version": "1.21.0", + "version": "1.22.0", "description": "CLI devtools for the OpenFn toolchain", "engines": { "node": ">=18", diff --git a/packages/cli/test/projects/checkout.test.ts b/packages/cli/test/projects/checkout.test.ts index cfe2a911d..06947f829 100644 --- a/packages/cli/test/projects/checkout.test.ts +++ b/packages/cli/test/projects/checkout.test.ts @@ -457,7 +457,7 @@ workspace: next: { 'transform-data-to-fhir-standard': { disabled: false, - condition: true, + condition: 'always', }, }, }, diff --git a/packages/cli/test/projects/fetch.test.ts b/packages/cli/test/projects/fetch.test.ts index 0072984f5..51ee74351 100644 --- a/packages/cli/test/projects/fetch.test.ts +++ b/packages/cli/test/projects/fetch.test.ts @@ -442,7 +442,7 @@ test.serial( next: { 'transform-data': { disabled: false, - condition: true, + condition: 'always', openfn: { uuid: 'a9a3adef-b394-4405-814d-3ac4323f4b4b', }, diff --git a/packages/cli/test/projects/fixtures.ts b/packages/cli/test/projects/fixtures.ts index a8013466a..02682ea53 100644 --- a/packages/cli/test/projects/fixtures.ts +++ b/packages/cli/test/projects/fixtures.ts @@ -90,7 +90,7 @@ workflows: next: transform-data: disabled: false - condition: true + condition: always openfn: uuid: a9a3adef-b394-4405-814d-3ac4323f4b4b history: diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index 8b5785c28..55ef919fe 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,19 @@ # engine-multi +## 1.10.0 + +### Minor Changes + +- 064933d: Measure the size of state objects at the end of each step, and throw if they exceed a limit + + In the Worker, this limit is set to 25% of the available runtime memory. + +### Patch Changes + +- Updated dependencies [f089f8d] +- Updated dependencies [064933d] + - @openfn/runtime@1.8.0 + ## 1.9.1 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 9b6af17f1..a831f1d70 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.9.1", + "version": "1.10.0", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/engine-multi/src/api/execute.ts b/packages/engine-multi/src/api/execute.ts index fabdf4b11..9e6cd0ea4 100644 --- a/packages/engine-multi/src/api/execute.ts +++ b/packages/engine-multi/src/api/execute.ts @@ -46,8 +46,17 @@ const execute = async (context: ExecutionContext) => { repoDir: options.repoDir, profile: context.options.profile, profilePollInteval: context.options.profilePollInterval, + // work out the max size of the state object at the end of each step + // This must be fairly high to prevent crashes + stateLimitMb: + options.stateLimitMb ?? + Math.max((options.memoryLimitMb ?? 1000) * 0.25), } as RunOptions; + logger.debug( + `${state.plan.id} setting runtime state limit to ${runOptions.stateLimitMb}mb` + ); + // Construct the payload limits object const payloadLimits: PayloadLimits = {}; if (options.payloadLimitMb !== undefined) { diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index adf2bb88b..630e334ab 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -167,6 +167,7 @@ const createEngine = async ( callWorker, options: { ...options, + stateLimitMb: opts.stateLimitMb, sanitize: opts.sanitize, resolvers: opts.resolvers, runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout, diff --git a/packages/engine-multi/src/types.ts b/packages/engine-multi/src/types.ts index e42255b1b..ae2a44b7a 100644 --- a/packages/engine-multi/src/types.ts +++ b/packages/engine-multi/src/types.ts @@ -45,6 +45,7 @@ export type ExecutionContextConstructor = { }; export type ExecuteOptions = { + stateLimitMb?: number; payloadLimitMb?: number; logPayloadLimitMb?: number; memoryLimitMb?: number; @@ -72,7 +73,7 @@ export interface RuntimeEngine { execute( plan: ExecutionPlan, input: State, - options?: Partial + options?: ExecuteOptions ): Pick; destroy(instant?: boolean): Promise; diff --git a/packages/engine-multi/src/worker/thread/run.ts b/packages/engine-multi/src/worker/thread/run.ts index 8d54d9a72..a388faf0f 100644 --- a/packages/engine-multi/src/worker/thread/run.ts +++ b/packages/engine-multi/src/worker/thread/run.ts @@ -20,6 +20,7 @@ export type RunOptions = { jobLogLevel?: LogLevel; profile?: boolean; profilePollInterval?: number; + stateLimitMb?: number; }; const eventMap = { @@ -39,6 +40,7 @@ register({ jobLogLevel, profile, profilePollInterval, + stateLimitMb, } = runOptions; const { logger, jobLogger, adaptorLogger } = createLoggers( plan.id!, @@ -54,9 +56,8 @@ register({ console = adaptorLogger; // Leave console.debug for local debugging - // This goes to stdout but not the adapator logger + // This goes to stdout but not the adaptor logger console.debug = debug; - // TODO I would like to pull these options out of here const options = { // disable the runtime's own timeout @@ -72,6 +73,7 @@ register({ profile, profilePollInterval, statePropsToRemove, + stateLimitMb, callbacks: { // TODO: this won't actually work across the worker boundary // For now I am preloading credentials diff --git a/packages/engine-multi/test/errors.test.ts b/packages/engine-multi/test/errors.test.ts index a206b37d2..15781753f 100644 --- a/packages/engine-multi/test/errors.test.ts +++ b/packages/engine-multi/test/errors.test.ts @@ -165,6 +165,35 @@ test.serial.skip('vm oom error', (t) => { }); }); +test.serial('state object too big', async (t) => { + return new Promise((done) => { + const plan = { + id: 'x', + workflow: { + steps: [ + { + expression: `export default [(s) => { + s.data = new Array(1024 * 1024).fill("x").join(""); + return s; + }]`, + }, + ], + }, + }; + + const options = { + stateLimitMb: 0.1, + }; + + engine.execute(plan, {}, options).on(WORKFLOW_ERROR, (evt) => { + t.is(evt.type, 'StateTooLargeError'); + t.is(evt.severity, 'kill'); + t.is(evt.message, 'State exceeds the limit of 0.1mb'); + done(); + }); + }); +}); + test.serial('execution error from async code', (t) => { return new Promise((done) => { const plan = { diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index e9c6b0ad4..b1a421891 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,14 @@ # @openfn/lightning-mock +## 2.4.1 + +### Patch Changes + +- Updated dependencies [f089f8d] +- Updated dependencies [064933d] + - @openfn/runtime@1.8.0 + - @openfn/engine-multi@1.10.0 + ## 2.4.0 ### Minor Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 8a46ae650..813ad64b0 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.4.0", + "version": "2.4.1", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/project/CHANGELOG.md b/packages/project/CHANGELOG.md index 452c31c2a..f12c59f0b 100644 --- a/packages/project/CHANGELOG.md +++ b/packages/project/CHANGELOG.md @@ -1,5 +1,11 @@ # @openfn/project +## 0.10.1 + +### Patch Changes + +- f089f8d: Map edge conditions so that they are compatible with CLI + ## 0.10.0 ### Minor Changes diff --git a/packages/project/package.json b/packages/project/package.json index d06678098..a0c148af5 100644 --- a/packages/project/package.json +++ b/packages/project/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/project", - "version": "0.10.0", + "version": "0.10.1", "description": "Read, serialize, replicate and sync OpenFn projects", "scripts": { "test": "pnpm ava", diff --git a/packages/project/src/parse/from-app-state.ts b/packages/project/src/parse/from-app-state.ts index 24d246c93..5224d6fc3 100644 --- a/packages/project/src/parse/from-app-state.ts +++ b/packages/project/src/parse/from-app-state.ts @@ -62,22 +62,28 @@ export default ( return new Project(proj as l.Project, config); }; -const mapTriggerEdgeCondition = (edge: Provisioner.Edge) => { +// TODO maybe this is a util and moved out of this file +export const mapEdge = (edge: Provisioner.Edge) => { const e: any = { disabled: !edge.enabled, }; - if (edge.condition_type === 'always') { - e.condition = true; - } else if (edge.condition_type === 'never') { - e.condition = false; - } else { + + if (edge.condition_type === 'js_expression') { e.condition = edge.condition_expression; + } else if (edge.condition_type) { + e.condition = edge.condition_type; + } + + if (edge.condition_label) { + e.name = edge.condition_label; } // Do this last so that it serializes last - e.openfn = { - uuid: edge.id, - }; + if (edge.id) { + e.openfn = { + uuid: edge.id, + }; + } return e; }; @@ -114,7 +120,7 @@ export const mapWorkflow = (workflow: Provisioner.Workflow) => { throw new Error(`Failed to find ${edge.target_job_id}`); } // we use the name, not the id, to reference - obj[slugify(target.name)] = mapTriggerEdgeCondition(edge); + obj[slugify(target.name)] = mapEdge(edge); return obj; }, {}), } as l.Trigger); @@ -148,7 +154,7 @@ export const mapWorkflow = (workflow: Provisioner.Workflow) => { s.next = outboundEdges.reduce((next, edge) => { const target = jobs.find((j) => j.id === edge.target_job_id); // @ts-ignore - next[slugify(target.name)] = mapTriggerEdgeCondition(edge); + next[slugify(target.name)] = mapEdge(edge); return next; }, {}); } diff --git a/packages/project/src/serialize/to-app-state.ts b/packages/project/src/serialize/to-app-state.ts index 6886ff424..a08786466 100644 --- a/packages/project/src/serialize/to-app-state.ts +++ b/packages/project/src/serialize/to-app-state.ts @@ -141,12 +141,19 @@ const mapWorkflow = (workflow: Workflow) => { e.source_job_id = node.id; } - if (rules.condition === true) { - e.condition_type = 'always'; - } else if (rules.condition === false) { - e.condition_type = 'never'; - } else if (typeof rules.condition === 'string') { - // TODO conditional + if (rules.condition) { + if (typeof rules.condition === 'boolean') { + e.condition_type = rules.condition ? 'always' : 'never'; + } else if ( + rules.condition.match( + /^(always|never|on_job_success|on_job_failure)$/ + ) + ) { + e.condition_type = rules.condition; + } else { + e.condition_type = 'js_expression'; + e.condition_expression = rules.condition; + } } wfState.edges.push(e); }); diff --git a/packages/project/test/parse/from-app-state.test.ts b/packages/project/test/parse/from-app-state.test.ts index fe696dec8..6f509bc04 100644 --- a/packages/project/test/parse/from-app-state.test.ts +++ b/packages/project/test/parse/from-app-state.test.ts @@ -1,5 +1,8 @@ import test from 'ava'; -import fromAppState, { mapWorkflow } from '../../src/parse/from-app-state'; +import fromAppState, { + mapEdge, + mapWorkflow, +} from '../../src/parse/from-app-state'; import { clone, cloneDeep } from 'lodash-es'; import state, { withCreds } from '../fixtures/sample-v1-project'; @@ -90,7 +93,7 @@ test('should create a Project from prov state with a workflow', (t) => { openfn: { enabled: true, uuid: '4a06289c-15aa-4662-8dc6-f0aaacd8a058' }, next: { 'transform-data': { - condition: true, + condition: 'always', disabled: false, openfn: { uuid: 'a9a3adef-b394-4405-814d-3ac4323f4b4b', @@ -130,7 +133,7 @@ test('mapWorkflow: map a simple trigger', (t) => { type: 'webhook', next: { 'transform-data': { - condition: true, + condition: 'always', disabled: false, openfn: { uuid: 'a9a3adef-b394-4405-814d-3ac4323f4b4b', @@ -219,6 +222,155 @@ test('mapWorkflow: map a job with projcet credentials onto job.configuration', ( }); }); +test('mapEdge: map enabled state', (t) => { + let e; + + e = mapEdge({} as any); + t.deepEqual(e, { + disabled: true, + }); + + e = mapEdge({ + enabled: true, + } as any); + t.deepEqual(e, { + disabled: false, + }); + + e = mapEdge({ + enabled: false, + } as any); + t.deepEqual(e, { + disabled: true, + }); +}); + +test('mapEdge: map UUID', (t) => { + const e = mapEdge({ + id: 'abc', + } as any); + t.deepEqual(e, { + disabled: true, + openfn: { + uuid: 'abc', + }, + }); +}); + +test('mapEdge: map label', (t) => { + const e = mapEdge({ + condition_label: 'abc', + } as any); + t.deepEqual(e, { + disabled: true, + name: 'abc', + }); +}); + +test('mapEdge: map conditions', (t) => { + let e; + + // basically any condition type should just map + e = mapEdge({ + condition_type: 'always', + } as any); + t.deepEqual(e, { + disabled: true, + condition: 'always', + }); + + e = mapEdge({ + condition_type: 'on_job_success', + } as any); + t.deepEqual(e, { + disabled: true, + condition: 'on_job_success', + }); + + e = mapEdge({ + condition_type: 'jam', + } as any); + t.deepEqual(e, { + disabled: true, + condition: 'jam', + }); + + // But js expression should override + e = mapEdge({ + condition_type: 'js_expression', + condition_expression: 'abc', + } as any); + t.deepEqual(e, { + disabled: true, + condition: 'abc', + }); +}); + +// TODO the workflow yaml is not a project yaml +// so this test doesn't work +// I'll need to pull the project yaml, with uuids, to get this to work +test.skip('mapWorkflow: map edge conditions', (t) => { + // TODO for yaml like this: + const yaml = ` +workflows: + - name: Edge Conditions + jobs: + - Transform-data: + name: Transform data + adaptor: "@openfn/language-common@latest" + body: assert($.ok) + - sucess: + name: sucess + adaptor: "@openfn/language-common@latest" + body: log('All ok!') + - fail: + name: fail + adaptor: "@openfn/language-common@latest" + body: log('everything is terrible') + - custom: + name: custom + adaptor: "@openfn/language-common@latest" + body: | + // Check out the Job Writing Guide for help getting started: + // https://docs.openfn.org/documentation/jobs/job-writing-guide + triggers: + - webhook: + type: webhook + enabled: true + edges: + - webhook->Transform-data: + condition_type: always + enabled: true + target_job: Transform-data + source_trigger: webhook + - Transform-data->sucess: + condition_type: on_job_success + enabled: true + target_job: sucess + source_job: Transform-data + - Transform-data->fail: + condition_type: on_job_failure + enabled: true + target_job: fail + source_job: Transform-data + - Transform-data->custom: + condition_type: js_expression + enabled: true + target_job: custom + source_job: Transform-data + condition_expression: state.ok == 22 + +`; + const project = fromAppState(yaml, meta, { + format: 'yaml', + }); + console.log(project.workflows[0].steps); + const { next } = project.workflows[0].steps[1]; + console.log({ next }); + // make sure that the condition_types get mapped to condition + // also make sure that custom conditions work (both ways) +}); + test('should create a Project from prov state yaml', (t) => { const yaml = `id: e16c5f09-f0cb-4ba7-a4c2-73fcb2f29d00 name: aaa diff --git a/packages/project/test/serialize/to-app-state.test.ts b/packages/project/test/serialize/to-app-state.test.ts index e0c3e6613..75a4d195a 100644 --- a/packages/project/test/serialize/to-app-state.test.ts +++ b/packages/project/test/serialize/to-app-state.test.ts @@ -277,61 +277,35 @@ c-p t.deepEqual(edges, [3, 6, 9]); }); -/** - * Stumbled on something difficult here - * - * lightning saves special edge conditions, like on_job_success - * We need to convert that two ways - * - * I think I need to make the RUNTIME support the special strings - * in order to make this sync properly. Else it's too hard. - * - * See https://github.com/OpenFn/kit/issues/1123 - */ -test.skip('should handle edge conditions', (t) => { +test('should handle edge conditions', (t) => { const wf = ` -a-(condition=true)-b -a-(condition=false)-c -a-(condition=false)-c -a--b +a-(condition=always)-b +a-(condition="on_job_success")-c +a-(condition="on_job_failure")-d +a-(condition=never)-e +a-(condition=x)-f `; - const project = generateProject('p'); - const data = { - id: 'my-project', - workflows: [ - { - id: 'wf', - steps: [ - { - id: 'trigger', - type: 'webhook', - next: { - step: {}, - }, - }, - { - id: 'a', - expression: '.', - }, - { - id: 'b', - expression: '.', - }, - { - id: 'c', - expression: '.', - }, - { - id: 'd', - expression: '.', - }, - ], - }, - ], - }; + const project = generateProject('p', [wf], { + uuidSeed: 1, // ensure predictable UUIDS + }); - const state = toAppState(project); - // TODO + const state = toAppState(project, { format: 'json' }); + const [a_b, a_c, a_d, a_e, a_f] = state.workflows[0].edges; + + t.is(a_b.condition_type, 'always'); + t.falsy(a_b.condition_expression); + + t.is(a_c.condition_type, 'on_job_success'); + t.falsy(a_c.condition_expression); + + t.is(a_d.condition_type, 'on_job_failure'); + t.falsy(a_d.condition_expression); + + t.is(a_e.condition_type, 'never'); + t.falsy(a_e.condition_expression); + + t.is(a_f.condition_type, 'js_expression'); + t.is(a_f.condition_expression, 'x'); }); test('should convert a project back to app state in json', (t) => { diff --git a/packages/runtime/CHANGELOG.md b/packages/runtime/CHANGELOG.md index f38f7500d..aaec325b2 100644 --- a/packages/runtime/CHANGELOG.md +++ b/packages/runtime/CHANGELOG.md @@ -1,5 +1,17 @@ # @openfn/runtime +## 1.8.0 + +### Minor Changes + +- f089f8d: Support special condition strings `never`, `always`, `on_job_success` and `on_job_fail`. + + These used to be mapped from Lightning workflows by the Worker, but by supporting them in the runtime directly we get much better compatibility across platforms + +- 064933d: Measure the size of state objects at the end of each step, and throw if they exceed a limit + + In the Worker, this limit is set to 25% of the available runtime memory. + ## 1.7.7 ### Patch Changes diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 3bcad74bc..9e6e62aa9 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/runtime", - "version": "1.7.7", + "version": "1.8.0", "description": "Job processing runtime.", "type": "module", "exports": { @@ -48,6 +48,7 @@ "dependencies": { "@openfn/logger": "workspace:*", "fast-safe-stringify": "^2.1.1", + "json-stream-stringify": "^3.1.6", "semver": "^7.7.2", "source-map": "^0.7.6" } diff --git a/packages/runtime/src/errors.ts b/packages/runtime/src/errors.ts index c31ec5a57..2279df992 100644 --- a/packages/runtime/src/errors.ts +++ b/packages/runtime/src/errors.ts @@ -91,6 +91,15 @@ export const extractStackTrace = (e: Error) => { } }; +export class StateTooLargeError extends Error { + name = 'StateTooLargeError'; + severity = 'kill'; + constructor(limit_mb: number) { + super(); + this.message = `State exceeds the limit of ${limit_mb}mb`; + } +} + // Abstract error supertype export class RTError extends Error { source = 'runtime'; diff --git a/packages/runtime/src/execute/compile-plan.ts b/packages/runtime/src/execute/compile-plan.ts index c78cc34c3..e6f8048b0 100644 --- a/packages/runtime/src/execute/compile-plan.ts +++ b/packages/runtime/src/execute/compile-plan.ts @@ -9,6 +9,17 @@ import { conditionContext, Context } from './context'; import { ExecutionPlan, Job, StepEdge, Workflow } from '@openfn/lexicon'; import { getNameAndVersion } from '../modules/repo'; +// map special condition strings to JS expressions +// The special strings are generated by lightning, and are useful convenience for local dev +export const conditions: Record = { + on_job_success: 'Boolean(!state?.errors?.[upstreamStepId] ?? true)', + on_job_failure: 'Boolean(state?.errors && state.errors[upstreamStepId])', + always: 'true', +}; +// create a couple of aliases for future reference +conditions.on_upstream_success = conditions.on_job_success; +conditions.on_upstream_fail = conditions.on_job_failure; + const compileEdges = ( from: string, edges: string | Record, @@ -19,6 +30,9 @@ const compileEdges = ( } const errs = []; + const mapCondition = (condition: string) => + conditions[condition] ?? condition; + const result = {} as Record; for (const edgeId in edges) { try { @@ -26,13 +40,21 @@ const compileEdges = ( if (typeof edge === 'boolean') { result[edgeId] = edge; } else if (typeof edge === 'string') { - result[edgeId] = { condition: compileFunction(edge, context) }; + result[edgeId] = { + condition: compileFunction(mapCondition(edge), context, [ + 'upstreamStepId', + ]), + }; } else { const newEdge = { ...edge, }; if (typeof edge.condition === 'string') { - (newEdge as any).condition = compileFunction(edge.condition, context); + (newEdge as any).condition = compileFunction( + mapCondition(edge.condition), + context, + ['upstreamStepId'] + ); } result[edgeId] = newEdge as CompiledEdge; } diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 1e72168ea..d701ab337 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -20,6 +20,7 @@ import { import { isNullState } from '../util/null-state'; import sourcemapErrors from '../util/sourcemap-errors'; import createProfiler from '../util/profile-memory'; +import ensureStateSize from '../util/ensure-state-size'; const loadCredentials = async ( job: Job, @@ -56,7 +57,7 @@ const calculateNext = (job: CompiledStep, result: any, logger: Logger) => { } if (typeof edge.condition === 'function') { try { - if (!edge.condition(result)) { + if (!edge.condition(result, job.id)) { logger.debug( `Edge condition returned false; ${nextJobId} will NOT be executed` ); @@ -79,13 +80,21 @@ const calculateNext = (job: CompiledStep, result: any, logger: Logger) => { // TODO this is suboptimal and may be slow on large objects // (especially as the result get stringified again downstream) -const prepareFinalState = ( +const prepareFinalState = async ( state: any, logger: Logger, - statePropsToRemove?: string[] + statePropsToRemove?: string[], + stateLimit_mb?: number ) => { if (isNullState(state)) return undefined; if (state) { + try { + await ensureStateSize(state, stateLimit_mb); + } catch (e) { + logger.error('Critical error processing state:'); + throw e; + } + if (!statePropsToRemove) { // As a strict default, remove the configuration key // tbh this should happen higher up in the stack but it causes havoc in unit testing @@ -190,7 +199,12 @@ const executeStep = async ( const duration = logger.timer(timerId); logger.error(`${jobName} aborted with error (${duration})`); - state = prepareFinalState(state, logger, ctx.opts.statePropsToRemove); + state = await prepareFinalState( + state, + logger, + ctx.opts.statePropsToRemove, + ctx.opts.stateLimitMb + ); // Whatever the final state was, save that as the initial state to the next thing result = state; @@ -219,7 +233,12 @@ const executeStep = async ( if (!didError) { const humanDuration = logger.timer(timerId); logger.success(`${jobName} completed in ${humanDuration}`); - result = prepareFinalState(result, logger, ctx.opts.statePropsToRemove); + result = await prepareFinalState( + result, + logger, + ctx.opts.statePropsToRemove, + ctx.opts.stateLimitMb + ); // Take a memory snapshot // IMPORTANT: this runs _after_ the state object has been serialized diff --git a/packages/runtime/src/modules/compile-function.ts b/packages/runtime/src/modules/compile-function.ts index 6a7fa5892..8266a11c0 100644 --- a/packages/runtime/src/modules/compile-function.ts +++ b/packages/runtime/src/modules/compile-function.ts @@ -1,7 +1,7 @@ import vm, { Context } from './experimental-vm'; -export default (expression: string, context: Context) => { - return vm.compileFunction(`return ${expression}`, ['state'], { +export default (expression: string, context: Context, args: string[] = []) => { + return vm.compileFunction(`return ${expression}`, ['state'].concat(args), { parsingContext: context, }); }; diff --git a/packages/runtime/src/runtime.ts b/packages/runtime/src/runtime.ts index 11b60f6c0..ba47d0327 100644 --- a/packages/runtime/src/runtime.ts +++ b/packages/runtime/src/runtime.ts @@ -38,6 +38,8 @@ export type Options = { /** Optional name for the expression (if passed a string) */ defaultStepId?: string; + + stateLimitMb?: number; }; type RawOptions = Omit & { diff --git a/packages/runtime/src/util/clone.ts b/packages/runtime/src/util/clone.ts index 0920f7106..e1f3f9add 100644 --- a/packages/runtime/src/util/clone.ts +++ b/packages/runtime/src/util/clone.ts @@ -1,6 +1,4 @@ import type { State } from '@openfn/lexicon'; import stringify from 'fast-safe-stringify'; -// TODO I'm in the market for the best solution here - immer? deep-clone? -// What should we do if functions are in the state? export default (state: State) => JSON.parse(stringify(state)); diff --git a/packages/runtime/src/util/ensure-state-size.ts b/packages/runtime/src/util/ensure-state-size.ts new file mode 100644 index 000000000..f884d1d44 --- /dev/null +++ b/packages/runtime/src/util/ensure-state-size.ts @@ -0,0 +1,34 @@ +import { JsonStreamStringify } from 'json-stream-stringify'; +import { StateTooLargeError } from '../errors'; + +const replacer = (_key: string, value: any) => { + // Ignore non serializable keys + if ( + value === undefined || + typeof value === 'function' || + value?.constructor?.name === 'Promise' + ) { + return undefined; + } + + return value; +}; + +// throws if state exceeds a particular size limit +export default async (value: any, limit_mb: number = 500) => { + if (value && !isNaN(limit_mb) && limit_mb > 0) { + const limitBytes = limit_mb * 1024 * 1024; + let size_bytes = 0; + const stream = new JsonStreamStringify(value, replacer, 0, true); + for await (const chunk of stream) { + // Each chunk is a string token from the JSON output + size_bytes += Buffer.byteLength(chunk, 'utf8'); + + if (size_bytes > limitBytes) { + stream.destroy(); + throw new StateTooLargeError(limit_mb); + } + } + stream.destroy(); + } +}; diff --git a/packages/runtime/test/execute/compile-plan.test.ts b/packages/runtime/test/execute/compile-plan.test.ts index 091e99bb3..b1ed9efa0 100644 --- a/packages/runtime/test/execute/compile-plan.test.ts +++ b/packages/runtime/test/execute/compile-plan.test.ts @@ -359,6 +359,82 @@ test('should compile a falsy edge', (t) => { t.false(result); }); +test('should compile an edge with special string: on_job_success (and return true)', (t) => { + const plan = planWithEdge({ condition: 'on_job_success' }); + + const { workflow } = compilePlan(plan); + + const state = { + errors: {}, // no errors! + }; + + // @ts-ignore + const result = workflow.steps.a.next!.b.condition(state, 'x'); + t.true(result); +}); + +test('should compile an edge with special string: on_job_success (and return false)', (t) => { + const plan = planWithEdge({ condition: 'on_job_success' }); + + const { workflow } = compilePlan(plan); + + const state = { + errors: { + x: {}, // upstream job failed + }, + }; + + // @ts-ignore + const result = workflow.steps.a.next!.b.condition(state, 'x'); + t.false(result); +}); + +test('should compile an edge with special string: on_job_failure (and return true)', (t) => { + const plan = planWithEdge({ condition: 'on_job_failure' }); + + const { workflow } = compilePlan(plan); + + const state = { + errors: { + x: {}, + }, + }; + + // @ts-ignore + const result = workflow.steps.a.next!.b.condition(state, 'x'); + t.true(result); +}); + +test('should compile an edge with special string: on_job_failure (and return false)', (t) => { + const plan = planWithEdge({ condition: 'on_job_failure' }); + + const { workflow } = compilePlan(plan); + + const state = { + errors: {}, + }; + + // @ts-ignore + const result = workflow.steps.a.next!.b.condition(state, 'x'); + t.false(result); +}); + +test('should compile an edge with special string: always', (t) => { + const plan = planWithEdge({ condition: 'always' }); + + const { workflow } = compilePlan(plan); + + const state = { + errors: { + x: {}, + }, + }; + + // @ts-ignore + const result = workflow.steps.a.next!.b.condition(state, 'x'); + t.true(result); +}); + test('should compile an edge with arithmetic', (t) => { const plan = planWithEdge({ condition: '1 + 1' }); diff --git a/packages/runtime/test/runtime.test.ts b/packages/runtime/test/runtime.test.ts index 209fdf865..5af219bf4 100644 --- a/packages/runtime/test/runtime.test.ts +++ b/packages/runtime/test/runtime.test.ts @@ -852,6 +852,66 @@ test('run from an adaptor with error', async (t) => { t.truthy(result.errors['job-1']); }); +test('run a workflow with special on_job_success condition', async (t) => { + const plan: ExecutionPlan = { + workflow: { + steps: [ + { + expression: + 'export default [(s) =>{ if (s.err) throw new Error("e"); return s}]', + next: { + b: { + condition: 'on_job_success', + }, + }, + }, + { + id: 'b', + expression: 'export default [(s) => { s.data.b = true; return s}]', + }, + ], + }, + }; + + // The first should execute step b + const result1: any = await run(plan, {}); + t.true(result1.data.b); + + // The second should NOT execute b + const result2: any = await run(plan, { err: true }); + t.falsy(result2.data.b); +}); + +test('run a workflow with special on_job_failure condition', async (t) => { + const plan: ExecutionPlan = { + workflow: { + steps: [ + { + expression: + 'export default [(s) =>{ if (s.err) throw new Error("e"); return s}]', + next: { + b: { + condition: 'on_job_failure', + }, + }, + }, + { + id: 'b', + expression: 'export default [(s) => { s.data.b = true; return s}]', + }, + ], + }, + }; + + // The first should NOT execute step b + const result1: any = await run(plan, {}); + t.falsy(result1.data.b); + + // The second should execute b + const result2: any = await run(plan, { err: true }); + t.true(result2.data.b); +}); + test('accept a whitelist as a regex', async (t) => { const expression = ` import { call } from 'blah'; @@ -899,3 +959,40 @@ test('accept a whitelist as a string', async (t) => { t.is(error.message, 'module blacklisted: blah'); } }); + +test('do not enforce state size limit if state is small enough', async (t) => { + const plan: ExecutionPlan = { + workflow: { + steps: [ + { + expression: + 'export default [(s) => { s.data.large = new Array(1024).fill("z").join(""); return s; }]', + }, + ], + }, + }; + + await run(plan, {}, {}); + t.pass('did not fail'); +}); + +test('enforce state size limit from runtime option', async (t) => { + const plan: ExecutionPlan = { + workflow: { + steps: [ + { + expression: + 'export default [(s) => { s.data.large = new Array(1024).fill("z").join(""); return s; }]', + }, + ], + }, + }; + + try { + await run(plan, {}, { stateLimitMb: 1 / 1024 }); + t.fail('Should have thrown StateTooLargeError'); + } catch (error: any) { + t.is(error.name, 'StateTooLargeError'); + t.regex(error.message, /State exceeds the limit/); + } +}); diff --git a/packages/runtime/test/util/ensure-state-size.test.ts b/packages/runtime/test/util/ensure-state-size.test.ts new file mode 100644 index 000000000..dfb5616e3 --- /dev/null +++ b/packages/runtime/test/util/ensure-state-size.test.ts @@ -0,0 +1,108 @@ +import test from 'ava'; +import ensureStateSize from '../../src/util/ensure-state-size'; +import { StateTooLargeError } from '../../src/errors'; + +test('do not throw for limit 0kb', async (t) => { + const state = { data: new Array(1024).fill('z').join('') }; + await t.notThrowsAsync(() => ensureStateSize(state, 0)); +}); + +test('throw for limit 1kb, payload 1kb', async (t) => { + const state = { data: new Array(1024).fill('z').join('') }; + await t.throwsAsync(() => ensureStateSize(state, 1 / 1024), { + instanceOf: StateTooLargeError, + message: 'State exceeds the limit of 0.0009765625mb', + }); +}); + +test('ok for limit 2kb, payload 1kb', async (t) => { + const state = { data: new Array(1024).fill('z').join('') }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('allow circular references', async (t) => { + const state: any = { data: 'test' }; + state.self = state; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('allow circular references 2', async (t) => { + const arr = [{ id: 1 }, { id: 2 }, { id: 3 }]; + const state: any = { data: arr, arr }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle promise in state', async (t) => { + const state = { + data: 'test', + promise: new Promise((r) => r), + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle function in state', async (t) => { + const state = { + data: 'test', + fn: () => 'hello', + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle undefined in state', async (t) => { + const state = { + data: 'test', + undef: undefined, + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle symbol in state', async (t) => { + const state = { + data: 'test', + sym: Symbol('test'), + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle BigInt in state', async (t) => { + const state = { + data: 'test', + big: BigInt(123456789), + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle Date in state', async (t) => { + const state = { + data: 'test', + date: new Date('2024-01-01'), + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle RegExp in state', async (t) => { + const state = { + data: 'test', + regex: /test/gi, + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle Map in state', async (t) => { + const map = new Map(); + map.set('key', 'value'); + const state = { + data: 'test', + map: map, + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); + +test('handle Set in state', async (t) => { + const set = new Set([1, 2, 3]); + const state = { + data: 'test', + set: set, + }; + await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); +}); diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 11bb3186d..9deb376e3 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,21 @@ # ws-worker +## 1.21.0 + +### Minor Changes + +- 064933d: Measure the size of state objects at the end of each step, and throw if they exceed a limit + + In the Worker, this limit is set to 25% of the available runtime memory. + +### Patch Changes + +- f089f8d: Remove special edge condition mapping (this is now handled by the runtime) +- Updated dependencies [f089f8d] +- Updated dependencies [064933d] + - @openfn/runtime@1.8.0 + - @openfn/engine-multi@1.10.0 + ## 1.20.2 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 2b71b2908..149ab78b0 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.20.2", + "version": "1.21.0", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/mock/runtime-engine.ts b/packages/ws-worker/src/mock/runtime-engine.ts index 282168b7e..2d76b0b74 100644 --- a/packages/ws-worker/src/mock/runtime-engine.ts +++ b/packages/ws-worker/src/mock/runtime-engine.ts @@ -74,7 +74,7 @@ async function createMock() { for (const step of steps) { const job = step as Job; if (typeof job.configuration === 'string') { - // Call the crendtial callback, but don't do anything with it + // Call the credential callback, but don't do anything with it job.configuration = await options.resolvers?.credential?.( job.configuration ); diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index 6e8cd7c39..88c09bdb4 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -312,6 +312,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { if (!('logPayloadLimitMb' in options)) { options.logPayloadLimitMb = app.options.logPayloadLimitMb; } + options.timeoutRetryCount = app.options.timeoutRetryCount; options.timeoutRetryDelay = app.options.timeoutRetryDelayMs ?? app.options.socketTimeoutSeconds; diff --git a/packages/ws-worker/src/util/convert-lightning-plan.ts b/packages/ws-worker/src/util/convert-lightning-plan.ts index e284c6cad..7723e2a0b 100644 --- a/packages/ws-worker/src/util/convert-lightning-plan.ts +++ b/packages/ws-worker/src/util/convert-lightning-plan.ts @@ -16,24 +16,6 @@ import { LightningPlan, LightningEdge } from '@openfn/lexicon/lightning'; import { ExecuteOptions } from '@openfn/engine-multi'; import { getNameAndVersion } from '@openfn/runtime'; -export const conditions: Record string | null> = - { - on_job_success: (upstreamId: string) => - `Boolean(!state?.errors?.["${upstreamId}"] ?? true)`, - on_job_failure: (upstreamId: string) => - `Boolean(state?.errors && state.errors["${upstreamId}"])`, - always: (_upstreamId: string) => null, - }; - -const mapEdgeCondition = (edge: LightningEdge) => { - const { condition } = edge; - if (condition && condition in conditions) { - const upstream = (edge.source_job_id || edge.source_trigger_id) as string; - return conditions[condition](upstream); - } - return condition; -}; - const mapTriggerEdgeCondition = (edge: LightningEdge) => { const { condition } = edge; // This handles cron triggers with undefined conditions and the 'always' string. @@ -205,9 +187,8 @@ export default ( .reduce((obj, edge) => { const newEdge: StepEdge = {}; - const condition = mapEdgeCondition(edge); - if (condition) { - newEdge.condition = condition; + if (edge.condition) { + newEdge.condition = edge.condition; } if (edge.enabled === false) { newEdge.disabled = true; diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index 3f2deeb4a..a16f2b5fa 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -181,14 +181,14 @@ test.serial(`should reset backoff after claim`, (t) => { }); lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { - // set this articially high - if there are no more claims, the test will fail + // set this artificially high - if there are no more claims, the test will fail lastClaimDiff = 10000; // When the run is finished, the claims should resume // but with a smaller backoff setTimeout(() => { t.log('Backoff after run:', lastClaimDiff); - t.true(lastClaimDiff < 5); + t.true(lastClaimDiff < 10); done(); }, 10); }); diff --git a/packages/ws-worker/test/mock/runtime-engine.test.ts b/packages/ws-worker/test/mock/runtime-engine.test.ts index 28842769e..a876baf07 100644 --- a/packages/ws-worker/test/mock/runtime-engine.test.ts +++ b/packages/ws-worker/test/mock/runtime-engine.test.ts @@ -1,15 +1,16 @@ import test from 'ava'; import type { ExecutionPlan } from '@openfn/lexicon'; +import create from '../../src/mock/runtime-engine'; +import { waitForEvent, clone, createPlan } from '../util'; + import type { JobCompletePayload, JobStartPayload, WorkflowCompletePayload, WorkflowStartPayload, } from '@openfn/engine-multi'; -import create from '../../src/mock/runtime-engine'; -import { waitForEvent, clone, createPlan } from '../util'; -import { WorkflowErrorPayload } from '@openfn/engine-multi'; +import type { WorkflowErrorPayload } from '@openfn/engine-multi'; const sampleWorkflow = { id: 'w1', @@ -117,21 +118,30 @@ test.serial('wait function', async (t) => { test.serial( 'resolve credential before job-start if credential is a string', async (t) => { - const wf = clone(sampleWorkflow); - wf.id = t.title; - wf.workflow.steps[0].configuration = 'x'; - - let didCallCredentials; - const credential = async () => { - didCallCredentials = true; - return {}; - }; - - // @ts-ignore - engine.execute(wf, {}, { resolvers: { credential } }); - - await waitForEvent(engine, 'job-start'); - t.true(didCallCredentials); + t.plan(1); + return new Promise((resolve) => { + const wf = clone(sampleWorkflow); + wf.id = t.title; + wf.workflow.steps[0].configuration = 'x'; + + let didCallCredentials = false; + const credential = async () => { + didCallCredentials = true; + return {}; + }; + + engine.listen(wf.id, { + 'job-start': () => { + t.true(didCallCredentials); + }, + 'workflow-complete': () => { + resolve(); + }, + }); + + // @ts-ignore + engine.execute(wf, {}, { resolvers: { credential } }); + }); } ); @@ -145,7 +155,7 @@ test.serial('listen to events', async (t) => { }; const wf = createPlan({ - id: 'j1', + id: t.title, adaptors: ['common@1.0.0'], expression: 'export default [() => { console.log("x"); }]', }); diff --git a/packages/ws-worker/test/reasons.test.ts b/packages/ws-worker/test/reasons.test.ts index d44430d28..0e1bc797d 100644 --- a/packages/ws-worker/test/reasons.test.ts +++ b/packages/ws-worker/test/reasons.test.ts @@ -279,6 +279,26 @@ test('kill: timeout', async (t) => { t.is(reason.error_message, 'Workflow failed to return within 100ms'); }); +test('exception: state too large', async (t) => { + const plan = createPlan({ + id: 'x', + expression: `export default [(s) => { + s.data = new Array(1024 * 1024 * 10).fill("x").join(""); + return s; + }]`, + }); + + const options = { + stateLimitMb: 0.1, + }; + + const { reason } = await execute(plan, {}, options); + t.is(reason.reason, 'kill'); + t.is(reason.error_type, 'StateTooLargeError'); + t.truthy(reason.error_message); + t.regex(reason.error_message!, /State exceeds the limit/); +}); + test.todo('crash: workflow validation error'); test.todo('fail: adaptor error'); test.todo('crash: import error'); diff --git a/packages/ws-worker/test/util/convert-lightning-plan.test.ts b/packages/ws-worker/test/util/convert-lightning-plan.test.ts index dfe0b1dcc..7cf1e3132 100644 --- a/packages/ws-worker/test/util/convert-lightning-plan.test.ts +++ b/packages/ws-worker/test/util/convert-lightning-plan.test.ts @@ -4,8 +4,8 @@ import type { LightningJob, LightningTrigger, } from '@openfn/lexicon/lightning'; -import convertPlan, { conditions } from '../../src/util/convert-lightning-plan'; -import { ConditionalStepEdge, Job } from '@openfn/lexicon'; +import convertPlan from '../../src/util/convert-lightning-plan'; +import { Job } from '@openfn/lexicon'; // Creates a lightning node (job or trigger) const createNode = (props = {}) => @@ -41,11 +41,6 @@ const createJob = (props = {}) => ({ ...props, }); -const testEdgeCondition = (expr: string, state: any) => { - const fn = new Function('state', 'return ' + expr); - return fn(state); -}; - test('convert a single job', (t) => { const run: Partial = { id: 'w', @@ -452,175 +447,6 @@ test('convert two linked jobs with a disabled edge', (t) => { }); }); -test('on_job_success condition: return true if no errors', (t) => { - const condition = conditions.on_job_success('a'); - - const state = {}; - const result = testEdgeCondition(condition!, state); - - t.is(result, true); -}); - -// You can argue this both ways, but a job which returned no state is technically not in error -// Mostly I dont want it to blow up -test('on_job_success condition: return true if state is undefined', (t) => { - const condition = conditions.on_job_success('a'); - - const state = undefined; - const result = testEdgeCondition(condition!, state); - - t.is(result, true); -}); - -test('on_job_success condition: return true if unconnected upstream errors', (t) => { - const condition = conditions.on_job_success('a'); - - const state = { - errors: { - c: { - // some error that occured upstream - }, - }, - }; - const result = testEdgeCondition(condition!, state); - - t.is(result, true); -}); - -test('on_job_success condition: return false if the upstream job errored', (t) => { - const condition = conditions.on_job_success('a'); - - const state = { - errors: { - a: { - // some error that occured upstream - }, - }, - }; - const result = testEdgeCondition(condition!, state); - - t.is(result, false); -}); - -test('on_job_failure condition: return true if error immediately upstream', (t) => { - const condition = conditions.on_job_failure('a'); - - const state = { - errors: { - a: { - // some error that occured upstream - }, - }, - }; - const result = testEdgeCondition(condition!, state); - - t.is(result, true); -}); - -test('on_job_failure condition: return false if unrelated error upstream', (t) => { - const condition = conditions.on_job_failure('a'); - - const state = { - errors: { - b: { - // some error that occured upstream - }, - }, - }; - const result = testEdgeCondition(condition!, state); - - t.is(result, false); -}); - -test('on_job_failure condition: return false if no errors', (t) => { - const condition = conditions.on_job_failure('a'); - - const state = {}; - const result = testEdgeCondition(condition!, state); - - t.is(result, false); -}); - -test('on_job_failure condition: return false if state is undefined', (t) => { - const condition = conditions.on_job_failure('a'); - - const state = undefined; - const result = testEdgeCondition(condition!, state); - - t.is(result, false); -}); - -test('convert edge condition on_job_success', (t) => { - const run: Partial = { - id: 'w', - jobs: [createNode({ id: 'a' }), createNode({ id: 'b' })], - triggers: [], - edges: [createEdge('a', 'b', { condition: 'on_job_success' })], - }; - const { plan } = convertPlan(run as LightningPlan); - - const [job] = plan.workflow.steps as Job[]; - const edge = job.next as Record; - - t.truthy(edge.b); - t.is(edge.b.condition!, conditions.on_job_success('a')!); - t.true(testEdgeCondition(edge.b.condition!, {})); -}); - -test('convert edge condition on_job_failure', (t) => { - const run: Partial = { - id: 'w', - jobs: [createNode({ id: 'a' }), createNode({ id: 'b' })], - triggers: [], - edges: [createEdge('a', 'b', { condition: 'on_job_failure' })], - }; - const { plan } = convertPlan(run as LightningPlan); - - const [job] = plan.workflow.steps as Job[]; - const edge = job.next as Record; - - t.truthy(edge.b); - t.is(edge.b.condition!, conditions.on_job_failure('a')!); - // Check that this is valid js - t.true( - testEdgeCondition(edge.b.condition!, { - errors: { a: {} }, - }) - ); -}); - -test('convert edge condition on_job_success with a funky id', (t) => { - const id_a = 'a-b-c@ # {} !£'; - const run: Partial = { - id: 'w', - jobs: [createNode({ id: id_a }), createNode({ id: 'b' })], - triggers: [], - edges: [createEdge(id_a, 'b', { condition: 'on_job_success' })], - }; - const { plan } = convertPlan(run as LightningPlan); - const [job] = plan.workflow.steps as Job[]; - const edge = job.next as Record; - - t.truthy(edge.b); - t.is(edge.b.condition!, conditions.on_job_success(id_a)!); - // Check that this is valid js - t.true(testEdgeCondition(edge.b.condition!, {})); -}); - -test('convert edge condition always', (t) => { - const run: Partial = { - id: 'w', - jobs: [createNode({ id: 'a' }), createNode({ id: 'b' })], - triggers: [], - edges: [createEdge('a', 'b', { condition: 'always' })], - }; - const { plan } = convertPlan(run as LightningPlan); - - const [job] = plan.workflow.steps as Job[]; - const edge = job.next as Record; - t.false(edge.b.hasOwnProperty('condition')); -}); - test('append the collections adaptor to jobs that use it', (t) => { const run: Partial = { id: 'w', diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 34a8e6953..7066d0636 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -695,6 +695,9 @@ importers: fast-safe-stringify: specifier: ^2.1.1 version: 2.1.1 + json-stream-stringify: + specifier: ^3.1.6 + version: 3.1.6 semver: specifier: ^7.7.2 version: 7.7.2