Skip to content

Commit 9166097

Browse files
committed
fix: delay resuming workflows after restart
The `go-workflows` backend locks workflows and activities while they're being executed. In the event of a crash or restart, the lock would prevent the worker from resuming the workflow. This fix makes a few changes to resolve this issue: - Uses a stable ID for the worker ID - Adds a random "worker instance" ID - Adds the new IDs to the lock entities - Incorporates the new IDs into the lock checks If the worker ID matches the lock's worker ID but the worker instance ID does not match, we know that the lock belonged to an old instance of the worker and that the lock can be reassigned to the new instance. PLAT-99
1 parent 8305da3 commit 9166097

File tree

6 files changed

+98
-32
lines changed

6 files changed

+98
-32
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
kind: Fixed
2+
body: Delay when resuming workflows after a restart.
3+
time: 2025-05-25T12:34:21.713839-04:00

server/internal/workflows/backend/etcd/activity_lock/store.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ type Value struct {
1414
WorkflowInstanceID string `json:"workflow_instance_id"`
1515
EventID string `json:"event_id"`
1616
CreatedAt time.Time `json:"created_at"`
17+
WorkerID string `json:"worker_id"`
18+
WorkerInstanceID string `json:"worker_instance_id"`
19+
}
20+
21+
func (v *Value) CanBeReassignedTo(workerID, workerInstanceID string) bool {
22+
// This lock can be reassigned if it belongs to an old instance of the given
23+
// worker.
24+
return v.WorkerID == workerID && v.WorkerInstanceID != workerInstanceID
1725
}
1826

