Skip to content

Commit 8dd18cd

Browse files
committed
add more tests
1 parent 71751b9 commit 8dd18cd

File tree

4 files changed

+125
-54
lines changed

4 files changed

+125
-54
lines changed

delayqueue.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,6 @@ func (q *DelayQueue) ack(idStr string) error {
514514

515515
func (q *DelayQueue) nack(idStr string) error {
516516
atomic.AddInt32(&q.fetchCount, -1)
517-
// update retry time as now, unack2Retry will move it to retry immediately
518517
err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{
519518
idStr: float64(time.Now().Add(q.nackRedeliveryDelay).Unix()),
520519
})

delayqueue_test.go

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ func TestDelayQueueOnCluster(t *testing.T) {
102102
}
103103
queue.afterConsume()
104104
}
105+
queue.garbageCollect()
105106
if succeed != size {
106107
t.Error("msg not consumed")
107108
}
@@ -126,7 +127,8 @@ func TestDelayQueue_ConcurrentConsume(t *testing.T) {
126127
WithFetchInterval(time.Millisecond * 50).
127128
WithMaxConsumeDuration(0).
128129
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
129-
WithConcurrent(4)
130+
WithConcurrent(4).
131+
WithScriptPreload(false)
130132

131133
for i := 0; i < size; i++ {
132134
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
@@ -341,56 +343,6 @@ func TestDelayQueue_FetchLimit(t *testing.T) {
341343
}
342344
}
343345

344-
func TestDelayQueue_ScriptPreload(t *testing.T) {
345-
redisCli := redis.NewClient(&redis.Options{
346-
Addr: "127.0.0.1:6379",
347-
})
348-
redisCli.FlushDB(context.Background())
349-
size := 101 // use a prime number may found some hidden bugs ^_^
350-
retryCount := 3
351-
mu := sync.Mutex{}
352-
deliveryCount := make(map[string]int)
353-
cb := func(s string) bool {
354-
mu.Lock()
355-
deliveryCount[s]++
356-
mu.Unlock()
357-
return true
358-
}
359-
queue := NewQueue("test", redisCli, cb).
360-
WithFetchInterval(time.Millisecond * 50).
361-
WithMaxConsumeDuration(0).
362-
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
363-
WithConcurrent(4).
364-
WithScriptPreload(true)
365-
366-
for i := 0; i < size; i++ {
367-
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
368-
if err != nil {
369-
t.Error(err)
370-
}
371-
}
372-
for i := 0; i < 2*size; i++ {
373-
if i == 2 {
374-
// random clean script cache
375-
redisCli.ScriptFlush(context.Background())
376-
}
377-
ids, err := queue.beforeConsume()
378-
if err != nil {
379-
t.Errorf("consume error: %v", err)
380-
return
381-
}
382-
for _, id := range ids {
383-
queue.callback(id)
384-
}
385-
queue.afterConsume()
386-
}
387-
for k, v := range deliveryCount {
388-
if v != 1 {
389-
t.Errorf("expect 1 delivery, actual %d. key: %s", v, k)
390-
}
391-
}
392-
}
393-
394346
func TestDelayQueue_NackRedeliveryDelay(t *testing.T) {
395347
redisCli := redis.NewClient(&redis.Options{
396348
Addr: "127.0.0.1:6379",

monitor.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,22 @@ func NewMonitor0(name string, cli RedisCli, opts ...interface{}) *Monitor {
1818
}
1919
}
2020

21-
// NewPublisher creates a new Publisher by a *redis.Client
21+
// NewMonitor creates a new Monitor by a *redis.Client
2222
func NewMonitor(name string, cli *redis.Client, opts ...interface{}) *Monitor {
2323
rc := &redisV9Wrapper{
2424
inner: cli,
2525
}
2626
return NewMonitor0(name, rc, opts...)
2727
}
2828

29+
// NewMonitor creates a new Monitor by a *redis.ClusterClient
30+
func NewMonitorOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *Monitor {
31+
rc := &redisClusterWrapper{
32+
inner: cli,
33+
}
34+
return NewMonitor0(name, rc, opts...)
35+
}
36+
2937
// WithLogger customizes logger for queue
3038
func (m *Monitor) WithLogger(logger *log.Logger) *Monitor {
3139
m.inner.logger = logger

monitor_test.go

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/redis/go-redis/v9"
1111
)
1212

13-
func TestMonitor_get_status(t *testing.T) {
13+
func TestMonitor_GetStatus(t *testing.T) {
1414
redisCli := redis.NewClient(&redis.Options{
1515
Addr: "127.0.0.1:6379",
1616
})
@@ -72,6 +72,72 @@ func TestMonitor_get_status(t *testing.T) {
7272
}
7373
}
7474

75+
func TestMonitor_Cluster_GetStatus(t *testing.T) {
76+
redisCli := redis.NewClusterClient(&redis.ClusterOptions{
77+
Addrs: []string{
78+
"127.0.0.1:7000",
79+
"127.0.0.1:7001",
80+
"127.0.0.1:7002",
81+
},
82+
})
83+
redisCli.FlushDB(context.Background())
84+
size := 1000
85+
cb := func(s string) bool {
86+
return true
87+
}
88+
logger := log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)
89+
queue := NewQueueOnCluster("test", redisCli, cb)
90+
monitor := NewMonitorOnCluster("test", redisCli).WithLogger(logger)
91+
92+
for i := 0; i < size; i++ {
93+
err := queue.SendDelayMsg(strconv.Itoa(i), 0)
94+
if err != nil {
95+
t.Error(err)
96+
}
97+
}
98+
99+
// test pengding count
100+
pending, err := monitor.GetPendingCount()
101+
if err != nil {
102+
t.Error(err)
103+
return
104+
}
105+
if int(pending) != size {
106+
t.Errorf("execting %d, got %d", int(pending), size)
107+
return
108+
}
109+
110+
// test ready count
111+
err = queue.pending2Ready()
112+
if err != nil {
113+
t.Errorf("consume error: %v", err)
114+
return
115+
}
116+
ready, err := monitor.GetReadyCount()
117+
if err != nil {
118+
t.Error(err)
119+
return
120+
}
121+
if int(ready) != size {
122+
t.Errorf("execting %d, got %d", int(pending), size)
123+
return
124+
}
125+
126+
// test processing count
127+
for i := 0; i < size/2; i++ {
128+
_, _ = queue.ready2Unack()
129+
}
130+
processing, err := monitor.GetProcessingCount()
131+
if err != nil {
132+
t.Error(err)
133+
return
134+
}
135+
if int(processing) != size/2 {
136+
t.Errorf("execting %d, got %d", int(pending), size/2)
137+
return
138+
}
139+
}
140+
75141
type MyProfiler struct {
76142
ProduceCount int
77143
DeliverCount int
@@ -137,6 +203,52 @@ func TestMonitor_listener1(t *testing.T) {
137203
}
138204
}
139205

206+
func TestMonitor_Cluster_listener1(t *testing.T) {
207+
redisCli := redis.NewClusterClient(&redis.ClusterOptions{
208+
Addrs: []string{
209+
"127.0.0.1:7000",
210+
"127.0.0.1:7001",
211+
"127.0.0.1:7002",
212+
},
213+
})
214+
redisCli.FlushDB(context.Background())
215+
size := 1000
216+
cb := func(s string) bool {
217+
return true
218+
}
219+
queue := NewQueueOnCluster("test", redisCli, cb)
220+
queue.EnableReport()
221+
monitor := NewMonitorOnCluster("test", redisCli)
222+
profile := &MyProfiler{}
223+
monitor.ListenEvent(profile)
224+
225+
for i := 0; i < size; i++ {
226+
err := queue.SendDelayMsg(strconv.Itoa(i), 0)
227+
if err != nil {
228+
t.Error(err)
229+
}
230+
}
231+
ids, err := queue.beforeConsume()
232+
if err != nil {
233+
t.Errorf("consume error: %v", err)
234+
return
235+
}
236+
for _, id := range ids {
237+
queue.callback(id)
238+
}
239+
queue.afterConsume()
240+
241+
if profile.ProduceCount != size {
242+
t.Error("wrong produce count")
243+
}
244+
if profile.DeliverCount != size {
245+
t.Error("wrong deliver count")
246+
}
247+
if profile.ConsumeCount != size {
248+
t.Error("wrong consume count")
249+
}
250+
}
251+
140252
func TestMonitor_listener2(t *testing.T) {
141253
redisCli := redis.NewClient(&redis.Options{
142254
Addr: "127.0.0.1:6379",

0 commit comments

Comments
 (0)