Skip to content

Commit 238e7d5

Browse files
authored
fix(wfe): should notify when finished + add state info (medusajs#12982)
1 parent 89a57ed commit 238e7d5

File tree

4 files changed

+47
-28
lines changed

4 files changed

+47
-28
lines changed

.changeset/rude-windows-boil.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@medusajs/workflow-engine-inmemory": patch
3+
"@medusajs/workflow-engine-redis": patch
4+
---
5+
6+
fix(wfe): should notify when finished + add state info

packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type NotifyOptions = {
5151
eventType: keyof DistributedTransactionEvents
5252
workflowId: string
5353
transactionId?: string
54+
state?: TransactionState
5455
step?: TransactionStep
5556
response?: unknown
5657
result?: unknown
@@ -269,9 +270,6 @@ export class WorkflowOrchestratorService {
269270
throw new Error(`Workflow with id "${workflowId}" not found.`)
270271
}
271272

272-
const originalOnFinishHandler = events.onFinish!
273-
delete events.onFinish
274-
275273
const transaction = await this.getRunningTransaction(
276274
workflowId,
277275
transactionId,
@@ -307,12 +305,11 @@ export class WorkflowOrchestratorService {
307305
const metadata = ret.transaction.getFlow().metadata
308306
const { parentStepIdempotencyKey } = metadata ?? {}
309307

310-
const hasFailed = [TransactionState.FAILED].includes(
311-
ret.transaction.getFlow().state
312-
)
308+
const transactionState = ret.transaction.getFlow().state
309+
const hasFailed = [TransactionState.FAILED].includes(transactionState)
313310

314311
const acknowledgement = {
315-
transactionId: context.transactionId,
312+
transactionId: transaction.transactionId,
316313
workflowId: workflowId,
317314
parentStepIdempotencyKey,
318315
hasFinished,
@@ -323,8 +320,11 @@ export class WorkflowOrchestratorService {
323320
if (hasFinished) {
324321
const { result, errors } = ret
325322

326-
await originalOnFinishHandler({
327-
transaction: ret.transaction,
323+
this.notify({
324+
eventType: "onFinish",
325+
workflowId,
326+
transactionId: transaction.transactionId,
327+
state: transactionState as TransactionState,
328328
result,
329329
errors,
330330
})
@@ -423,6 +423,7 @@ export class WorkflowOrchestratorService {
423423
eventType: "onFinish",
424424
workflowId,
425425
transactionId,
426+
state: ret.transaction.getFlow().state as TransactionState,
426427
result,
427428
errors,
428429
})
@@ -493,6 +494,7 @@ export class WorkflowOrchestratorService {
493494
eventType: "onFinish",
494495
workflowId,
495496
transactionId,
497+
state: ret.transaction.getFlow().state as TransactionState,
496498
result,
497499
errors,
498500
})
@@ -598,6 +600,7 @@ export class WorkflowOrchestratorService {
598600
result,
599601
step,
600602
response,
603+
state,
601604
} = options
602605

603606
const subscribers: TransactionSubscribers =
@@ -613,6 +616,7 @@ export class WorkflowOrchestratorService {
613616
response,
614617
result,
615618
errors,
619+
state,
616620
})
617621
})
618622
}
@@ -641,12 +645,14 @@ export class WorkflowOrchestratorService {
641645
result,
642646
response,
643647
errors,
648+
state,
644649
}: {
645650
eventType: keyof DistributedTransactionEvents
646651
step?: TransactionStep
647652
response?: unknown
648653
result?: unknown
649654
errors?: unknown[]
655+
state?: TransactionState
650656
}) => {
651657
this.notify({
652658
workflowId,
@@ -656,6 +662,7 @@ export class WorkflowOrchestratorService {
656662
step,
657663
result,
658664
errors,
665+
state,
659666
})
660667
}
661668

packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
841841

842842
void workflowOrcModule.subscribe({
843843
workflowId: "wf-when",
844+
transactionId: "trx_123_when",
844845
subscriber: (event) => {
845846
if (event.eventType === "onFinish") {
846847
done()

packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type NotifyOptions = {
6363
response?: unknown
6464
result?: unknown
6565
errors?: unknown[]
66+
state?: TransactionState
6667
}
6768

6869
type WorkflowId = string
@@ -318,9 +319,6 @@ export class WorkflowOrchestratorService {
318319
throw new Error(`Workflow with id "${workflowId}" not found.`)
319320
}
320321

321-
const originalOnFinishHandler = events.onFinish!
322-
delete events.onFinish
323-
324322
const transaction = await this.getRunningTransaction(
325323
workflowId,
326324
transactionId,
@@ -352,12 +350,11 @@ export class WorkflowOrchestratorService {
352350
const metadata = ret.transaction.getFlow().metadata
353351
const { parentStepIdempotencyKey } = metadata ?? {}
354352

355-
const hasFailed = [TransactionState.FAILED].includes(
356-
ret.transaction.getFlow().state
357-
)
353+
const transactionState = ret.transaction.getFlow().state
354+
const hasFailed = [TransactionState.FAILED].includes(transactionState)
358355

359356
const acknowledgement = {
360-
transactionId: context.transactionId,
357+
transactionId: transaction.transactionId,
361358
workflowId: workflowId,
362359
parentStepIdempotencyKey,
363360
hasFinished,
@@ -368,8 +365,11 @@ export class WorkflowOrchestratorService {
368365
if (hasFinished) {
369366
const { result, errors } = ret
370367

371-
await originalOnFinishHandler({
372-
transaction: ret.transaction,
368+
this.notify({
369+
eventType: "onFinish",
370+
workflowId,
371+
transactionId: transaction.transactionId,
372+
state: transactionState as TransactionState,
373373
result,
374374
errors,
375375
})
@@ -449,9 +449,6 @@ export class WorkflowOrchestratorService {
449449
workflowId,
450450
})
451451

452-
const originalOnFinishHandler = events.onFinish!
453-
delete events.onFinish
454-
455452
const ret = await exportedWorkflow.registerStepSuccess({
456453
idempotencyKey: idempotencyKey_,
457454
context,
@@ -466,8 +463,11 @@ export class WorkflowOrchestratorService {
466463
if (ret.transaction.hasFinished()) {
467464
const { result, errors } = ret
468465

469-
await originalOnFinishHandler({
470-
transaction: ret.transaction,
466+
this.notify({
467+
eventType: "onFinish",
468+
workflowId,
469+
transactionId,
470+
state: ret.transaction.getFlow().state as TransactionState,
471471
result,
472472
errors,
473473
})
@@ -520,9 +520,6 @@ export class WorkflowOrchestratorService {
520520
workflowId,
521521
})
522522

523-
const originalOnFinishHandler = events.onFinish!
524-
delete events.onFinish
525-
526523
const ret = await exportedWorkflow.registerStepFailure({
527524
idempotencyKey: idempotencyKey_,
528525
context,
@@ -537,8 +534,11 @@ export class WorkflowOrchestratorService {
537534
if (ret.transaction.hasFinished()) {
538535
const { result, errors } = ret
539536

540-
await originalOnFinishHandler({
541-
transaction: ret.transaction,
537+
this.notify({
538+
eventType: "onFinish",
539+
workflowId,
540+
transactionId,
541+
state: ret.transaction.getFlow().state as TransactionState,
542542
result,
543543
errors,
544544
})
@@ -677,6 +677,7 @@ export class WorkflowOrchestratorService {
677677
result,
678678
step,
679679
response,
680+
state,
680681
} = options
681682

682683
const subscribers: TransactionSubscribers =
@@ -692,6 +693,7 @@ export class WorkflowOrchestratorService {
692693
response,
693694
result,
694695
errors,
696+
state,
695697
}
696698
const isPromise = "then" in handler
697699
if (isPromise) {
@@ -737,12 +739,14 @@ export class WorkflowOrchestratorService {
737739
result,
738740
response,
739741
errors,
742+
state,
740743
}: {
741744
eventType: keyof DistributedTransactionEvents
742745
step?: TransactionStep
743746
response?: unknown
744747
result?: unknown
745748
errors?: unknown[]
749+
state?: TransactionState
746750
}) => {
747751
await this.notify({
748752
workflowId,
@@ -752,6 +756,7 @@ export class WorkflowOrchestratorService {
752756
step,
753757
result,
754758
errors,
759+
state,
755760
})
756761
}
757762

0 commit comments

Comments
 (0)