Skip to content

Commit bad0858

Browse files
fix: prevent jobId collisions on workflow step retries (medusajs#13786)
## Summary **What** — What changes are introduced in this PR? This PR fixes a bug where async workflow steps with retry intervals would get stuck after the first retry attempt due to Bull queue jobId collisions preventing retry jobs from executing. **Why** — Why are these changes relevant or necessary? Workflows using async steps with retry configurations (e.g., `retryInterval: 1`, `maxRetries: 5`) would fail once, schedule a retry, but the retry job would never execute, causing workflows to hang indefinitely. **How** — How have these changes been implemented? **Root Cause:** Bull queue was rejecting retry jobs because they had identical jobIds to the async execution jobs that already completed. Both used the format: `retry:workflow:transaction:step_id:attempts`. **Solution:** Modified `getJobId()` in `workflow-orchestrator-storage.ts` to append a `:retry` suffix when `interval > 0`, creating unique jobIds: - Async execution (interval=0): `retry:...:step_id:1` - Retry scheduling (interval>0): `retry:...:step_id:1:retry` Updated methods: `getJobId()`, `scheduleRetry()`, `removeJob()`, and `clearRetry()` to pass and handle the interval parameter. **Testing** — How have these changes been tested, or how can the reviewer test the feature? Added integration test `retry-interval.spec.ts` that verifies: 1. Step with `retryInterval: 1` and `maxRetries: 3` executes 3 times 2. Retry intervals are approximately 1 second between attempts 3. Workflow completes successfully after retries 4. Uses proper async workflow completion pattern with `subscribe()` and `onFinish` event --- ## Examples ```ts // Example workflow step that would previously get stuck export const testRetryStep = createStep( { name: "test-retry-step", async: true, retryInterval: 1, // 1 second retry interval maxRetries: 3, }, async (input: any) => { // Simulate failure on first 2 attempts if (attempts < 3) { throw new Error("Temporary failure - will retry") } return { success: true } } ) // Before fix: Step would fail once, schedule retry, but retry job never fired (jobId collision) // After fix: Step properly retries up to 3 times with 1-second intervals ``` --- ## Checklist Please ensure the following before requesting a review: - [ ] I have added a **changeset** for this PR - Every non-breaking change should be marked as a **patch** - To add a changeset, run `yarn changeset` and follow the prompts - [ ] The changes are covered by relevant **tests** - [ ] I have verified the code works as intended locally - [ ] I have linked the related issue(s) if applicable --- ## Additional Context - Co-authored-by: Carlos R. L. Rodrigues <[email protected]>
1 parent 5df903f commit bad0858

File tree

7 files changed

+209
-7
lines changed

7 files changed

+209
-7
lines changed

.changeset/chilled-chicken-bow.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@medusajs/workflow-engine-redis": patch
3+
"@medusajs/orchestration": patch
4+
---
5+
6+
fix: avoid jobId collisions on retry

packages/core/orchestration/src/transaction/transaction-step.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ export class TransactionStep {
177177
!!(
178178
this.lastAttempt &&
179179
this.definition.retryInterval &&
180-
Date.now() - this.lastAttempt > this.definition.retryInterval * 1e3
180+
Date.now() - this.lastAttempt >= this.definition.retryInterval * 1e3
181181
)
182182
)
183183
}

packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ export * from "./workflow_sync"
77
export * from "./workflow_transaction_timeout"
88
export * from "./workflow_when"
99
export * from "./workflow_not_idempotent_with_retention"
10+
export * from "./workflow_retry_interval"
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Test fixture for workflow with retry interval
3+
* Tests that steps with retry intervals properly retry after failures
4+
*/
5+
6+
import {
7+
createStep,
8+
createWorkflow,
9+
StepResponse,
10+
WorkflowResponse,
11+
} from "@medusajs/framework/workflows-sdk"
12+
13+
// Mock counters to track execution attempts
14+
export const retryIntervalStep1InvokeMock = jest.fn()
15+
export const retryIntervalStep2InvokeMock = jest.fn()
16+
17+
// Step 1: Fails first 2 times, succeeds on 3rd attempt
18+
const step_1_retry_interval = createStep(
19+
{
20+
name: "step_1_retry_interval",
21+
async: true,
22+
retryInterval: 1, // 1 second retry interval
23+
maxRetries: 3,
24+
},
25+
async (input: { attemptToSucceedOn: number }) => {
26+
const attemptCount = retryIntervalStep1InvokeMock.mock.calls.length + 1
27+
retryIntervalStep1InvokeMock(input)
28+
29+
// Fail until we reach the target attempt
30+
if (attemptCount < input.attemptToSucceedOn) {
31+
throw new Error(
32+
`Step 1 failed on attempt ${attemptCount}, will retry`
33+
)
34+
}
35+
36+
return new StepResponse({
37+
success: true,
38+
attempts: attemptCount,
39+
step: "step_1",
40+
})
41+
}
42+
)
43+
44+
// Step 2: Always succeeds (to verify workflow continues after retry)
45+
const step_2_after_retry = createStep(
46+
{
47+
name: "step_2_after_retry",
48+
async: true,
49+
},
50+
async (input) => {
51+
retryIntervalStep2InvokeMock(input)
52+
53+
return new StepResponse({
54+
success: true,
55+
step: "step_2",
56+
})
57+
}
58+
)
59+
60+
export const workflowRetryIntervalId = "workflow_retry_interval_test"
61+
62+
createWorkflow(
63+
{
64+
name: workflowRetryIntervalId,
65+
retentionTime: 600, // Keep for 10 minutes for debugging
66+
},
67+
function (input: { attemptToSucceedOn: number }) {
68+
const step1Result = step_1_retry_interval(input)
69+
const step2Result = step_2_after_retry({ step1Result })
70+
71+
return new WorkflowResponse({
72+
step1: step1Result,
73+
step2: step2Result,
74+
})
75+
}
76+
)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/**
2+
* Integration test for workflow step retry intervals
3+
*
4+
* This test verifies the fix for the bug where steps with retry intervals
5+
* would get stuck after the first retry attempt due to retryRescheduledAt
6+
* not being properly cleared.
7+
*/
8+
9+
import { IWorkflowEngineService } from "@medusajs/framework/types"
10+
import { Modules } from "@medusajs/framework/utils"
11+
import { moduleIntegrationTestRunner } from "@medusajs/test-utils"
12+
import { setTimeout } from "timers/promises"
13+
import {
14+
retryIntervalStep1InvokeMock,
15+
retryIntervalStep2InvokeMock,
16+
workflowRetryIntervalId,
17+
} from "../__fixtures__/workflow_retry_interval"
18+
import { TestDatabase } from "../utils"
19+
20+
jest.setTimeout(60000) // Increase timeout for async retries
21+
22+
moduleIntegrationTestRunner<IWorkflowEngineService>({
23+
moduleName: Modules.WORKFLOW_ENGINE,
24+
resolve: __dirname + "/../..",
25+
moduleOptions: {
26+
redis: {
27+
url: "localhost:6379",
28+
},
29+
},
30+
testSuite: ({ service: workflowOrcModule }) => {
31+
describe("Workflow Retry Interval", function () {
32+
beforeEach(async () => {
33+
await TestDatabase.clearTables()
34+
jest.clearAllMocks()
35+
})
36+
37+
afterEach(async () => {
38+
await TestDatabase.clearTables()
39+
})
40+
41+
it("should properly retry async step with retry interval after failures", async () => {
42+
// Configure step to succeed on 3rd attempt
43+
const attemptToSucceedOn = 3
44+
45+
// Track when each attempt happens
46+
const attemptTimestamps: number[] = []
47+
retryIntervalStep1InvokeMock.mockImplementation(() => {
48+
attemptTimestamps.push(Date.now())
49+
})
50+
51+
// Create promise to wait for workflow completion
52+
const workflowCompletion = new Promise<{ result: any; errors: any }>((resolve) => {
53+
workflowOrcModule.subscribe({
54+
workflowId: workflowRetryIntervalId,
55+
subscriber: async (data) => {
56+
if (data.eventType === "onFinish") {
57+
resolve({
58+
result: data.result,
59+
errors: data.errors,
60+
})
61+
}
62+
},
63+
})
64+
})
65+
66+
// Execute workflow
67+
await workflowOrcModule.run(
68+
workflowRetryIntervalId,
69+
{
70+
input: { attemptToSucceedOn },
71+
throwOnError: false,
72+
}
73+
)
74+
75+
// Wait for async workflow to complete
76+
const { result, errors } = await workflowCompletion
77+
78+
// Assertions
79+
// Step 1 should have been called 3 times (2 failures + 1 success)
80+
expect(retryIntervalStep1InvokeMock).toHaveBeenCalledTimes(3)
81+
82+
// Step 2 should have been called once (after step 1 succeeded)
83+
expect(retryIntervalStep2InvokeMock).toHaveBeenCalledTimes(1)
84+
85+
// Workflow should complete successfully
86+
expect(errors === undefined || errors.length === 0).toBe(true)
87+
expect(result).toBeDefined()
88+
expect(result.step1).toBeDefined()
89+
expect(result.step1.success).toBe(true)
90+
expect(result.step1.attempts).toBe(3)
91+
92+
// Verify retry intervals are approximately 1 second (with some tolerance)
93+
if (attemptTimestamps.length >= 2) {
94+
const firstRetryInterval = attemptTimestamps[1] - attemptTimestamps[0]
95+
expect(firstRetryInterval).toBeGreaterThan(800) // At least 800ms
96+
expect(firstRetryInterval).toBeLessThan(2000) // Less than 2s
97+
}
98+
99+
if (attemptTimestamps.length >= 3) {
100+
const secondRetryInterval =
101+
attemptTimestamps[2] - attemptTimestamps[1]
102+
expect(secondRetryInterval).toBeGreaterThan(800)
103+
expect(secondRetryInterval).toBeLessThan(2000)
104+
}
105+
})
106+
})
107+
},
108+
})

