Skip to content

Commit 554b879

Browse files
authored
chore(queue): refactor queue package (#18)
1 parent 57842b7 commit 554b879

File tree

5 files changed

+17
-152
lines changed

5 files changed

+17
-152
lines changed

.github/workflows/go.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
- name: Setup golangci-lint
1515
uses: golangci/golangci-lint-action@v3
1616
with:
17-
version: v1.50.1
17+
version: latest
1818
args: --verbose
1919

2020
# Label of the container job

go.mod

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,16 @@ go 1.18
44

55
require (
66
github.com/go-redis/redis/v9 v9.0.0-rc.2
7-
github.com/golang-queue/queue v0.1.3
7+
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98
88
github.com/stretchr/testify v1.8.1
99
go.uber.org/goleak v1.2.0
1010
)
1111

1212
require (
13-
github.com/cespare/xxhash/v2 v2.1.2 // indirect
13+
github.com/cespare/xxhash/v2 v2.2.0 // indirect
1414
github.com/davecgh/go-spew v1.1.1 // indirect
1515
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
16-
github.com/goccy/go-json v0.9.7 // indirect
17-
github.com/kr/text v0.2.0 // indirect
16+
github.com/goccy/go-json v0.10.0 // indirect
1817
github.com/pmezard/go-difflib v1.0.0 // indirect
1918
gopkg.in/yaml.v3 v3.0.1 // indirect
2019
)

go.sum

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
2-
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
3-
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
1+
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
2+
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
43
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
54
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
65
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -9,15 +8,14 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
98
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
109
github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksDDJ3a6a0=
1110
github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY=
12-
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
13-
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
14-
github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40=
15-
github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos=
11+
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
12+
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
13+
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 h1:T2DoUcMWZr6uSUQAr5wCEzOiwHB1zJOiATAZ4BUAefg=
14+
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98/go.mod h1:8P7IgwdxwKh0/W1I9yCuQQGI8OHIuc7fIHi4OYr1COU=
1615
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
1716
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
1817
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
1918
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
20-
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
2119
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
2220
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
2321
github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E=

redis.go

Lines changed: 4 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/golang-queue/queue"
1212
"github.com/golang-queue/queue/core"
13+
"github.com/golang-queue/queue/job"
1314

1415
"github.com/go-redis/redis/v9"
1516
)
@@ -130,53 +131,6 @@ func (w *Worker) fetchTask() {
130131
}
131132
}
132133

