Skip to content

Commit bd0d1c2

Browse files
authored
Add support for BLMPOP (#2442)
* Add support for BLMPOP
1 parent d2c53bd commit bd0d1c2

File tree

2 files changed

+91
-0
lines changed

2 files changed

+91
-0
lines changed

commands.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ type Cmdable interface {
218218
HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd
219219

220220
BLPop(ctx context.Context, timeout time.Duration, keys ...string) *StringSliceCmd
221+
BLMPop(ctx context.Context, timeout time.Duration, direction string, count int64, keys ...string) *KeyValuesCmd
221222
BRPop(ctx context.Context, timeout time.Duration, keys ...string) *StringSliceCmd
222223
BRPopLPush(ctx context.Context, source, destination string, timeout time.Duration) *StringCmd
223224
LIndex(ctx context.Context, key string, index int64) *StringCmd
@@ -1432,6 +1433,21 @@ func (c cmdable) BLPop(ctx context.Context, timeout time.Duration, keys ...strin
14321433
return cmd
14331434
}
14341435

1436+
func (c cmdable) BLMPop(ctx context.Context, timeout time.Duration, direction string, count int64, keys ...string) *KeyValuesCmd {
1437+
args := make([]interface{}, 3+len(keys), 6+len(keys))
1438+
args[0] = "blmpop"
1439+
args[1] = formatSec(ctx, timeout)
1440+
args[2] = len(keys)
1441+
for i, key := range keys {
1442+
args[3+i] = key
1443+
}
1444+
args = append(args, strings.ToLower(direction), "count", count)
1445+
cmd := NewKeyValuesCmd(ctx, args...)
1446+
cmd.setReadTimeout(timeout)
1447+
_ = c(ctx, cmd)
1448+
return cmd
1449+
}
1450+
14351451
func (c cmdable) BRPop(ctx context.Context, timeout time.Duration, keys ...string) *StringSliceCmd {
14361452
args := make([]interface{}, 1+len(keys)+1)
14371453
args[0] = "brpop"

commands_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2312,6 +2312,81 @@ var _ = Describe("Commands", func() {
23122312
Expect(err).To(HaveOccurred())
23132313
})
23142314

2315+
It("should BLMPop", func() {
2316+
err := client.LPush(ctx, "list1", "one", "two", "three", "four", "five").Err()
2317+
Expect(err).NotTo(HaveOccurred())
2318+
2319+
err = client.LPush(ctx, "list2", "a", "b", "c", "d", "e").Err()
2320+
Expect(err).NotTo(HaveOccurred())
2321+
2322+
key, val, err := client.BLMPop(ctx, 0, "left", 3, "list1", "list2").Result()
2323+
Expect(err).NotTo(HaveOccurred())
2324+
Expect(key).To(Equal("list1"))
2325+
Expect(val).To(Equal([]string{"five", "four", "three"}))
2326+
2327+
key, val, err = client.BLMPop(ctx, 0, "right", 3, "list1", "list2").Result()
2328+
Expect(err).NotTo(HaveOccurred())
2329+
Expect(key).To(Equal("list1"))
2330+
Expect(val).To(Equal([]string{"one", "two"}))
2331+
2332+
key, val, err = client.BLMPop(ctx, 0, "left", 1, "list1", "list2").Result()
2333+
Expect(err).NotTo(HaveOccurred())
2334+
Expect(key).To(Equal("list2"))
2335+
Expect(val).To(Equal([]string{"e"}))
2336+
2337+
key, val, err = client.BLMPop(ctx, 0, "right", 10, "list1", "list2").Result()
2338+
Expect(err).NotTo(HaveOccurred())
2339+
Expect(key).To(Equal("list2"))
2340+
Expect(val).To(Equal([]string{"a", "b", "c", "d"}))
2341+
2342+
})
2343+
2344+
It("should BLMPopBlocks", func() {
2345+
started := make(chan bool)
2346+
done := make(chan bool)
2347+
go func() {
2348+
defer GinkgoRecover()
2349+
2350+
started <- true
2351+
key, val, err := client.BLMPop(ctx, 0, "left", 1, "list_list").Result()
2352+
Expect(err).NotTo(HaveOccurred())
2353+
Expect(key).To(Equal("list_list"))
2354+
Expect(val).To(Equal([]string{"a"}))
2355+
done <- true
2356+
}()
2357+
<-started
2358+
2359+
select {
2360+
case <-done:
2361+
Fail("BLMPop is not blocked")
2362+
case <-time.After(time.Second):
2363+
//ok
2364+
}
2365+
2366+
_, err := client.LPush(ctx, "list_list", "a").Result()
2367+
Expect(err).NotTo(HaveOccurred())
2368+
2369+
select {
2370+
case <-done:
2371+
//ok
2372+
case <-time.After(time.Second):
2373+
Fail("BLMPop is still blocked")
2374+
}
2375+
})
2376+
2377+
It("should BLMPop timeout", func() {
2378+
_, val, err := client.BLMPop(ctx, time.Second, "left", 1, "list1").Result()
2379+
Expect(err).To(Equal(redis.Nil))
2380+
Expect(val).To(BeNil())
2381+
2382+
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
2383+
2384+
stats := client.PoolStats()
2385+
Expect(stats.Hits).To(Equal(uint32(2)))
2386+
Expect(stats.Misses).To(Equal(uint32(1)))
2387+
Expect(stats.Timeouts).To(Equal(uint32(0)))
2388+
})
2389+
23152390
It("should LLen", func() {
23162391
lPush := client.LPush(ctx, "list", "World")
23172392
Expect(lPush.Err()).NotTo(HaveOccurred())

0 commit comments

Comments
 (0)