Skip to content

Commit d769210

Browse files
authored
chore(orchestration): add support for autoRetry, maxAwaitingRetries, retryStep (medusajs#13391)
RESOLVES CORE-1163 RESOLVES CORE-1164 **What** ### Add support for non auto retryable steps. When marking a step with `maxRetries`, when it will fail it will be marked as temporary failure and then retry itself automatically. Thats the default behaviour, if you now add `autoRetry: false`, when the step will fail it will be marked as temporary failure but not retry automatically. you can now call the workflow engine run to resume the workflow from the failing step to be retried. ### Add support for `maxAwaitingRetries` When setting `retyIntervalAwaiting` a step that is awaiting will be retried after the specified interval without maximun retry. Now you can set `maxAwaitingRetries` to force a maximum awaiting retry number ### Add support to manually retry an awaiting step In some scenario, either a machine dies while a step is executing or a step is taking longer than expected, you can now call `retryStep` on the workflow engine to force a retry of the step that is supposedly stucked
1 parent ac5e23b commit d769210

File tree

28 files changed

+1366
-64
lines changed

28 files changed

+1366
-64
lines changed

.changeset/many-paws-tease.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@medusajs/workflow-engine-inmemory": patch
3+
"@medusajs/workflow-engine-redis": patch
4+
"@medusajs/orchestration": patch
5+
"@medusajs/types": patch
6+
---
7+
8+
chore(orchestration): add support for autoRetry step configuration

packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,116 @@ describe("Transaction Orchestrator", () => {
648648
)
649649
})
650650

