Skip to content

Commit 29aed1f

Browse files
authored
chore(queue): refactor queue package (#32)
1 parent 0b25dda commit 29aed1f

File tree

6 files changed

+16
-153
lines changed

6 files changed

+16
-153
lines changed

.github/workflows/go.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
- name: Setup golangci-lint
1515
uses: golangci/golangci-lint-action@v3
1616
with:
17-
version: v1.45.0
17+
version: latest
1818
args: --verbose
1919

2020
# Label of the container job

.golangci.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ linters:
44
fast: false
55
enable:
66
- bodyclose
7-
- deadcode
87
- depguard
98
- dogsled
109
- dupl
@@ -27,15 +26,12 @@ linters:
2726
- nakedret
2827
- noctx
2928
- nolintlint
30-
- rowserrcheck
3129
- staticcheck
32-
- structcheck
3330
- stylecheck
3431
- typecheck
3532
- unconvert
3633
- unparam
3734
- unused
38-
- varcheck
3935
- whitespace
4036
- gofumpt
4137

go.mod

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@ module github.com/golang-queue/nsq
33
go 1.18
44

55
require (
6-
github.com/golang-queue/queue v0.1.3
6+
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98
77
github.com/nsqio/go-nsq v1.1.0
88
github.com/stretchr/testify v1.8.1
99
go.uber.org/goleak v1.2.0
1010
)
1111

1212
require (
1313
github.com/davecgh/go-spew v1.1.1 // indirect
14-
github.com/goccy/go-json v0.9.11 // indirect
14+
github.com/goccy/go-json v0.10.0 // indirect
1515
github.com/golang/snappy v0.0.4 // indirect
16-
github.com/kr/text v0.2.0 // indirect
1716
github.com/pmezard/go-difflib v1.0.0 // indirect
1817
gopkg.in/yaml.v3 v3.0.1 // indirect
1918
)

go.sum

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
21
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
32
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
43
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5-
github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk=
6-
github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
7-
github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40=
8-
github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos=
4+
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
5+
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
6+
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 h1:T2DoUcMWZr6uSUQAr5wCEzOiwHB1zJOiATAZ4BUAefg=
7+
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98/go.mod h1:8P7IgwdxwKh0/W1I9yCuQQGI8OHIuc7fIHi4OYr1COU=
98
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
109
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
1110
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
1211
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
1312
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
1413
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
15-
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
1614
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
1715
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
1816
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

nsq.go

Lines changed: 6 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import (
44
"context"
55
"encoding/json"
66
"sync"
7-
"sync/atomic"
7+
"sync/atomic" //nolint:typecheck,nolintlint
88
"time"
99

1010
"github.com/golang-queue/queue"
1111
"github.com/golang-queue/queue/core"
12+
"github.com/golang-queue/queue/job"
1213

13-
"github.com/nsqio/go-nsq"
14+
nsq "github.com/nsqio/go-nsq"
1415
)
1516

1617
var _ core.Worker = (*Worker)(nil)
@@ -96,62 +97,9 @@ func (w *Worker) startConsumer() (err error) {
9697
return err
9798
}
9899

99-
func (w *Worker) handle(job *queue.Job) error {
100-
// create channel with buffer size 1 to avoid goroutine leak
101-
done := make(chan error, 1)
102-
panicChan := make(chan interface{}, 1)
103-
startTime := time.Now()
104-
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
105-
defer func() {
106-
cancel()
107-
}()
108-
109-
// run the job
110-
go func() {
111-
// handle panic issue
112-
defer func() {
113-
if p := recover(); p != nil {
114-
panicChan <- p
115-
}
116-
}()
117-
118-
// run custom process function
119-
done <- w.opts.runFunc(ctx, job)
120-
}()
121-
122-
select {
123-
case p := <-panicChan:
124-
panic(p)
125-
case <-ctx.Done(): // timeout reached
126-
return ctx.Err()
127-
case <-w.stop: // shutdown service
128-
// cancel job
129-
cancel()
130-
131-
leftTime := job.Timeout - time.Since(startTime)
132-
// wait job
133-
select {
134-
case <-time.After(leftTime):
135-
return context.DeadlineExceeded
136-
case err := <-done: // job finish
137-
return err
138-
case p := <-panicChan:
139-
panic(p)
140-
}
141-
case err := <-done: // job finish
142-
return err
143-
}
144-
}
145-
146100
// Run start the worker
147-
func (w *Worker) Run(task core.QueuedMessage) error {
148-
data, _ := task.(*queue.Job)
149-
150-
if err := w.handle(data); err != nil {
151-
return err
152-
}
153-
154-
return nil
101+
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
102+
return w.opts.runFunc(ctx, task)
155103
}
156104

157105
// Shutdown worker
@@ -200,7 +148,7 @@ loop:
200148
if !ok {
201149
return nil, queue.ErrQueueHasBeenClosed
202150
}
203-
var data queue.Job
151+
var data job.Message
204152
_ = json.Unmarshal(task.Body, &data)
205153
return &data, nil
206154
case <-time.After(1 * time.Second):

nsq_test.go

Lines changed: 3 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/golang-queue/queue"
1313
"github.com/golang-queue/queue/core"
14+
"github.com/golang-queue/queue/job"
1415

1516
"github.com/stretchr/testify/assert"
1617
"go.uber.org/goleak"
@@ -154,7 +155,7 @@ func TestJobReachTimeout(t *testing.T) {
154155
assert.NoError(t, err)
155156
q.Start()
156157
time.Sleep(400 * time.Millisecond)
157-
assert.NoError(t, q.QueueWithTimeout(20*time.Millisecond, m))
158+
assert.NoError(t, q.Queue(m, job.WithTimeout(20*time.Millisecond)))
158159
time.Sleep(2 * time.Second)
159160
q.Release()
160161
}
@@ -191,7 +192,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
191192
assert.NoError(t, err)
192193
q.Start()
193194
time.Sleep(400 * time.Millisecond)
194-
assert.NoError(t, q.QueueWithTimeout(3*time.Second, m))
195+
assert.NoError(t, q.Queue(m, job.WithTimeout(3*time.Second)))
195196
time.Sleep(2 * time.Second)
196197
q.Release()
197198
}
@@ -267,85 +268,6 @@ func TestGoroutinePanic(t *testing.T) {
267268
q.Wait()
268269
}
269270

