Skip to content

Commit e8e7c9e

Browse files
Copilotcschleiden
andcommitted
Address PR feedback: merge newTaskQueue functions and simplify test
Co-authored-by: cschleiden <[email protected]>
1 parent 92c195d commit e8e7c9e

File tree

4 files changed

+10
-53
lines changed

4 files changed

+10
-53
lines changed

backend/redis/queue.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,7 @@ type KeyInfo struct {
4444
SetKey string
4545
}
4646

47-
func newTaskQueue[T any](ctx context.Context, rdb redis.UniversalClient, keyPrefix string, tasktype string) (*taskQueue[T], error) {
48-
return newTaskQueueWithWorkerName[T](ctx, rdb, keyPrefix, tasktype, "")
49-
}
50-
51-
func newTaskQueueWithWorkerName[T any](ctx context.Context, rdb redis.UniversalClient, keyPrefix string, tasktype string, workerName string) (*taskQueue[T], error) {
47+
func newTaskQueue[T any](ctx context.Context, rdb redis.UniversalClient, keyPrefix string, tasktype string, workerName string) (*taskQueue[T], error) {
5248
// Ensure the key prefix ends with a colon
5349
if keyPrefix != "" && keyPrefix[len(keyPrefix)-1] != ':' {
5450
keyPrefix += ":"

backend/redis/queue_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func Test_TaskQueue(t *testing.T) {
106106

107107
ctx := context.Background()
108108

109-
q, err := newTaskQueue[foo](context.Background(), client, "prefix", taskType)
109+
q, err := newTaskQueue[foo](context.Background(), client, "prefix", taskType, "")
110110
require.NoError(t, err)
111111

112112
_, err = client.Pipelined(ctx, func(p redis.Pipeliner) error {
@@ -135,7 +135,7 @@ func Test_TaskQueue(t *testing.T) {
135135
})
136136
require.NoError(t, err)
137137

138-
q2, _ := newTaskQueue[any](context.Background(), client, "prefix", taskType)
138+
q2, _ := newTaskQueue[any](context.Background(), client, "prefix", taskType, "")
139139
require.NoError(t, err)
140140

141141
// Dequeue using second worker
@@ -148,7 +148,7 @@ func Test_TaskQueue(t *testing.T) {
148148
{
149149
name: "Complete removes task",
150150
f: func(t *testing.T, q *taskQueue[any]) {
151-
q2, _ := newTaskQueue[any](context.Background(), client, "prefix", taskType)
151+
q2, _ := newTaskQueue[any](context.Background(), client, "prefix", taskType, "")
152152

153153
ctx := context.Background()
154154

@@ -182,7 +182,7 @@ func Test_TaskQueue(t *testing.T) {
182182
type taskData struct {
183183
Count int `json:"count"`
184184
}
185-
q, _ := newTaskQueue[taskData](context.Background(), client, "prefix", taskType)
185+
q, _ := newTaskQueue[taskData](context.Background(), client, "prefix", taskType, "")
186186

187187
ctx := context.Background()
188188

@@ -193,7 +193,7 @@ func Test_TaskQueue(t *testing.T) {
193193
})
194194
require.NoError(t, err)
195195

196-
q2, _ := newTaskQueue[taskData](context.Background(), client, "prefix", taskType)
196+
q2, _ := newTaskQueue[taskData](context.Background(), client, "prefix", taskType, "")
197197
require.NoError(t, err)
198198

199199
task, err := q2.Dequeue(ctx, client, []workflow.Queue{workflow.QueueDefault}, lockTimeout, blockTimeout)
@@ -221,7 +221,7 @@ func Test_TaskQueue(t *testing.T) {
221221
require.NoError(t, err)
222222

223223
// Create second worker (with different name)
224-
q2, _ := newTaskQueue[any](context.Background(), client, "prefix", taskType)
224+
q2, _ := newTaskQueue[any](context.Background(), client, "prefix", taskType, "")
225225
require.NoError(t, err)
226226

227227
task, err := q2.Dequeue(ctx, client, []workflow.Queue{workflow.QueueDefault}, lockTimeout, blockTimeout)
@@ -281,7 +281,7 @@ func Test_TaskQueue(t *testing.T) {
281281

282282
ctx := context.Background()
283283

284-
q, err := newTaskQueue[any](ctx, client, "prefix", taskType)
284+
q, err := newTaskQueue[any](ctx, client, "prefix", taskType, "")
285285
require.NoError(t, err)
286286

287287
q.Prepare(ctx, client, []workflow.Queue{workflow.QueueDefault})

backend/redis/redis.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
4242

4343
ctx := context.Background()
4444

45-
workflowQueue, err := newTaskQueueWithWorkerName[workflowData](ctx, client, options.KeyPrefix, "workflows", options.WorkerName)
45+
workflowQueue, err := newTaskQueue[workflowData](ctx, client, options.KeyPrefix, "workflows", options.WorkerName)
4646
if err != nil {
4747
return nil, fmt.Errorf("creating workflow task queue: %w", err)
4848
}
4949

50-
activityQueue, err := newTaskQueueWithWorkerName[activityData](ctx, client, options.KeyPrefix, "activities", options.WorkerName)
50+
activityQueue, err := newTaskQueue[activityData](ctx, client, options.KeyPrefix, "activities", options.WorkerName)
5151
if err != nil {
5252
return nil, fmt.Errorf("creating activity task queue: %w", err)
5353
}

backend/sqlite/sqlite_test.go

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
11
package sqlite
22

33
import (
4-
"context"
54
"testing"
6-
"time"
75

86
"github.com/cschleiden/go-workflows/backend"
9-
"github.com/cschleiden/go-workflows/backend/history"
107
"github.com/cschleiden/go-workflows/backend/test"
11-
"github.com/cschleiden/go-workflows/core"
12-
"github.com/cschleiden/go-workflows/workflow"
138
"github.com/stretchr/testify/require"
149
)
1510

@@ -76,39 +71,5 @@ func Test_SqliteBackend_WorkerName(t *testing.T) {
7671

7772
// Verify the worker name is stored correctly
7873
require.Equal(t, customWorkerName, backend.workerName)
79-
80-
// Create a workflow instance and task to ensure the worker name is actually used
81-
ctx := context.Background()
82-
instance := core.NewWorkflowInstance("test-instance", "test-execution")
83-
84-
event := history.NewPendingEvent(
85-
time.Now(),
86-
history.EventType_WorkflowExecutionStarted,
87-
&history.ExecutionStartedAttributes{
88-
Queue: "test-queue",
89-
Metadata: &workflow.Metadata{},
90-
},
91-
)
92-
93-
// Create workflow instance
94-
err := backend.CreateWorkflowInstance(ctx, instance, event)
95-
require.NoError(t, err)
96-
97-
// Get a workflow task (this should lock it with our custom worker name)
98-
task, err := backend.GetWorkflowTask(ctx, []workflow.Queue{"test-queue"})
99-
require.NoError(t, err)
100-
require.NotNil(t, task)
101-
102-
// Query the database to verify our custom worker name is used
103-
rows, err := backend.db.Query("SELECT worker FROM instances WHERE id = ? AND execution_id = ?",
104-
instance.InstanceID, instance.ExecutionID)
105-
require.NoError(t, err)
106-
defer rows.Close()
107-
108-
var workerNameFromDB string
109-
require.True(t, rows.Next())
110-
err = rows.Scan(&workerNameFromDB)
111-
require.NoError(t, err)
112-
require.Equal(t, customWorkerName, workerNameFromDB)
11374
})
11475
}

0 commit comments

Comments
 (0)