Skip to content

Commit 0625f76

Browse files
chore(workflow-engine): export cancel method (medusajs#11844)
What: * Workflow engine exports the method `cancel` to revert a workflow.
1 parent 3db146c commit 0625f76

File tree

14 files changed

+300
-83
lines changed

14 files changed

+300
-83
lines changed

.changeset/good-wolves-travel.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@medusajs/workflow-engine-inmemory": patch
3+
"@medusajs/workflow-engine-redis": patch
4+
"@medusajs/orchestration": patch
5+
"@medusajs/types": patch
6+
---
7+
8+
chore(workflow-engine): expose cancel method

packages/core/orchestration/src/transaction/transaction-orchestrator.ts

Lines changed: 58 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
import {
2121
isDefined,
2222
isErrorLike,
23+
isObject,
2324
MedusaError,
2425
promiseAll,
2526
serializeError,
@@ -188,6 +189,7 @@ export class TransactionOrchestrator extends EventEmitter {
188189
TransactionStepState.DORMANT,
189190
TransactionStepState.SKIPPED,
190191
]
192+
191193
const siblings = step.next.map((sib) => flow.steps[sib])
192194
return (
193195
siblings.length === 0 ||
@@ -1208,70 +1210,72 @@ export class TransactionOrchestrator extends EventEmitter {
12081210
while (queue.length > 0) {
12091211
const { obj, level } = queue.shift()
12101212

1211-
for (const key of Object.keys(obj)) {
1212-
if (typeof obj[key] === "object" && obj[key] !== null) {
1213-
queue.push({ obj: obj[key], level: [...level] })
1214-
} else if (key === "action") {
1215-
if (actionNames.has(obj.action)) {
1216-
throw new Error(
1217-
`Step ${obj.action} is already defined in workflow.`
1218-
)
1219-
}
1213+
if (obj.action) {
1214+
if (actionNames.has(obj.action)) {
1215+
throw new Error(`Step ${obj.action} is already defined in workflow.`)
1216+
}
12201217

1221-
actionNames.add(obj.action)
1222-
level.push(obj.action)
1223-
const id = level.join(".")
1224-
const parent = level.slice(0, level.length - 1).join(".")
1218+
actionNames.add(obj.action)
1219+
level.push(obj.action)
1220+
const id = level.join(".")
1221+
const parent = level.slice(0, level.length - 1).join(".")
12251222

1226-
if (!existingSteps || parent === TransactionOrchestrator.ROOT_STEP) {
1227-
states[parent].next?.push(id)
1228-
}
1223+
if (!existingSteps || parent === TransactionOrchestrator.ROOT_STEP) {
1224+
states[parent].next?.push(id)
1225+
}
12291226

1230-
const definitionCopy = { ...obj }
1231-
delete definitionCopy.next
1227+
const definitionCopy = { ...obj }
1228+
delete definitionCopy.next
12321229

1233-
if (definitionCopy.async) {
1234-
features.hasAsyncSteps = true
1235-
}
1230+
if (definitionCopy.async) {
1231+
features.hasAsyncSteps = true
1232+
}
12361233

1237-
if (definitionCopy.timeout) {
1238-
features.hasStepTimeouts = true
1239-
}
1234+
if (definitionCopy.timeout) {
1235+
features.hasStepTimeouts = true
1236+
}
12401237

1241-
if (
1242-
definitionCopy.retryInterval ||
1243-
definitionCopy.retryIntervalAwaiting
1244-
) {
1245-
features.hasRetriesTimeout = true
1246-
}
1238+
if (
1239+
definitionCopy.retryInterval ||
1240+
definitionCopy.retryIntervalAwaiting
1241+
) {
1242+
features.hasRetriesTimeout = true
1243+
}
1244+
1245+
if (definitionCopy.nested) {
1246+
features.hasNestedTransactions = true
1247+
}
12471248

1248-
if (definitionCopy.nested) {
1249-
features.hasNestedTransactions = true
1249+
states[id] = Object.assign(
1250+
new TransactionStep(),
1251+
existingSteps?.[id] || {
1252+
id,
1253+
uuid: definitionCopy.uuid,
1254+
depth: level.length - 1,
1255+
definition: definitionCopy,
1256+
saveResponse: definitionCopy.saveResponse ?? true,
1257+
invoke: {
1258+
state: TransactionStepState.NOT_STARTED,
1259+
status: TransactionStepStatus.IDLE,
1260+
},
1261+
compensate: {
1262+
state: TransactionStepState.DORMANT,
1263+
status: TransactionStepStatus.IDLE,
1264+
},
1265+
attempts: 0,
1266+
failures: 0,
1267+
lastAttempt: null,
1268+
next: [],
12501269
}
1270+
)
1271+
}
12511272

1252-
states[id] = Object.assign(
1253-
new TransactionStep(),
1254-
existingSteps?.[id] || {
1255-
id,
1256-
uuid: definitionCopy.uuid,
1257-
depth: level.length - 1,
1258-
definition: definitionCopy,
1259-
saveResponse: definitionCopy.saveResponse ?? true,
1260-
invoke: {
1261-
state: TransactionStepState.NOT_STARTED,
1262-
status: TransactionStepStatus.IDLE,
1263-
},
1264-
compensate: {
1265-
state: TransactionStepState.DORMANT,
1266-
status: TransactionStepStatus.IDLE,
1267-
},
1268-
attempts: 0,
1269-
failures: 0,
1270-
lastAttempt: null,
1271-
next: [],
1272-
}
1273-
)
1273+
if (Array.isArray(obj.next)) {
1274+
for (const next of obj.next) {
1275+
queue.push({ obj: next, level: [...level] })
12741276
}
1277+
} else if (isObject(obj.next)) {
1278+
queue.push({ obj: obj.next, level: [...level] })
12751279
}
12761280
}
12771281

packages/core/types/src/workflows-sdk/service.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { FindConfig } from "../common"
2-
import { IModuleService } from "../modules-sdk"
2+
import { ContainerLike, IModuleService } from "../modules-sdk"
33
import { Context } from "../shared-context"
44
import {
55
FilterableWorkflowExecutionProps,
@@ -28,6 +28,15 @@ export interface WorkflowOrchestratorRunDTO<T = unknown>
2828
transactionId?: string
2929
}
3030

31+
export interface WorkflowOrchestratorCancelOptionsDTO {
32+
transactionId: string
33+
context?: Context
34+
throwOnError?: boolean
35+
logOnError?: boolean
36+
events?: Record<string, Function>
37+
container?: ContainerLike
38+
}
39+
3140
export type IdempotencyKeyParts = {
3241
workflowId: string
3342
transactionId: string
@@ -63,17 +72,11 @@ export interface IWorkflowEngineService extends IModuleService {
6372
workflowId: string,
6473
options?: WorkflowOrchestratorRunDTO,
6574
sharedContext?: Context
66-
): Promise<{
67-
errors: Error[]
68-
transaction: object
69-
result: any
70-
acknowledgement: Acknowledgement
71-
}>
75+
)
7276

7377
getRunningTransaction(
7478
workflowId: string,
7579
transactionId: string,
76-
options?: Record<string, any>,
7780
sharedContext?: Context
7881
): Promise<unknown>
7982

@@ -121,4 +124,10 @@ export interface IWorkflowEngineService extends IModuleService {
121124
},
122125
sharedContext?: Context
123126
)
127+
128+
cancel(
129+
workflowId: string,
130+
options: WorkflowOrchestratorCancelOptionsDTO,
131+
sharedContext?: Context
132+
)
124133
}

packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ export * from "./workflow_async"
44
export * from "./workflow_conditional_step"
55
export * from "./workflow_idempotent"
66
export * from "./workflow_step_timeout"
7+
export * from "./workflow_sync"
78
export * from "./workflow_transaction_timeout"
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import {
2+
createStep,
3+
createWorkflow,
4+
StepResponse,
5+
WorkflowResponse,
6+
} from "@medusajs/framework/workflows-sdk"
7+
8+
const step_1 = createStep(
9+
"step_1",
10+
jest.fn((input) => {
11+
input.test = "test"
12+
return new StepResponse(input, { compensate: 123 })
13+
}),
14+
jest.fn((compensateInput) => {
15+
if (!compensateInput) {
16+
return
17+
}
18+
19+
return new StepResponse({
20+
reverted: true,
21+
})
22+
})
23+
)
24+
25+
const step_2 = createStep(
26+
"step_2",
27+
jest.fn((input, context) => {
28+
if (input) {
29+
return new StepResponse({ notAsyncResponse: input.hey })
30+
}
31+
}),
32+
jest.fn((_, context) => {
33+
return new StepResponse({
34+
step: context.metadata.action,
35+
idempotency_key: context.metadata.idempotency_key,
36+
reverted: true,
37+
})
38+
})
39+
)
40+
41+
const step_3 = createStep(
42+
"step_3",
43+
jest.fn((res) => {
44+
return new StepResponse({
45+
done: {
46+
inputFromSyncStep: res.notAsyncResponse,
47+
},
48+
})
49+
})
50+
)
51+
52+
createWorkflow(
53+
{
54+
name: "workflow_sync",
55+
idempotent: true,
56+
},
57+
function (input) {
58+
step_1(input)
59+
60+
const ret2 = step_2({ hey: "oh" })
61+
62+
return new WorkflowResponse(step_3(ret2))
63+
}
64+
)

packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,26 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
300300
expect(onFinish).toHaveBeenCalledTimes(0)
301301
})
302302

303+
it("should cancel and revert a completed workflow", async () => {
304+
const workflowId = "workflow_sync"
305+
306+
const { acknowledgement, transaction: trx } =
307+
await workflowOrcModule.run(workflowId, {
308+
input: {
309+
value: "123",
310+
},
311+
})
312+
313+
expect(trx.getFlow().state).toEqual("done")
314+
expect(acknowledgement.hasFinished).toBe(true)
315+
316+
const { transaction } = await workflowOrcModule.cancel(workflowId, {
317+
transactionId: acknowledgement.transactionId,
318+
})
319+
320+
expect(transaction.getFlow().state).toEqual("reverted")
321+
})
322+
303323
it("should run conditional steps if condition is true", (done) => {
304324
void workflowOrcModule.subscribe({
305325
workflowId: "workflow_conditional_step",

packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ import {
66
TransactionStep,
77
WorkflowScheduler,
88
} from "@medusajs/framework/orchestration"
9-
import { ContainerLike, MedusaContainer } from "@medusajs/framework/types"
9+
import {
10+
ContainerLike,
11+
Context,
12+
MedusaContainer,
13+
} from "@medusajs/framework/types"
1014
import {
1115
isString,
1216
MedusaError,
@@ -18,9 +22,9 @@ import {
1822
resolveValue,
1923
ReturnWorkflow,
2024
} from "@medusajs/framework/workflows-sdk"
25+
import { WorkflowOrchestratorCancelOptions } from "@types"
2126
import { ulid } from "ulid"
2227
import { InMemoryDistributedTransactionStorage } from "../utils"
23-
import { WorkflowOrchestratorCancelOptions } from "@types"
2428

2529
export type WorkflowOrchestratorRunOptions<T> = Omit<
2630
FlowRunOptions<T>,
@@ -319,10 +323,8 @@ export class WorkflowOrchestratorService {
319323
async getRunningTransaction(
320324
workflowId: string,
321325
transactionId: string,
322-
options?: WorkflowOrchestratorRunOptions<unknown>
326+
context?: Context
323327
): Promise<DistributedTransactionType> {
324-
let { context, container } = options ?? {}
325-
326328
if (!workflowId) {
327329
throw new Error("Workflow ID is required")
328330
}
@@ -339,9 +341,7 @@ export class WorkflowOrchestratorService {
339341
throw new Error(`Workflow with id "${workflowId}" not found.`)
340342
}
341343

342-
const flow = exportedWorkflow(
343-
(container as MedusaContainer) ?? this.container_
344-
)
344+
const flow = exportedWorkflow()
345345

346346
const transaction = await flow.getRunningTransaction(transactionId, context)
347347

packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import type {
1818
import { SqlEntityManager } from "@mikro-orm/postgresql"
1919
import { WorkflowExecution } from "@models"
2020
import { WorkflowOrchestratorService } from "@services"
21+
import { WorkflowOrchestratorCancelOptions } from "@types"
2122

2223
type InjectedDependencies = {
2324
manager: SqlEntityManager
@@ -185,4 +186,16 @@ export class WorkflowsModuleService<
185186
updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time);
186187
`)
187188
}
189+
190+
@InjectSharedContext()
191+
async cancel<TWorkflow extends string | ReturnWorkflow<any, any, any>>(
192+
workflowIdOrWorkflow: TWorkflow,
193+
options: WorkflowOrchestratorCancelOptions,
194+
@MedusaContext() context: Context = {}
195+
) {
196+
return this.workflowOrchestratorService_.cancel(
197+
workflowIdOrWorkflow,
198+
options
199+
)
200+
}
188201
}

packages/modules/workflow-engine-inmemory/src/types/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ export type InitializeModuleInjectableDependencies = {
88

99
export type WorkflowOrchestratorCancelOptions = Omit<
1010
FlowCancelOptions,
11-
"transaction" | "container"
11+
"transaction" | "transactionId" | "container"
1212
> & {
13+
transactionId: string
1314
container?: ContainerLike
1415
}

packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
export * from "./workflow_1"
22
export * from "./workflow_2"
33
export * from "./workflow_async"
4+
export * from "./workflow_async_compensate"
45
export * from "./workflow_step_timeout"
6+
export * from "./workflow_sync"
57
export * from "./workflow_transaction_timeout"
68
export * from "./workflow_when"
7-
export * from "./workflow_async_compensate"

0 commit comments

Comments
 (0)