Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changes/unreleased/Fixed-20250525-123421.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Fixed
body: Delay when resuming workflows after a restart.
time: 2025-05-25T12:34:21.713839-04:00
8 changes: 8 additions & 0 deletions server/internal/workflows/backend/etcd/activity_lock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ type Value struct {
WorkflowInstanceID string `json:"workflow_instance_id"`
EventID string `json:"event_id"`
CreatedAt time.Time `json:"created_at"`
WorkerID string `json:"worker_id"`
WorkerInstanceID string `json:"worker_instance_id"`
}

func (v *Value) CanBeReassignedTo(workerID, workerInstanceID string) bool {
// This lock can be reassigned if it belongs to an old instance of the given
// worker.
return v.WorkerID == workerID && v.WorkerInstanceID != workerInstanceID
}

type Store struct {
Expand Down
93 changes: 62 additions & 31 deletions server/internal/workflows/backend/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@ import (
var _ backend.Backend = (*Backend)(nil)

type Backend struct {
store *Store
options *backend.Options
workerID string
store *Store
options *backend.Options
workerID string
workerInstanceID string
}

func NewBackend(store *Store, options *backend.Options) *Backend {
func NewBackend(store *Store, options *backend.Options, workerID string) *Backend {
return &Backend{
store: store,
options: options,
workerID: uuid.NewString(),
store: store,
options: options,
workerID: workerID,
workerInstanceID: uuid.NewString(),
}
}

Expand Down Expand Up @@ -252,15 +254,35 @@ func (b *Backend) GetWorkflowTask(ctx context.Context, queues []workflow.Queue)
instanceID := item.WorkflowInstance.InstanceID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GetWorkflowTask method looks a bit large and complex with 3 nested loops. It might be a good idea to refactor. What do you think?

Copy link
Member Author

@jason-lynch jason-lynch Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this package could absolutely be refactored. But, I would prefer to do that as its own ticket, because changes to this package need to be made very carefully. This ticket is just focused on fixing a bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True

executionID := item.WorkflowInstance.ExecutionID

locked, err := b.store.WorkflowInstanceLock.
ExistsByKey(item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID).
lock, err := b.store.WorkflowInstanceLock.
GetByKey(item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID).
Exec(ctx)
if err != nil {
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return nil, fmt.Errorf("failed to check for lock: %w", err)
}
if locked {
now := time.Now()
var lockOp storage.TxnOperation
switch {
case lock == nil:
lockOp = b.store.WorkflowInstanceLock.
Create(&workflow_instance_lock.Value{
WorkflowInstanceID: instanceID,
WorkflowExecutionID: executionID,
CreatedAt: now,
WorkerID: b.workerID,
WorkerInstanceID: b.workerInstanceID,
}).
WithTTL(b.options.WorkflowLockTimeout)
case lock.CanBeReassignedTo(b.workerID, b.workerInstanceID):
lock.WorkerID = b.workerID
lock.WorkerInstanceID = b.workerInstanceID
lockOp = b.store.WorkflowInstanceLock.
Update(lock).
WithTTL(b.options.WorkflowLockTimeout)
default:
continue
}

sticky, err := b.store.WorkflowInstanceSticky.
GetByKey(item.WorkflowInstance.InstanceID).
Exec(ctx)
Expand All @@ -278,7 +300,6 @@ func (b *Backend) GetWorkflowTask(ctx context.Context, queues []workflow.Queue)
}
sortPendingEvents(pendingEvents)

now := time.Now()
var newEvents []*history.Event
for _, event := range pendingEvents {
// Skip events that aren't visible yet.
Expand All @@ -297,17 +318,11 @@ func (b *Backend) GetWorkflowTask(ctx context.Context, queues []workflow.Queue)
item.UpdateLastLocked()

err = b.store.Txn(
b.store.WorkflowInstanceLock.
Create(&workflow_instance_lock.Value{
WorkflowInstanceID: instanceID,
WorkflowExecutionID: executionID,
CreatedAt: time.Now(),
}).
WithTTL(b.options.WorkflowLockTimeout),
lockOp,
b.store.WorkflowInstanceSticky.
Put(&workflow_instance_sticky.Value{
WorkflowInstanceID: instanceID,
CreatedAt: time.Now(),
CreatedAt: now,
WorkerID: b.workerID,
}).
WithTTL(b.options.StickyTimeout),
Expand Down Expand Up @@ -356,6 +371,8 @@ func (b *Backend) ExtendWorkflowTask(ctx context.Context, task *backend.Workflow
WorkflowInstanceID: instanceID,
WorkflowExecutionID: executionID,
CreatedAt: time.Now(),
WorkerID: b.workerID,
WorkerInstanceID: b.workerInstanceID,
}
}

Expand Down Expand Up @@ -570,13 +587,31 @@ func (b *Backend) GetActivityTask(ctx context.Context, queues []workflow.Queue)
for _, item := range items {
instanceID := item.WorkflowInstanceID
executionID := item.WorkflowExecutionID
locked, err := b.store.ActivityLock.
ExistsByKey(instanceID, item.Event.ID).
lock, err := b.store.ActivityLock.
GetByKey(instanceID, item.Event.ID).
Exec(ctx)
if err != nil {
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return nil, fmt.Errorf("failed to check for lock: %w", err)
}
if locked {
var lockOp storage.TxnOperation
switch {
case lock == nil:
lockOp = b.store.ActivityLock.
Create(&activity_lock.Value{
WorkflowInstanceID: instanceID,
EventID: item.Event.ID,
CreatedAt: time.Now(),
WorkerID: b.workerID,
WorkerInstanceID: b.workerInstanceID,
}).
WithTTL(b.options.ActivityLockTimeout)
case lock.CanBeReassignedTo(b.workerID, b.workerInstanceID):
lock.WorkerID = b.workerID
lock.WorkerInstanceID = b.workerInstanceID
lockOp = b.store.ActivityLock.
Update(lock).
WithTTL(b.options.WorkflowLockTimeout)
default:
continue
}

Expand All @@ -585,13 +620,7 @@ func (b *Backend) GetActivityTask(ctx context.Context, queues []workflow.Queue)
item.UpdateLastLocked()

err = b.store.Txn(
b.store.ActivityLock.
Create(&activity_lock.Value{
WorkflowInstanceID: instanceID,
EventID: item.Event.ID,
CreatedAt: time.Now(),
}).
WithTTL(b.options.ActivityLockTimeout),
lockOp,
b.store.ActivityQueueItem.Update(item),
).Commit(ctx)
if err != nil {
Expand Down Expand Up @@ -628,6 +657,8 @@ func (b *Backend) ExtendActivityTask(ctx context.Context, task *backend.Activity
WorkflowInstanceID: task.WorkflowInstance.InstanceID,
EventID: task.Event.ID,
CreatedAt: time.Now(),
WorkerID: b.workerID,
WorkerInstanceID: b.workerInstanceID,
}
}

Expand Down
4 changes: 2 additions & 2 deletions server/internal/workflows/backend/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Test_EtcdBackend(t *testing.T) {

test.BackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
opts := backend.ApplyOptions(options...)
return NewBackend(NewStore(client, uuid.NewString()), opts)
return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
}, nil)
}

Expand All @@ -33,7 +33,7 @@ func Test_EtcdBackendE2E(t *testing.T) {

test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
opts := backend.ApplyOptions(options...)
return NewBackend(NewStore(client, uuid.NewString()), opts)
return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
}, nil)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ type Value struct {
WorkflowInstanceID string `json:"workflow_instance_id"`
WorkflowExecutionID string `json:"workflow_execution_id"`
CreatedAt time.Time `json:"created_at"`
WorkerID string `json:"worker_id"`
WorkerInstanceID string `json:"worker_instance_id"`
}

func (v *Value) CanBeReassignedTo(workerID, workerInstanceID string) bool {
// This lock can be reassigned if it belongs to an old instance of the given
// worker.
return v.WorkerID == workerID && v.WorkerInstanceID != workerInstanceID
}

type Store struct {
Expand Down
6 changes: 5 additions & 1 deletion server/internal/workflows/provide.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func provideClient(i *do.Injector) {

func provideBackend(i *do.Injector) {
do.Provide(i, func(i *do.Injector) (backend.Backend, error) {
cfg, err := do.Invoke[config.Config](i)
if err != nil {
return nil, err
}
store, err := do.Invoke[*etcd.Store](i)
if err != nil {
return nil, err
Expand All @@ -83,7 +87,7 @@ func provideBackend(i *do.Injector) {
backendOpts := backend.ApplyOptions(
backend.WithLogger(logger),
)
return etcd.NewBackend(store, backendOpts), nil
return etcd.NewBackend(store, backendOpts, cfg.HostID.String()), nil
})
}

Expand Down