270-
func TestHandleTimeout(t *testing.T) {
271-
job := &queue.Job{
272-
Timeout: 100 * time.Millisecond,
273-
Payload: []byte("foo"),
274-
}
275-
w := NewWorker(
276-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
277-
time.Sleep(200 * time.Millisecond)
278-
return nil
279-
}),
280-
)
281-
282-
err := w.handle(job)
283-
assert.Error(t, err)
284-
assert.Equal(t, context.DeadlineExceeded, err)
285-
286-
job = &queue.Job{
287-
Timeout: 150 * time.Millisecond,
288-
Payload: []byte("foo"),
289-
}
290-
291-
w = NewWorker(
292-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
293-
time.Sleep(200 * time.Millisecond)
294-
return nil
295-
}),
296-
)
297-
298-
done := make(chan error)
299-
go func() {
300-
done <- w.handle(job)
301-
}()
302-
303-
assert.NoError(t, w.Shutdown())
304-
305-
err = <-done
306-
assert.Error(t, err)
307-
assert.Equal(t, context.DeadlineExceeded, err)
308-
}
309-
310-
func TestJobComplete(t *testing.T) {
311-
job := &queue.Job{
312-
Timeout: 100 * time.Millisecond,
313-
Payload: []byte("foo"),
314-
}
315-
w := NewWorker(
316-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
317-
return errors.New("job completed")
318-
}),
319-
)
320-
321-
err := w.handle(job)
322-
assert.Error(t, err)
323-
assert.Equal(t, errors.New("job completed"), err)
324-
325-
job = &queue.Job{
326-
Timeout: 250 * time.Millisecond,
327-
Payload: []byte("foo"),
328-
}
329-
330-
w = NewWorker(
331-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
332-
time.Sleep(200 * time.Millisecond)
333-
return errors.New("job completed")
334-
}),
335-
)
336-
337-
done := make(chan error)
338-
go func() {
339-
done <- w.handle(job)
340-
}()
341-
342-
assert.NoError(t, w.Shutdown())
343-
344-
err = <-done
345-
assert.Error(t, err)
346-
assert.Equal(t, errors.New("job completed"), err)
347-
}
348-
349271
func TestNSQStatsinQueue(t *testing.T) {
350272
m := mockMessage{
351273
Message: "foo",

0 commit comments

Comments
 (0)