diff --git a/.changeset/chilly-hotels-juggle.md b/.changeset/chilly-hotels-juggle.md new file mode 100644 index 000000000..7edce5f93 --- /dev/null +++ b/.changeset/chilly-hotels-juggle.md @@ -0,0 +1,5 @@ +--- +'@openfn/cli': patch +--- + +Support workflow.jaml/json files without a top workflow key diff --git a/.changeset/lucky-words-kick.md b/.changeset/lucky-words-kick.md new file mode 100644 index 000000000..641d0af4c --- /dev/null +++ b/.changeset/lucky-words-kick.md @@ -0,0 +1,5 @@ +--- +'@openfn/runtime': patch +--- + +Support a start key on a workflow (different from a start option) diff --git a/.changeset/poor-mangos-tease.md b/.changeset/poor-mangos-tease.md new file mode 100644 index 000000000..796d486dd --- /dev/null +++ b/.changeset/poor-mangos-tease.md @@ -0,0 +1,5 @@ +--- +'@openfn/runtime': patch +--- + +Tweak error messaging when state objects exceed size limit diff --git a/integration-tests/cli/test/execute-workflow.test.ts b/integration-tests/cli/test/execute-workflow.test.ts index 5d5adacbc..9e69d8465 100644 --- a/integration-tests/cli/test/execute-workflow.test.ts +++ b/integration-tests/cli/test/execute-workflow.test.ts @@ -83,6 +83,41 @@ test.serial( } ); +test.serial( + `openfn ${jobsPath}/wf-array.yaml -S "{ \\"data\\": { \\"items\\": [\\"z\\"] } }"`, + async (t) => { + const { err } = await run(t.title); + t.falsy(err); + + const out = getJSON(); + t.is(out.data.items.length, 4); + t.deepEqual(out.data.items, ['z', 'a', 'b', 'c']); + } +); + +test.serial( + `openfn ${jobsPath}/wf-array-legacy.yaml -S "{ \\"data\\": { \\"items\\": [\\"z\\"] } }"`, + async (t) => { + const { stdout, err } = await run(t.title); + t.falsy(err); + + const out = getJSON(); + t.is(out.data.items.length, 3); + t.deepEqual(out.data.items, ['z', 'b', 'c']); + } +); + +test.serial( + `openfn ${jobsPath}/wf-array-legacy.json -S "{ \\"data\\": { \\"items\\": [\\"z\\"] } }"`, + async (t) => { + const { err } = await run(t.title); + t.falsy(err); + const out = getJSON(); + t.is(out.data.items.length, 3); + t.deepEqual(out.data.items, ['z', 'b', 'c']); + } +); + // special start step test.serial( `openfn ${jobsPath}/wf-array.json --start b -S "{ \\"data\\": { \\"items\\": [] } }"`, @@ -96,6 +131,17 @@ test.serial( t.true(out.data.items.includes('c')); } ); +test.serial( + `openfn ${jobsPath}/wf-array-legacy.json --start c -S "{ \\"data\\": { \\"items\\": [] } }"`, + async (t) => { + const { err } = await run(t.title); + t.falsy(err); + + const out = getJSON(); + t.is(out.data.items.length, 1); + t.true(out.data.items.includes('c')); + } +); // only step test.serial( diff --git a/integration-tests/cli/test/fixtures/wf-array-legacy.json b/integration-tests/cli/test/fixtures/wf-array-legacy.json new file mode 100644 index 000000000..3a823ee8f --- /dev/null +++ b/integration-tests/cli/test/fixtures/wf-array-legacy.json @@ -0,0 +1,26 @@ +{ + "workflow": { + "steps": [ + { + "id": "a", + "adaptor": "common", + "expression": "fn((state) => { if (!state.data.items) { state.data.items = []; } state.data.items.push('a'); return state; });", + "next": { "b": true } + }, + { + "id": "b", + "adaptor": "common", + "expression": "fn((state) => { state.data.items.push('b'); return state; });", + "next": { "c": true } + }, + { + "id": "c", + "adaptor": "common", + "expression": "fn((state) => { state.data.items.push('c'); return state; });" + } + ] + }, + "options": { + "start": "b" + } +} diff --git a/integration-tests/cli/test/fixtures/wf-array-legacy.yaml b/integration-tests/cli/test/fixtures/wf-array-legacy.yaml new file mode 100644 index 000000000..c1592c8be --- /dev/null +++ b/integration-tests/cli/test/fixtures/wf-array-legacy.yaml @@ -0,0 +1,17 @@ +workflow: + steps: + - id: a + adaptor: common + expression: fn((state) => { if (!state.data.items) { state.data.items = []; } state.data.items.push('a'); return state; }); + next: + b: true + - id: b + adaptor: common + expression: fn((state) => { state.data.items.push('b'); return state; }); + next: + c: true + - id: c + adaptor: common + expression: fn((state) => { state.data.items.push('c'); return state; }); +options: + start: b diff --git a/integration-tests/cli/test/fixtures/wf-array.json b/integration-tests/cli/test/fixtures/wf-array.json index 06e949844..589bf7033 100644 --- a/integration-tests/cli/test/fixtures/wf-array.json +++ b/integration-tests/cli/test/fixtures/wf-array.json @@ -1,23 +1,21 @@ { - "workflow": { - "steps": [ - { - "id": "a", - "adaptor": "common", - "expression": "fn((state) => { if (!state.data.items) { state.data.items = []; } state.data.items.push('a'); return state; });", - "next": { "b": true } - }, - { - "id": "b", - "adaptor": "common", - "expression": "fn((state) => { state.data.items.push('b'); return state; });", - "next": { "c": true } - }, - { - "id": "c", - "adaptor": "common", - "expression": "fn((state) => { state.data.items.push('c'); return state; });" - } - ] - } + "steps": [ + { + "id": "a", + "adaptor": "common", + "expression": "fn((state) => { if (!state.data.items) { state.data.items = []; } state.data.items.push('a'); return state; });", + "next": { "b": true } + }, + { + "id": "b", + "adaptor": "common", + "expression": "fn((state) => { state.data.items.push('b'); return state; });", + "next": { "c": true } + }, + { + "id": "c", + "adaptor": "common", + "expression": "fn((state) => { state.data.items.push('c'); return state; });" + } + ] } diff --git a/integration-tests/cli/test/fixtures/wf-array.yaml b/integration-tests/cli/test/fixtures/wf-array.yaml new file mode 100644 index 000000000..099c9fbb4 --- /dev/null +++ b/integration-tests/cli/test/fixtures/wf-array.yaml @@ -0,0 +1,14 @@ +steps: + - id: a + adaptor: common + expression: fn((state) => { if (!state.data.items) { state.data.items = []; } state.data.items.push('a'); return state; }); + next: + b: true + - id: b + adaptor: common + expression: fn((state) => { state.data.items.push('b'); return state; }); + next: + c: true + - id: c + adaptor: common + expression: fn((state) => { state.data.items.push('c'); return state; }); diff --git a/integration-tests/cli/test/project-v1.test.ts b/integration-tests/cli/test/project-v1.test.ts index 13007e429..1f584d0ae 100644 --- a/integration-tests/cli/test/project-v1.test.ts +++ b/integration-tests/cli/test/project-v1.test.ts @@ -131,6 +131,7 @@ test.serial('Checkout a project', async (t) => { workflowYaml, `id: my-workflow name: my workflow +start: trigger-webhook options: {} steps: - id: trigger diff --git a/packages/cli/src/options.ts b/packages/cli/src/options.ts index 0d050644e..acf6403ba 100644 --- a/packages/cli/src/options.ts +++ b/packages/cli/src/options.ts @@ -373,7 +373,7 @@ export const inputPath: CLIOption = { }, ensure: (opts) => { const { path: basePath } = opts; - if (basePath?.endsWith('.json')) { + if (basePath?.match(/.(json|ya?ml)$/)) { opts.planPath = basePath; } else if (basePath?.endsWith('.js')) { opts.expressionPath = basePath; diff --git a/packages/cli/src/util/load-plan.ts b/packages/cli/src/util/load-plan.ts index 00dba22db..1bfe3a682 100644 --- a/packages/cli/src/util/load-plan.ts +++ b/packages/cli/src/util/load-plan.ts @@ -32,15 +32,20 @@ const loadPlan = async ( // so many more input formats const { workflowPath, planPath, expressionPath } = options; + let workflowObj; if (options.path && /ya?ml$/.test(options.path)) { const content = await fs.readFile(path.resolve(options.path), 'utf-8'); - const workflow = yamlToJson(content); options.baseDir = dirname(options.path); - return loadXPlan({ workflow }, options, logger); + workflowObj = yamlToJson(content); + const { options: o, ...rest } = workflowObj; + // restructure the workflow so that options are not on the workflow object, + // but part of hte execution plan options instead + if (!workflowObj.workflow && workflowObj.options) { + workflowObj = { workflow: rest, options: o }; + } } - // Run a workflow from a project, with a path and workflow name - if (options.path && options.workflow) { + else if (options.path && options.workflow) { options.baseDir = options.path; return fromProject(options.path, options.workflow, options, logger); } @@ -48,6 +53,7 @@ const loadPlan = async ( // Run a workflow from a project in the current working dir // (no expression or workflow path, and no file extension) if ( + !workflowObj && !expressionPath && !workflowPath && !/\.(js|json|yaml)+$/.test(options.path || '') && @@ -59,7 +65,7 @@ const loadPlan = async ( return fromProject(path.resolve('.'), workflow!, options, logger); } - if (expressionPath) { + if (!workflowObj && expressionPath) { return loadExpression(options, logger); } @@ -69,13 +75,24 @@ const loadPlan = async ( options.baseDir = path.dirname(jsonPath!); } - const json = await loadJson(jsonPath!, logger); - const defaultName = path.parse(jsonPath!).name; + workflowObj = workflowObj ?? (await loadJson(jsonPath!, logger)); + const defaultName = workflowObj.name || path.parse(jsonPath ?? '').name; - if (json.workflow) { - return loadXPlan(json, options, logger, defaultName); + // Support very old workflow formats + if (workflowObj.jobs) { + return loadOldWorkflow(workflowObj, options, logger, defaultName); + } + // support workflow saved like { workflow, options } + else if (workflowObj.workflow) { + return loadXPlan( + workflowObj, + Object.assign({}, workflowObj.options, options), + logger, + defaultName + ); } else { - return loadOldWorkflow(json, options, logger, defaultName); + // This is the main route now - just load the workflow from the file + return loadXPlan({ workflow: workflowObj }, options, logger, defaultName); } }; diff --git a/packages/cli/test/projects/checkout.test.ts b/packages/cli/test/projects/checkout.test.ts index 06947f829..3ffd1df28 100644 --- a/packages/cli/test/projects/checkout.test.ts +++ b/packages/cli/test/projects/checkout.test.ts @@ -450,6 +450,7 @@ workspace: id: 'simple-workflow', name: 'Simple Workflow', options: {}, + start: 'trigger-webhook', steps: [ { id: 'trigger', diff --git a/packages/cli/test/projects/fetch.test.ts b/packages/cli/test/projects/fetch.test.ts index 51ee74351..8c8b4d8de 100644 --- a/packages/cli/test/projects/fetch.test.ts +++ b/packages/cli/test/projects/fetch.test.ts @@ -450,6 +450,7 @@ test.serial( }, }, ], + start: 'trigger-webhook', openfn: { uuid: '72ca3eb0-042c-47a0-a2a1-a545ed4a8406', inserted_at: '2025-04-23T11:19:32Z', diff --git a/packages/cli/test/projects/fixtures.ts b/packages/cli/test/projects/fixtures.ts index 02682ea53..9cb301162 100644 --- a/packages/cli/test/projects/fixtures.ts +++ b/packages/cli/test/projects/fixtures.ts @@ -100,4 +100,5 @@ workflows: inserted_at: 2025-04-23T11:19:32Z updated_at: 2025-04-23T11:19:32Z lock_version: 1 - id: my-workflow`; + id: my-workflow + start: trigger-webhook`; diff --git a/packages/cli/test/util/load-plan.test.ts b/packages/cli/test/util/load-plan.test.ts index 6e50dde63..720e27348 100644 --- a/packages/cli/test/util/load-plan.test.ts +++ b/packages/cli/test/util/load-plan.test.ts @@ -4,11 +4,7 @@ import { createMockLogger } from '@openfn/logger'; import type { Job } from '@openfn/lexicon'; import loadPlan from '../../src/util/load-plan'; -import { - collectionsEndpoint, - collectionsVersion, - Opts, -} from '../../src/options'; +import { Opts } from '../../src/options'; const logger = createMockLogger(undefined, { level: 'debug' }); @@ -38,6 +34,7 @@ test.beforeEach(() => { jobs: [{ id: 'a', expression: 'x()' }], }), 'test/wf.json': JSON.stringify(sampleXPlan), + 'test/wf-flat.json': JSON.stringify(sampleXPlan.workflow), 'test/wf-err.json': '!!!', }); }); @@ -163,7 +160,7 @@ test.serial( } ); -test.serial('xplan: load a plan from workflow path', async (t) => { +test.serial('xplan: load an old-style plan from workflow path', async (t) => { const opts = { workflowPath: 'test/wf.json', expandAdaptors: true, @@ -176,6 +173,22 @@ test.serial('xplan: load a plan from workflow path', async (t) => { t.deepEqual(plan, sampleXPlan); }); +test.serial('xplan: load a new flat plan from workflow path', async (t) => { + const opts = { + workflowPath: 'test/wf-flat.json', + expandAdaptors: true, + plan: {}, + }; + + const plan = await loadPlan(opts, logger); + + t.truthy(plan); + t.deepEqual(plan, { + options: {}, // no options here! + workflow: sampleXPlan.workflow, + }); +}); + test.serial('xplan: expand adaptors', async (t) => { const opts = { workflowPath: 'test/wf.json', @@ -429,3 +442,79 @@ test.serial('xplan: append collections', async (t) => { collections_token: opts.apiKey, }); }); + +test.serial( + 'xplan: load a workflow.yaml without top workflow key', + async (t) => { + mock({ + 'test/wf.yaml': ` +name: wf +steps: + - id: a + adaptors: [] + expression: x() +`, + }); + const opts = { + path: 'test/wf.yaml', + }; + + const plan = await loadPlan(opts, logger); + + t.truthy(plan); + // Note that options are lost in this design! + t.deepEqual(plan, { workflow: sampleXPlan.workflow, options: {} }); + } +); + +test.serial( + 'xplan: load a workflow.yaml without top workflow key and options', + async (t) => { + mock({ + 'test/wf.yaml': ` +name: wf +steps: + - id: a + adaptors: [] + expression: x() +options: + start: x +`, + }); + const opts = { + path: 'test/wf.yaml', + }; + + const plan = await loadPlan(opts, logger); + + t.truthy(plan); + // Note that options are lost in this design! + t.deepEqual(plan, { + workflow: sampleXPlan.workflow, + options: { start: 'x' }, + }); + } +); + +test.serial('xplan: load a workflow.yaml with top workflow key', async (t) => { + mock({ + 'test/wf.yaml': ` +workflow: + name: wf + steps: + - id: a + adaptors: [] + expression: x() +options: + start: a +`, + }); + const opts = { + path: 'test/wf.yaml', + }; + + const plan = await loadPlan(opts, logger); + + t.truthy(plan); + t.deepEqual(plan, sampleXPlan); +}); diff --git a/packages/lexicon/core.d.ts b/packages/lexicon/core.d.ts index 15a198974..f25e05601 100644 --- a/packages/lexicon/core.d.ts +++ b/packages/lexicon/core.d.ts @@ -161,6 +161,9 @@ export type Workflow = { // holds history information of a workflow history?: string[]; + + /** The default start node - the one the workflow was designed for (the trigger) */ + start?: string; }; export type StepId = string; diff --git a/packages/project/src/Workflow.ts b/packages/project/src/Workflow.ts index 2a5e3c98b..28b8c982d 100644 --- a/packages/project/src/Workflow.ts +++ b/packages/project/src/Workflow.ts @@ -30,7 +30,15 @@ class Workflow { // history needs to be on workflow object. this.workflow.history = workflow.history?.length ? workflow.history : []; - const { id, name, openfn, steps, history, ...options } = workflow; + const { + id, + name, + openfn, + steps, + history, + start: _start, + ...options + } = workflow; if (!(id || name)) { throw new Error('A Workflow MUST have a name or id'); } @@ -54,6 +62,14 @@ class Workflow { return this.workflow.steps; } + get start(): string | undefined { + return this.workflow.start; + } + + set start(s: string) { + this.workflow.start = s; + } + _buildIndex() { for (const step of this.workflow.steps) { const s = step as any; diff --git a/packages/project/src/parse/from-app-state.ts b/packages/project/src/parse/from-app-state.ts index 5224d6fc3..9f08e83c2 100644 --- a/packages/project/src/parse/from-app-state.ts +++ b/packages/project/src/parse/from-app-state.ts @@ -107,6 +107,10 @@ export const mapWorkflow = (workflow: Provisioner.Workflow) => { workflow.triggers.forEach((trigger: Provisioner.Trigger) => { const { type, ...otherProps } = trigger; + if (!mapped.start) { + mapped.start = `trigger-${type}`; + } + const connectedEdges = edges.filter( (e) => e.source_trigger_id === trigger.id ); diff --git a/packages/project/src/serialize/to-fs.ts b/packages/project/src/serialize/to-fs.ts index ca9f59956..e934ad624 100644 --- a/packages/project/src/serialize/to-fs.ts +++ b/packages/project/src/serialize/to-fs.ts @@ -48,6 +48,7 @@ export const extractWorkflow = (project: Project, workflowId: string) => { const wf = { id: workflow.id, name: workflow.name, + start: workflow.start, // Note: if no options are defined, options will serialize to an empty object // Not crazy about this - maybe we should do something better? Or do we like the consistency? options: workflow.options, diff --git a/packages/project/test/fixtures/sample-v2-project.ts b/packages/project/test/fixtures/sample-v2-project.ts index 049ccb27e..05e9be6fb 100644 --- a/packages/project/test/fixtures/sample-v2-project.ts +++ b/packages/project/test/fixtures/sample-v2-project.ts @@ -31,6 +31,7 @@ export const json: SerializedProject = { id: 'workflow', openfn: { uuid: 1 }, history: [], + start: 'trigger', }, ], }; @@ -67,4 +68,5 @@ workflows: openfn: uuid: 1 history: [] + start: trigger `; diff --git a/packages/project/test/parse/from-app-state.test.ts b/packages/project/test/parse/from-app-state.test.ts index 6f509bc04..699853cb4 100644 --- a/packages/project/test/parse/from-app-state.test.ts +++ b/packages/project/test/parse/from-app-state.test.ts @@ -86,6 +86,7 @@ test('should create a Project from prov state with a workflow', (t) => { id: 'my-workflow', name: 'My Workflow', history: [], + start: 'trigger-webhook', steps: [ { id: 'trigger', diff --git a/packages/project/test/parse/from-project.test.ts b/packages/project/test/parse/from-project.test.ts index d6c57644d..4da3625ae 100644 --- a/packages/project/test/parse/from-project.test.ts +++ b/packages/project/test/parse/from-project.test.ts @@ -86,6 +86,7 @@ test('import from a v2 project as JSON', async (t) => { uuid: 1, }, history: [], + start: 'trigger', steps: [ { name: 'b', @@ -140,6 +141,7 @@ test('import from a v2 project as YAML', async (t) => { openfn: { uuid: 1, }, + start: 'trigger', history: [], steps: [ { diff --git a/packages/project/test/serialize/to-app-state.test.ts b/packages/project/test/serialize/to-app-state.test.ts index 75a4d195a..68a022140 100644 --- a/packages/project/test/serialize/to-app-state.test.ts +++ b/packages/project/test/serialize/to-app-state.test.ts @@ -256,6 +256,38 @@ test('should handle credentials', (t) => { t.is(job.project_credential_id, 'p'); }); +test('should ignore workflow start keys', (t) => { + const data = { + id: 'my-project', + workflows: [ + { + id: 'wf', + start: 'step', + steps: [ + { + id: 'trigger', + type: 'webhook', + next: { + step: {}, + }, + }, + { + id: 'step', + expression: '.', + configuration: 'p', + openfn: { + keychain_credential_id: 'k', + }, + }, + ], + }, + ], + }; + + const state = toAppState(new Project(data), { format: 'json' }); + t.falsy(state.workflows[0].start); +}); + test.todo('handle edge labels'); test('serialize steps and trigger in alphabetical order', (t) => { diff --git a/packages/project/test/serialize/to-fs.test.ts b/packages/project/test/serialize/to-fs.test.ts index 6f558ae20..eb4063592 100644 --- a/packages/project/test/serialize/to-fs.test.ts +++ b/packages/project/test/serialize/to-fs.test.ts @@ -20,6 +20,7 @@ test('extractWorkflow: single simple workflow (yaml by default)', (t) => { id: 'my-workflow', name: 'My Workflow', steps: [step], + start: 'step', // should be ignored because this lives in the project file openfn: { id: '72ca3eb0-042c-47a0-a2a1-a545ed4a8406', @@ -31,11 +32,11 @@ test('extractWorkflow: single simple workflow (yaml by default)', (t) => { const { path, content } = extractWorkflow(project, 'my-workflow'); t.is(path, 'workflows/my-workflow/my-workflow.yaml'); - // TODO is the empty options object correct here?? t.deepEqual( content, `id: my-workflow name: My Workflow +start: step options: {} steps: - id: step diff --git a/packages/project/test/serialize/to-project.test.ts b/packages/project/test/serialize/to-project.test.ts index 533350059..299356cb4 100644 --- a/packages/project/test/serialize/to-project.test.ts +++ b/packages/project/test/serialize/to-project.test.ts @@ -31,6 +31,7 @@ const createProject = () => { }); // hack delete proj.workflows[0].steps[0].name; + proj.workflows[0].start = 'trigger'; return proj; }; diff --git a/packages/runtime/src/execute/plan.ts b/packages/runtime/src/execute/plan.ts index 33914d746..e04ebed28 100644 --- a/packages/runtime/src/execute/plan.ts +++ b/packages/runtime/src/execute/plan.ts @@ -54,7 +54,7 @@ const executePlan = async ( } const queue: Array<{ stepName: string; input: any }> = [ - { stepName: options.start, input }, + { stepName: options.start ?? workflow.start, input }, ]; // count how many times each step has been called diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index d701ab337..819c35ade 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -90,8 +90,8 @@ const prepareFinalState = async ( if (state) { try { await ensureStateSize(state, stateLimit_mb); - } catch (e) { - logger.error('Critical error processing state:'); + } catch (e: any) { + logger.error('Critical error processing state: ', e.message); throw e; } diff --git a/packages/runtime/src/types.ts b/packages/runtime/src/types.ts index 4a9ac5929..957696233 100644 --- a/packages/runtime/src/types.ts +++ b/packages/runtime/src/types.ts @@ -44,8 +44,11 @@ export type CompiledExecutionPlan = { globals?: string; steps: Record; credentials?: Record; + /** The default start node - the one the workflow was designed for (the trigger) */ + start?: StepId; }; options: WorkflowOptions & { + /** User-specified start node */ start: StepId; }; }; diff --git a/packages/runtime/test/execute/plan.test.ts b/packages/runtime/test/execute/plan.test.ts index 4666e29c0..3efc6c3a1 100644 --- a/packages/runtime/test/execute/plan.test.ts +++ b/packages/runtime/test/execute/plan.test.ts @@ -589,7 +589,7 @@ test('only execute one job in a two-job execution plan', async (t) => { t.is(result.data.x, 1); }); -test('execute a two-job execution plan with custom start', async (t) => { +test('execute a two-job execution plan with an option start', async (t) => { const plan = createPlan( [ { @@ -609,6 +609,24 @@ test('execute a two-job execution plan with custom start', async (t) => { t.is(result.data.result, 11); }); +test('execute a two-job execution plan with a custom start', async (t) => { + const plan = createPlan([ + { + id: 'job1', + expression: 'export default [() => ({ data: { result: 11 } }) ]', + }, + { + id: 'job2', + expression: 'export default [() => ({ data: { result: 1 } }) ]', + next: { job1: true }, + }, + ]); + plan.workflow.start = 'job2'; + + const result: any = await executePlan(plan, {}, {}, mockLogger); + t.is(result.data.result, 11); +}); + test('Return when there are no more edges', async (t) => { const plan = createPlan( [