Skip to content

Commit 53e1663

Browse files
committed
fix: delay resuming workflows after restart
The `go-workflows` backend locks workflows and activities while they're being executed. In the event of a crash or restart, the lock would prevent the worker from resuming the workflow. This fix makes a few changes to resolve this issue: - Uses a stable ID for the worker ID - Adds a random "worker instance" ID - Adds the new IDs to the lock entities - Incorporates the new IDs into the lock checks If the worker ID matches the lock's worker ID but the worker instance ID does not match, we know that the lock belonged to an old instance of the worker and that the lock can be reassigned to the new instance. PLAT-99
1 parent 8305da3 commit 53e1663

File tree

6 files changed

+86
-32
lines changed

6 files changed

+86
-32
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
kind: Fixed
2+
body: Delay when resuming workflows after a restart.
3+
time: 2025-05-25T12:34:21.713839-04:00

server/internal/workflows/backend/etcd/activity_lock/store.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ type Value struct {
1414
WorkflowInstanceID string `json:"workflow_instance_id"`
1515
EventID string `json:"event_id"`
1616
CreatedAt time.Time `json:"created_at"`
17+
WorkerID string `json:"worker_id"`
18+
WorkerInstanceID string `json:"worker_instance_id"`
19+
}
20+
21+
func (v *Value) CanBeReassignedTo(workerID, workerInstanceID string) bool {
22+
// This lock can be reassigned if it belongs to an old instance of the given
23+
// worker.
24+
return v.WorkerID == workerID && v.WorkerInstanceID != workerInstanceID
1725
}
1826

