Skip to content

Commit cf97e00

Browse files
committed
Fix continue as new race condition for sql backends
1 parent 761745b commit cf97e00

File tree

2 files changed

+14
-12
lines changed

2 files changed

+14
-12
lines changed

backend/mysql/mysql.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -332,16 +332,17 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
332332
FROM instances i
333333
INNER JOIN pending_events pe ON i.instance_id = pe.instance_id
334334
WHERE
335-
i.completed_at IS NULL
335+
state = ? AND i.completed_at IS NULL
336336
AND (pe.visible_at IS NULL OR pe.visible_at <= ?)
337337
AND (i.locked_until IS NULL OR i.locked_until < ?)
338338
AND (i.sticky_until IS NULL OR i.sticky_until < ? OR i.worker = ?)
339339
LIMIT 1
340340
FOR UPDATE OF i SKIP LOCKED`,
341-
now, // event.visible_at
342-
now, // locked_until
343-
now, // sticky_until
344-
b.workerName, // worker
341+
core.WorkflowInstanceStateActive, // state
342+
now, // event.visible_at
343+
now, // locked_until
344+
now, // sticky_until
345+
b.workerName, // worker
345346
)
346347

347348
var id int
@@ -484,7 +485,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
484485

485486
// Unlock instance, but keep it sticky to the current worker
486487
var completedAt *time.Time
487-
if state == core.WorkflowInstanceStateFinished {
488+
if state == core.WorkflowInstanceStateContinuedAsNew || state == core.WorkflowInstanceStateFinished {
488489
t := time.Now()
489490
completedAt = &t
490491
}

backend/sqlite/sqlite.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
288288
WHERE
289289
(locked_until IS NULL OR locked_until < ?)
290290
AND (sticky_until IS NULL OR sticky_until < ? OR worker = ?)
291-
AND completed_at IS NULL
291+
AND state = ? AND i.completed_at IS NULL
292292
AND EXISTS (
293293
SELECT 1
294294
FROM pending_events
@@ -298,10 +298,11 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
298298
) RETURNING id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, sticky_until`,
299299
now.Add(sb.options.WorkflowLockTimeout), // new locked_until
300300
sb.workerName,
301-
now, // locked_until
302-
now, // sticky_until
303-
sb.workerName, // worker
304-
now, // event.visible_at
301+
now, // locked_until
302+
now, // sticky_until
303+
sb.workerName, // worker
304+
core.WorkflowInstanceStateActive, // state
305+
now, // pending_event.visible_at
305306
)
306307

307308
var instanceID, executionID string
@@ -383,7 +384,7 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
383384
defer tx.Rollback()
384385

385386
var completedAt *time.Time
386-
if state == core.WorkflowInstanceStateFinished || state == core.WorkflowInstanceStateContinuedAsNew {
387+
if state == core.WorkflowInstanceStateContinuedAsNew || state == core.WorkflowInstanceStateFinished {
387388
t := time.Now()
388389
completedAt = &t
389390
}

0 commit comments

Comments
 (0)