Skip to content

Commit 7d36bba

Browse files
authored
chore(queue): update to latest version (#24)
1 parent 778a164 commit 7d36bba

File tree

4 files changed

+24
-158
lines changed

4 files changed

+24
-158
lines changed

go.mod

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ module github.com/golang-queue/redisdb
33
go 1.18
44

55
require (
6-
github.com/go-redis/redis/v8 v8.11.5
7-
github.com/golang-queue/queue v0.1.3
6+
github.com/go-redis/redis/v9 v9.0.0-rc.2
7+
github.com/golang-queue/queue v0.1.4-0.20221210024521-cb8720b0c721
88
github.com/stretchr/testify v1.8.1
99
go.uber.org/goleak v1.2.0
1010
)
@@ -13,8 +13,7 @@ require (
1313
github.com/cespare/xxhash/v2 v2.1.2 // indirect
1414
github.com/davecgh/go-spew v1.1.1 // indirect
1515
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
16-
github.com/goccy/go-json v0.9.7 // indirect
17-
github.com/kr/text v0.2.0 // indirect
16+
github.com/goccy/go-json v0.10.0 // indirect
1817
github.com/pmezard/go-difflib v1.0.0 // indirect
1918
gopkg.in/yaml.v3 v3.0.1 // indirect
2019
)

go.sum

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,24 @@
11
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
22
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
3-
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
43
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
54
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
65
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
76
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
87
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
98
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
10-
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
11-
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
12-
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
13-
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
14-
github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40=
15-
github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos=
9+
github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksDDJ3a6a0=
10+
github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY=
11+
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
12+
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
13+
github.com/golang-queue/queue v0.1.4-0.20221210024521-cb8720b0c721 h1:sYfb8dgd4PQkbDNUFAeVI/RkxCgFJPx49hTILog/CNU=
14+
github.com/golang-queue/queue v0.1.4-0.20221210024521-cb8720b0c721/go.mod h1:8P7IgwdxwKh0/W1I9yCuQQGI8OHIuc7fIHi4OYr1COU=
1615
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
16+
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
1717
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
1818
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
19-
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
2019
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
2120
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
22-
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
21+
github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E=
2322
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2423
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
2524
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -32,14 +31,13 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
3231
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
3332
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
3433
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
35-
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
36-
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
37-
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
34+
golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU=
35+
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
36+
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
3837
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
3938
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
4039
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
4140
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
42-
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
4341
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
4442
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
4543
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

redis.go

Lines changed: 7 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import (
88
"sync/atomic"
99
"time"
1010

11-
"github.com/go-redis/redis/v8"
1211
"github.com/golang-queue/queue"
1312
"github.com/golang-queue/queue/core"
13+
"github.com/golang-queue/queue/job"
14+
15+
"github.com/go-redis/redis/v9"
1416
)
1517

1618
var _ core.Worker = (*Worker)(nil)
@@ -86,51 +88,9 @@ func NewWorker(opts ...Option) *Worker {
8688
return w
8789
}
8890

89-
func (w *Worker) handle(job *queue.Job) error {
90-
// create channel with buffer size 1 to avoid goroutine leak
91-
done := make(chan error, 1)
92-
panicChan := make(chan interface{}, 1)
93-
startTime := time.Now()
94-
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
95-
defer func() {
96-
cancel()
97-
}()
98-
99-
// run the job
100-
go func() {
101-
// handle panic issue
102-
defer func() {
103-
if p := recover(); p != nil {
104-
panicChan <- p
105-
}
106-
}()
107-
108-
// run custom process function
109-
done <- w.opts.runFunc(ctx, job)
110-
}()
111-
112-
select {
113-
case p := <-panicChan:
114-
panic(p)
115-
case <-ctx.Done(): // timeout reached
116-
return ctx.Err()
117-
case <-w.stop: // shutdown service
118-
// cancel job
119-
cancel()
120-
121-
leftTime := job.Timeout - time.Since(startTime)
122-
// wait job
123-
select {
124-
case <-time.After(leftTime):
125-
return context.DeadlineExceeded
126-
case err := <-done: // job finish
127-
return err
128-
case p := <-panicChan:
129-
panic(p)
130-
}
131-
case err := <-done: // job finish
132-
return err
133-
}
91+
// Run to execute new task
92+
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
93+
return w.opts.runFunc(ctx, task)
13494
}
13595

13696
// Shutdown worker
@@ -169,17 +129,6 @@ func (w *Worker) Queue(job core.QueuedMessage) error {
169129
return nil
170130
}
171131

172-
// Run start the worker
173-
func (w *Worker) Run(task core.QueuedMessage) error {
174-
data, _ := task.(*queue.Job)
175-
176-
if err := w.handle(data); err != nil {
177-
return err
178-
}
179-
180-
return nil
181-
}
182-
183132
// Request a new task
184133
func (w *Worker) Request() (core.QueuedMessage, error) {
185134
clock := 0
@@ -190,7 +139,7 @@ loop:
190139
if !ok {
191140
return nil, queue.ErrQueueHasBeenClosed
192141
}
193-
var data queue.Job
142+
var data job.Message
194143
_ = json.Unmarshal([]byte(task.Payload), &data)
195144
return &data, nil
196145
case <-time.After(1 * time.Second):

redis_test.go

Lines changed: 3 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/golang-queue/queue"
1414
"github.com/golang-queue/queue/core"
15+
"github.com/golang-queue/queue/job"
1516

1617
"github.com/stretchr/testify/assert"
1718
"go.uber.org/goleak"
@@ -182,7 +183,7 @@ func TestJobReachTimeout(t *testing.T) {
182183
assert.NoError(t, err)
183184
q.Start()
184185
time.Sleep(50 * time.Millisecond)
185-
assert.NoError(t, q.QueueWithTimeout(20*time.Millisecond, m))
186+
assert.NoError(t, q.Queue(m, job.WithTimeout(20*time.Millisecond)))
186187
time.Sleep(2 * time.Second)
187188
q.Shutdown()
188189
q.Wait()
@@ -220,7 +221,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
220221
assert.NoError(t, err)
221222
q.Start()
222223
time.Sleep(50 * time.Millisecond)
223-
assert.NoError(t, q.QueueWithTimeout(3*time.Second, m))
224+
assert.NoError(t, q.Queue(m, job.WithTimeout(3*time.Second)))
224225
time.Sleep(2 * time.Second)
225226
q.Shutdown()
226227
q.Wait()
@@ -296,84 +297,3 @@ func TestGoroutinePanic(t *testing.T) {
296297
assert.Error(t, q.Queue(m))
297298
q.Wait()
298299
}
299-
300-
func TestHandleTimeout(t *testing.T) {
301-
job := &queue.Job{
302-
Timeout: 100 * time.Millisecond,
303-
Payload: []byte("foo"),
304-
}
305-
w := NewWorker(
306-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
307-
time.Sleep(200 * time.Millisecond)
308-
return nil
309-
}),
310-
)
311-
312-
err := w.handle(job)
313-
assert.Error(t, err)
314-
assert.Equal(t, context.DeadlineExceeded, err)
315-
assert.NoError(t, w.Shutdown())
316-
317-
job = &queue.Job{
318-
Timeout: 150 * time.Millisecond,
319-
Payload: []byte("foo"),
320-
}
321-
322-
w = NewWorker(
323-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
324-
time.Sleep(200 * time.Millisecond)
325-
return nil
326-
}),
327-
)
328-
329-
done := make(chan error)
330-
go func() {
331-
done <- w.handle(job)
332-
}()
333-
334-
assert.NoError(t, w.Shutdown())
335-
336-
err = <-done
337-
assert.Error(t, err)
338-
assert.Equal(t, context.DeadlineExceeded, err)
339-
}
340-
341-
func TestJobComplete(t *testing.T) {
342-
job := &queue.Job{
343-
Timeout: 100 * time.Millisecond,
344-
Payload: []byte("foo"),
345-
}
346-
w := NewWorker(
347-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
348-
return errors.New("job completed")
349-
}),
350-
)
351-
352-
err := w.handle(job)
353-
assert.Error(t, err)
354-
assert.Equal(t, errors.New("job completed"), err)
355-
assert.NoError(t, w.Shutdown())
356-
357-
job = &queue.Job{
358-
Timeout: 250 * time.Millisecond,
359-
Payload: []byte("foo"),
360-
}
361-
362-
w = NewWorker(
363-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
364-
time.Sleep(200 * time.Millisecond)
365-
return errors.New("job completed")
366-
}),
367-
)
368-
369-
done := make(chan error)
370-
go func() {
371-
done <- w.handle(job)
372-
}()
373-
374-
assert.NoError(t, w.Shutdown())
375-
376-
err = <-done
377-
assert.Error(t, err)
378-
assert.Equal(t, errors.New("job completed"), err)
379-
}

0 commit comments

Comments
 (0)