Skip to content

Commit 14fc9e6

Browse files
committed
chore(queue): upgrade to latest version
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent bec2d79 commit 14fc9e6

File tree

5 files changed

+22
-17
lines changed

5 files changed

+22
-17
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/golang-queue/nats
33
go 1.18
44

55
require (
6-
github.com/golang-queue/queue v0.0.13-0.20220403053548-d431277d570f
6+
github.com/golang-queue/queue v0.0.13-0.20220423025512-c4a8df54c917
77
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d
88
github.com/stretchr/testify v1.7.1
99
go.uber.org/goleak v1.1.12

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
33
github.com/golang-queue/queue v0.0.13-0.20220403053548-d431277d570f h1:Wioq3g97ssizNPQsPwdL61DIjePabPaq+XYo7z2t2Oc=
44
github.com/golang-queue/queue v0.0.13-0.20220403053548-d431277d570f/go.mod h1:KD9age1s6nk8Evz3tfKHsk8k4LwA0htxQ7MS7rJPJzA=
5+
github.com/golang-queue/queue v0.0.13-0.20220423025512-c4a8df54c917 h1:+khkGHxQPsad/mfmgizoTc3Jh5UVdCp1OfUkDfW+uDQ=
6+
github.com/golang-queue/queue v0.0.13-0.20220423025512-c4a8df54c917/go.mod h1:g1yxxDl8JMo4gUfxt11fjjU3SXU1ah61EvwshmDoSIs=
57
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
68
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
79
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=

nats.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ import (
88
"time"
99

1010
"github.com/golang-queue/queue"
11+
"github.com/golang-queue/queue/core"
1112

1213
"github.com/nats-io/nats.go"
1314
)
1415

15-
var _ queue.Worker = (*Worker)(nil)
16+
var _ core.Worker = (*Worker)(nil)
1617

1718
// Worker for NSQ
1819
type Worker struct {
@@ -119,7 +120,7 @@ func (w *Worker) handle(job queue.Job) error {
119120
}
120121

121122
// Run start the worker
122-
func (w *Worker) Run(task queue.QueuedMessage) error {
123+
func (w *Worker) Run(task core.QueuedMessage) error {
123124
data, _ := task.(queue.Job)
124125

125126
if err := w.handle(data); err != nil {
@@ -149,7 +150,7 @@ func (w *Worker) Shutdown() error {
149150
}
150151

151152
// Queue send notification to queue
152-
func (w *Worker) Queue(job queue.QueuedMessage) error {
153+
func (w *Worker) Queue(job core.QueuedMessage) error {
153154
if atomic.LoadInt32(&w.stopFlag) == 1 {
154155
return queue.ErrQueueShutdown
155156
}
@@ -163,7 +164,7 @@ func (w *Worker) Queue(job queue.QueuedMessage) error {
163164
}
164165

165166
// Request a new task
166-
func (w *Worker) Request() (queue.QueuedMessage, error) {
167+
func (w *Worker) Request() (core.QueuedMessage, error) {
167168
clock := 0
168169
loop:
169170
for {

nats_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/golang-queue/queue"
13+
"github.com/golang-queue/queue/core"
1314

1415
"github.com/stretchr/testify/assert"
1516
"go.uber.org/goleak"
@@ -77,7 +78,7 @@ func TestNATSCustomFuncAndWait(t *testing.T) {
7778
WithAddr(host+":4222"),
7879
WithSubj("test"),
7980
WithQueue("test"),
80-
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
81+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
8182
log.Println("show message: " + string(m.Bytes()))
8283
time.Sleep(500 * time.Millisecond)
8384
return nil
@@ -130,7 +131,7 @@ func TestJobReachTimeout(t *testing.T) {
130131
WithAddr(host+":4222"),
131132
WithSubj("JobReachTimeout"),
132133
WithQueue("test"),
133-
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
134+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
134135
for {
135136
select {
136137
case <-ctx.Done():
@@ -169,7 +170,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
169170
WithSubj("CancelJob"),
170171
WithQueue("test"),
171172
WithLogger(queue.NewLogger()),
172-
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
173+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
173174
for {
174175
select {
175176
case <-ctx.Done():
@@ -208,7 +209,7 @@ func TestGoroutineLeak(t *testing.T) {
208209
WithSubj("GoroutineLeak"),
209210
WithQueue("test"),
210211
WithLogger(queue.NewEmptyLogger()),
211-
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
212+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
212213
for {
213214
select {
214215
case <-ctx.Done():
@@ -252,7 +253,7 @@ func TestGoroutinePanic(t *testing.T) {
252253
w := NewWorker(
253254
WithAddr(host+":4222"),
254255
WithSubj("GoroutinePanic"),
255-
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
256+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
256257
panic("missing something")
257258
}),
258259
)
@@ -278,7 +279,7 @@ func TestHandleTimeout(t *testing.T) {
278279
}
279280
w := NewWorker(
280281
WithAddr(host+":4222"),
281-
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
282+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
282283
time.Sleep(200 * time.Millisecond)
283284
return nil
284285
}),
@@ -296,7 +297,7 @@ func TestHandleTimeout(t *testing.T) {
296297

297298
w = NewWorker(
298299
WithAddr(host+":4222"),
299-
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
300+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
300301
time.Sleep(200 * time.Millisecond)
301302
return nil
302303
}),
@@ -321,7 +322,7 @@ func TestJobComplete(t *testing.T) {
321322
}
322323
w := NewWorker(
323324
WithAddr(host+":4222"),
324-
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
325+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
325326
return errors.New("job completed")
326327
}),
327328
)
@@ -338,7 +339,7 @@ func TestJobComplete(t *testing.T) {
338339

339340
w = NewWorker(
340341
WithAddr(host+":4222"),
341-
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
342+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
342343
time.Sleep(200 * time.Millisecond)
343344
return errors.New("job completed")
344345
}),

options.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import (
44
"context"
55

66
"github.com/golang-queue/queue"
7+
"github.com/golang-queue/queue/core"
78
)
89

910
// Option for queue system
1011
type Option func(*options)
1112

1213
type options struct {
13-
runFunc func(context.Context, queue.QueuedMessage) error
14+
runFunc func(context.Context, core.QueuedMessage) error
1415
logger queue.Logger
1516
addr string
1617
subj string
@@ -40,7 +41,7 @@ func WithQueue(queue string) Option {
4041
}
4142

4243
// WithRunFunc setup the run func of queue
43-
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
44+
func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option {
4445
return func(w *options) {
4546
w.runFunc = fn
4647
}
@@ -66,7 +67,7 @@ func newOptions(opts ...Option) options {
6667
subj: "foobar",
6768
queue: "foobar",
6869
logger: queue.NewLogger(),
69-
runFunc: func(context.Context, queue.QueuedMessage) error {
70+
runFunc: func(context.Context, core.QueuedMessage) error {
7071
return nil
7172
},
7273
}

0 commit comments

Comments
 (0)