Skip to content

Commit 17a9b6d

Browse files
committed
chore: update task message handling and dependencies
- Update `github.com/golang-queue/queue` dependency from v0.2.1 to v0.3.0 - Remove indirect dependencies `github.com/vmihailenco/msgpack/v5` and `github.com/vmihailenco/tagparser/v2` - Change `QueuedMessage` to `TaskMessage` in function signatures in `options.go` - Change `QueuedMessage` to `TaskMessage` in function signatures in `redis.go` - Change `QueuedMessage` to `TaskMessage` in test function signatures in `redis_test.go` Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent cbbac46 commit 17a9b6d

File tree

5 files changed

+15
-21
lines changed

5 files changed

+15
-21
lines changed

go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.22
44

55
require (
66
github.com/appleboy/com v0.2.1
7-
github.com/golang-queue/queue v0.2.1
7+
github.com/golang-queue/queue v0.3.0
88
github.com/redis/go-redis/v9 v9.7.0
99
github.com/stretchr/testify v1.10.0
1010
github.com/testcontainers/testcontainers-go v0.35.0
@@ -53,8 +53,6 @@ require (
5353
github.com/sirupsen/logrus v1.9.3 // indirect
5454
github.com/tklauser/go-sysconf v0.3.12 // indirect
5555
github.com/tklauser/numcpus v0.6.1 // indirect
56-
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
57-
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
5856
github.com/yusufpapurcu/wmi v1.2.3 // indirect
5957
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
6058
go.opentelemetry.io/otel v1.24.0 // indirect

go.sum

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
5050
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
5151
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
5252
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
53-
github.com/golang-queue/queue v0.2.1 h1:W0JeHniILD4oxvs+E/hDuV0zlnQh0qeTy73BAjrigCw=
54-
github.com/golang-queue/queue v0.2.1/go.mod h1:eUZ3HH9GbhoEKQSlxCBQ4pPXeadbJ7QKBMZ0kIZNDHI=
53+
github.com/golang-queue/queue v0.3.0 h1:gyBLNT9EDOsChazYScp8iLiwLfG0SdnCDmNUybcHig4=
54+
github.com/golang-queue/queue v0.3.0/go.mod h1:SkjMwz1TjxZOrF7kABvbar1CagcMxwRtXt5Tx00wb4g=
5555
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
5656
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
5757
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
@@ -125,10 +125,6 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
125125
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
126126
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
127127
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
128-
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
129-
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
130-
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
131-
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
132128
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
133129
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
134130
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=

options.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
type Option func(*options)
1414

1515
type options struct {
16-
runFunc func(context.Context, core.QueuedMessage) error
16+
runFunc func(context.Context, core.TaskMessage) error
1717
logger queue.Logger
1818
addr string
1919
db int
@@ -110,7 +110,7 @@ func WithConnectionString(connectionString string) Option {
110110
}
111111

112112
// WithRunFunc setup the run func of queue
113-
func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option {
113+
func WithRunFunc(fn func(context.Context, core.TaskMessage) error) Option {
114114
return func(w *options) {
115115
w.runFunc = fn
116116
}
@@ -157,7 +157,7 @@ func newOptions(opts ...Option) options {
157157
group: "golang-queue",
158158
consumer: "golang-queue",
159159
logger: queue.NewLogger(),
160-
runFunc: func(context.Context, core.QueuedMessage) error {
160+
runFunc: func(context.Context, core.TaskMessage) error {
161161
return nil
162162
},
163163
blockTime: 60 * time.Second,

redis.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (w *Worker) queue(data interface{}) error {
176176
}
177177

178178
// Queue send notification to queue
179-
func (w *Worker) Queue(task core.QueuedMessage) error {
179+
func (w *Worker) Queue(task core.TaskMessage) error {
180180
if atomic.LoadInt32(&w.stopFlag) == 1 {
181181
return queue.ErrQueueShutdown
182182
}
@@ -185,12 +185,12 @@ func (w *Worker) Queue(task core.QueuedMessage) error {
185185
}
186186

187187
// Run start the worker
188-
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
188+
func (w *Worker) Run(ctx context.Context, task core.TaskMessage) error {
189189
return w.opts.runFunc(ctx, task)
190190
}
191191

192192
// Request a new task
193-
func (w *Worker) Request() (core.QueuedMessage, error) {
193+
func (w *Worker) Request() (core.TaskMessage, error) {
194194
clock := 0
195195
w.startConsumer()
196196
loop:

redis_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func TestCustomFuncAndWait(t *testing.T) {
141141
w := NewWorker(
142142
WithAddr(endpoint),
143143
WithStreamName("test3"),
144-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
144+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
145145
time.Sleep(500 * time.Millisecond)
146146
return nil
147147
}),
@@ -189,7 +189,7 @@ func TestRedisCluster(t *testing.T) {
189189
WithAddr(strings.Join(hosts, ",")),
190190
WithStreamName("testCluster"),
191191
WithCluster(),
192-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
192+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
193193
time.Sleep(500 * time.Millisecond)
194194
return nil
195195
}),
@@ -243,7 +243,7 @@ func TestJobReachTimeout(t *testing.T) {
243243
w := NewWorker(
244244
WithAddr(endpoint),
245245
WithStreamName("timeout"),
246-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
246+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
247247
for {
248248
select {
249249
case <-ctx.Done():
@@ -286,7 +286,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
286286
WithAddr(endpoint),
287287
WithStreamName("cancel"),
288288
WithLogger(queue.NewLogger()),
289-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
289+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
290290
for {
291291
select {
292292
case <-ctx.Done():
@@ -329,7 +329,7 @@ func TestGoroutineLeak(t *testing.T) {
329329
WithAddr(endpoint),
330330
WithStreamName("GoroutineLeak"),
331331
WithLogger(queue.NewEmptyLogger()),
332-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
332+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
333333
for {
334334
select {
335335
case <-ctx.Done():
@@ -376,7 +376,7 @@ func TestGoroutinePanic(t *testing.T) {
376376
w := NewWorker(
377377
WithAddr(endpoint),
378378
WithStreamName("GoroutinePanic"),
379-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
379+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
380380
panic("missing something")
381381
}),
382382
)

0 commit comments

Comments
 (0)