Skip to content

Commit 9da76fb

Browse files
jk2KCopilot
andauthored
feat: redis support consumer groups (#4912)
Co-authored-by: Copilot <[email protected]>
1 parent b69db5e commit 9da76fb

File tree

2 files changed

+212
-0
lines changed

2 files changed

+212
-0
lines changed

core/stores/redis/redis.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2402,6 +2402,117 @@ func (s *Redis) ZunionstoreCtx(ctx context.Context, dest string, store *ZStore)
24022402
return conn.ZUnionStore(ctx, dest, store).Result()
24032403
}
24042404

2405+
func (s *Redis) XGroupCreateMkStream(stream string, group string, start string) (string, error) {
2406+
return s.XGroupCreateMkStreamCtx(context.Background(), stream, group, start)
2407+
}
2408+
2409+
func (s *Redis) XGroupCreateMkStreamCtx(ctx context.Context, stream string, group string, start string) (string, error) {
2410+
conn, err := getRedis(s)
2411+
if err != nil {
2412+
return "", err
2413+
}
2414+
return conn.XGroupCreateMkStream(ctx, stream, group, start).Result()
2415+
}
2416+
2417+
func (s *Redis) XGroupCreate(stream string, group string, start string) (string, error) {
2418+
return s.XGroupCreateCtx(context.Background(), stream, group, start)
2419+
}
2420+
2421+
func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string, start string) (string, error) {
2422+
conn, err := getRedis(s)
2423+
if err != nil {
2424+
return "", err
2425+
}
2426+
return conn.XGroupCreate(ctx, stream, group, start).Result()
2427+
}
2428+
2429+
func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) {
2430+
return s.XInfoConsumersCtx(context.Background(), stream, group)
2431+
}
2432+
2433+
func (s *Redis) XInfoConsumersCtx(ctx context.Context, stream string, group string) ([]red.XInfoConsumer, error) {
2434+
conn, err := getRedis(s)
2435+
if err != nil {
2436+
return nil, err
2437+
}
2438+
return conn.XInfoConsumers(ctx, stream, group).Result()
2439+
}
2440+
2441+
func (s *Redis) XInfoGroups(stream string) ([]red.XInfoGroup, error) {
2442+
return s.XInfoGroupsCtx(context.Background(), stream)
2443+
}
2444+
2445+
func (s *Redis) XInfoGroupsCtx(ctx context.Context, stream string) ([]red.XInfoGroup, error) {
2446+
conn, err := getRedis(s)
2447+
if err != nil {
2448+
return nil, err
2449+
}
2450+
return conn.XInfoGroups(ctx, stream).Result()
2451+
}
2452+
2453+
func (s *Redis) XInfoStream(stream string) (*red.XInfoStream, error) {
2454+
return s.XInfoStreamCtx(context.Background(), stream)
2455+
}
2456+
2457+
func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoStream, error) {
2458+
conn, err := getRedis(s)
2459+
if err != nil {
2460+
return nil, err
2461+
}
2462+
return conn.XInfoStream(ctx, stream).Result()
2463+
}
2464+
2465+
func (s *Redis) XAdd(stream string, noMkStream bool, id string, values interface{}) (string, error) {
2466+
return s.XAddCtx(context.Background(), stream, noMkStream, id, values)
2467+
}
2468+
2469+
func (s *Redis) XAddCtx(ctx context.Context, stream string, noMkStream bool, id string, values interface{}) (string, error) {
2470+
conn, err := getRedis(s)
2471+
if err != nil {
2472+
return "", err
2473+
}
2474+
return conn.XAdd(ctx, &red.XAddArgs{
2475+
Stream: stream,
2476+
ID: id,
2477+
Values: values,
2478+
NoMkStream: noMkStream,
2479+
}).Result()
2480+
}
2481+
2482+
func (s *Redis) XAck(stream string, group string, ids ...string) (int64, error) {
2483+
return s.XAckCtx(context.Background(), stream, group, ids...)
2484+
}
2485+
2486+
func (s *Redis) XAckCtx(ctx context.Context, stream string, group string, ids ...string) (int64, error) {
2487+
conn, err := getRedis(s)
2488+
if err != nil {
2489+
return 0, err
2490+
}
2491+
return conn.XAck(ctx, stream, group, ids...).Result()
2492+
}
2493+
2494+
/**
2495+
* streams: list of streams and ids, e.g. stream1 stream2 id1 id2
2496+
*/
2497+
func (s *Redis) XReadGroup(group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
2498+
return s.XReadGroupCtx(context.Background(), group, consumerId, count, block, noAck, streams...)
2499+
}
2500+
2501+
func (s *Redis) XReadGroupCtx(ctx context.Context, group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
2502+
conn, err := getRedis(s)
2503+
if err != nil {
2504+
return nil, err
2505+
}
2506+
return conn.XReadGroup(ctx, &red.XReadGroupArgs{
2507+
Group: group,
2508+
Consumer: consumerId,
2509+
Count: count,
2510+
Block: block,
2511+
NoAck: noAck,
2512+
Streams: streams,
2513+
}).Result()
2514+
}
2515+
24052516
func (s *Redis) checkConnection(pingTimeout time.Duration) error {
24062517
conn, err := getRedis(s)
24072518
if err != nil {

core/stores/redis/redis_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2175,3 +2175,104 @@ func TestRedisTxPipeline(t *testing.T) {
21752175
assert.Equal(t, hashValue, value)
21762176
})
21772177
}
2178+
2179+
func TestRedisXGroupCreate(t *testing.T) {
2180+
runOnRedis(t, func(client *Redis) {
2181+
_, err := newRedis(client.Addr, badType()).XGroupCreate("Source", "Destination", "0")
2182+
assert.NotNil(t, err)
2183+
2184+
redisCli := newRedis(client.Addr)
2185+
2186+
_, err = redisCli.XGroupCreate("aa", "bb", "0")
2187+
assert.NotNil(t, err)
2188+
2189+
_, err = newRedis(client.Addr, badType()).XGroupCreateMkStream("Source", "Destination", "0")
2190+
assert.NotNil(t, err)
2191+
2192+
_, err = redisCli.XGroupCreateMkStream("aa", "bb", "0")
2193+
assert.Nil(t, err)
2194+
2195+
_, err = redisCli.XGroupCreate("aa", "cc", "0")
2196+
assert.Nil(t, err)
2197+
})
2198+
}
2199+
2200+
func TestRedisXInfo(t *testing.T) {
2201+
runOnRedis(t, func(client *Redis) {
2202+
_, err := newRedis(client.Addr, badType()).XInfoStream("Source")
2203+
assert.NotNil(t, err)
2204+
_, err = newRedis(client.Addr, badType()).XInfoGroups("Source")
2205+
assert.NotNil(t, err)
2206+
_, err = newRedis(client.Addr, badType()).XReadGroup("aa", "consumer", 1, 2000, false, "ss", ">")
2207+
assert.NotNil(t, err)
2208+
_, err = newRedis(client.Addr, badType()).XInfoConsumers("aa", "bb")
2209+
assert.NotNil(t, err)
2210+
2211+
redisCli := newRedis(client.Addr)
2212+
2213+
stream := "aa"
2214+
group := "bb"
2215+
2216+
_, err = redisCli.XGroupCreateMkStream(stream, group, "$")
2217+
assert.Nil(t, err)
2218+
2219+
_, err = redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"})
2220+
assert.Nil(t, err)
2221+
2222+
infoStream, err := redisCli.XInfoStream(stream)
2223+
assert.Nil(t, err)
2224+
assert.Equal(t, int64(1), infoStream.Length)
2225+
2226+
infoGroups, err := redisCli.XInfoGroups(stream)
2227+
assert.Nil(t, err)
2228+
assert.Equal(t, int64(1), infoGroups[0].Lag)
2229+
assert.Equal(t, group, infoGroups[0].Name)
2230+
2231+
streamRes, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, ">")
2232+
assert.Nil(t, err)
2233+
assert.Equal(t, 1, len(streamRes))
2234+
assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"])
2235+
2236+
infoConsumers, err := redisCli.XInfoConsumers(stream, group)
2237+
assert.Nil(t, err)
2238+
assert.Equal(t, 1, len(infoConsumers))
2239+
})
2240+
}
2241+
2242+
func TestRedisXReadGroup(t *testing.T) {
2243+
runOnRedis(t, func(client *Redis) {
2244+
_, err := newRedis(client.Addr, badType()).XAdd("bb", true, "*", []string{"key1", "value1", "key2", "value2"})
2245+
assert.NotNil(t, err)
2246+
_, err = newRedis(client.Addr, badType()).XAck("bb", "aa", "123")
2247+
assert.NotNil(t, err)
2248+
2249+
redisCli := newRedis(client.Addr)
2250+
2251+
stream := "aa"
2252+
group := "bb"
2253+
2254+
_, err = redisCli.XGroupCreateMkStream(stream, group, "$")
2255+
assert.Nil(t, err)
2256+
2257+
_, err = redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"})
2258+
assert.Nil(t, err)
2259+
2260+
streamRes, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, ">")
2261+
assert.Nil(t, err)
2262+
assert.Equal(t, 1, len(streamRes))
2263+
assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"])
2264+
2265+
streamRes1, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, "0")
2266+
assert.Nil(t, err)
2267+
assert.Equal(t, 1, len(streamRes1))
2268+
assert.Equal(t, "value1", streamRes1[0].Messages[0].Values["key1"])
2269+
2270+
_, err = redisCli.XAck(stream, group, streamRes[0].Messages[0].ID)
2271+
assert.Nil(t, err)
2272+
2273+
streamRes2, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, "0")
2274+
assert.Nil(t, err)
2275+
assert.Greater(t, len(streamRes2), 0, "streamRes2 is empty")
2276+
assert.Equal(t, 0, len(streamRes2[0].Messages))
2277+
})
2278+
}

0 commit comments

Comments
 (0)