Skip to content

Commit f473bcf

Browse files
committed
fix: prevent concurrent step data updates
1 parent 6f77bc2 commit f473bcf

File tree

4 files changed

+44
-8
lines changed

4 files changed

+44
-8
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
"exit-hook": "^5.0.1",
4545
"groupmq": "^1.1.0",
4646
"ioredis": "^5.9.2",
47+
"p-mutex": "^1.0.0",
4748
"p-retry": "^7.1.1",
4849
"superjson": "^2.2.6",
4950
"type-fest": "^5.4.2"

pnpm-lock.yaml

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/step.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { Span } from '@opentelemetry/api'
22
import type { Options } from 'p-retry'
33
import type { WorkflowQueueInternal } from './types'
44
import { setTimeout } from 'node:timers/promises'
5+
import Mutex from 'p-mutex'
56
import pRetry from 'p-retry'
67
import { deserialize, serialize } from './serializer'
78
import { Settings } from './settings'
@@ -24,6 +25,7 @@ export class WorkflowStep {
2425
private queue
2526
private workflowJobId
2627
private stepNamePrefix
28+
private updateStepDataMutex = new Mutex()
2729

2830
constructor(opts: {
2931
queue: WorkflowQueueInternal<unknown>
@@ -171,13 +173,17 @@ export class WorkflowStep {
171173
return stepData as Extract<WorkflowStepData, { type: T }> | undefined
172174
}
173175

176+
// TODO maybe update this to use lua script for atomic operation
174177
private async updateStepData(stepName: string, data: WorkflowStepData) {
175-
const job = await this.getWorkflowJob()
178+
// prevent concurrent updates to step data which could lead to race conditions and lost updates
179+
await this.updateStepDataMutex.withLock(async () => {
180+
const job = await this.getWorkflowJob()
176181

177-
const jobData = deserialize(job.data)
178-
jobData.stepData[stepName] = data
182+
const jobData = deserialize(job.data)
183+
jobData.stepData[stepName] = data
179184

180-
await job.updateData(serialize(jobData))
185+
await job.updateData(serialize(jobData))
186+
})
181187
}
182188

183189
private async getWorkflowJob() {

tests/workflow.test.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { beforeAll, describe, expect, test, vi } from 'vitest'
44
import { createRedis, Settings, Workflow } from '../src'
55

66
beforeAll(() => {
7+
Settings.logger = console
78
Settings.defaultConnection = async () =>
89
createRedis({
910
host: 'localhost',
@@ -57,12 +58,21 @@ describe('input', () => {
5758

5859
describe('step', () => {
5960
test('only runs once', async () => {
60-
const stepHandler = vi.fn()
61+
const stepHandler1 = vi.fn()
62+
const stepHandler2 = vi.fn()
63+
const stepHandler3 = vi.fn()
6164
const handler = vi.fn()
6265
const workflow = new Workflow({
6366
id: randomUUID(),
67+
queueOptions: {
68+
maxAttempts: 10,
69+
},
6470
run: async ({ step }) => {
65-
await step.do('test-step', stepHandler)
71+
await Promise.all([
72+
step.do('test-step1', stepHandler1),
73+
step.do('test-step2', stepHandler2),
74+
step.do('test-step3', stepHandler3),
75+
])
6676
await handler()
6777
throw new Error('error')
6878
},
@@ -74,8 +84,10 @@ describe('step', () => {
7484
await workflow.run(undefined)
7585

7686
await vi.waitFor(() => {
77-
expect(stepHandler).toHaveBeenCalledOnce()
78-
expect(handler).toHaveBeenCalledTimes(3)
87+
expect(stepHandler1).toHaveBeenCalledOnce()
88+
expect(stepHandler2).toHaveBeenCalledOnce()
89+
expect(stepHandler3).toHaveBeenCalledOnce()
90+
expect(handler).toHaveBeenCalledTimes(10)
7991
})
8092
})
8193
test('caches output', async () => {

0 commit comments

Comments
 (0)