1927
type Store struct {

server/internal/workflows/backend/etcd/etcd.go

Lines changed: 72 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,18 @@ import (
2929
var _ backend.Backend = (*Backend)(nil)
3030

3131
type Backend struct {
32-
store *Store
33-
options *backend.Options
34-
workerID string
32+
store *Store
33+
options *backend.Options
34+
workerID string
35+
workerInstanceID string
3536
}
3637

37-
func NewBackend(store *Store, options *backend.Options) *Backend {
38+
func NewBackend(store *Store, options *backend.Options, workerID string) *Backend {
3839
return &Backend{
39-
store: store,
40-
options: options,
41-
workerID: uuid.NewString(),
40+
store: store,
41+
options: options,
42+
workerID: workerID,
43+
workerInstanceID: uuid.NewString(),
4244
}
4345
}
4446

@@ -252,15 +254,46 @@ func (b *Backend) GetWorkflowTask(ctx context.Context, queues []workflow.Queue)
252254
instanceID := item.WorkflowInstance.InstanceID
253255
executionID := item.WorkflowInstance.ExecutionID
254256

255-
locked, err := b.store.WorkflowInstanceLock.
256-
ExistsByKey(item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID).
257+
lock, err := b.store.WorkflowInstanceLock.
258+
GetByKey(item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID).
257259
Exec(ctx)
258-
if err != nil {
260+
if err != nil && !errors.Is(err, storage.ErrNotFound) {
259261
return nil, fmt.Errorf("failed to check for lock: %w", err)
260262
}
261-
if locked {
263+
var lockOp storage.TxnOperation
264+
switch {
265+
case lock == nil:
266+
lockOp = b.store.WorkflowInstanceLock.
267+
Create(&workflow_instance_lock.Value{
268+
WorkflowInstanceID: instanceID,
269+
WorkflowExecutionID: executionID,
270+
CreatedAt: time.Now(),
271+
WorkerID: b.workerID,
272+
WorkerInstanceID: b.workerInstanceID,
273+
}).
274+
WithTTL(b.options.WorkflowLockTimeout)
275+
case lock.CanBeReassignedTo(b.workerID, b.workerInstanceID):
276+
lock.WorkerID = b.workerID
277+
lock.WorkerInstanceID = b.workerInstanceID
278+
lockOp = b.store.WorkflowInstanceLock.
279+
Update(lock).
280+
WithTTL(b.options.WorkflowLockTimeout)
281+
default:
262282
continue
263283
}
284+
285+
// if lock != nil {
286+
// if lock.CanBeReassignedTo(b.workerID, b.workerInstanceID) {
287+
// lock.WorkerID = b.workerID
288+
// lock.WorkerInstanceID = b.workerInstanceID
289+
// lockOp = b.store.WorkflowInstanceLock.Update(lock)
290+
// } else {
291+
// continue
292+
// }
293+
// }
294+
// if lock != nil && !lock.CanBeReassignedTo(b.workerID, b.workerInstanceID) {
295+
// continue
296+
// }
264297
sticky, err := b.store.WorkflowInstanceSticky.
265298
GetByKey(item.WorkflowInstance.InstanceID).
266299
Exec(ctx)
@@ -297,13 +330,7 @@ func (b *Backend) GetWorkflowTask(ctx context.Context, queues []workflow.Queue)
297330
item.UpdateLastLocked()
298331

299332
err = b.store.Txn(
300-
b.store.WorkflowInstanceLock.
301-
Create(&workflow_instance_lock.Value{
302-
WorkflowInstanceID: instanceID,
303-
WorkflowExecutionID: executionID,
304-
CreatedAt: time.Now(),
305-
}).
306-
WithTTL(b.options.WorkflowLockTimeout),
333+
lockOp,
307334
b.store.WorkflowInstanceSticky.
308335
Put(&workflow_instance_sticky.Value{
309336
WorkflowInstanceID: instanceID,
@@ -356,6 +383,8 @@ func (b *Backend) ExtendWorkflowTask(ctx context.Context, task *backend.Workflow
356383
WorkflowInstanceID: instanceID,
357384
WorkflowExecutionID: executionID,
358385
CreatedAt: time.Now(),
386+
WorkerID: b.workerID,
387+
WorkerInstanceID: b.workerInstanceID,
359388
}
360389
}
361390

@@ -570,13 +599,31 @@ func (b *Backend) GetActivityTask(ctx context.Context, queues []workflow.Queue)
570599
for _, item := range items {
571600
instanceID := item.WorkflowInstanceID
572601
executionID := item.WorkflowExecutionID
573-
locked, err := b.store.ActivityLock.
574-
ExistsByKey(instanceID, item.Event.ID).
602+
lock, err := b.store.ActivityLock.
603+
GetByKey(instanceID, item.Event.ID).
575604
Exec(ctx)
576-
if err != nil {
605+
if err != nil && !errors.Is(err, storage.ErrNotFound) {
577606
return nil, fmt.Errorf("failed to check for lock: %w", err)
578607
}
579-
if locked {
608+
var lockOp storage.TxnOperation
609+
switch {
610+
case lock == nil:
611+
lockOp = b.store.ActivityLock.
612+
Create(&activity_lock.Value{
613+
WorkflowInstanceID: instanceID,
614+
EventID: item.Event.ID,
615+
CreatedAt: time.Now(),
616+
WorkerID: b.workerID,
617+
WorkerInstanceID: b.workerInstanceID,
618+
}).
619+
WithTTL(b.options.ActivityLockTimeout)
620+
case lock.CanBeReassignedTo(b.workerID, b.workerInstanceID):
621+
lock.WorkerID = b.workerID
622+
lock.WorkerInstanceID = b.workerInstanceID
623+
lockOp = b.store.ActivityLock.
624+
Update(lock).
625+
WithTTL(b.options.WorkflowLockTimeout)
626+
default:
580627
continue
581628
}
582629

@@ -585,13 +632,7 @@ func (b *Backend) GetActivityTask(ctx context.Context, queues []workflow.Queue)
585632
item.UpdateLastLocked()
586633

587634
err = b.store.Txn(
588-
b.store.ActivityLock.
589-
Create(&activity_lock.Value{
590-
WorkflowInstanceID: instanceID,
591-
EventID: item.Event.ID,
592-
CreatedAt: time.Now(),
593-
}).
594-
WithTTL(b.options.ActivityLockTimeout),
635+
lockOp,
595636
b.store.ActivityQueueItem.Update(item),
596637
).Commit(ctx)
597638
if err != nil {
@@ -628,6 +669,8 @@ func (b *Backend) ExtendActivityTask(ctx context.Context, task *backend.Activity
628669
WorkflowInstanceID: task.WorkflowInstance.InstanceID,
629670
EventID: task.Event.ID,
630671
CreatedAt: time.Now(),
672+
WorkerID: b.workerID,
673+
WorkerInstanceID: b.workerInstanceID,
631674
}
632675
}
633676

server/internal/workflows/backend/etcd/etcd_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func Test_EtcdBackend(t *testing.T) {
2323

2424
test.BackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
2525
opts := backend.ApplyOptions(options...)
26-
return NewBackend(NewStore(client, uuid.NewString()), opts)
26+
return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
2727
}, nil)
2828
}
2929

@@ -33,7 +33,7 @@ func Test_EtcdBackendE2E(t *testing.T) {
3333

3434
test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
3535
opts := backend.ApplyOptions(options...)
36-
return NewBackend(NewStore(client, uuid.NewString()), opts)
36+
return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
3737
}, nil)
3838
}
3939

server/internal/workflows/backend/etcd/workflow_instance_lock/store.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ type Value struct {
1414
WorkflowInstanceID string `json:"workflow_instance_id"`
1515
WorkflowExecutionID string `json:"workflow_execution_id"`
1616
CreatedAt time.Time `json:"created_at"`
17+
WorkerID string `json:"worker_id"`
18+
WorkerInstanceID string `json:"worker_instance_id"`
19+
}
20+
21+
func (v *Value) CanBeReassignedTo(workerID, workerInstanceID string) bool {
22+
// This lock can be reassigned if it belongs to an old instance of the given
23+
// worker.
24+
return v.WorkerID == workerID && v.WorkerInstanceID != workerInstanceID
1725
}
1826

1927
type Store struct {

server/internal/workflows/provide.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ func provideClient(i *do.Injector) {
7070

7171
func provideBackend(i *do.Injector) {
7272
do.Provide(i, func(i *do.Injector) (backend.Backend, error) {
73+
cfg, err := do.Invoke[config.Config](i)
74+
if err != nil {
75+
return nil, err
76+
}
7377
store, err := do.Invoke[*etcd.Store](i)
7478
if err != nil {
7579
return nil, err
@@ -82,7 +86,7 @@ func provideBackend(i *do.Injector) {
8286
backendOpts := backend.ApplyOptions(
8387
backend.WithLogger(logger),
8488
)
85-
return etcd.NewBackend(store, backendOpts), nil
89+
return etcd.NewBackend(store, backendOpts, cfg.HostID.String()), nil
8690
})
8791
}
8892

0 commit comments

Comments
 (0)