651+
it("Should not retry steps X times automatically when flag 'autoRetry' is set to false and then compensate steps afterward", async () => {
652+
const mocks = {
653+
one: jest.fn().mockImplementation((payload) => {
654+
return payload
655+
}),
656+
compensateOne: jest.fn().mockImplementation((payload) => {
657+
return payload
658+
}),
659+
two: jest.fn().mockImplementation((payload) => {
660+
throw new Error()
661+
}),
662+
compensateTwo: jest.fn().mockImplementation((payload) => {
663+
return payload
664+
}),
665+
}
666+
667+
async function handler(
668+
actionId: string,
669+
functionHandlerType: TransactionHandlerType,
670+
payload: TransactionPayload
671+
) {
672+
const command = {
673+
firstMethod: {
674+
[TransactionHandlerType.INVOKE]: () => {
675+
mocks.one(payload)
676+
},
677+
[TransactionHandlerType.COMPENSATE]: () => {
678+
mocks.compensateOne(payload)
679+
},
680+
},
681+
secondMethod: {
682+
[TransactionHandlerType.INVOKE]: () => {
683+
mocks.two(payload)
684+
},
685+
[TransactionHandlerType.COMPENSATE]: () => {
686+
mocks.compensateTwo(payload)
687+
},
688+
},
689+
}
690+
691+
return command[actionId][functionHandlerType](payload)
692+
}
693+
694+
const flow: TransactionStepsDefinition = {
695+
next: {
696+
action: "firstMethod",
697+
maxRetries: 3,
698+
autoRetry: false,
699+
next: {
700+
action: "secondMethod",
701+
maxRetries: 3,
702+
autoRetry: false,
703+
},
704+
},
705+
}
706+
707+
const strategy = new TransactionOrchestrator({
708+
id: "transaction-name",
709+
definition: flow,
710+
})
711+
712+
const transaction = await strategy.beginTransaction({
713+
transactionId: "transaction_id_123",
714+
handler,
715+
})
716+
717+
await strategy.resume(transaction)
718+
719+
expect(transaction.transactionId).toBe("transaction_id_123")
720+
721+
expect(mocks.one).toHaveBeenCalledTimes(1)
722+
expect(mocks.two).toHaveBeenCalledTimes(1)
723+
724+
await strategy.resume(transaction)
725+
726+
expect(mocks.one).toHaveBeenCalledTimes(1)
727+
expect(mocks.two).toHaveBeenCalledTimes(2)
728+
729+
await strategy.resume(transaction)
730+
731+
expect(mocks.one).toHaveBeenCalledTimes(1)
732+
expect(mocks.two).toHaveBeenCalledTimes(3)
733+
734+
await strategy.resume(transaction)
735+
736+
expect(mocks.one).toHaveBeenCalledTimes(1)
737+
expect(mocks.two).toHaveBeenCalledTimes(4)
738+
739+
expect(transaction.getState()).toBe(TransactionState.REVERTED)
740+
expect(mocks.compensateOne).toHaveBeenCalledTimes(1)
741+
742+
expect(mocks.two).nthCalledWith(
743+
1,
744+
expect.objectContaining({
745+
metadata: expect.objectContaining({
746+
attempt: 1,
747+
}),
748+
})
749+
)
750+
751+
expect(mocks.two).nthCalledWith(
752+
4,
753+
expect.objectContaining({
754+
metadata: expect.objectContaining({
755+
attempt: 4,
756+
}),
757+
})
758+
)
759+
})
760+
651761
it("Should fail a transaction if any step fails after retrying X time to compensate it", async () => {
652762
const mocks = {
653763
one: jest.fn().mockImplementation((payload) => {

packages/core/orchestration/src/transaction/errors.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,22 @@ export class SkipExecutionError extends Error {
9999
}
100100
}
101101

102+
export class SkipStepAlreadyFinishedError extends Error {
103+
static isSkipStepAlreadyFinishedError(
104+
error: Error
105+
): error is SkipStepAlreadyFinishedError {
106+
return (
107+
error instanceof SkipStepAlreadyFinishedError ||
108+
error?.name === "SkipStepAlreadyFinishedError"
109+
)
110+
}
111+
112+
constructor(message?: string) {
113+
super(message)
114+
this.name = "SkipStepAlreadyFinishedError"
115+
}
116+
}
117+
102118
export class SkipCancelledExecutionError extends Error {
103119
static isSkipCancelledExecutionError(
104120
error: Error

packages/core/orchestration/src/transaction/transaction-orchestrator.ts

Lines changed: 106 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
PermanentStepFailureError,
3434
SkipCancelledExecutionError,
3535
SkipExecutionError,
36+
SkipStepAlreadyFinishedError,
3637
SkipStepResponse,
3738
TransactionStepTimeoutError,
3839
TransactionTimeoutError,
@@ -117,7 +118,8 @@ export class TransactionOrchestrator extends EventEmitter {
117118
private static isExpectedError(error: Error): boolean {
118119
return (
119120
SkipCancelledExecutionError.isSkipCancelledExecutionError(error) ||
120-
SkipExecutionError.isSkipExecutionError(error)
121+
SkipExecutionError.isSkipExecutionError(error) ||
122+
SkipStepAlreadyFinishedError.isSkipStepAlreadyFinishedError(error)
121123
)
122124
}
123125

@@ -415,10 +417,24 @@ export class TransactionOrchestrator extends EventEmitter {
415417
stepDef.definition.retryIntervalAwaiting!
416418
)
417419
}
420+
} else if (stepDef.retryRescheduledAt) {
421+
// The step is not configured for awaiting retry but is manually force to retry
422+
stepDef.retryRescheduledAt = null
423+
nextSteps.push(stepDef)
418424
}
419425

420426
continue
421427
} else if (curState.status === TransactionStepStatus.TEMPORARY_FAILURE) {
428+
if (
429+
!stepDef.temporaryFailedAt &&
430+
stepDef.definition.autoRetry === false
431+
) {
432+
stepDef.temporaryFailedAt = Date.now()
433+
continue
434+
}
435+
436+
stepDef.temporaryFailedAt = null
437+
422438
currentSteps.push(stepDef)
423439

424440
if (!stepDef.canRetry()) {
@@ -565,6 +581,27 @@ export class TransactionOrchestrator extends EventEmitter {
565581
}
566582
}
567583

584+
private static async retryStep(
585+
transaction: DistributedTransactionType,
586+
step: TransactionStep
587+
): Promise<void> {
588+
if (!step.retryRescheduledAt) {
589+
step.hasScheduledRetry = true
590+
step.retryRescheduledAt = Date.now()
591+
}
592+
593+
transaction.getFlow().hasWaitingSteps = true
594+
595+
try {
596+
await transaction.saveCheckpoint()
597+
await transaction.scheduleRetry(step, 0)
598+
} catch (error) {
599+
if (!TransactionOrchestrator.isExpectedError(error)) {
600+
throw error
601+
}
602+
}
603+
}
604+
568605
private static async skipStep({
569606
transaction,
570607
step,
@@ -677,10 +714,13 @@ export class TransactionOrchestrator extends EventEmitter {
677714
stopExecution: boolean
678715
transactionIsCancelling?: boolean
679716
}> {
717+
const result = {
718+
stopExecution: false,
719+
transactionIsCancelling: false,
720+
}
721+
680722
if (SkipExecutionError.isSkipExecutionError(error)) {
681-
return {
682-
stopExecution: false,
683-
}
723+
return result
684724
}
685725

686726
step.failures++
@@ -773,22 +813,33 @@ export class TransactionOrchestrator extends EventEmitter {
773813
if (step.hasTimeout()) {
774814
cleaningUp.push(transaction.clearStepTimeout(step))
775815
}
816+
} else {
817+
const isAsync = step.isCompensating()
818+
? step.definition.compensateAsync
819+
: step.definition.async
820+
821+
if (
822+
step.getStates().status === TransactionStepStatus.TEMPORARY_FAILURE &&
823+
step.definition.autoRetry === false &&
824+
isAsync
825+
) {
826+
step.temporaryFailedAt = Date.now()
827+
result.stopExecution = true
828+
}
776829
}
777830

778-
let transactionIsCancelling = false
779-
let shouldEmit = true
780831
try {
781832
await transaction.saveCheckpoint()
782833
} catch (error) {
783834
if (!TransactionOrchestrator.isExpectedError(error)) {
784835
throw error
785836
}
786837

787-
transactionIsCancelling =
838+
result.transactionIsCancelling =
788839
SkipCancelledExecutionError.isSkipCancelledExecutionError(error)
789840

790841
if (SkipExecutionError.isSkipExecutionError(error)) {
791-
shouldEmit = false
842+
result.stopExecution = true
792843
}
793844
}
794845

@@ -798,16 +849,16 @@ export class TransactionOrchestrator extends EventEmitter {
798849

799850
await promiseAll(cleaningUp)
800851

801-
if (shouldEmit) {
852+
if (!result.stopExecution) {
802853
const eventName = step.isCompensating()
803854
? DistributedTransactionEvent.COMPENSATE_STEP_FAILURE
804855
: DistributedTransactionEvent.STEP_FAILURE
805856
transaction.emit(eventName, { step, transaction })
806857
}
807858

808859
return {
809-
stopExecution: !shouldEmit,
810-
transactionIsCancelling,
860+
stopExecution: result.stopExecution,
861+
transactionIsCancelling: result.transactionIsCancelling,
811862
}
812863
}
813864

@@ -1732,6 +1783,50 @@ export class TransactionOrchestrator extends EventEmitter {
17321783
return curTransaction
17331784
}
17341785

1786+
/**
1787+
* Manually force a step to retry even if it is still in awaiting status
1788+
* @param responseIdempotencyKey - The idempotency key for the step
1789+
* @param handler - The handler function to execute the step
1790+
* @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey
1791+
*/
1792+
public async retryStep({
1793+
responseIdempotencyKey,
1794+
handler,
1795+
transaction,
1796+
onLoad,
1797+
}: {
1798+
responseIdempotencyKey: string
1799+
handler?: TransactionStepHandler
1800+
transaction?: DistributedTransactionType
1801+
onLoad?: (transaction: DistributedTransactionType) => Promise<void> | void
1802+
}): Promise<DistributedTransactionType> {
1803+
const [curTransaction, step] =
1804+
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
1805+
responseIdempotencyKey,
1806+
handler,
1807+
transaction
1808+
)
1809+
1810+
if (onLoad) {
1811+
await onLoad(curTransaction)
1812+
}
1813+
1814+
if (step.getStates().status === TransactionStepStatus.WAITING) {
1815+
this.emit(DistributedTransactionEvent.RESUME, {
1816+
transaction: curTransaction,
1817+
})
1818+
1819+
await TransactionOrchestrator.retryStep(curTransaction, step)
1820+
} else {
1821+
throw new MedusaError(
1822+
MedusaError.Types.NOT_ALLOWED,
1823+
`Cannot retry step when status is ${step.getStates().status}`
1824+
)
1825+
}
1826+
1827+
return curTransaction
1828+
}
1829+
17351830
/** Register a step success for a specific transaction and step
17361831
* @param responseIdempotencyKey - The idempotency key for the step
17371832
* @param handler - The handler function to execute the step

packages/core/orchestration/src/transaction/transaction-step.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export class TransactionStep {
5454
}
5555
attempts: number
5656
failures: number
57+
temporaryFailedAt: number | null
5758
lastAttempt: number | null
5859
retryRescheduledAt: number | null
5960
hasScheduledRetry: boolean
@@ -189,7 +190,9 @@ export class TransactionStep {
189190
this.hasAwaitingRetry() &&
190191
this.lastAttempt &&
191192
Date.now() - this.lastAttempt >
192-
this.definition.retryIntervalAwaiting! * 1e3
193+
this.definition.retryIntervalAwaiting! * 1e3 &&
194+
(!("maxAwaitingRetries" in this.definition) ||
195+
this.attempts < this.definition.maxAwaitingRetries!)
193196
)
194197
}
195198

@@ -199,7 +202,8 @@ export class TransactionStep {
199202
(!this.isCompensating() &&
200203
state === TransactionStepState.NOT_STARTED &&
201204
flowState === TransactionState.INVOKING) ||
202-
status === TransactionStepStatus.TEMPORARY_FAILURE
205+
(status === TransactionStepStatus.TEMPORARY_FAILURE &&
206+
!this.temporaryFailedAt)
203207
)
204208
}
205209

0 commit comments

Comments
 (0)