packages/modules/workflow-engine-redis/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"resolve:aliases": "tsc --showConfig -p tsconfig.json > tsconfig.resolved.json && tsc-alias -p tsconfig.resolved.json && rimraf tsconfig.resolved.json",
3030
"build": "rimraf dist && tsc --build && npm run resolve:aliases",
3131
"test": "jest --passWithNoTests --bail --forceExit -- src",
32-
"test:integration": "jest --forceExit --runInBand -- integration-tests/**/__tests__/index.spec.ts && jest --forceExit --runInBand -- integration-tests/**/__tests__/race.spec.ts",
32+
"test:integration": "jest --forceExit -- integration-tests/**/__tests__/index.spec.ts && jest --forceExit -- integration-tests/**/__tests__/race.spec.ts && jest --forceExit -- integration-tests/**/__tests__/retry-interval.spec.ts",
3333
"migration:initial": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create --initial",
3434
"migration:create": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create",
3535
"migration:up": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:up",

packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
} from "@medusajs/framework/orchestration"
1717
import { Logger, ModulesSdkTypes } from "@medusajs/framework/types"
1818
import {
19+
isDefined,
1920
isPresent,
2021
MedusaError,
2122
promiseAll,
@@ -594,7 +595,7 @@ export class RedisDistributedTransactionStorage
594595
},
595596
{
596597
delay: interval > 0 ? interval * 1000 : undefined,
597-
jobId: this.getJobId(JobType.RETRY, transaction, step),
598+
jobId: this.getJobId(JobType.RETRY, transaction, step, interval),
598599
removeOnComplete: true,
599600
}
600601
)
@@ -604,7 +605,9 @@ export class RedisDistributedTransactionStorage
604605
transaction: DistributedTransactionType,
605606
step: TransactionStep
606607
): Promise<void> {
607-
await this.removeJob(JobType.RETRY, transaction, step)
608+
// Pass retry interval to ensure we remove the correct job (with -retry suffix if interval > 0)
609+
const interval = step.definition.retryInterval || 0
610+
await this.removeJob(JobType.RETRY, transaction, step, interval)
608611
}
609612

610613
async scheduleTransactionTimeout(
@@ -665,12 +668,19 @@ export class RedisDistributedTransactionStorage
665668
private getJobId(
666669
type: JobType,
667670
transaction: DistributedTransactionType,
668-
step?: TransactionStep
671+
step?: TransactionStep,
672+
interval?: number
669673
) {
670674
const key = [type, transaction.modelId, transaction.transactionId]
671675

672676
if (step) {
673677
key.push(step.id, step.attempts + "")
678+
679+
// Add suffix for retry scheduling (interval > 0) to avoid collision with async execution (interval = 0)
680+
if (type === JobType.RETRY && isDefined(interval) && interval > 0) {
681+
key.push("retry")
682+
}
683+
674684
if (step.isCompensating()) {
675685
key.push("compensate")
676686
}
@@ -682,9 +692,10 @@ export class RedisDistributedTransactionStorage
682692
private async removeJob(
683693
type: JobType,
684694
transaction: DistributedTransactionType,
685-
step?: TransactionStep
695+
step?: TransactionStep,
696+
interval?: number
686697
) {
687-
const jobId = this.getJobId(type, transaction, step)
698+
const jobId = this.getJobId(type, transaction, step, interval)
688699

689700
if (type === JobType.SCHEDULE) {
690701
const job = await this.jobQueue?.getJob(jobId)

0 commit comments

Comments
 (0)