Skip to content

Commit 92c195d

Browse files
Copilotcschleiden
andcommitted
Move WorkerName to common backend options
Co-authored-by: cschleiden <[email protected]>
1 parent 465ada4 commit 92c195d

File tree

9 files changed

+37
-35
lines changed

9 files changed

+37
-35
lines changed

backend/mysql/mysql.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -977,8 +977,8 @@ func scheduleActivity(ctx context.Context, tx *sql.Tx, queue workflow.Queue, ins
977977

978978
// getWorkerName returns the worker name from options, or generates a UUID-based name if not set.
979979
func getWorkerName(options *options) string {
980-
if options.WorkerName != "" {
981-
return options.WorkerName
980+
if options.Options.WorkerName != "" {
981+
return options.Options.WorkerName
982982
}
983983
return fmt.Sprintf("worker-%v", uuid.NewString())
984984
}

backend/mysql/mysql_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ func Test_MysqlBackend_WorkerName(t *testing.T) {
171171
t.Run("DefaultWorkerName", func(t *testing.T) {
172172
// Create a backend without specifying worker name
173173
// Since we can't connect to MySQL without it being available, we'll test the getWorkerName function directly
174-
options := &options{}
174+
options := &options{
175+
Options: backend.ApplyOptions(),
176+
}
175177
workerName := getWorkerName(options)
176178

177179
// The default worker name should be in the format "worker-<uuid>"
@@ -185,7 +187,9 @@ func Test_MysqlBackend_WorkerName(t *testing.T) {
185187

186188
t.Run("CustomWorkerName", func(t *testing.T) {
187189
customWorkerName := "test-worker-123"
188-
options := &options{WorkerName: customWorkerName}
190+
options := &options{
191+
Options: backend.ApplyOptions(backend.WithWorkerName(customWorkerName)),
192+
}
189193
workerName := getWorkerName(options)
190194

191195
if workerName != customWorkerName {
@@ -194,7 +198,9 @@ func Test_MysqlBackend_WorkerName(t *testing.T) {
194198
})
195199

196200
t.Run("EmptyWorkerNameUsesDefault", func(t *testing.T) {
197-
options := &options{WorkerName: ""}
201+
options := &options{
202+
Options: backend.ApplyOptions(backend.WithWorkerName("")),
203+
}
198204
workerName := getWorkerName(options)
199205

200206
// Empty worker name should fall back to UUID generation

backend/mysql/options.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,10 @@ type options struct {
1313

1414
// ApplyMigrations automatically applies database migrations on startup.
1515
ApplyMigrations bool
16-
17-
// WorkerName allows setting a custom worker name. If not set, a random UUID will be generated.
18-
WorkerName string
1916
}
2017

2118
type option func(*options)
2219

23-
// WithWorkerName sets a custom worker name for the MySQL backend.
24-
// If not provided, a random UUID will be generated.
25-
func WithWorkerName(workerName string) option {
26-
return func(o *options) {
27-
o.WorkerName = workerName
28-
}
29-
}
30-
3120
// WithApplyMigrations automatically applies database migrations on startup.
3221
func WithApplyMigrations(applyMigrations bool) option {
3322
return func(o *options) {

backend/options.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ type Options struct {
4646

4747
// MaxHistorySize is the maximum size of a workflow history. If a workflow exceeds this size, it will be failed.
4848
MaxHistorySize int64
49+
50+
// WorkerName allows setting a custom worker name. If not set, backends will generate a default name.
51+
WorkerName string
4952
}
5053

5154
var DefaultOptions Options = Options{
@@ -115,6 +118,12 @@ func WithMaxHistorySize(size int64) BackendOption {
115118
}
116119
}
117120

121+
func WithWorkerName(workerName string) BackendOption {
122+
return func(o *Options) {
123+
o.WorkerName = workerName
124+
}
125+
}
126+
118127
func ApplyOptions(opts ...BackendOption) *Options {
119128
options := DefaultOptions
120129

backend/redis/queue.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,25 @@ type KeyInfo struct {
4545
}
4646

4747
func newTaskQueue[T any](ctx context.Context, rdb redis.UniversalClient, keyPrefix string, tasktype string) (*taskQueue[T], error) {
48+
return newTaskQueueWithWorkerName[T](ctx, rdb, keyPrefix, tasktype, "")
49+
}
50+
51+
func newTaskQueueWithWorkerName[T any](ctx context.Context, rdb redis.UniversalClient, keyPrefix string, tasktype string, workerName string) (*taskQueue[T], error) {
4852
// Ensure the key prefix ends with a colon
4953
if keyPrefix != "" && keyPrefix[len(keyPrefix)-1] != ':' {
5054
keyPrefix += ":"
5155
}
5256

57+
// Use provided worker name or generate UUID if empty
58+
if workerName == "" {
59+
workerName = uuid.NewString()
60+
}
61+
5362
tq := &taskQueue[T]{
5463
keyPrefix: keyPrefix,
5564
tasktype: tasktype,
5665
groupName: "task-workers",
57-
workerName: uuid.NewString(),
66+
workerName: workerName,
5867
queueSetKey: fmt.Sprintf("%s%s:queues", keyPrefix, tasktype),
5968
}
6069

backend/redis/redis.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
4242

4343
ctx := context.Background()
4444

45-
workflowQueue, err := newTaskQueue[workflowData](ctx, client, options.KeyPrefix, "workflows")
45+
workflowQueue, err := newTaskQueueWithWorkerName[workflowData](ctx, client, options.KeyPrefix, "workflows", options.WorkerName)
4646
if err != nil {
4747
return nil, fmt.Errorf("creating workflow task queue: %w", err)
4848
}
4949

50-
activityQueue, err := newTaskQueue[activityData](ctx, client, options.KeyPrefix, "activities")
50+
activityQueue, err := newTaskQueueWithWorkerName[activityData](ctx, client, options.KeyPrefix, "activities", options.WorkerName)
5151
if err != nil {
5252
return nil, fmt.Errorf("creating activity task queue: %w", err)
5353
}

backend/sqlite/options.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,10 @@ type options struct {
99

1010
// ApplyMigrations automatically applies database migrations on startup.
1111
ApplyMigrations bool
12-
13-
// WorkerName allows setting a custom worker name. If not set, a random UUID will be generated.
14-
WorkerName string
1512
}
1613

1714
type option func(*options)
1815

19-
// WithWorkerName sets a custom worker name for the SQLite backend.
20-
// If not provided, a random UUID will be generated.
21-
func WithWorkerName(workerName string) option {
22-
return func(o *options) {
23-
o.WorkerName = workerName
24-
}
25-
}
26-
2716
// WithApplyMigrations automatically applies database migrations on startup.
2817
func WithApplyMigrations(applyMigrations bool) option {
2918
return func(o *options) {

backend/sqlite/sqlite.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -865,8 +865,8 @@ func (sb *sqliteBackend) ExtendActivityTask(ctx context.Context, task *backend.A
865865

866866
// getWorkerName returns the worker name from options, or generates a UUID-based name if not set.
867867
func getWorkerName(options *options) string {
868-
if options.WorkerName != "" {
869-
return options.WorkerName
868+
if options.Options.WorkerName != "" {
869+
return options.Options.WorkerName
870870
}
871871
return fmt.Sprintf("worker-%v", uuid.NewString())
872872
}

backend/sqlite/sqlite_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@ func Test_SqliteBackend_WorkerName(t *testing.T) {
5454

5555
t.Run("CustomWorkerName", func(t *testing.T) {
5656
customWorkerName := "test-worker-123"
57-
backend := NewInMemoryBackend(WithWorkerName(customWorkerName))
57+
backend := NewInMemoryBackend(WithBackendOptions(backend.WithWorkerName(customWorkerName)))
5858
defer backend.Close()
5959

6060
require.Equal(t, customWorkerName, backend.workerName)
6161
})
6262

6363
t.Run("EmptyWorkerNameUsesDefault", func(t *testing.T) {
64-
backend := NewInMemoryBackend(WithWorkerName(""))
64+
backend := NewInMemoryBackend(WithBackendOptions(backend.WithWorkerName("")))
6565
defer backend.Close()
6666

6767
// Empty worker name should fall back to UUID generation
@@ -71,7 +71,7 @@ func Test_SqliteBackend_WorkerName(t *testing.T) {
7171

7272
t.Run("CustomWorkerNameIsUsedInDatabase", func(t *testing.T) {
7373
customWorkerName := "integration-test-worker"
74-
backend := NewInMemoryBackend(WithWorkerName(customWorkerName))
74+
backend := NewInMemoryBackend(WithBackendOptions(backend.WithWorkerName(customWorkerName)))
7575
defer backend.Close()
7676

7777
// Verify the worker name is stored correctly

0 commit comments

Comments
 (0)