Skip to content

Commit f40b42c

Browse files
committed
feat(worker): support XReadGroup read data
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent a58b8d0 commit f40b42c

File tree

4 files changed

+52
-9
lines changed

4 files changed

+52
-9
lines changed

_example/server-client/client/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func main() {
3434
w := redisdb.NewWorker(
3535
redisdb.WithAddr("127.0.0.1:6379"),
3636
redisdb.WithStreamName("foobar"),
37+
redisdb.WithGroup("foobar"),
3738
redisdb.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
3839
var v *job
3940
if err := json.Unmarshal(m.Bytes(), &v); err != nil {

_example/server-client/server/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func main() {
2929
w := redisdb.NewWorker(
3030
redisdb.WithAddr("127.0.0.1:6379"),
3131
redisdb.WithStreamName("foobar"),
32+
redisdb.WithDisableConsumer(),
3233
)
3334

3435
// define the queue
@@ -48,7 +49,7 @@ func main() {
4849
}(i)
4950
}
5051

51-
time.Sleep(1 * time.Second)
52+
time.Sleep(2 * time.Second)
5253
// shutdown the service and notify all the worker
5354
q.Release()
5455
}

options.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ type options struct {
1919
password string
2020
streamName string
2121
cluster bool
22+
group string
23+
consumer string
24+
disableConsumer bool
2225
}
2326

2427
// WithAddr setup the addr of redis
@@ -49,6 +52,27 @@ func WithStreamName(name string) Option {
4952
}
5053
}
5154

55+
// WithGroup group name
56+
func WithGroup(name string) Option {
57+
return func(w *options) {
58+
w.group = name
59+
}
60+
}
61+
62+
// WithConsumer consumer name
63+
func WithConsumer(name string) Option {
64+
return func(w *options) {
65+
w.consumer = name
66+
}
67+
}
68+
69+
// WithDisableConsumer disable consumer
70+
func WithDisableConsumer() Option {
71+
return func(w *options) {
72+
w.disableConsumer = true
73+
}
74+
}
75+
5276
// WithPassword redis password
5377
func WithPassword(passwd string) Option {
5478
return func(w *options) {
@@ -80,7 +104,9 @@ func WithLogger(l queue.Logger) Option {
80104
func newOptions(opts ...Option) options {
81105
defaultOpts := options{
82106
addr: "127.0.0.1:6379",
83-
streamName: "queue",
107+
streamName: "golang-queue",
108+
group: "golang-queue",
109+
consumer: "golang-queue",
84110
logger: queue.NewLogger(),
85111
runFunc: func(context.Context, core.QueuedMessage) error {
86112
return nil

redis.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,20 @@ func NewWorker(opts ...Option) *Worker {
6464
w.opts.logger.Fatal(err)
6565
}
6666

67-
go w.fetchTask()
67+
if !w.opts.disableConsumer {
68+
err = w.rdb.XGroupCreateMkStream(
69+
context.Background(),
70+
w.opts.streamName,
71+
w.opts.group,
72+
"$",
73+
).Err()
74+
75+
if err != nil {
76+
w.opts.logger.Fatal(err)
77+
}
78+
79+
go w.fetchTask()
80+
}
6881

6982
return w
7083
}
@@ -78,8 +91,10 @@ func (w *Worker) fetchTask() {
7891
}
7992

8093
ctx := context.Background()
81-
data, err := w.rdb.XRead(ctx, &redis.XReadArgs{
82-
Streams: []string{w.opts.streamName, "0"},
94+
data, err := w.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
95+
Group: w.opts.group,
96+
Consumer: w.opts.consumer,
97+
Streams: []string{w.opts.streamName, ">"},
8398
// count is number of entries we want to read from redis
8499
Count: 1,
85100
// we use the block command to make sure if no entry is found we wait
@@ -95,12 +110,12 @@ func (w *Worker) fetchTask() {
95110
for _, message := range result.Messages {
96111
select {
97112
case w.tasks <- message:
98-
// delete message
99-
result := w.rdb.XDel(ctx, w.opts.streamName, message.ID)
100-
if result.Val() != 1 {
101-
w.opts.logger.Errorf("can't delete message: %s", message.ID)
113+
if err := w.rdb.XAck(ctx, w.opts.streamName, w.opts.group, message.ID).Err(); err != nil {
114+
w.opts.logger.Errorf("can't ack message: %s", message.ID)
102115
}
103116
case <-w.stop:
117+
// Todo: re-queue the task
118+
w.opts.logger.Info("re-queue the task: ", message.ID)
104119
return
105120
}
106121
}

0 commit comments

Comments
 (0)