Skip to content

Commit 2136351

Browse files
authored
Merge branch 'master' into feat-add-rpc-name
2 parents c5b96d0 + 569c00a commit 2136351

File tree

2 files changed

+290
-1
lines changed

2 files changed

+290
-1
lines changed

core/stores/redis/redis.go

Lines changed: 150 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ type (
6565
// RedisNode interface represents a redis node.
6666
RedisNode interface {
6767
red.Cmdable
68-
red.BitMapCmdable
6968
}
7069

7170
// GeoLocation is used with GeoAdd to add geospatial location.
@@ -1285,10 +1284,12 @@ func (s *Redis) RpushCtx(ctx context.Context, key string, values ...any) (int, e
12851284
return int(v), nil
12861285
}
12871286

1287+
// RPopLPush atomically removes the last element from source list and prepends it to destination list.
12881288
func (s *Redis) RPopLPush(source string, destination string) (string, error) {
12891289
return s.RPopLPushCtx(context.Background(), source, destination)
12901290
}
12911291

1292+
// RPopLPushCtx is the context-aware version of RPopLPush.
12921293
func (s *Redis) RPopLPushCtx(ctx context.Context, source string, destination string) (string, error) {
12931294
conn, err := getRedis(s)
12941295
if err != nil {
@@ -1695,14 +1696,17 @@ func (s *Redis) TtlCtx(ctx context.Context, key string) (int, error) {
16951696
return int(duration), nil
16961697
}
16971698

1699+
// TxPipeline returns a Redis transaction pipeline for executing multiple commands atomically.
16981700
func (s *Redis) TxPipeline() (pipe Pipeliner, err error) {
16991701
conn, err := getRedis(s)
17001702
if err != nil {
17011703
return nil, err
17021704
}
1705+
17031706
return conn.TxPipeline(), nil
17041707
}
17051708

1709+
// Unlink is similar to Del but removes keys asynchronously in a separate thread.
17061710
func (s *Redis) Unlink(keys ...string) (int64, error) {
17071711
return s.UnlinkCtx(context.Background(), keys...)
17081712
}
@@ -1712,9 +1716,154 @@ func (s *Redis) UnlinkCtx(ctx context.Context, keys ...string) (int64, error) {
17121716
if err != nil {
17131717
return 0, err
17141718
}
1719+
17151720
return conn.Unlink(ctx, keys...).Result()
17161721
}
17171722

1723+
// XAck acknowledges one or more messages in a Redis stream consumer group.
1724+
// It marks the specified messages as successfully processed.
1725+
func (s *Redis) XAck(stream string, group string, ids ...string) (int64, error) {
1726+
return s.XAckCtx(context.Background(), stream, group, ids...)
1727+
}
1728+
1729+
// XAckCtx is the context-aware version of XAck.
1730+
func (s *Redis) XAckCtx(ctx context.Context, stream string, group string, ids ...string) (int64, error) {
1731+
conn, err := getRedis(s)
1732+
if err != nil {
1733+
return 0, err
1734+
}
1735+
1736+
return conn.XAck(ctx, stream, group, ids...).Result()
1737+
}
1738+
1739+
// XAdd adds a new entry to a Redis stream with the specified ID and field-value pairs.
1740+
// If noMkStream is true, the command will fail if the stream doesn't exist.
1741+
func (s *Redis) XAdd(stream string, noMkStream bool, id string, values any) (string, error) {
1742+
return s.XAddCtx(context.Background(), stream, noMkStream, id, values)
1743+
}
1744+
1745+
// XAddCtx is the context-aware version of XAdd.
1746+
func (s *Redis) XAddCtx(ctx context.Context, stream string, noMkStream bool, id string, values any) (
1747+
string, error) {
1748+
conn, err := getRedis(s)
1749+
if err != nil {
1750+
return "", err
1751+
}
1752+
1753+
return conn.XAdd(ctx, &red.XAddArgs{
1754+
Stream: stream,
1755+
ID: id,
1756+
Values: values,
1757+
NoMkStream: noMkStream,
1758+
}).Result()
1759+
}
1760+
1761+
// XGroupCreateMkStream creates a consumer group for a Redis stream.
1762+
// If the stream doesn't exist, it will be created automatically.
1763+
func (s *Redis) XGroupCreateMkStream(stream string, group string, start string) (string, error) {
1764+
return s.XGroupCreateMkStreamCtx(context.Background(), stream, group, start)
1765+
}
1766+
1767+
// XGroupCreateMkStreamCtx is the context-aware version of XGroupCreateMkStream.
1768+
func (s *Redis) XGroupCreateMkStreamCtx(ctx context.Context, stream string, group string,
1769+
start string) (string, error) {
1770+
conn, err := getRedis(s)
1771+
if err != nil {
1772+
return "", err
1773+
}
1774+
1775+
return conn.XGroupCreateMkStream(ctx, stream, group, start).Result()
1776+
}
1777+
1778+
// XGroupCreate creates a consumer group for a Redis stream.
1779+
// The stream must already exist, otherwise the command will fail.
1780+
func (s *Redis) XGroupCreate(stream string, group string, start string) (string, error) {
1781+
return s.XGroupCreateCtx(context.Background(), stream, group, start)
1782+
}
1783+
1784+
// XGroupCreateCtx is the context-aware version of XGroupCreate.
1785+
func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string, start string) (
1786+
string, error) {
1787+
conn, err := getRedis(s)
1788+
if err != nil {
1789+
return "", err
1790+
}
1791+
1792+
return conn.XGroupCreate(ctx, stream, group, start).Result()
1793+
}
1794+
1795+
// XInfoConsumers returns information about consumers in a Redis stream consumer group.
1796+
func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) {
1797+
return s.XInfoConsumersCtx(context.Background(), stream, group)
1798+
}
1799+
1800+
// XInfoConsumersCtx is the context-aware version of XInfoConsumers.
1801+
func (s *Redis) XInfoConsumersCtx(ctx context.Context, stream string, group string) (
1802+
[]red.XInfoConsumer, error) {
1803+
conn, err := getRedis(s)
1804+
if err != nil {
1805+
return nil, err
1806+
}
1807+
1808+
return conn.XInfoConsumers(ctx, stream, group).Result()
1809+
}
1810+
1811+
// XInfoGroups returns information about consumer groups for a Redis stream.
1812+
func (s *Redis) XInfoGroups(stream string) ([]red.XInfoGroup, error) {
1813+
return s.XInfoGroupsCtx(context.Background(), stream)
1814+
}
1815+
1816+
// XInfoGroupsCtx is the context-aware version of XInfoGroups.
1817+
func (s *Redis) XInfoGroupsCtx(ctx context.Context, stream string) ([]red.XInfoGroup, error) {
1818+
conn, err := getRedis(s)
1819+
if err != nil {
1820+
return nil, err
1821+
}
1822+
1823+
return conn.XInfoGroups(ctx, stream).Result()
1824+
}
1825+
1826+
// XInfoStream returns general information about a Redis stream.
1827+
func (s *Redis) XInfoStream(stream string) (*red.XInfoStream, error) {
1828+
return s.XInfoStreamCtx(context.Background(), stream)
1829+
}
1830+
1831+
// XInfoStreamCtx is the context-aware version of XInfoStream.
1832+
func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoStream, error) {
1833+
conn, err := getRedis(s)
1834+
if err != nil {
1835+
return nil, err
1836+
}
1837+
1838+
return conn.XInfoStream(ctx, stream).Result()
1839+
}
1840+
1841+
// XReadGroup reads messages from Redis streams as part of a consumer group.
1842+
// It allows for distributed processing of stream messages with automatic message delivery semantics.
1843+
// Doesn't benefit from pooling redis connections of blocking queries.
1844+
func (s *Redis) XReadGroup(node RedisNode, group string, consumerId string, count int64,
1845+
block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
1846+
return s.XReadGroupCtx(context.Background(), node, group, consumerId, count, block, noAck, streams...)
1847+
}
1848+
1849+
// XReadGroupCtx is the context-aware version of XReadGroup.
1850+
// Doesn't benefit from pooling redis connections of blocking queries.
1851+
func (s *Redis) XReadGroupCtx(ctx context.Context, node RedisNode, group string, consumerId string,
1852+
count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
1853+
if node == nil {
1854+
return nil, ErrNilNode
1855+
}
1856+
1857+
return node.XReadGroup(ctx, &red.XReadGroupArgs{
1858+
Group: group,
1859+
Consumer: consumerId,
1860+
Count: count,
1861+
Block: block,
1862+
NoAck: noAck,
1863+
Streams: streams,
1864+
}).Result()
1865+
}
1866+
17181867
// Zadd is the implementation of redis zadd command.
17191868
func (s *Redis) Zadd(key string, score int64, value string) (bool, error) {
17201869
return s.ZaddCtx(context.Background(), key, score, value)

core/stores/redis/redis_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,11 @@ func TestRedis_Ping(t *testing.T) {
916916
ok := client.Ping()
917917
assert.True(t, ok)
918918
})
919+
920+
runOnRedisWithError(t, func(client *Redis) {
921+
ok := client.Ping()
922+
assert.False(t, ok)
923+
})
919924
})
920925
}
921926

