Skip to content

Commit 187ba96

Browse files
committed
chore: implement sub message in redis
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 5b6137e commit 187ba96

File tree

4 files changed

+126
-9
lines changed

4 files changed

+126
-9
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ go 1.16
55
require (
66
github.com/go-redis/redis/v8 v8.11.3
77
github.com/golang-queue/queue v0.0.7
8+
github.com/stretchr/testify v1.7.0
89
)

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
9292
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
9393
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
9494
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
95+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
9596
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
9697
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
9798
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=

redis.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package redisdb
22

33
import (
44
"context"
5+
"encoding/json"
6+
"fmt"
57
"sync"
68
"sync/atomic"
79
"time"
@@ -20,7 +22,7 @@ type Option func(*Worker)
2022
type Worker struct {
2123
// redis config
2224
rdb *redis.Client
23-
pubsub *redis.Pubsub
25+
pubsub *redis.PubSub
2426
addr string
2527
db int
2628
connectionString string
@@ -229,14 +231,8 @@ func (s *Worker) Queue(job queue.QueuedMessage) error {
229231

230232
ctx := context.Background()
231233

232-
// Wait for confirmation that subscription is created before publishing anything.
233-
_, err := s.pubsub.Receive(ctx)
234-
if err != nil {
235-
return err
236-
}
237-
238234
// Publish a message.
239-
err = s.rdb.Publish(ctx, s.channel, job).Err()
235+
err := s.rdb.Publish(ctx, s.channel, job.Bytes()).Err()
240236
if err != nil {
241237
return err
242238
}
@@ -253,5 +249,37 @@ func (s *Worker) Run() error {
253249
default:
254250
}
255251

256-
return nil
252+
var options []redis.ChannelOption
253+
ctx := context.Background()
254+
255+
if s.channelSize > 1 {
256+
options = append(options, redis.WithChannelSize(s.channelSize))
257+
}
258+
259+
ch := s.pubsub.Channel(options...)
260+
// make sure the connection is successful
261+
err := s.pubsub.Ping(ctx)
262+
if err != nil {
263+
return err
264+
}
265+
266+
for {
267+
select {
268+
case m, ok := <-ch:
269+
if !ok {
270+
return fmt.Errorf("redis pubsub: channel=%s closed", s.channel)
271+
}
272+
273+
var data queue.Job
274+
if err := json.Unmarshal([]byte(m.Payload), &data); err != nil {
275+
s.logger.Error("json unmarshal error: ", err)
276+
continue
277+
}
278+
if err := s.handle(data); err != nil {
279+
s.logger.Error("handle job error: ", err)
280+
}
281+
case <-s.stop:
282+
return nil
283+
}
284+
}
257285
}

redis_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,88 @@
11
package redisdb
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/golang-queue/queue"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
var host = "127.0.0.1"
14+
15+
type mockMessage struct {
16+
Message string
17+
}
18+
19+
func (m mockMessage) Bytes() []byte {
20+
return []byte(m.Message)
21+
}
22+
23+
func TestRedisDefaultFlow(t *testing.T) {
24+
m := &mockMessage{
25+
Message: "foo",
26+
}
27+
w := NewWorker(
28+
WithAddr(host+":6379"),
29+
WithChannel("test"),
30+
)
31+
q, err := queue.NewQueue(
32+
queue.WithWorker(w),
33+
queue.WithWorkerCount(2),
34+
)
35+
assert.NoError(t, err)
36+
q.Start()
37+
time.Sleep(100 * time.Millisecond)
38+
assert.NoError(t, q.Queue(m))
39+
m.Message = "bar"
40+
assert.NoError(t, q.Queue(m))
41+
q.Shutdown()
42+
q.Wait()
43+
}
44+
45+
func TestNSQShutdown(t *testing.T) {
46+
w := NewWorker(
47+
WithAddr(host+":6379"),
48+
WithChannel("test2"),
49+
)
50+
q, err := queue.NewQueue(
51+
queue.WithWorker(w),
52+
queue.WithWorkerCount(2),
53+
)
54+
assert.NoError(t, err)
55+
q.Start()
56+
time.Sleep(1 * time.Second)
57+
q.Shutdown()
58+
// check shutdown once
59+
assert.Error(t, w.Shutdown())
60+
assert.Equal(t, queue.ErrQueueShutdown, w.Shutdown())
61+
q.Wait()
62+
}
63+
64+
func TestGoroutinePanic(t *testing.T) {
65+
m := mockMessage{
66+
Message: "foo",
67+
}
68+
w := NewWorker(
69+
WithAddr(host+":6379"),
70+
WithChannel("GoroutinePanic"),
71+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
72+
panic("missing something")
73+
}),
74+
)
75+
q, err := queue.NewQueue(
76+
queue.WithWorker(w),
77+
queue.WithWorkerCount(2),
78+
)
79+
assert.NoError(t, err)
80+
q.Start()
81+
time.Sleep(50 * time.Millisecond)
82+
assert.NoError(t, q.Queue(m))
83+
assert.NoError(t, q.Queue(m))
84+
time.Sleep(200 * time.Millisecond)
85+
q.Shutdown()
86+
assert.Error(t, q.Queue(m))
87+
q.Wait()
88+
}

0 commit comments

Comments
 (0)