133-
func (w *Worker) handle(job *queue.Job) error {
134-
// create channel with buffer size 1 to avoid goroutine leak
135-
done := make(chan error, 1)
136-
panicChan := make(chan interface{}, 1)
137-
startTime := time.Now()
138-
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
139-
defer func() {
140-
cancel()
141-
}()
142-
143-
// run the job
144-
go func() {
145-
// handle panic issue
146-
defer func() {
147-
if p := recover(); p != nil {
148-
panicChan <- p
149-
}
150-
}()
151-
152-
// run custom process function
153-
done <- w.opts.runFunc(ctx, job)
154-
}()
155-
156-
select {
157-
case p := <-panicChan:
158-
panic(p)
159-
case <-ctx.Done(): // timeout reached
160-
return ctx.Err()
161-
case <-w.stop: // shutdown service
162-
// cancel job
163-
cancel()
164-
165-
leftTime := job.Timeout - time.Since(startTime)
166-
// wait job
167-
select {
168-
case <-time.After(leftTime):
169-
return context.DeadlineExceeded
170-
case err := <-done: // job finish
171-
return err
172-
case p := <-panicChan:
173-
panic(p)
174-
}
175-
case err := <-done: // job finish
176-
return err
177-
}
178-
}
179-
180134
// Shutdown worker
181135
func (w *Worker) Shutdown() error {
182136
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
@@ -226,14 +180,8 @@ func (w *Worker) Queue(task core.QueuedMessage) error {
226180
}
227181

228182
// Run start the worker
229-
func (w *Worker) Run(task core.QueuedMessage) error {
230-
data, _ := task.(*queue.Job)
231-
232-
if err := w.handle(data); err != nil {
233-
return err
234-
}
235-
236-
return nil
183+
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
184+
return w.opts.runFunc(ctx, task)
237185
}
238186

239187
// Request a new task
@@ -247,7 +195,7 @@ loop:
247195
if !ok {
248196
return nil, queue.ErrQueueHasBeenClosed
249197
}
250-
var data queue.Job
198+
var data job.Message
251199
_ = json.Unmarshal(StrToBytes(task.Values["body"].(string)), &data)
252200
return &data, nil
253201
case <-time.After(1 * time.Second):

redis_test.go

Lines changed: 3 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/golang-queue/queue"
1414
"github.com/golang-queue/queue/core"
15+
"github.com/golang-queue/queue/job"
1516

1617
"github.com/stretchr/testify/assert"
1718
"go.uber.org/goleak"
@@ -179,7 +180,7 @@ func TestJobReachTimeout(t *testing.T) {
179180
assert.NoError(t, err)
180181
q.Start()
181182
time.Sleep(50 * time.Millisecond)
182-
assert.NoError(t, q.QueueWithTimeout(20*time.Millisecond, m))
183+
assert.NoError(t, q.Queue(m, job.WithTimeout(20*time.Millisecond)))
183184
time.Sleep(2 * time.Second)
184185
q.Shutdown()
185186
q.Wait()
@@ -217,7 +218,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
217218
assert.NoError(t, err)
218219
q.Start()
219220
time.Sleep(50 * time.Millisecond)
220-
assert.NoError(t, q.QueueWithTimeout(3*time.Second, m))
221+
assert.NoError(t, q.Queue(m, job.WithTimeout(3*time.Second)))
221222
time.Sleep(2 * time.Second)
222223
q.Shutdown()
223224
q.Wait()
@@ -293,84 +294,3 @@ func TestGoroutinePanic(t *testing.T) {
293294
assert.Error(t, q.Queue(m))
294295
q.Wait()
295296
}
296-
297-
func TestHandleTimeout(t *testing.T) {
298-
job := &queue.Job{
299-
Timeout: 100 * time.Millisecond,
300-
Payload: []byte("foo"),
301-
}
302-
w := NewWorker(
303-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
304-
time.Sleep(200 * time.Millisecond)
305-
return nil
306-
}),
307-
)
308-
309-
err := w.handle(job)
310-
assert.Error(t, err)
311-
assert.Equal(t, context.DeadlineExceeded, err)
312-
assert.NoError(t, w.Shutdown())
313-
314-
job = &queue.Job{
315-
Timeout: 150 * time.Millisecond,
316-
Payload: []byte("foo"),
317-
}
318-
319-
w = NewWorker(
320-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
321-
time.Sleep(200 * time.Millisecond)
322-
return nil
323-
}),
324-
)
325-
326-
done := make(chan error)
327-
go func() {
328-
done <- w.handle(job)
329-
}()
330-
331-
assert.NoError(t, w.Shutdown())
332-
333-
err = <-done
334-
assert.Error(t, err)
335-
assert.Equal(t, context.DeadlineExceeded, err)
336-
}
337-
338-
func TestJobComplete(t *testing.T) {
339-
job := &queue.Job{
340-
Timeout: 100 * time.Millisecond,
341-
Payload: []byte("foo"),
342-
}
343-
w := NewWorker(
344-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
345-
return errors.New("job completed")
346-
}),
347-
)
348-
349-
err := w.handle(job)
350-
assert.Error(t, err)
351-
assert.Equal(t, errors.New("job completed"), err)
352-
assert.NoError(t, w.Shutdown())
353-
354-
job = &queue.Job{
355-
Timeout: 250 * time.Millisecond,
356-
Payload: []byte("foo"),
357-
}
358-
359-
w = NewWorker(
360-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
361-
time.Sleep(200 * time.Millisecond)
362-
return errors.New("job completed")
363-
}),
364-
)
365-
366-
done := make(chan error)
367-
go func() {
368-
done <- w.handle(job)
369-
}()
370-
371-
assert.NoError(t, w.Shutdown())
372-
373-
err = <-done
374-
assert.Error(t, err)
375-
assert.Equal(t, errors.New("job completed"), err)
376-
}

0 commit comments

Comments
 (0)