Skip to content

Commit d3d82fa

Browse files
authored
feat:rocketmq support msg delay level (#31)
Co-authored-by: Peter Wu <wuyukun@qudian.com>
1 parent 4c34be8 commit d3d82fa

File tree

9 files changed

+265
-174
lines changed

9 files changed

+265
-174
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## v0.1.29(2023-08-03)
2+
3+
### New Features
4+
- 社区版rocketmq支持延时消息
5+
-
16
## v0.1.23(2020-11-13)
27

38
### New Features

cache/rediscache/redis_cache.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ type RedisCache struct {
1818
client *goredis.Client
1919
}
2020

21-
//实例模式
21+
// 实例模式
2222
func newRedisCache(diName string) cache.Cache {
2323
m := new(RedisCache)
2424
m.client = redis.GetRedis(diName)
2525
return m
2626
}
2727

28-
//单例模式
28+
// 单例模式
2929
func GetRedisCache(diName string) cache.Cache {
3030
key := diName
3131
mu.RLock()

go.mod

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,52 +5,51 @@ go 1.12
55
require (
66
github.com/aliyun/aliyun-mns-go-sdk v1.0.2
77
github.com/aliyunmq/mq-http-go-sdk v1.0.3
8-
github.com/andybalholm/brotli v1.0.4 // indirect
9-
github.com/apache/rocketmq-client-go/v2 v2.1.0
8+
github.com/apache/rocketmq-client-go/v2 v2.1.1
9+
github.com/bytedance/sonic v1.10.0-rc3 // indirect
10+
github.com/cespare/xxhash/v2 v2.2.0 // indirect
11+
github.com/emirpasic/gods v1.18.1 // indirect
1012
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
1113
github.com/fsnotify/fsnotify v1.5.1 // indirect
1214
github.com/fvbock/endless v0.0.0-20170109170031-447134032cb6
13-
github.com/gin-gonic/gin v1.7.7
14-
github.com/go-playground/validator/v10 v10.9.0 // indirect
15-
github.com/go-redis/redis/v8 v8.11.4
16-
github.com/go-sql-driver/mysql v1.6.0
17-
github.com/goccy/go-json v0.8.1 // indirect
15+
github.com/gin-gonic/gin v1.9.1
16+
github.com/go-playground/validator/v10 v10.14.1 // indirect
17+
github.com/go-redis/redis/v8 v8.11.5
18+
github.com/go-sql-driver/mysql v1.7.1
1819
github.com/gogap/errors v0.0.0-20210818113853-edfbba0ddea9
1920
github.com/gogap/stack v0.0.0-20150131034635-fef68dddd4f8 // indirect
2021
github.com/golang/mock v1.6.0 // indirect
21-
github.com/golang/snappy v0.0.4 // indirect
2222
github.com/google/uuid v1.3.0
2323
github.com/gopherjs/gopherjs v0.0.0-20211111143520-d0d5ecc1a356 // indirect
2424
github.com/hetiansu5/accesslog v1.0.0
2525
github.com/hetiansu5/cores v1.0.0
2626
github.com/jonboulle/clockwork v0.2.2 // indirect
27-
github.com/json-iterator/go v1.1.12 // indirect
28-
github.com/klauspost/compress v1.13.6 // indirect
27+
github.com/klauspost/compress v1.16.7 // indirect
28+
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
29+
github.com/kr/pretty v0.3.0 // indirect
2930
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
3031
github.com/lestrrat-go/strftime v1.0.5 // indirect
31-
github.com/mattn/go-isatty v0.0.14 // indirect
32-
github.com/onsi/ginkgo v1.16.5 // indirect
33-
github.com/onsi/gomega v1.17.0 // indirect
32+
github.com/pelletier/go-toml/v2 v2.0.9 // indirect
3433
github.com/pkg/errors v0.9.1 // indirect
3534
github.com/qit-team/work v0.3.11
3635
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
3736
github.com/robfig/cron v1.2.0
38-
github.com/sirupsen/logrus v1.8.1
39-
github.com/smartystreets/assertions v1.2.1 // indirect
37+
github.com/rogpeppe/go-internal v1.8.0 // indirect
38+
github.com/sirupsen/logrus v1.9.3
4039
github.com/smartystreets/goconvey v1.7.2 // indirect
41-
github.com/tidwall/gjson v1.12.1 // indirect
42-
github.com/ugorji/go v1.2.6 // indirect
43-
github.com/valyala/fasthttp v1.31.0 // indirect
40+
github.com/tidwall/gjson v1.15.0 // indirect
41+
github.com/tidwall/pretty v1.2.1 // indirect
42+
github.com/ugorji/go v1.2.11 // indirect
43+
github.com/valyala/fasthttp v1.48.0 // indirect
4444
github.com/valyala/fasttemplate v1.2.1 // indirect
45-
go.uber.org/atomic v1.9.0 // indirect
46-
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
47-
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
48-
golang.org/x/sys v0.0.0-20211213223007-03aa0b5f6827 // indirect
49-
golang.org/x/tools v0.1.8 // indirect
50-
google.golang.org/protobuf v1.27.1 // indirect
45+
go.uber.org/atomic v1.11.0 // indirect
46+
golang.org/x/arch v0.4.0 // indirect
47+
golang.org/x/net v0.13.0 // indirect
48+
google.golang.org/protobuf v1.31.0 // indirect
49+
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
5150
modernc.org/ccgo/v3 v3.12.95 // indirect
52-
modernc.org/sqlite v1.14.2 // indirect
5351
modernc.org/tcl v1.9.2 // indirect
52+
xorm.io/builder v0.3.13 // indirect
5453
xorm.io/core v0.7.3
55-
xorm.io/xorm v1.2.5
54+
xorm.io/xorm v1.3.2
5655
)

go.sum

Lines changed: 156 additions & 92 deletions
Large diffs are not rendered by default.

queue/alimnsqueue/alimns_queue.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func (m *MnsQueue) Enqueue(ctx context.Context, key string, message string, args
7575

7676
/**
7777
* 队列消息出队
78+
* args[0] 消息下次可见时间
7879
* return 第一个参数是消息 第二个参数是mns的ReceiptHandle命名为token,通过token确定消息是否从队列删除
7980
*/
8081
func (m *MnsQueue) Dequeue(ctx context.Context, key string, args ...interface{}) (message string, tag string, token string, dequeueCount int64, err error) {
@@ -91,8 +92,16 @@ func (m *MnsQueue) Dequeue(ctx context.Context, key string, args ...interface{})
9192

9293
select {
9394
case resp := <-respChan:
95+
visibilityTimeout := DefaultVisibilityTimeout
96+
l := len(args)
97+
if l > 0 {
98+
vt, ok := args[0].(int64)
99+
if ok {
100+
visibilityTimeout = vt
101+
}
102+
}
94103
//代表N秒内其他并发队列不可见这条消息
95-
if ret, err1 := queueClient.ChangeMessageVisibility(resp.ReceiptHandle, DefaultVisibilityTimeout); err1 != nil {
104+
if ret, err1 := queueClient.ChangeMessageVisibility(resp.ReceiptHandle, visibilityTimeout); err1 != nil {
96105
err = err1
97106
} else {
98107
//处理resp.MessageBody 阿里这什么sdk 也不说明各个函数作用。。。暂时就按照demo例子里用到的函数写了

queue/alirocketqueue/alirocket_queue.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ type AliyunMq struct {
2626
client mq_http_sdk.MQClient
2727
}
2828

29-
//new实例
29+
// new实例
3030
func newAliyunMq(diName string) queue.Queue {
3131
m := new(AliyunMq)
3232
m.client = aliyunmq.GetAliyunMq(diName)
3333

3434
return m
3535
}
3636

37-
//单例模式
37+
// 单例模式
3838
func GetAliyunRocketQueue(diName string) queue.Queue {
3939
key := diName
4040
mu.RLock()
@@ -66,7 +66,6 @@ func (m *AliyunMq) Enqueue(ctx context.Context, key string, message string, args
6666
mqMsg := mq_http_sdk.PublishMessageRequest{
6767
MessageBody: message,
6868
}
69-
7069
_, err := mqProducer.PublishMessage(mqMsg)
7170
if err != nil {
7271
return false, err

queue/rocketqueue/rocket_queue.go

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ type RocketQueue struct {
3535
producerOnce sync.Once
3636
}
3737

38-
func (rq *RocketQueue) initProducer(ctx context.Context) error {
38+
func (m *RocketQueue) initProducer(ctx context.Context) error {
3939
var err error
40-
rq.producerOnce.Do(
40+
m.producerOnce.Do(
4141
func() {
42-
err = rq.Producer.Start()
42+
err = m.Producer.Start()
4343
if err != nil {
4444
logger.Fatal(ctx, "RocketQueue:Producer:Start", err.Error())
4545
return
@@ -48,11 +48,11 @@ func (rq *RocketQueue) initProducer(ctx context.Context) error {
4848
return err
4949
}
5050

51-
func (rq *RocketQueue) initConsumer(ctx context.Context, topic, messageTag string, num int) error {
51+
func (m *RocketQueue) initConsumer(ctx context.Context, topic, messageTag string, num int) error {
5252
var err error
53-
rq.consumerOnce.Do(
53+
m.consumerOnce.Do(
5454
func() {
55-
rq.consumerMessageChan = make(chan *primitive.MessageExt, num)
55+
m.consumerMessageChan = make(chan *primitive.MessageExt, num)
5656

5757
var selector consumer.MessageSelector
5858
if len(messageTag) > 0 {
@@ -61,10 +61,10 @@ func (rq *RocketQueue) initConsumer(ctx context.Context, topic, messageTag strin
6161
Expression: messageTag,
6262
}
6363
}
64-
err = rq.Consumer.Subscribe(topic, selector, func(ctx context.Context, messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
64+
err = m.Consumer.Subscribe(topic, selector, func(ctx context.Context, messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
6565
// 取到的消息放入管道,交给下游处理
6666
for _, msg := range messages {
67-
rq.consumerMessageChan <- msg
67+
m.consumerMessageChan <- msg
6868
}
6969

7070
return consumer.ConsumeSuccess, nil
@@ -74,7 +74,7 @@ func (rq *RocketQueue) initConsumer(ctx context.Context, topic, messageTag strin
7474
return
7575
}
7676

77-
err = rq.Consumer.Start()
77+
err = m.Consumer.Start()
7878
if err != nil {
7979
logger.Fatal(ctx, "RocketQueue:Start", err.Error())
8080
return
@@ -95,8 +95,8 @@ func (rq *RocketQueue) initConsumer(ctx context.Context, topic, messageTag strin
9595
sig := <-c //blocked
9696
switch sig {
9797
case syscall.SIGINT, syscall.SIGTERM:
98-
close(rq.consumerMessageChan)
99-
err = rq.Consumer.Shutdown()
98+
close(m.consumerMessageChan)
99+
err = m.Consumer.Shutdown()
100100
if err != nil {
101101
logger.Error(ctx, "Shutdown.Failure", err.Error())
102102
return
@@ -116,7 +116,7 @@ func (rq *RocketQueue) initConsumer(ctx context.Context, topic, messageTag strin
116116
return nil
117117
}
118118

119-
//new实例
119+
// new实例
120120
func newRocketQueue(diName string) queue.Queue {
121121
m := new(RocketQueue)
122122
client := rkmq.GetRocketMq(diName)
@@ -127,7 +127,9 @@ func newRocketQueue(diName string) queue.Queue {
127127
return m
128128
}
129129

130-
//单例模式
130+
// GetRocketQueue
131+
//
132+
// 单例模式
131133
func GetRocketQueue(diName string) queue.Queue {
132134
key := diName
133135
mu.RLock()
@@ -145,18 +147,15 @@ func GetRocketQueue(diName string) queue.Queue {
145147
return q
146148
}
147149

148-
/**
149-
* 队列消息入队
150-
* args[0] instanceId
151-
*/
150+
// Enqueue 队列消息入队
151+
//
152+
// args[0] instanceId
152153
func (m *RocketQueue) Enqueue(ctx context.Context, key string, message string, args ...interface{}) (bool, error) {
153-
154154
err := m.initProducer(ctx)
155155
if err != nil {
156156
return false, err
157157
}
158-
159-
_, _, messageTag := getOption(args...)
158+
_, _, messageTag, timeLevel := getOption(args...)
160159
log.Printf("messageTag: %v", messageTag)
161160
if len(messageTag) > 0 {
162161
tags := strings.Split(messageTag, "||")
@@ -167,6 +166,10 @@ func (m *RocketQueue) Enqueue(ctx context.Context, key string, message string, a
167166
Body: []byte(message),
168167
}
169168
msg.WithTag(tag)
169+
// https://rocketmq.apache.org/docs/4.x/producer/04message3/
170+
if timeLevel > 0 && timeLevel <= 18 {
171+
msg.WithDelayTimeLevel(timeLevel)
172+
}
170173
log.Printf("send for tag: %v", tag)
171174
res, err := m.Producer.SendSync(context.Background(), msg)
172175
if err != nil {
@@ -180,6 +183,10 @@ func (m *RocketQueue) Enqueue(ctx context.Context, key string, message string, a
180183
Topic: key,
181184
Body: []byte(message),
182185
}
186+
// https://rocketmq.apache.org/docs/4.x/producer/04message3/
187+
if timeLevel > 0 && timeLevel <= 18 {
188+
msg.WithDelayTimeLevel(timeLevel)
189+
}
183190
res, err := m.Producer.SendSync(ctx, msg)
184191
if err != nil {
185192
return false, err
@@ -191,13 +198,13 @@ func (m *RocketQueue) Enqueue(ctx context.Context, key string, message string, a
191198
return true, nil
192199
}
193200

194-
/**
195-
* 队列消息出队
196-
* param 第二个参数是队列名称,args[0]是instanceId,args[1]是groupId,目前只有rocketmq需要groupId
197-
* return 第一个参数是消息 第二个参数是aliyunmq的ReceiptHandle命名为token,通过token确定消息是否从队列删除,第三个参数为消费次数
198-
*/
201+
// Dequeue 队列消息出队
202+
//
203+
// param 第二个参数是队列名称,args[0]是instanceId,args[1]是groupId,目前只有rocketmq需要groupId
204+
//
205+
// return 第一个参数是消息 第二个参数是aliyunmq的ReceiptHandle命名为token,通过token确定消息是否从队列删除,第三个参数为消费次数
199206
func (m *RocketQueue) Dequeue(ctx context.Context, key string, args ...interface{}) (message string, tag string, token string, dequeueCount int64, err error) {
200-
_, _, messageTag := getOption(args...)
207+
_, _, messageTag, _ := getOption(args...)
201208

202209
err = m.initConsumer(ctx, key, messageTag, 5)
203210
if err != nil {
@@ -213,11 +220,9 @@ func (m *RocketQueue) Dequeue(ctx context.Context, key string, args ...interface
213220
}
214221
}
215222

216-
/**
217-
* 队列消息批量入队
218-
* args[0] instanceId
219-
* 注:rocket其实没有批量函数,所以循环调用publishMsg方法
220-
*/
223+
// BatchEnqueue 队列消息批量入队
224+
// args[0] instanceId
225+
// 注:rocket其实没有批量函数,所以循环调用publishMsg方法
221226
func (m *RocketQueue) BatchEnqueue(ctx context.Context, key string, messageList []string, args ...interface{}) (bool, error) {
222227
if len(messageList) == 0 {
223228
return false, errors.New("messageList is empty")
@@ -233,21 +238,20 @@ func (m *RocketQueue) BatchEnqueue(ctx context.Context, key string, messageList
233238
return true, nil
234239
}
235240

236-
/**
237-
* 确认消息接收
238-
* args[0]是instanceId,args[1]是groupId,args[2]是messageTag
239-
*/
241+
// AckMsg 确认消息接收
242+
// args[0]是instanceId,args[1]是groupId,args[2]是messageTag, args[2]是delayTimeLevel
240243
func (m *RocketQueue) AckMsg(ctx context.Context, key string, token string, args ...interface{}) (bool, error) {
241244
return true, nil
242245
}
243246

244-
// 缺省参数统一获取
245-
// args[0]是instanceId,args[1]是groupId,args[2]是messageTag
246-
func getOption(args ...interface{}) (instanceId, groupId, messageTag string) {
247+
// getOption 缺省参数统一获取
248+
//
249+
// args[0]是instanceId,args[1]是groupId,args[2]是messageTag, args[3]是delayTimeLevel
250+
func getOption(args ...interface{}) (instanceId, groupId, messageTag string, delayTimeLevel int) {
247251
instanceId = ""
248252
groupId = ""
249253
messageTag = ""
250-
254+
delayTimeLevel = 0
251255
l := len(args)
252256
if l > 0 {
253257
tempInstance, ok := args[0].(string)
@@ -266,6 +270,12 @@ func getOption(args ...interface{}) (instanceId, groupId, messageTag string) {
266270
messageTag = tempTag
267271
}
268272
}
273+
if l > 3 {
274+
tempDelayTimeLevel, ok := args[3].(int)
275+
if ok {
276+
delayTimeLevel = tempDelayTimeLevel
277+
}
278+
}
269279
}
270280
return
271281
}

queue/rocketqueue/rocket_queue_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ package rocketqueue
33
import (
44
"context"
55
"fmt"
6-
"github.com/qit-team/snow-core/log/logger"
76
"io/ioutil"
87
"log"
98
"strings"
109
"testing"
1110

11+
"github.com/qit-team/snow-core/log/logger"
12+
1213
"github.com/qit-team/snow-core/config"
1314
"github.com/qit-team/snow-core/queue"
1415
"github.com/qit-team/snow-core/rocketmq"
@@ -175,7 +176,7 @@ func TestBatchEnqueueEmpty(t *testing.T) {
175176
}
176177

177178
func Test_getOption(t *testing.T) {
178-
instanceId, groupId, _ := getOption("", "GID-SNOW-TOPIC-TEST")
179+
instanceId, groupId, _, _ := getOption("", "GID-SNOW-TOPIC-TEST")
179180
if instanceId != "" {
180181
t.Errorf("delay is not equal 1. %s", instanceId)
181182
} else if groupId != "GID-SNOW-TOPIC-TEST" {

0 commit comments

Comments
 (0)