Skip to content

Commit 8b586d6

Browse files
authored
feat(worker): add request method (#9)
1 parent 8700f0e commit 8b586d6

File tree

6 files changed

+67
-53
lines changed

6 files changed

+67
-53
lines changed

.github/workflows/go.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
strategy:
2323
matrix:
2424
os: [ubuntu-latest]
25-
go: [1.13, 1.14, 1.15, 1.16, 1.17, 1.18]
25+
go: [1.17, 1.18]
2626
include:
2727
- os: ubuntu-latest
2828
go-build: ~/.cache/go-build

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@ go 1.18
44

55
require (
66
github.com/golang-queue/queue v0.0.13-0.20220403053548-d431277d570f
7-
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d
7+
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d
88
github.com/stretchr/testify v1.7.1
99
go.uber.org/goleak v1.1.12
1010
)
1111

1212
require (
1313
github.com/davecgh/go-spew v1.1.0 // indirect
1414
github.com/golang/protobuf v1.5.2 // indirect
15-
github.com/nats-io/nats-server/v2 v2.7.1 // indirect
15+
github.com/nats-io/nats-server/v2 v2.7.4 // indirect
1616
github.com/nats-io/nkeys v0.3.0 // indirect
1717
github.com/nats-io/nuid v1.0.1 // indirect
1818
github.com/pmezard/go-difflib v1.0.0 // indirect
1919
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
20-
google.golang.org/protobuf v1.27.1 // indirect
20+
google.golang.org/protobuf v1.28.0 // indirect
2121
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
2222
)

go.sum

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,18 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
66
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
77
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
88
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
9-
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
9+
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
1010
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
1111
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
1212
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
1313
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
1414
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
15-
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
15+
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
1616
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
17-
github.com/nats-io/nats-server/v2 v2.7.1 h1:SDj8R0PJPVekw3EgHxGtTfJUuMbsuaul1nwWFI3xTyk=
18-
github.com/nats-io/nats-server/v2 v2.7.1/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
19-
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA=
20-
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
17+
github.com/nats-io/nats-server/v2 v2.7.4 h1:c+BZJ3rGzUKCBIM4IXO8uNT2u1vajGbD1kPA6wqCEaM=
18+
github.com/nats-io/nats-server/v2 v2.7.4/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc=
19+
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d h1:zJf4l8Kp67RIZhoVeniSLZs69SHNgjLHz0aNsqPPlx8=
20+
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
2121
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
2222
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
2323
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -67,8 +67,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
6767
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
6868
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
6969
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
70-
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
71-
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
70+
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
71+
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
7272
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
7373
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
7474
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

nats.go

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,45 @@ type Worker struct {
2121
stopFlag int32
2222
stopOnce sync.Once
2323
opts options
24+
tasks chan *nats.Msg
2425
}
2526

2627
// NewWorker for struc
2728
func NewWorker(opts ...Option) *Worker {
2829
var err error
2930
w := &Worker{
30-
opts: newOptions(opts...),
31-
stop: make(chan struct{}),
31+
opts: newOptions(opts...),
32+
stop: make(chan struct{}),
33+
tasks: make(chan *nats.Msg, 1),
3234
}
3335

3436
w.client, err = nats.Connect(w.opts.addr)
3537
if err != nil {
3638
panic(err)
3739
}
3840

41+
if err := w.startConsumer(); err != nil {
42+
panic(err)
43+
}
44+
3945
return w
4046
}
4147

48+
func (w *Worker) startConsumer() error {
49+
_, err := w.client.QueueSubscribe(w.opts.subj, w.opts.queue, func(msg *nats.Msg) {
50+
select {
51+
case w.tasks <- msg:
52+
case <-w.stop:
53+
if msg != nil {
54+
// re-queue the job if worker has been shutdown.
55+
w.opts.logger.Info("re-queue the old job")
56+
}
57+
}
58+
})
59+
60+
return err
61+
}
62+
4263
func (w *Worker) handle(job queue.Job) error {
4364
// create channel with buffer size 1 to avoid goroutine leak
4465
done := make(chan error, 1)
@@ -88,38 +109,12 @@ func (w *Worker) handle(job queue.Job) error {
88109

89110
// Run start the worker
90111
func (w *Worker) Run(task queue.QueuedMessage) error {
91-
wg := &sync.WaitGroup{}
92-
panicChan := make(chan interface{}, 1)
93-
_, err := w.client.QueueSubscribe(w.opts.subj, w.opts.queue, func(m *nats.Msg) {
94-
wg.Add(1)
95-
defer func() {
96-
wg.Done()
97-
if p := recover(); p != nil {
98-
panicChan <- p
99-
}
100-
}()
112+
data, _ := task.(queue.Job)
101113

102-
var data queue.Job
103-
_ = json.Unmarshal(m.Data, &data)
104-
105-
if err := w.handle(data); err != nil {
106-
w.opts.logger.Error(err)
107-
}
108-
})
109-
if err != nil {
114+
if err := w.handle(data); err != nil {
110115
return err
111116
}
112117

113-
// wait close signal
114-
select {
115-
case <-w.stop:
116-
case err := <-panicChan:
117-
w.opts.logger.Error(err)
118-
}
119-
120-
// wait job completed
121-
wg.Wait()
122-
123118
return nil
124119
}
125120

@@ -130,8 +125,9 @@ func (w *Worker) Shutdown() error {
130125
}
131126

132127
w.stopOnce.Do(func() {
133-
w.client.Close()
134128
close(w.stop)
129+
w.client.Close()
130+
close(w.tasks)
135131
})
136132
return nil
137133
}
@@ -152,5 +148,24 @@ func (w *Worker) Queue(job queue.QueuedMessage) error {
152148

153149
// Request a new task
154150
func (w *Worker) Request() (queue.QueuedMessage, error) {
155-
return nil, nil
151+
clock := 0
152+
loop:
153+
for {
154+
select {
155+
case task, ok := <-w.tasks:
156+
if !ok {
157+
return nil, queue.ErrQueueHasBeenClosed
158+
}
159+
var data queue.Job
160+
_ = json.Unmarshal(task.Data, &data)
161+
return data, nil
162+
case <-time.After(1 * time.Second):
163+
if clock == 5 {
164+
break loop
165+
}
166+
clock += 1
167+
}
168+
}
169+
170+
return nil, queue.ErrNoTaskInQueue
156171
}

nats_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,14 @@ func TestNATSDefaultFlow(t *testing.T) {
4040
)
4141
q, err := queue.NewQueue(
4242
queue.WithWorker(w),
43-
queue.WithWorkerCount(2),
43+
queue.WithWorkerCount(1),
4444
)
4545
assert.NoError(t, err)
46-
q.Start()
47-
time.Sleep(100 * time.Millisecond)
4846
assert.NoError(t, q.Queue(m))
49-
m.Message = "new message"
5047
assert.NoError(t, q.Queue(m))
51-
q.Shutdown()
52-
q.Wait()
48+
q.Start()
49+
time.Sleep(500 * time.Millisecond)
50+
q.Release()
5351
}
5452

5553
func TestNATSShutdown(t *testing.T) {

options.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ func WithLogger(l queue.Logger) Option {
5454

5555
func newOptions(opts ...Option) options {
5656
defaultOpts := options{
57-
addr: "127.0.0.1:4222",
58-
subj: "foobar",
59-
queue: "foobar",
57+
addr: "127.0.0.1:4222",
58+
subj: "foobar",
59+
queue: "foobar",
60+
logger: queue.NewLogger(),
6061
runFunc: func(context.Context, queue.QueuedMessage) error {
6162
return nil
6263
},

0 commit comments

Comments
 (0)