From 4adb886f61aca99a1afae03c030c49d9331c97c9 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Mon, 5 Jan 2026 16:30:08 +0000 Subject: [PATCH 01/11] runtime: map special condition strings --- packages/runtime/src/execute/compile-plan.ts | 26 ++++++- packages/runtime/src/execute/step.ts | 2 +- .../runtime/src/modules/compile-function.ts | 4 +- .../runtime/test/execute/compile-plan.test.ts | 76 +++++++++++++++++++ 4 files changed, 103 insertions(+), 5 deletions(-) diff --git a/packages/runtime/src/execute/compile-plan.ts b/packages/runtime/src/execute/compile-plan.ts index c78cc34c3..e6f8048b0 100644 --- a/packages/runtime/src/execute/compile-plan.ts +++ b/packages/runtime/src/execute/compile-plan.ts @@ -9,6 +9,17 @@ import { conditionContext, Context } from './context'; import { ExecutionPlan, Job, StepEdge, Workflow } from '@openfn/lexicon'; import { getNameAndVersion } from '../modules/repo'; +// map special condition strings to JS expressions +// The special strings are generated by lightning, and are useful convenience for local dev +export const conditions: Record = { + on_job_success: 'Boolean(!state?.errors?.[upstreamStepId] ?? true)', + on_job_failure: 'Boolean(state?.errors && state.errors[upstreamStepId])', + always: 'true', +}; +// create a couple of aliases for future reference +conditions.on_upstream_success = conditions.on_job_success; +conditions.on_upstream_fail = conditions.on_job_failure; + const compileEdges = ( from: string, edges: string | Record, @@ -19,6 +30,9 @@ const compileEdges = ( } const errs = []; + const mapCondition = (condition: string) => + conditions[condition] ?? condition; + const result = {} as Record; for (const edgeId in edges) { try { @@ -26,13 +40,21 @@ const compileEdges = ( if (typeof edge === 'boolean') { result[edgeId] = edge; } else if (typeof edge === 'string') { - result[edgeId] = { condition: compileFunction(edge, context) }; + result[edgeId] = { + condition: compileFunction(mapCondition(edge), context, [ + 'upstreamStepId', + ]), + }; } else { const newEdge = { ...edge, }; if (typeof edge.condition === 'string') { - (newEdge as any).condition = compileFunction(edge.condition, context); + (newEdge as any).condition = compileFunction( + mapCondition(edge.condition), + context, + ['upstreamStepId'] + ); } result[edgeId] = newEdge as CompiledEdge; } diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 1e72168ea..0156f7399 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -56,7 +56,7 @@ const calculateNext = (job: CompiledStep, result: any, logger: Logger) => { } if (typeof edge.condition === 'function') { try { - if (!edge.condition(result)) { + if (!edge.condition(result, job.id)) { logger.debug( `Edge condition returned false; ${nextJobId} will NOT be executed` ); diff --git a/packages/runtime/src/modules/compile-function.ts b/packages/runtime/src/modules/compile-function.ts index 6a7fa5892..8266a11c0 100644 --- a/packages/runtime/src/modules/compile-function.ts +++ b/packages/runtime/src/modules/compile-function.ts @@ -1,7 +1,7 @@ import vm, { Context } from './experimental-vm'; -export default (expression: string, context: Context) => { - return vm.compileFunction(`return ${expression}`, ['state'], { +export default (expression: string, context: Context, args: string[] = []) => { + return vm.compileFunction(`return ${expression}`, ['state'].concat(args), { parsingContext: context, }); }; diff --git a/packages/runtime/test/execute/compile-plan.test.ts b/packages/runtime/test/execute/compile-plan.test.ts index 091e99bb3..b1ed9efa0 100644 --- a/packages/runtime/test/execute/compile-plan.test.ts +++ b/packages/runtime/test/execute/compile-plan.test.ts @@ -359,6 +359,82 @@ test('should compile a falsy edge', (t) => { t.false(result); }); +test('should compile an edge with special string: on_job_success (and return true)', (t) => { + const plan = planWithEdge({ condition: 'on_job_success' }); + + const { workflow } = compilePlan(plan); + + const state = { + errors: {}, // no errors! + }; + + // @ts-ignore + const result = workflow.steps.a.next!.b.condition(state, 'x'); + t.true(result); +}); + +test('should compile an edge with special string: on_job_success (and return false)', (t) => { + const plan = planWithEdge({ condition: 'on_job_success' }); + + const { workflow } = compilePlan(plan); + + const state = { + errors: { + x: {}, // upstream job failed + }, + }; + + // @ts-ignore + const result = workflow.steps.a.next!.b.condition(state, 'x'); + t.false(result); +}); + +test('should compile an edge with special string: on_job_failure (and return true)', (t) => { + const plan = planWithEdge({ condition: 'on_job_failure' }); + + const { workflow } = compilePlan(plan); + + const state = { + errors: { + x: {}, + }, + }; + + // @ts-ignore + const result = workflow.steps.a.next!.b.condition(state, 'x'); + t.true(result); +}); + +test('should compile an edge with special string: on_job_failure (and return false)', (t) => { + const plan = planWithEdge({ condition: 'on_job_failure' }); + + const { workflow } = compilePlan(plan); + + const state = { + errors: {}, + }; + + // @ts-ignore + const result = workflow.steps.a.next!.b.condition(state, 'x'); + t.false(result); +}); + +test('should compile an edge with special string: always', (t) => { + const plan = planWithEdge({ condition: 'always' }); + + const { workflow } = compilePlan(plan); + + const state = { + errors: { + x: {}, + }, + }; + + // @ts-ignore + const result = workflow.steps.a.next!.b.condition(state, 'x'); + t.true(result); +}); + test('should compile an edge with arithmetic', (t) => { const plan = planWithEdge({ condition: '1 + 1' }); From dad44b54dbd914cf6944085d3ffed34d24d14724 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Mon, 5 Jan 2026 16:35:30 +0000 Subject: [PATCH 02/11] pass upstream id to conditions --- packages/runtime/test/runtime.test.ts | 60 +++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/packages/runtime/test/runtime.test.ts b/packages/runtime/test/runtime.test.ts index 209fdf865..63cd502c4 100644 --- a/packages/runtime/test/runtime.test.ts +++ b/packages/runtime/test/runtime.test.ts @@ -852,6 +852,66 @@ test('run from an adaptor with error', async (t) => { t.truthy(result.errors['job-1']); }); +test('run a workflow with special on_job_success condition', async (t) => { + const plan: ExecutionPlan = { + workflow: { + steps: [ + { + expression: + 'export default [(s) =>{ if (s.err) throw new Error("e"); return s}]', + next: { + b: { + condition: 'on_job_success', + }, + }, + }, + { + id: 'b', + expression: 'export default [(s) => { s.data.b = true; return s}]', + }, + ], + }, + }; + + // The first should execute step b + const result1: any = await run(plan, {}); + t.true(result1.data.b); + + // The second should NOT execute b + const result2: any = await run(plan, { err: true }); + t.falsy(result2.data.b); +}); + +test('run a workflow with special on_job_failure condition', async (t) => { + const plan: ExecutionPlan = { + workflow: { + steps: [ + { + expression: + 'export default [(s) =>{ if (s.err) throw new Error("e"); return s}]', + next: { + b: { + condition: 'on_job_failure', + }, + }, + }, + { + id: 'b', + expression: 'export default [(s) => { s.data.b = true; return s}]', + }, + ], + }, + }; + + // The first should NOT execute step b + const result1: any = await run(plan, {}); + t.falsy(result1.data.b); + + // The second should execute b + const result2: any = await run(plan, { err: true }); + t.true(result2.data.b); +}); + test('accept a whitelist as a regex', async (t) => { const expression = ` import { call } from 'blah'; From a20442ca9d90dabfc48fa4a803d1ec14b1a48930 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Mon, 5 Jan 2026 17:27:29 +0000 Subject: [PATCH 03/11] project: support edge conditions in v1 state Still need to apply mappings in v2 --- .changeset/loud-numbers-sell.md | 5 + packages/project/src/parse/from-app-state.ts | 28 ++-- .../project/src/serialize/to-app-state.ts | 19 ++- .../project/test/parse/from-app-state.test.ts | 158 +++++++++++++++++- .../test/serialize/to-app-state.test.ts | 78 +++------ 5 files changed, 216 insertions(+), 72 deletions(-) create mode 100644 .changeset/loud-numbers-sell.md diff --git a/.changeset/loud-numbers-sell.md b/.changeset/loud-numbers-sell.md new file mode 100644 index 000000000..d43b9765b --- /dev/null +++ b/.changeset/loud-numbers-sell.md @@ -0,0 +1,5 @@ +--- +'@openfn/project': patch +--- + +Map edge conditions so that they are compatible with CLI diff --git a/packages/project/src/parse/from-app-state.ts b/packages/project/src/parse/from-app-state.ts index 24d246c93..5224d6fc3 100644 --- a/packages/project/src/parse/from-app-state.ts +++ b/packages/project/src/parse/from-app-state.ts @@ -62,22 +62,28 @@ export default ( return new Project(proj as l.Project, config); }; -const mapTriggerEdgeCondition = (edge: Provisioner.Edge) => { +// TODO maybe this is a util and moved out of this file +export const mapEdge = (edge: Provisioner.Edge) => { const e: any = { disabled: !edge.enabled, }; - if (edge.condition_type === 'always') { - e.condition = true; - } else if (edge.condition_type === 'never') { - e.condition = false; - } else { + + if (edge.condition_type === 'js_expression') { e.condition = edge.condition_expression; + } else if (edge.condition_type) { + e.condition = edge.condition_type; + } + + if (edge.condition_label) { + e.name = edge.condition_label; } // Do this last so that it serializes last - e.openfn = { - uuid: edge.id, - }; + if (edge.id) { + e.openfn = { + uuid: edge.id, + }; + } return e; }; @@ -114,7 +120,7 @@ export const mapWorkflow = (workflow: Provisioner.Workflow) => { throw new Error(`Failed to find ${edge.target_job_id}`); } // we use the name, not the id, to reference - obj[slugify(target.name)] = mapTriggerEdgeCondition(edge); + obj[slugify(target.name)] = mapEdge(edge); return obj; }, {}), } as l.Trigger); @@ -148,7 +154,7 @@ export const mapWorkflow = (workflow: Provisioner.Workflow) => { s.next = outboundEdges.reduce((next, edge) => { const target = jobs.find((j) => j.id === edge.target_job_id); // @ts-ignore - next[slugify(target.name)] = mapTriggerEdgeCondition(edge); + next[slugify(target.name)] = mapEdge(edge); return next; }, {}); } diff --git a/packages/project/src/serialize/to-app-state.ts b/packages/project/src/serialize/to-app-state.ts index 6886ff424..a08786466 100644 --- a/packages/project/src/serialize/to-app-state.ts +++ b/packages/project/src/serialize/to-app-state.ts @@ -141,12 +141,19 @@ const mapWorkflow = (workflow: Workflow) => { e.source_job_id = node.id; } - if (rules.condition === true) { - e.condition_type = 'always'; - } else if (rules.condition === false) { - e.condition_type = 'never'; - } else if (typeof rules.condition === 'string') { - // TODO conditional + if (rules.condition) { + if (typeof rules.condition === 'boolean') { + e.condition_type = rules.condition ? 'always' : 'never'; + } else if ( + rules.condition.match( + /^(always|never|on_job_success|on_job_failure)$/ + ) + ) { + e.condition_type = rules.condition; + } else { + e.condition_type = 'js_expression'; + e.condition_expression = rules.condition; + } } wfState.edges.push(e); }); diff --git a/packages/project/test/parse/from-app-state.test.ts b/packages/project/test/parse/from-app-state.test.ts index fe696dec8..6f509bc04 100644 --- a/packages/project/test/parse/from-app-state.test.ts +++ b/packages/project/test/parse/from-app-state.test.ts @@ -1,5 +1,8 @@ import test from 'ava'; -import fromAppState, { mapWorkflow } from '../../src/parse/from-app-state'; +import fromAppState, { + mapEdge, + mapWorkflow, +} from '../../src/parse/from-app-state'; import { clone, cloneDeep } from 'lodash-es'; import state, { withCreds } from '../fixtures/sample-v1-project'; @@ -90,7 +93,7 @@ test('should create a Project from prov state with a workflow', (t) => { openfn: { enabled: true, uuid: '4a06289c-15aa-4662-8dc6-f0aaacd8a058' }, next: { 'transform-data': { - condition: true, + condition: 'always', disabled: false, openfn: { uuid: 'a9a3adef-b394-4405-814d-3ac4323f4b4b', @@ -130,7 +133,7 @@ test('mapWorkflow: map a simple trigger', (t) => { type: 'webhook', next: { 'transform-data': { - condition: true, + condition: 'always', disabled: false, openfn: { uuid: 'a9a3adef-b394-4405-814d-3ac4323f4b4b', @@ -219,6 +222,155 @@ test('mapWorkflow: map a job with projcet credentials onto job.configuration', ( }); }); +test('mapEdge: map enabled state', (t) => { + let e; + + e = mapEdge({} as any); + t.deepEqual(e, { + disabled: true, + }); + + e = mapEdge({ + enabled: true, + } as any); + t.deepEqual(e, { + disabled: false, + }); + + e = mapEdge({ + enabled: false, + } as any); + t.deepEqual(e, { + disabled: true, + }); +}); + +test('mapEdge: map UUID', (t) => { + const e = mapEdge({ + id: 'abc', + } as any); + t.deepEqual(e, { + disabled: true, + openfn: { + uuid: 'abc', + }, + }); +}); + +test('mapEdge: map label', (t) => { + const e = mapEdge({ + condition_label: 'abc', + } as any); + t.deepEqual(e, { + disabled: true, + name: 'abc', + }); +}); + +test('mapEdge: map conditions', (t) => { + let e; + + // basically any condition type should just map + e = mapEdge({ + condition_type: 'always', + } as any); + t.deepEqual(e, { + disabled: true, + condition: 'always', + }); + + e = mapEdge({ + condition_type: 'on_job_success', + } as any); + t.deepEqual(e, { + disabled: true, + condition: 'on_job_success', + }); + + e = mapEdge({ + condition_type: 'jam', + } as any); + t.deepEqual(e, { + disabled: true, + condition: 'jam', + }); + + // But js expression should override + e = mapEdge({ + condition_type: 'js_expression', + condition_expression: 'abc', + } as any); + t.deepEqual(e, { + disabled: true, + condition: 'abc', + }); +}); + +// TODO the workflow yaml is not a project yaml +// so this test doesn't work +// I'll need to pull the project yaml, with uuids, to get this to work +test.skip('mapWorkflow: map edge conditions', (t) => { + // TODO for yaml like this: + const yaml = ` +workflows: + - name: Edge Conditions + jobs: + - Transform-data: + name: Transform data + adaptor: "@openfn/language-common@latest" + body: assert($.ok) + - sucess: + name: sucess + adaptor: "@openfn/language-common@latest" + body: log('All ok!') + - fail: + name: fail + adaptor: "@openfn/language-common@latest" + body: log('everything is terrible') + - custom: + name: custom + adaptor: "@openfn/language-common@latest" + body: | + // Check out the Job Writing Guide for help getting started: + // https://docs.openfn.org/documentation/jobs/job-writing-guide + triggers: + - webhook: + type: webhook + enabled: true + edges: + - webhook->Transform-data: + condition_type: always + enabled: true + target_job: Transform-data + source_trigger: webhook + - Transform-data->sucess: + condition_type: on_job_success + enabled: true + target_job: sucess + source_job: Transform-data + - Transform-data->fail: + condition_type: on_job_failure + enabled: true + target_job: fail + source_job: Transform-data + - Transform-data->custom: + condition_type: js_expression + enabled: true + target_job: custom + source_job: Transform-data + condition_expression: state.ok == 22 + +`; + const project = fromAppState(yaml, meta, { + format: 'yaml', + }); + console.log(project.workflows[0].steps); + const { next } = project.workflows[0].steps[1]; + console.log({ next }); + // make sure that the condition_types get mapped to condition + // also make sure that custom conditions work (both ways) +}); + test('should create a Project from prov state yaml', (t) => { const yaml = `id: e16c5f09-f0cb-4ba7-a4c2-73fcb2f29d00 name: aaa diff --git a/packages/project/test/serialize/to-app-state.test.ts b/packages/project/test/serialize/to-app-state.test.ts index e0c3e6613..75a4d195a 100644 --- a/packages/project/test/serialize/to-app-state.test.ts +++ b/packages/project/test/serialize/to-app-state.test.ts @@ -277,61 +277,35 @@ c-p t.deepEqual(edges, [3, 6, 9]); }); -/** - * Stumbled on something difficult here - * - * lightning saves special edge conditions, like on_job_success - * We need to convert that two ways - * - * I think I need to make the RUNTIME support the special strings - * in order to make this sync properly. Else it's too hard. - * - * See https://github.com/OpenFn/kit/issues/1123 - */ -test.skip('should handle edge conditions', (t) => { +test('should handle edge conditions', (t) => { const wf = ` -a-(condition=true)-b -a-(condition=false)-c -a-(condition=false)-c -a--b +a-(condition=always)-b +a-(condition="on_job_success")-c +a-(condition="on_job_failure")-d +a-(condition=never)-e +a-(condition=x)-f `; - const project = generateProject('p'); - const data = { - id: 'my-project', - workflows: [ - { - id: 'wf', - steps: [ - { - id: 'trigger', - type: 'webhook', - next: { - step: {}, - }, - }, - { - id: 'a', - expression: '.', - }, - { - id: 'b', - expression: '.', - }, - { - id: 'c', - expression: '.', - }, - { - id: 'd', - expression: '.', - }, - ], - }, - ], - }; + const project = generateProject('p', [wf], { + uuidSeed: 1, // ensure predictable UUIDS + }); - const state = toAppState(project); - // TODO + const state = toAppState(project, { format: 'json' }); + const [a_b, a_c, a_d, a_e, a_f] = state.workflows[0].edges; + + t.is(a_b.condition_type, 'always'); + t.falsy(a_b.condition_expression); + + t.is(a_c.condition_type, 'on_job_success'); + t.falsy(a_c.condition_expression); + + t.is(a_d.condition_type, 'on_job_failure'); + t.falsy(a_d.condition_expression); + + t.is(a_e.condition_type, 'never'); + t.falsy(a_e.condition_expression); + + t.is(a_f.condition_type, 'js_expression'); + t.is(a_f.condition_expression, 'x'); }); test('should convert a project back to app state in json', (t) => { From 0016804360750a4ea9a3edc45d9f350121cfe467 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 6 Jan 2026 12:06:29 +0000 Subject: [PATCH 04/11] add integration test --- integration-tests/worker/src/factories.ts | 3 ++- integration-tests/worker/test/runs.test.ts | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/integration-tests/worker/src/factories.ts b/integration-tests/worker/src/factories.ts index 2b0251f0a..ee65c6ced 100644 --- a/integration-tests/worker/src/factories.ts +++ b/integration-tests/worker/src/factories.ts @@ -19,7 +19,7 @@ export const createTrigger = () => ({ id: crypto.randomUUID(), }); -export const createEdge = (a: any, b: any, condition?: string) => { +export const createEdge = (a: any, b: any, extra?: any) => { const edge: any = { id: crypto.randomUUID(), target_job_id: b.id, @@ -29,6 +29,7 @@ export const createEdge = (a: any, b: any, condition?: string) => { } else { edge.source_job_id = a.id; } + Object.assign(edge, extra); return edge; }; diff --git a/integration-tests/worker/test/runs.test.ts b/integration-tests/worker/test/runs.test.ts index 3b031c320..f834029ba 100644 --- a/integration-tests/worker/test/runs.test.ts +++ b/integration-tests/worker/test/runs.test.ts @@ -283,3 +283,24 @@ test.serial('Run with collections', async (t) => { { key: 'c', value: { id: 'c' } }, ]); }); + +test.serial('Run with edge conditions', async (t) => { + const job1 = createJob({ + body: `fn((s) => s)`, + }); + const job2 = createJob({ + body: `fn((s) => { + s.didExecuteStep2 = true + return s; + })`, + }); + const edge = createEdge(job1, job2, { + // I would prefer this ran on failure, but that's a lot + // harder to do the way these tests are set up + condition: 'on_job_success', + }); + const attempt = createRun([], [job1, job2], [edge]); + + const state = await run(t, attempt); + t.true(state.didExecuteStep2); +}); From 965631856f489ee0ae357ca4db0d1f8ddfabed15 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 6 Jan 2026 13:51:14 +0000 Subject: [PATCH 05/11] remove special worker handling of conditions --- .../src/util/convert-lightning-plan.ts | 23 +-- .../test/util/convert-lightning-plan.test.ts | 173 +----------------- 2 files changed, 4 insertions(+), 192 deletions(-) diff --git a/packages/ws-worker/src/util/convert-lightning-plan.ts b/packages/ws-worker/src/util/convert-lightning-plan.ts index e284c6cad..7723e2a0b 100644 --- a/packages/ws-worker/src/util/convert-lightning-plan.ts +++ b/packages/ws-worker/src/util/convert-lightning-plan.ts @@ -16,24 +16,6 @@ import { LightningPlan, LightningEdge } from '@openfn/lexicon/lightning'; import { ExecuteOptions } from '@openfn/engine-multi'; import { getNameAndVersion } from '@openfn/runtime'; -export const conditions: Record string | null> = - { - on_job_success: (upstreamId: string) => - `Boolean(!state?.errors?.["${upstreamId}"] ?? true)`, - on_job_failure: (upstreamId: string) => - `Boolean(state?.errors && state.errors["${upstreamId}"])`, - always: (_upstreamId: string) => null, - }; - -const mapEdgeCondition = (edge: LightningEdge) => { - const { condition } = edge; - if (condition && condition in conditions) { - const upstream = (edge.source_job_id || edge.source_trigger_id) as string; - return conditions[condition](upstream); - } - return condition; -}; - const mapTriggerEdgeCondition = (edge: LightningEdge) => { const { condition } = edge; // This handles cron triggers with undefined conditions and the 'always' string. @@ -205,9 +187,8 @@ export default ( .reduce((obj, edge) => { const newEdge: StepEdge = {}; - const condition = mapEdgeCondition(edge); - if (condition) { - newEdge.condition = condition; + if (edge.condition) { + newEdge.condition = edge.condition; } if (edge.enabled === false) { newEdge.disabled = true; diff --git a/packages/ws-worker/test/util/convert-lightning-plan.test.ts b/packages/ws-worker/test/util/convert-lightning-plan.test.ts index dfe0b1dcc..cad8982f9 100644 --- a/packages/ws-worker/test/util/convert-lightning-plan.test.ts +++ b/packages/ws-worker/test/util/convert-lightning-plan.test.ts @@ -4,8 +4,8 @@ import type { LightningJob, LightningTrigger, } from '@openfn/lexicon/lightning'; -import convertPlan, { conditions } from '../../src/util/convert-lightning-plan'; -import { ConditionalStepEdge, Job } from '@openfn/lexicon'; +import convertPlan from '../../src/util/convert-lightning-plan'; +import { Job } from '@openfn/lexicon'; // Creates a lightning node (job or trigger) const createNode = (props = {}) => @@ -452,175 +452,6 @@ test('convert two linked jobs with a disabled edge', (t) => { }); }); -test('on_job_success condition: return true if no errors', (t) => { - const condition = conditions.on_job_success('a'); - - const state = {}; - const result = testEdgeCondition(condition!, state); - - t.is(result, true); -}); - -// You can argue this both ways, but a job which returned no state is technically not in error -// Mostly I dont want it to blow up -test('on_job_success condition: return true if state is undefined', (t) => { - const condition = conditions.on_job_success('a'); - - const state = undefined; - const result = testEdgeCondition(condition!, state); - - t.is(result, true); -}); - -test('on_job_success condition: return true if unconnected upstream errors', (t) => { - const condition = conditions.on_job_success('a'); - - const state = { - errors: { - c: { - // some error that occured upstream - }, - }, - }; - const result = testEdgeCondition(condition!, state); - - t.is(result, true); -}); - -test('on_job_success condition: return false if the upstream job errored', (t) => { - const condition = conditions.on_job_success('a'); - - const state = { - errors: { - a: { - // some error that occured upstream - }, - }, - }; - const result = testEdgeCondition(condition!, state); - - t.is(result, false); -}); - -test('on_job_failure condition: return true if error immediately upstream', (t) => { - const condition = conditions.on_job_failure('a'); - - const state = { - errors: { - a: { - // some error that occured upstream - }, - }, - }; - const result = testEdgeCondition(condition!, state); - - t.is(result, true); -}); - -test('on_job_failure condition: return false if unrelated error upstream', (t) => { - const condition = conditions.on_job_failure('a'); - - const state = { - errors: { - b: { - // some error that occured upstream - }, - }, - }; - const result = testEdgeCondition(condition!, state); - - t.is(result, false); -}); - -test('on_job_failure condition: return false if no errors', (t) => { - const condition = conditions.on_job_failure('a'); - - const state = {}; - const result = testEdgeCondition(condition!, state); - - t.is(result, false); -}); - -test('on_job_failure condition: return false if state is undefined', (t) => { - const condition = conditions.on_job_failure('a'); - - const state = undefined; - const result = testEdgeCondition(condition!, state); - - t.is(result, false); -}); - -test('convert edge condition on_job_success', (t) => { - const run: Partial = { - id: 'w', - jobs: [createNode({ id: 'a' }), createNode({ id: 'b' })], - triggers: [], - edges: [createEdge('a', 'b', { condition: 'on_job_success' })], - }; - const { plan } = convertPlan(run as LightningPlan); - - const [job] = plan.workflow.steps as Job[]; - const edge = job.next as Record; - - t.truthy(edge.b); - t.is(edge.b.condition!, conditions.on_job_success('a')!); - t.true(testEdgeCondition(edge.b.condition!, {})); -}); - -test('convert edge condition on_job_failure', (t) => { - const run: Partial = { - id: 'w', - jobs: [createNode({ id: 'a' }), createNode({ id: 'b' })], - triggers: [], - edges: [createEdge('a', 'b', { condition: 'on_job_failure' })], - }; - const { plan } = convertPlan(run as LightningPlan); - - const [job] = plan.workflow.steps as Job[]; - const edge = job.next as Record; - - t.truthy(edge.b); - t.is(edge.b.condition!, conditions.on_job_failure('a')!); - // Check that this is valid js - t.true( - testEdgeCondition(edge.b.condition!, { - errors: { a: {} }, - }) - ); -}); - -test('convert edge condition on_job_success with a funky id', (t) => { - const id_a = 'a-b-c@ # {} !£'; - const run: Partial = { - id: 'w', - jobs: [createNode({ id: id_a }), createNode({ id: 'b' })], - triggers: [], - edges: [createEdge(id_a, 'b', { condition: 'on_job_success' })], - }; - const { plan } = convertPlan(run as LightningPlan); - const [job] = plan.workflow.steps as Job[]; - const edge = job.next as Record; - - t.truthy(edge.b); - t.is(edge.b.condition!, conditions.on_job_success(id_a)!); - // Check that this is valid js - t.true(testEdgeCondition(edge.b.condition!, {})); -}); - -test('convert edge condition always', (t) => { - const run: Partial = { - id: 'w', - jobs: [createNode({ id: 'a' }), createNode({ id: 'b' })], - triggers: [], - edges: [createEdge('a', 'b', { condition: 'always' })], - }; - const { plan } = convertPlan(run as LightningPlan); - - const [job] = plan.workflow.steps as Job[]; - const edge = job.next as Record; - t.false(edge.b.hasOwnProperty('condition')); -}); - test('append the collections adaptor to jobs that use it', (t) => { const run: Partial = { id: 'w', From 19a8db21d83de92b4124189dbcabd6341d4ff90a Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 6 Jan 2026 13:55:12 +0000 Subject: [PATCH 06/11] types --- packages/ws-worker/test/util/convert-lightning-plan.test.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/packages/ws-worker/test/util/convert-lightning-plan.test.ts b/packages/ws-worker/test/util/convert-lightning-plan.test.ts index cad8982f9..7cf1e3132 100644 --- a/packages/ws-worker/test/util/convert-lightning-plan.test.ts +++ b/packages/ws-worker/test/util/convert-lightning-plan.test.ts @@ -41,11 +41,6 @@ const createJob = (props = {}) => ({ ...props, }); -const testEdgeCondition = (expr: string, state: any) => { - const fn = new Function('state', 'return ' + expr); - return fn(state); -}; - test('convert a single job', (t) => { const run: Partial = { id: 'w', From cdcc1cd851468abee90743b6f2bbb356d423bd9b Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 6 Jan 2026 14:25:17 +0000 Subject: [PATCH 07/11] update tests --- packages/cli/test/projects/checkout.test.ts | 2 +- packages/cli/test/projects/fetch.test.ts | 2 +- packages/cli/test/projects/fixtures.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/cli/test/projects/checkout.test.ts b/packages/cli/test/projects/checkout.test.ts index cfe2a911d..06947f829 100644 --- a/packages/cli/test/projects/checkout.test.ts +++ b/packages/cli/test/projects/checkout.test.ts @@ -457,7 +457,7 @@ workspace: next: { 'transform-data-to-fhir-standard': { disabled: false, - condition: true, + condition: 'always', }, }, }, diff --git a/packages/cli/test/projects/fetch.test.ts b/packages/cli/test/projects/fetch.test.ts index 0072984f5..51ee74351 100644 --- a/packages/cli/test/projects/fetch.test.ts +++ b/packages/cli/test/projects/fetch.test.ts @@ -442,7 +442,7 @@ test.serial( next: { 'transform-data': { disabled: false, - condition: true, + condition: 'always', openfn: { uuid: 'a9a3adef-b394-4405-814d-3ac4323f4b4b', }, diff --git a/packages/cli/test/projects/fixtures.ts b/packages/cli/test/projects/fixtures.ts index a8013466a..02682ea53 100644 --- a/packages/cli/test/projects/fixtures.ts +++ b/packages/cli/test/projects/fixtures.ts @@ -90,7 +90,7 @@ workflows: next: transform-data: disabled: false - condition: true + condition: always openfn: uuid: a9a3adef-b394-4405-814d-3ac4323f4b4b history: From 543523577f4f721708f24d4f062021bf16e48815 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 6 Jan 2026 14:31:33 +0000 Subject: [PATCH 08/11] tests --- integration-tests/cli/test/project-v1.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/cli/test/project-v1.test.ts b/integration-tests/cli/test/project-v1.test.ts index cb899a77b..13007e429 100644 --- a/integration-tests/cli/test/project-v1.test.ts +++ b/integration-tests/cli/test/project-v1.test.ts @@ -138,7 +138,7 @@ steps: next: transform-data: disabled: false - condition: true + condition: always - id: transform-data name: Transform data adaptor: "@openfn/language-common@latest" From 5261f9c325d2de32da68ac3bd6b048a4e6bfac8f Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 6 Jan 2026 14:47:03 +0000 Subject: [PATCH 09/11] changeset for cli --- .changeset/three-lamps-double.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/three-lamps-double.md diff --git a/.changeset/three-lamps-double.md b/.changeset/three-lamps-double.md new file mode 100644 index 000000000..77b21e1df --- /dev/null +++ b/.changeset/three-lamps-double.md @@ -0,0 +1,5 @@ +--- +'@openfn/cli': minor +--- + +Fix edge conditions in pulled workflows From 208ed62dc2aff3484d84bd666b6ca08372f90a92 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 6 Jan 2026 14:48:43 +0000 Subject: [PATCH 10/11] changeset for runtime --- .changeset/great-facts-wish.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/great-facts-wish.md diff --git a/.changeset/great-facts-wish.md b/.changeset/great-facts-wish.md new file mode 100644 index 000000000..8882d629e --- /dev/null +++ b/.changeset/great-facts-wish.md @@ -0,0 +1,7 @@ +--- +'@openfn/runtime': minor +--- + +Support special condition strings `never`, `always`, `on_job_success` and `on_job_fail`. + +These used to be mapped from Lightning workflows by the Worker, but by supporting them in the runtime directly we get much better compatibility across platforms From 78680fd08b601339404c0c25914319c4c11d0f94 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 6 Jan 2026 14:49:24 +0000 Subject: [PATCH 11/11] changeset for worker --- .changeset/breezy-banks-tell.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/breezy-banks-tell.md diff --git a/.changeset/breezy-banks-tell.md b/.changeset/breezy-banks-tell.md new file mode 100644 index 000000000..e95c364cd --- /dev/null +++ b/.changeset/breezy-banks-tell.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': patch +--- + +Remove special edge condition mapping (this is now handled by the runtime)