Skip to content

Commit dafdb36

Browse files
Copilotcschleiden
andcommitted
Add configurable worker names for SQL backends
Co-authored-by: cschleiden <[email protected]>
1 parent 7cd54b1 commit dafdb36

File tree

7 files changed

+160
-2
lines changed

7 files changed

+160
-2
lines changed

backend/mysql/mysql.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func NewMysqlBackend(host string, port int, user, password, database string, opt
5555
b := &mysqlBackend{
5656
dsn: dsn,
5757
db: db,
58-
workerName: fmt.Sprintf("worker-%v", uuid.NewString()),
58+
workerName: getWorkerName(options),
5959
options: options,
6060
}
6161

@@ -974,3 +974,11 @@ func scheduleActivity(ctx context.Context, tx *sql.Tx, queue workflow.Queue, ins
974974

975975
return err
976976
}
977+
978+
// getWorkerName returns the worker name from options, or generates a UUID-based name if not set.
979+
func getWorkerName(options *options) string {
980+
if options.WorkerName != "" {
981+
return options.WorkerName
982+
}
983+
return fmt.Sprintf("worker-%v", uuid.NewString())
984+
}

backend/mysql/mysql_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,3 +162,47 @@ func (mb *mysqlBackend) GetFutureEvents(ctx context.Context) ([]*history.Event,
162162

163163
return f, nil
164164
}
165+
166+
func Test_MysqlBackend_WorkerName(t *testing.T) {
167+
if testing.Short() {
168+
t.Skip()
169+
}
170+
171+
t.Run("DefaultWorkerName", func(t *testing.T) {
172+
// Create a backend without specifying worker name
173+
// Since we can't connect to MySQL without it being available, we'll test the getWorkerName function directly
174+
options := &options{}
175+
workerName := getWorkerName(options)
176+
177+
// The default worker name should be in the format "worker-<uuid>"
178+
if !strings.Contains(workerName, "worker-") {
179+
t.Errorf("Expected worker name to contain 'worker-', got: %s", workerName)
180+
}
181+
if len(workerName) != 43 { // "worker-" (7) + UUID (36)
182+
t.Errorf("Expected worker name length to be 43, got: %d", len(workerName))
183+
}
184+
})
185+
186+
t.Run("CustomWorkerName", func(t *testing.T) {
187+
customWorkerName := "test-worker-123"
188+
options := &options{WorkerName: customWorkerName}
189+
workerName := getWorkerName(options)
190+
191+
if workerName != customWorkerName {
192+
t.Errorf("Expected worker name to be '%s', got: %s", customWorkerName, workerName)
193+
}
194+
})
195+
196+
t.Run("EmptyWorkerNameUsesDefault", func(t *testing.T) {
197+
options := &options{WorkerName: ""}
198+
workerName := getWorkerName(options)
199+
200+
// Empty worker name should fall back to UUID generation
201+
if !strings.Contains(workerName, "worker-") {
202+
t.Errorf("Expected worker name to contain 'worker-', got: %s", workerName)
203+
}
204+
if len(workerName) != 43 { // "worker-" (7) + UUID (36)
205+
t.Errorf("Expected worker name length to be 43, got: %d", len(workerName))
206+
}
207+
})
208+
}

backend/mysql/options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,21 @@ type options struct {
1313

1414
// ApplyMigrations automatically applies database migrations on startup.
1515
ApplyMigrations bool
16+
17+
// WorkerName allows setting a custom worker name. If not set, a random UUID will be generated.
18+
WorkerName string
1619
}
1720

1821
type option func(*options)
1922

23+
// WithWorkerName sets a custom worker name for the MySQL backend.
24+
// If not provided, a random UUID will be generated.
25+
func WithWorkerName(workerName string) option {
26+
return func(o *options) {
27+
o.WorkerName = workerName
28+
}
29+
}
30+
2031
// WithApplyMigrations automatically applies database migrations on startup.
2132
func WithApplyMigrations(applyMigrations bool) option {
2233
return func(o *options) {

backend/sqlite/options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,21 @@ type options struct {
99

1010
// ApplyMigrations automatically applies database migrations on startup.
1111
ApplyMigrations bool
12+
13+
// WorkerName allows setting a custom worker name. If not set, a random UUID will be generated.
14+
WorkerName string
1215
}
1316

1417
type option func(*options)
1518

19+
// WithWorkerName sets a custom worker name for the SQLite backend.
20+
// If not provided, a random UUID will be generated.
21+
func WithWorkerName(workerName string) option {
22+
return func(o *options) {
23+
o.WorkerName = workerName
24+
}
25+
}
26+
1627
// WithApplyMigrations automatically applies database migrations on startup.
1728
func WithApplyMigrations(applyMigrations bool) option {
1829
return func(o *options) {

backend/sqlite/sqlite.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func newSqliteBackend(dsn string, opts ...option) *sqliteBackend {
7676

7777
b := &sqliteBackend{
7878
db: db,
79-
workerName: fmt.Sprintf("worker-%v", uuid.NewString()),
79+
workerName: getWorkerName(options),
8080
options: options,
8181
}
8282

@@ -862,3 +862,11 @@ func (sb *sqliteBackend) ExtendActivityTask(ctx context.Context, task *backend.A
862862

863863
return tx.Commit()
864864
}
865+
866+
// getWorkerName returns the worker name from options, or generates a UUID-based name if not set.
867+
func getWorkerName(options *options) string {
868+
if options.WorkerName != "" {
869+
return options.WorkerName
870+
}
871+
return fmt.Sprintf("worker-%v", uuid.NewString())
872+
}

backend/sqlite/sqlite_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package sqlite
22

33
import (
4+
"context"
45
"testing"
6+
"time"
57

68
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/backend/history"
710
"github.com/cschleiden/go-workflows/backend/test"
11+
"github.com/cschleiden/go-workflows/core"
12+
"github.com/cschleiden/go-workflows/workflow"
813
"github.com/stretchr/testify/require"
914
)
1015

@@ -36,3 +41,74 @@ func Test_EndToEndSqliteBackend(t *testing.T) {
3641
require.NoError(t, b.Close())
3742
})
3843
}
44+
45+
func Test_SqliteBackend_WorkerName(t *testing.T) {
46+
t.Run("DefaultWorkerName", func(t *testing.T) {
47+
backend := NewInMemoryBackend()
48+
defer backend.Close()
49+
50+
// The default worker name should be in the format "worker-<uuid>"
51+
require.Contains(t, backend.workerName, "worker-")
52+
require.Len(t, backend.workerName, 43) // "worker-" (7) + UUID (36)
53+
})
54+
55+
t.Run("CustomWorkerName", func(t *testing.T) {
56+
customWorkerName := "test-worker-123"
57+
backend := NewInMemoryBackend(WithWorkerName(customWorkerName))
58+
defer backend.Close()
59+
60+
require.Equal(t, customWorkerName, backend.workerName)
61+
})
62+
63+
t.Run("EmptyWorkerNameUsesDefault", func(t *testing.T) {
64+
backend := NewInMemoryBackend(WithWorkerName(""))
65+
defer backend.Close()
66+
67+
// Empty worker name should fall back to UUID generation
68+
require.Contains(t, backend.workerName, "worker-")
69+
require.Len(t, backend.workerName, 43) // "worker-" (7) + UUID (36)
70+
})
71+
72+
t.Run("CustomWorkerNameIsUsedInDatabase", func(t *testing.T) {
73+
customWorkerName := "integration-test-worker"
74+
backend := NewInMemoryBackend(WithWorkerName(customWorkerName))
75+
defer backend.Close()
76+
77+
// Verify the worker name is stored correctly
78+
require.Equal(t, customWorkerName, backend.workerName)
79+
80+
// Create a workflow instance and task to ensure the worker name is actually used
81+
ctx := context.Background()
82+
instance := core.NewWorkflowInstance("test-instance", "test-execution")
83+
84+
event := history.NewPendingEvent(
85+
time.Now(),
86+
history.EventType_WorkflowExecutionStarted,
87+
&history.ExecutionStartedAttributes{
88+
Queue: "test-queue",
89+
Metadata: &workflow.Metadata{},
90+
},
91+
)
92+
93+
// Create workflow instance
94+
err := backend.CreateWorkflowInstance(ctx, instance, event)
95+
require.NoError(t, err)
96+
97+
// Get a workflow task (this should lock it with our custom worker name)
98+
task, err := backend.GetWorkflowTask(ctx, []workflow.Queue{"test-queue"})
99+
require.NoError(t, err)
100+
require.NotNil(t, task)
101+
102+
// Query the database to verify our custom worker name is used
103+
rows, err := backend.db.Query("SELECT worker FROM instances WHERE id = ? AND execution_id = ?",
104+
instance.InstanceID, instance.ExecutionID)
105+
require.NoError(t, err)
106+
defer rows.Close()
107+
108+
var workerNameFromDB string
109+
require.True(t, rows.Next())
110+
err = rows.Scan(&workerNameFromDB)
111+
require.NoError(t, err)
112+
require.Equal(t, customWorkerName, workerNameFromDB)
113+
})
114+
}

workflow.db

84 KB
Binary file not shown.

0 commit comments

Comments
 (0)