Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions integration-tests/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/integration-tests-cli

## 1.0.8

### Patch Changes

- Updated dependencies [f089f8d]
- @openfn/[email protected]
- @openfn/[email protected]

## 1.0.7

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-cli",
"private": true,
"version": "1.0.7",
"version": "1.0.8",
"description": "CLI integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/cli/test/project-v1.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ steps:
next:
transform-data:
disabled: false
condition: true
condition: always
- id: transform-data
name: Transform data
adaptor: "@openfn/language-common@latest"
Expand Down
3 changes: 2 additions & 1 deletion integration-tests/worker/src/factories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export const createTrigger = () => ({
id: crypto.randomUUID(),
});

export const createEdge = (a: any, b: any, condition?: string) => {
export const createEdge = (a: any, b: any, extra?: any) => {
const edge: any = {
id: crypto.randomUUID(),
target_job_id: b.id,
Expand All @@ -29,6 +29,7 @@ export const createEdge = (a: any, b: any, condition?: string) => {
} else {
edge.source_job_id = a.id;
}
Object.assign(edge, extra);
return edge;
};

Expand Down
21 changes: 21 additions & 0 deletions integration-tests/worker/test/runs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,24 @@ test.serial('Run with collections', async (t) => {
{ key: 'c', value: { id: 'c' } },
]);
});

test.serial('Run with edge conditions', async (t) => {
const job1 = createJob({
body: `fn((s) => s)`,
});
const job2 = createJob({
body: `fn((s) => {
s.didExecuteStep2 = true
return s;
})`,
});
const edge = createEdge(job1, job2, {
// I would prefer this ran on failure, but that's a lot
// harder to do the way these tests are set up
condition: 'on_job_success',
});
const attempt = createRun([], [job1, job2], [edge]);

const state = await run(t, attempt);
t.true(state.didExecuteStep2);
});
14 changes: 14 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# @openfn/cli

## 1.22.0

### Minor Changes

- f089f8d: Fix edge conditions in pulled workflows

### Patch Changes

- Updated dependencies [f089f8d]
- Updated dependencies [f089f8d]
- Updated dependencies [064933d]
- @openfn/[email protected]
- @openfn/[email protected]

