Skip to content

Commit 6c3ec52

Browse files
authored
fix(): Workflow engine redis worker instance in worker mode (medusajs#14099)
* fix(): Workflow engine redis worker instance in worker mode * Create wicked-tips-buy.md
1 parent f67bfb9 commit 6c3ec52

File tree

2 files changed

+32
-27
lines changed

2 files changed

+32
-27
lines changed

.changeset/wicked-tips-buy.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@medusajs/workflow-engine-redis": patch
3+
---
4+
5+
fix(): Workflow engine redis worker instance in worker mode

packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -145,37 +145,37 @@ export class RedisDistributedTransactionStorage
145145
// Remove all repeatable jobs from the old queue since now we have a queue dedicated to scheduled jobs
146146
await this.removeAllRepeatableJobs(this.queue)
147147

148-
this.worker = new Worker(
149-
this.queueName,
150-
async (job) => {
151-
this.logger_.debug(
152-
`executing job ${job.name} from queue ${
153-
this.queueName
154-
} with the following data: ${JSON.stringify(job.data)}`
155-
)
156-
if (allowedJobs.includes(job.name as JobType)) {
157-
try {
158-
await this.executeTransaction(
159-
job.data.workflowId,
160-
job.data.transactionId,
161-
job.data.transactionMetadata
162-
)
163-
} catch (error) {
164-
if (!SkipExecutionError.isSkipExecutionError(error)) {
165-
throw error
148+
if (this.#isWorkerMode) {
149+
this.worker = new Worker(
150+
this.queueName,
151+
async (job) => {
152+
this.logger_.debug(
153+
`executing job ${job.name} from queue ${
154+
this.queueName
155+
} with the following data: ${JSON.stringify(job.data)}`
156+
)
157+
if (allowedJobs.includes(job.name as JobType)) {
158+
try {
159+
await this.executeTransaction(
160+
job.data.workflowId,
161+
job.data.transactionId,
162+
job.data.transactionMetadata
163+
)
164+
} catch (error) {
165+
if (!SkipExecutionError.isSkipExecutionError(error)) {
166+
throw error
167+
}
166168
}
167169
}
168-
}
169170

170-
if (job.name === JobType.SCHEDULE) {
171-
// Remove repeatable job from the old queue since now we have a queue dedicated to scheduled jobs
172-
await this.remove(job.data.jobId)
173-
}
174-
},
175-
workerOptions
176-
)
171+
if (job.name === JobType.SCHEDULE) {
172+
// Remove repeatable job from the old queue since now we have a queue dedicated to scheduled jobs
173+
await this.remove(job.data.jobId)
174+
}
175+
},
176+
workerOptions
177+
)
177178

178-
if (this.#isWorkerMode) {
179179
this.jobWorker = new Worker(
180180
this.jobQueueName,
181181
async (job) => {

0 commit comments

Comments
 (0)