1927
type Store struct {

server/internal/workflows/backend/etcd/etcd.go

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,18 @@ import (
2929
var _ backend.Backend = (*Backend)(nil)
3030

3131
type Backend struct {
32-
store *Store
33-
options *backend.Options
34-
workerID string
32+
store *Store
33+
options *backend.Options
34+
workerID string
35+
workerInstanceID string
3536
}
3637

37-
func NewBackend(store *Store, options *backend.Options) *Backend {
38+
func NewBackend(store *Store, options *backend.Options, workerID string) *Backend {
3839
return &Backend{
39-
store: store,
40-
options: options,
41-
workerID: uuid.NewString(),
40+
store: store,
41+
options: options,
42+
workerID: workerID,
43+
workerInstanceID: uuid.NewString(),
4244
}
4345
}
4446

@@ -252,15 +254,34 @@ func (b *Backend) GetWorkflowTask(ctx context.Context, queues []workflow.Queue)
252254
instanceID := item.WorkflowInstance.InstanceID
253255
executionID := item.WorkflowInstance.ExecutionID
254256

255-
locked, err := b.store.WorkflowInstanceLock.
256-
ExistsByKey(item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID).
257+
lock, err := b.store.WorkflowInstanceLock.
258+
GetByKey(item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID).
257259
Exec(ctx)
258-
if err != nil {
260+
if err != nil && !errors.Is(err, storage.ErrNotFound) {
259261
return nil, fmt.Errorf("failed to check for lock: %w", err)
260262
}
261-
if locked {
263+
var lockOp storage.TxnOperation
264+
switch {
265+
case lock == nil:
266+
lockOp = b.store.WorkflowInstanceLock.
267+
Create(&workflow_instance_lock.Value{
268+
WorkflowInstanceID: instanceID,
269+
WorkflowExecutionID: executionID,
270+
CreatedAt: time.Now(),
271+
WorkerID: b.workerID,
272+
WorkerInstanceID: b.workerInstanceID,
273+
}).
274+
WithTTL(b.options.WorkflowLockTimeout)
275+
case lock.CanBeReassignedTo(b.workerID, b.workerInstanceID):
276+
lock.WorkerID = b.workerID
277+
lock.WorkerInstanceID = b.workerInstanceID
278+
lockOp = b.store.WorkflowInstanceLock.
279+
Update(lock).
280+
WithTTL(b.options.WorkflowLockTimeout)
281+
default:
262282
continue
263283
}
284+
264285
sticky, err := b.store.WorkflowInstanceSticky.
265286
GetByKey(item.WorkflowInstance.InstanceID).
266287
Exec(ctx)
@@ -297,13 +318,7 @@ func (b *Backend) GetWorkflowTask(ctx context.Context, queues []workflow.Queue)
297318
item.UpdateLastLocked()
298319

299320
err = b.store.Txn(
300-
b.store.WorkflowInstanceLock.
301-
Create(&workflow_instance_lock.Value{
302-
WorkflowInstanceID: instanceID,
303-
WorkflowExecutionID: executionID,
304-
CreatedAt: time.Now(),
305-
}).
306-
WithTTL(b.options.WorkflowLockTimeout),
321+
lockOp,
307322
b.store.WorkflowInstanceSticky.
308323
Put(&workflow_instance_sticky.Value{
309324
WorkflowInstanceID: instanceID,
@@ -356,6 +371,8 @@ func (b *Backend) ExtendWorkflowTask(ctx context.Context, task *backend.Workflow
356371
WorkflowInstanceID: instanceID,
357372
WorkflowExecutionID: executionID,
358373
CreatedAt: time.Now(),
374+
WorkerID: b.workerID,
375+
WorkerInstanceID: b.workerInstanceID,
359376
}
360377
}
361378

@@ -570,13 +587,31 @@ func (b *Backend) GetActivityTask(ctx context.Context, queues []workflow.Queue)
570587
for _, item := range items {
571588
instanceID := item.WorkflowInstanceID
572589
executionID := item.WorkflowExecutionID
573-
locked, err := b.store.ActivityLock.
574-
ExistsByKey(instanceID, item.Event.ID).
590+
lock, err := b.store.ActivityLock.
591+
GetByKey(instanceID, item.Event.ID).
575592
Exec(ctx)
576-
if err != nil {
593+
if err != nil && !errors.Is(err, storage.ErrNotFound) {
577594
return nil, fmt.Errorf("failed to check for lock: %w", err)
578595
}
579-
if locked {
596+
var lockOp storage.TxnOperation
597+
switch {
598+
case lock == nil:
599+
lockOp = b.store.ActivityLock.
600+
Create(&activity_lock.Value{
601+
WorkflowInstanceID: instanceID,
602+
EventID: item.Event.ID,
603+
CreatedAt: time.Now(),
604+
WorkerID: b.workerID,
605+
WorkerInstanceID: b.workerInstanceID,
606+
}).
607+
WithTTL(b.options.ActivityLockTimeout)
608+
case lock.CanBeReassignedTo(b.workerID, b.workerInstanceID):
609+
lock.WorkerID = b.workerID
610+
lock.WorkerInstanceID = b.workerInstanceID
611+
lockOp = b.store.ActivityLock.
612+
Update(lock).
613+
WithTTL(b.options.WorkflowLockTimeout)
614+
default:
580615
continue
581616
}
582617

@@ -585,13 +620,7 @@ func (b *Backend) GetActivityTask(ctx context.Context, queues []workflow.Queue)
585620
item.UpdateLastLocked()
586621

587622
err = b.store.Txn(
588-
b.store.ActivityLock.
589-
Create(&activity_lock.Value{
590-
WorkflowInstanceID: instanceID,
591-
EventID: item.Event.ID,
592-
CreatedAt: time.Now(),
593-
}).
594-
WithTTL(b.options.ActivityLockTimeout),
623+
lockOp,
595624
b.store.ActivityQueueItem.Update(item),
596625
).Commit(ctx)
597626
if err != nil {
@@ -628,6 +657,8 @@ func (b *Backend) ExtendActivityTask(ctx context.Context, task *backend.Activity
628657
WorkflowInstanceID: task.WorkflowInstance.InstanceID,
629658
EventID: task.Event.ID,
630659
CreatedAt: time.Now(),
660+
WorkerID: b.workerID,
661+
WorkerInstanceID: b.workerInstanceID,
631662
}
632663
}
633664

server/internal/workflows/backend/etcd/etcd_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func Test_EtcdBackend(t *testing.T) {
2323

2424
test.BackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
2525
opts := backend.ApplyOptions(options...)
26-
return NewBackend(NewStore(client, uuid.NewString()), opts)
26+
return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
2727
}, nil)
2828
}
2929

@@ -33,7 +33,7 @@ func Test_EtcdBackendE2E(t *testing.T) {
3333

3434
test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
3535
opts := backend.ApplyOptions(options...)
36-
return NewBackend(NewStore(client, uuid.NewString()), opts)
36+
return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
3737
}, nil)
3838
}
3939

server/internal/workflows/backend/etcd/workflow_instance_lock/store.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ type Value struct {
1414
WorkflowInstanceID string `json:"workflow_instance_id"`
1515
WorkflowExecutionID string `json:"workflow_execution_id"`
1616
CreatedAt time.Time `json:"created_at"`
17+
WorkerID string `json:"worker_id"`
18+
WorkerInstanceID string `json:"worker_instance_id"`
19+
}
20+
21+
func (v *Value) CanBeReassignedTo(workerID, workerInstanceID string) bool {
22+
// This lock can be reassigned if it belongs to an old instance of the given
23+
// worker.
24+
return v.WorkerID == workerID && v.WorkerInstanceID != workerInstanceID
1725
}
1826

1927
type Store struct {

server/internal/workflows/provide.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ func provideClient(i *do.Injector) {
7070

7171
func provideBackend(i *do.Injector) {
7272
do.Provide(i, func(i *do.Injector) (backend.Backend, error) {
73+
cfg, err := do.Invoke[config.Config](i)
74+
if err != nil {
75+
return nil, err
76+
}
7377
store, err := do.Invoke[*etcd.Store](i)
7478
if err != nil {
7579
return nil, err
@@ -82,7 +86,7 @@ func provideBackend(i *do.Injector) {
8286
backendOpts := backend.ApplyOptions(
8387
backend.WithLogger(logger),
8488
)
85-
return etcd.NewBackend(store, backendOpts), nil
89+
return etcd.NewBackend(store, backendOpts, cfg.HostID.String()), nil
8690
})
8791
}
8892

0 commit comments

Comments
 (0)