## 1.21.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "1.21.0",
"version": "1.22.0",
"description": "CLI devtools for the OpenFn toolchain",
"engines": {
"node": ">=18",
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/test/projects/checkout.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ workspace:
next: {
'transform-data-to-fhir-standard': {
disabled: false,
condition: true,
condition: 'always',
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/test/projects/fetch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ test.serial(
next: {
'transform-data': {
disabled: false,
condition: true,
condition: 'always',
openfn: {
uuid: 'a9a3adef-b394-4405-814d-3ac4323f4b4b',
},
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/test/projects/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ workflows:
next:
transform-data:
disabled: false
condition: true
condition: always
openfn:
uuid: a9a3adef-b394-4405-814d-3ac4323f4b4b
history:
Expand Down
14 changes: 14 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# engine-multi

## 1.10.0

### Minor Changes

- 064933d: Measure the size of state objects at the end of each step, and throw if they exceed a limit

In the Worker, this limit is set to 25% of the available runtime memory.

### Patch Changes

- Updated dependencies [f089f8d]
- Updated dependencies [064933d]
- @openfn/[email protected]

## 1.9.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.9.1",
"version": "1.10.0",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
9 changes: 9 additions & 0 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,17 @@ const execute = async (context: ExecutionContext) => {
repoDir: options.repoDir,
profile: context.options.profile,
profilePollInteval: context.options.profilePollInterval,
// work out the max size of the state object at the end of each step
// This must be fairly high to prevent crashes
stateLimitMb:
options.stateLimitMb ??
Math.max((options.memoryLimitMb ?? 1000) * 0.25),
} as RunOptions;

logger.debug(
`${state.plan.id} setting runtime state limit to ${runOptions.stateLimitMb}mb`
);

// Construct the payload limits object
const payloadLimits: PayloadLimits = {};
if (options.payloadLimitMb !== undefined) {
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ const createEngine = async (
callWorker,
options: {
...options,
stateLimitMb: opts.stateLimitMb,
sanitize: opts.sanitize,
resolvers: opts.resolvers,
runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout,
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export type ExecutionContextConstructor = {
};

export type ExecuteOptions = {
stateLimitMb?: number;
payloadLimitMb?: number;
logPayloadLimitMb?: number;
memoryLimitMb?: number;
Expand Down Expand Up @@ -72,7 +73,7 @@ export interface RuntimeEngine {
execute(
plan: ExecutionPlan,
input: State,
options?: Partial<EngineOptions>
options?: ExecuteOptions
): Pick<EventEmitter, 'on' | 'off' | 'once'>;

destroy(instant?: boolean): Promise<void>;
Expand Down
6 changes: 4 additions & 2 deletions packages/engine-multi/src/worker/thread/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export type RunOptions = {
jobLogLevel?: LogLevel;
profile?: boolean;
profilePollInterval?: number;
stateLimitMb?: number;
};

const eventMap = {
Expand All @@ -39,6 +40,7 @@ register({
jobLogLevel,
profile,
profilePollInterval,
stateLimitMb,
} = runOptions;
const { logger, jobLogger, adaptorLogger } = createLoggers(
plan.id!,
Expand All @@ -54,9 +56,8 @@ register({
console = adaptorLogger;

// Leave console.debug for local debugging
// This goes to stdout but not the adapator logger
// This goes to stdout but not the adaptor logger
console.debug = debug;

// TODO I would like to pull these options out of here
const options = {
// disable the runtime's own timeout
Expand All @@ -72,6 +73,7 @@ register({
profile,
profilePollInterval,
statePropsToRemove,
stateLimitMb,
callbacks: {
// TODO: this won't actually work across the worker boundary
// For now I am preloading credentials
Expand Down
29 changes: 29 additions & 0 deletions packages/engine-multi/test/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,35 @@ test.serial.skip('vm oom error', (t) => {
});
});

test.serial('state object too big', async (t) => {
return new Promise((done) => {
const plan = {
id: 'x',
workflow: {
steps: [
{
expression: `export default [(s) => {
s.data = new Array(1024 * 1024).fill("x").join("");
return s;
}]`,
},
],
},
};

const options = {
stateLimitMb: 0.1,
};

engine.execute(plan, {}, options).on(WORKFLOW_ERROR, (evt) => {
t.is(evt.type, 'StateTooLargeError');
t.is(evt.severity, 'kill');
t.is(evt.message, 'State exceeds the limit of 0.1mb');
done();
});
});
});

test.serial('execution error from async code', (t) => {
return new Promise((done) => {
const plan = {
Expand Down
9 changes: 9 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# @openfn/lightning-mock

## 2.4.1

### Patch Changes

- Updated dependencies [f089f8d]
- Updated dependencies [064933d]
- @openfn/[email protected]
- @openfn/[email protected]

## 2.4.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.4.0",
"version": "2.4.1",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
6 changes: 6 additions & 0 deletions packages/project/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @openfn/project

## 0.10.1

### Patch Changes

- f089f8d: Map edge conditions so that they are compatible with CLI

## 0.10.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/project/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/project",
"version": "0.10.0",
"version": "0.10.1",
"description": "Read, serialize, replicate and sync OpenFn projects",
"scripts": {
"test": "pnpm ava",
Expand Down
28 changes: 17 additions & 11 deletions packages/project/src/parse/from-app-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,28 @@ export default (
return new Project(proj as l.Project, config);
};

const mapTriggerEdgeCondition = (edge: Provisioner.Edge) => {
// TODO maybe this is a util and moved out of this file
export const mapEdge = (edge: Provisioner.Edge) => {
const e: any = {
disabled: !edge.enabled,
};
if (edge.condition_type === 'always') {
e.condition = true;
} else if (edge.condition_type === 'never') {
e.condition = false;
} else {

if (edge.condition_type === 'js_expression') {
e.condition = edge.condition_expression;
} else if (edge.condition_type) {
e.condition = edge.condition_type;
}

if (edge.condition_label) {
e.name = edge.condition_label;
}

// Do this last so that it serializes last
e.openfn = {
uuid: edge.id,
};
if (edge.id) {
e.openfn = {
uuid: edge.id,
};
}
return e;
};

Expand Down Expand Up @@ -114,7 +120,7 @@ export const mapWorkflow = (workflow: Provisioner.Workflow) => {
throw new Error(`Failed to find ${edge.target_job_id}`);
}
// we use the name, not the id, to reference
obj[slugify(target.name)] = mapTriggerEdgeCondition(edge);
obj[slugify(target.name)] = mapEdge(edge);
return obj;
}, {}),
} as l.Trigger);
Expand Down Expand Up @@ -148,7 +154,7 @@ export const mapWorkflow = (workflow: Provisioner.Workflow) => {
s.next = outboundEdges.reduce((next, edge) => {
const target = jobs.find((j) => j.id === edge.target_job_id);
// @ts-ignore
next[slugify(target.name)] = mapTriggerEdgeCondition(edge);
next[slugify(target.name)] = mapEdge(edge);
return next;
}, {});
}
Expand Down
19 changes: 13 additions & 6 deletions packages/project/src/serialize/to-app-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,19 @@ const mapWorkflow = (workflow: Workflow) => {
e.source_job_id = node.id;
}

if (rules.condition === true) {
e.condition_type = 'always';
} else if (rules.condition === false) {
e.condition_type = 'never';
} else if (typeof rules.condition === 'string') {
// TODO conditional
if (rules.condition) {
if (typeof rules.condition === 'boolean') {
e.condition_type = rules.condition ? 'always' : 'never';
} else if (
rules.condition.match(
/^(always|never|on_job_success|on_job_failure)$/
)
) {
e.condition_type = rules.condition;
} else {
e.condition_type = 'js_expression';
e.condition_expression = rules.condition;
}
}
wfState.edges.push(e);
});
Expand Down
Loading