@@ -2029,6 +2034,16 @@ func TestRedis_WithUserPass(t *testing.T) {
20292034
err := newRedis(client.Addr, WithUser("any"), WithPass("any")).Ping()
20302035
assert.NotNil(t, err)
20312036
})
2037+
2038+
runOnRedisWithAccount(t, "foo", "bar", func(client *Redis) {
2039+
err := client.Set("key1", "value1")
2040+
assert.Nil(t, err)
2041+
_, err = newRedis(client.Addr, badType()).Keys("*")
2042+
assert.NotNil(t, err)
2043+
keys, err := client.Keys("*")
2044+
assert.Nil(t, err)
2045+
assert.ElementsMatch(t, []string{"key1"}, keys)
2046+
})
20322047
}
20332048

20342049
func TestRedis_checkConnection(t *testing.T) {
@@ -2057,6 +2072,19 @@ func runOnRedis(t *testing.T, fn func(client *Redis)) {
20572072
}))
20582073
}
20592074

2075+
func runOnRedisWithAccount(t *testing.T, user, pass string, fn func(client *Redis)) {
2076+
logx.Disable()
2077+
2078+
s := miniredis.RunT(t)
2079+
s.RequireUserAuth(user, pass)
2080+
fn(MustNewRedis(RedisConf{
2081+
Host: s.Addr(),
2082+
Type: NodeType,
2083+
User: user,
2084+
Pass: pass,
2085+
}))
2086+
}
2087+
20602088
func runOnRedisWithError(t *testing.T, fn func(client *Redis)) {
20612089
logx.Disable()
20622090

@@ -2175,3 +2203,115 @@ func TestRedisTxPipeline(t *testing.T) {
21752203
assert.Equal(t, hashValue, value)
21762204
})
21772205
}
2206+
2207+
func TestRedisXGroupCreate(t *testing.T) {
2208+
runOnRedis(t, func(client *Redis) {
2209+
_, err := newRedis(client.Addr, badType()).XGroupCreate("Source", "Destination", "0")
2210+
assert.NotNil(t, err)
2211+
2212+
redisCli := newRedis(client.Addr)
2213+
2214+
_, err = redisCli.XGroupCreate("aa", "bb", "0")
2215+
assert.NotNil(t, err)
2216+
2217+
_, err = newRedis(client.Addr, badType()).XGroupCreateMkStream("Source", "Destination", "0")
2218+
assert.NotNil(t, err)
2219+
2220+
_, err = redisCli.XGroupCreateMkStream("aa", "bb", "0")
2221+
assert.Nil(t, err)
2222+
2223+
_, err = redisCli.XGroupCreate("aa", "cc", "0")
2224+
assert.Nil(t, err)
2225+
})
2226+
}
2227+
2228+
func TestRedisXInfo(t *testing.T) {
2229+
runOnRedis(t, func(client *Redis) {
2230+
_, err := newRedis(client.Addr, badType()).XInfoStream("Source")
2231+
assert.NotNil(t, err)
2232+
_, err = newRedis(client.Addr, badType()).XInfoGroups("Source")
2233+
assert.NotNil(t, err)
2234+
2235+
redisCli := newRedis(client.Addr)
2236+
2237+
stream := "aa"
2238+
group := "bb"
2239+
2240+
_, err = redisCli.XGroupCreateMkStream(stream, group, "$")
2241+
assert.Nil(t, err)
2242+
2243+
_, err = redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"})
2244+
assert.Nil(t, err)
2245+
2246+
infoStream, err := redisCli.XInfoStream(stream)
2247+
assert.Nil(t, err)
2248+
assert.Equal(t, int64(1), infoStream.Length)
2249+
2250+
infoGroups, err := redisCli.XInfoGroups(stream)
2251+
assert.Nil(t, err)
2252+
assert.Equal(t, int64(1), infoGroups[0].Lag)
2253+
assert.Equal(t, group, infoGroups[0].Name)
2254+
2255+
node, err := getRedis(redisCli)
2256+
assert.NoError(t, err)
2257+
redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"})
2258+
streamRes, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, ">")
2259+
assert.Nil(t, err)
2260+
assert.Equal(t, 1, len(streamRes))
2261+
assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"])
2262+
2263+
infoConsumers, err := redisCli.XInfoConsumers(stream, group)
2264+
assert.Nil(t, err)
2265+
assert.Equal(t, 1, len(infoConsumers))
2266+
2267+
_, err = newRedis(client.Addr, badType()).XInfoConsumers(stream, group)
2268+
assert.NotNil(t, err)
2269+
})
2270+
}
2271+
2272+
func TestRedisXReadGroup(t *testing.T) {
2273+
runOnRedis(t, func(client *Redis) {
2274+
_, err := newRedis(client.Addr, badType()).XAdd("bb", true, "*", []string{"key1", "value1", "key2", "value2"})
2275+
assert.NotNil(t, err)
2276+
_, err = newRedis(client.Addr, badType()).XAck("bb", "aa", "123")
2277+
assert.NotNil(t, err)
2278+
2279+
redisCli := newRedis(client.Addr)
2280+
2281+
stream := "aa"
2282+
group := "bb"
2283+
2284+
_, err = redisCli.XGroupCreateMkStream(stream, group, "$")
2285+
assert.Nil(t, err)
2286+
2287+
_, err = redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"})
2288+
assert.Nil(t, err)
2289+
2290+
node, err := getRedis(redisCli)
2291+
assert.NoError(t, err)
2292+
redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"})
2293+
_, err = redisCli.XReadGroup(nil, group, "consumer", 1, 2000, false, stream, ">")
2294+
assert.Error(t, err)
2295+
streamRes, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, ">")
2296+
assert.Nil(t, err)
2297+
assert.Equal(t, 1, len(streamRes))
2298+
assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"])
2299+
2300+
_, err = redisCli.XReadGroup(nil, group, "consumer", 1, 2000, false, stream, "0")
2301+
assert.Error(t, err)
2302+
streamRes1, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, "0")
2303+
assert.Nil(t, err)
2304+
assert.Equal(t, 1, len(streamRes1))
2305+
assert.Equal(t, "value1", streamRes1[0].Messages[0].Values["key1"])
2306+
2307+
_, err = redisCli.XAck(stream, group, streamRes[0].Messages[0].ID)
2308+
assert.Nil(t, err)
2309+
2310+
_, err = redisCli.XReadGroup(nil, group, "consumer", 1, 2000, false, stream, "0")
2311+
assert.Error(t, err)
2312+
streamRes2, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, "0")
2313+
assert.Nil(t, err)
2314+
assert.Greater(t, len(streamRes2), 0, "streamRes2 is empty")
2315+
assert.Equal(t, 0, len(streamRes2[0].Messages))
2316+
})
2317+
}

0 commit comments

Comments
 (0)