Skip to content

Commit 569c00a

Browse files
authored
chore: refactor redis stream (#5048)
Signed-off-by: Kevin Wan <[email protected]>
1 parent 9da76fb commit 569c00a

File tree

2 files changed

+197
-120
lines changed

2 files changed

+197
-120
lines changed

core/stores/redis/redis.go

Lines changed: 150 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ type (
6565
// RedisNode interface represents a redis node.
6666
RedisNode interface {
6767
red.Cmdable
68-
red.BitMapCmdable
6968
}
7069

7170
// GeoLocation is used with GeoAdd to add geospatial location.
@@ -1285,10 +1284,12 @@ func (s *Redis) RpushCtx(ctx context.Context, key string, values ...any) (int, e
12851284
return int(v), nil
12861285
}
12871286

1287+
// RPopLPush atomically removes the last element from source list and prepends it to destination list.
12881288
func (s *Redis) RPopLPush(source string, destination string) (string, error) {
12891289
return s.RPopLPushCtx(context.Background(), source, destination)
12901290
}
12911291

1292+
// RPopLPushCtx is the context-aware version of RPopLPush.
12921293
func (s *Redis) RPopLPushCtx(ctx context.Context, source string, destination string) (string, error) {
12931294
conn, err := getRedis(s)
12941295
if err != nil {
@@ -1695,14 +1696,17 @@ func (s *Redis) TtlCtx(ctx context.Context, key string) (int, error) {
16951696
return int(duration), nil
16961697
}
16971698

1699+
// TxPipeline returns a Redis transaction pipeline for executing multiple commands atomically.
16981700
func (s *Redis) TxPipeline() (pipe Pipeliner, err error) {
16991701
conn, err := getRedis(s)
17001702
if err != nil {
17011703
return nil, err
17021704
}
1705+
17031706
return conn.TxPipeline(), nil
17041707
}
17051708

1709+
// Unlink is similar to Del but removes keys asynchronously in a separate thread.
17061710
func (s *Redis) Unlink(keys ...string) (int64, error) {
17071711
return s.UnlinkCtx(context.Background(), keys...)
17081712
}
@@ -1712,9 +1716,154 @@ func (s *Redis) UnlinkCtx(ctx context.Context, keys ...string) (int64, error) {
17121716
if err != nil {
17131717
return 0, err
17141718
}
1719+
17151720
return conn.Unlink(ctx, keys...).Result()
17161721
}
17171722

1723+
// XAck acknowledges one or more messages in a Redis stream consumer group.
1724+
// It marks the specified messages as successfully processed.
1725+
func (s *Redis) XAck(stream string, group string, ids ...string) (int64, error) {
1726+
return s.XAckCtx(context.Background(), stream, group, ids...)
1727+
}
1728+
1729+
// XAckCtx is the context-aware version of XAck.
1730+
func (s *Redis) XAckCtx(ctx context.Context, stream string, group string, ids ...string) (int64, error) {
1731+
conn, err := getRedis(s)
1732+
if err != nil {
1733+
return 0, err
1734+
}
1735+
1736+
return conn.XAck(ctx, stream, group, ids...).Result()
1737+
}
1738+
1739+
// XAdd adds a new entry to a Redis stream with the specified ID and field-value pairs.
1740+
// If noMkStream is true, the command will fail if the stream doesn't exist.
1741+
func (s *Redis) XAdd(stream string, noMkStream bool, id string, values any) (string, error) {
1742+
return s.XAddCtx(context.Background(), stream, noMkStream, id, values)
1743+
}
1744+
1745+
// XAddCtx is the context-aware version of XAdd.
1746+
func (s *Redis) XAddCtx(ctx context.Context, stream string, noMkStream bool, id string, values any) (
1747+
string, error) {
1748+
conn, err := getRedis(s)
1749+
if err != nil {
1750+
return "", err
1751+
}
1752+
1753+
return conn.XAdd(ctx, &red.XAddArgs{
1754+
Stream: stream,
1755+
ID: id,
1756+
Values: values,
1757+
NoMkStream: noMkStream,
1758+
}).Result()
1759+
}
1760+
1761+
// XGroupCreateMkStream creates a consumer group for a Redis stream.
1762+
// If the stream doesn't exist, it will be created automatically.
1763+
func (s *Redis) XGroupCreateMkStream(stream string, group string, start string) (string, error) {
1764+
return s.XGroupCreateMkStreamCtx(context.Background(), stream, group, start)
1765+
}
1766+
1767+
// XGroupCreateMkStreamCtx is the context-aware version of XGroupCreateMkStream.
1768+
func (s *Redis) XGroupCreateMkStreamCtx(ctx context.Context, stream string, group string,
1769+
start string) (string, error) {
1770+
conn, err := getRedis(s)
1771+
if err != nil {
1772+
return "", err
1773+
}
1774+
1775+
return conn.XGroupCreateMkStream(ctx, stream, group, start).Result()
1776+
}
1777+
1778+
// XGroupCreate creates a consumer group for a Redis stream.
1779+
// The stream must already exist, otherwise the command will fail.
1780+
func (s *Redis) XGroupCreate(stream string, group string, start string) (string, error) {
1781+
return s.XGroupCreateCtx(context.Background(), stream, group, start)
1782+
}
1783+
1784+
// XGroupCreateCtx is the context-aware version of XGroupCreate.
1785+
func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string, start string) (
1786+
string, error) {
1787+
conn, err := getRedis(s)
1788+
if err != nil {
1789+
return "", err
1790+
}
1791+
1792+
return conn.XGroupCreate(ctx, stream, group, start).Result()
1793+
}
1794+
1795+
// XInfoConsumers returns information about consumers in a Redis stream consumer group.
1796+
func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) {
1797+
return s.XInfoConsumersCtx(context.Background(), stream, group)
1798+
}
1799+
1800+
// XInfoConsumersCtx is the context-aware version of XInfoConsumers.
1801+
func (s *Redis) XInfoConsumersCtx(ctx context.Context, stream string, group string) (
1802+
[]red.XInfoConsumer, error) {
1803+
conn, err := getRedis(s)
1804+
if err != nil {
1805+
return nil, err
1806+
}
1807+
1808+
return conn.XInfoConsumers(ctx, stream, group).Result()
1809+
}
1810+
1811+
// XInfoGroups returns information about consumer groups for a Redis stream.
1812+
func (s *Redis) XInfoGroups(stream string) ([]red.XInfoGroup, error) {
1813+
return s.XInfoGroupsCtx(context.Background(), stream)
1814+
}
1815+
1816+
// XInfoGroupsCtx is the context-aware version of XInfoGroups.
1817+
func (s *Redis) XInfoGroupsCtx(ctx context.Context, stream string) ([]red.XInfoGroup, error) {
1818+
conn, err := getRedis(s)
1819+
if err != nil {
1820+
return nil, err
1821+
}
1822+
1823+
return conn.XInfoGroups(ctx, stream).Result()
1824+
}
1825+
1826+
// XInfoStream returns general information about a Redis stream.
1827+
func (s *Redis) XInfoStream(stream string) (*red.XInfoStream, error) {
1828+
return s.XInfoStreamCtx(context.Background(), stream)
1829+
}
1830+
1831+
// XInfoStreamCtx is the context-aware version of XInfoStream.
1832+
func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoStream, error) {
1833+
conn, err := getRedis(s)
1834+
if err != nil {
1835+
return nil, err
1836+
}
1837+
1838+
return conn.XInfoStream(ctx, stream).Result()
1839+
}
1840+
1841+
// XReadGroup reads messages from Redis streams as part of a consumer group.
1842+
// It allows for distributed processing of stream messages with automatic message delivery semantics.
1843+
// Doesn't benefit from pooling redis connections of blocking queries.
1844+
func (s *Redis) XReadGroup(node RedisNode, group string, consumerId string, count int64,
1845+
block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
1846+
return s.XReadGroupCtx(context.Background(), node, group, consumerId, count, block, noAck, streams...)
1847+
}
1848+
1849+
// XReadGroupCtx is the context-aware version of XReadGroup.
1850+
// Doesn't benefit from pooling redis connections of blocking queries.
1851+
func (s *Redis) XReadGroupCtx(ctx context.Context, node RedisNode, group string, consumerId string,
1852+
count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
1853+
if node == nil {
1854+
return nil, ErrNilNode
1855+
}
1856+
1857+
return node.XReadGroup(ctx, &red.XReadGroupArgs{
1858+
Group: group,
1859+
Consumer: consumerId,
1860+
Count: count,
1861+
Block: block,
1862+
NoAck: noAck,
1863+
Streams: streams,
1864+
}).Result()
1865+
}
1866+
17181867
// Zadd is the implementation of redis zadd command.
17191868
func (s *Redis) Zadd(key string, score int64, value string) (bool, error) {
17201869
return s.ZaddCtx(context.Background(), key, score, value)
@@ -2402,117 +2551,6 @@ func (s *Redis) ZunionstoreCtx(ctx context.Context, dest string, store *ZStore)
24022551
return conn.ZUnionStore(ctx, dest, store).Result()
24032552
}
24042553

2405-
func (s *Redis) XGroupCreateMkStream(stream string, group string, start string) (string, error) {
2406-
return s.XGroupCreateMkStreamCtx(context.Background(), stream, group, start)
2407-
}
2408-
2409-
func (s *Redis) XGroupCreateMkStreamCtx(ctx context.Context, stream string, group string, start string) (string, error) {
2410-
conn, err := getRedis(s)
2411-
if err != nil {
2412-
return "", err
2413-
}
2414-
return conn.XGroupCreateMkStream(ctx, stream, group, start).Result()
2415-
}
2416-
2417-
func (s *Redis) XGroupCreate(stream string, group string, start string) (string, error) {
2418-
return s.XGroupCreateCtx(context.Background(), stream, group, start)
2419-
}
2420-
2421-
func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string, start string) (string, error) {
2422-
conn, err := getRedis(s)
2423-
if err != nil {
2424-
return "", err
2425-
}
2426-
return conn.XGroupCreate(ctx, stream, group, start).Result()
2427-
}
2428-
2429-
func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) {
2430-
return s.XInfoConsumersCtx(context.Background(), stream, group)
2431-
}
2432-
2433-
func (s *Redis) XInfoConsumersCtx(ctx context.Context, stream string, group string) ([]red.XInfoConsumer, error) {
2434-
conn, err := getRedis(s)
2435-
if err != nil {
2436-
return nil, err
2437-
}
2438-
return conn.XInfoConsumers(ctx, stream, group).Result()
2439-
}
2440-
2441-
func (s *Redis) XInfoGroups(stream string) ([]red.XInfoGroup, error) {
2442-
return s.XInfoGroupsCtx(context.Background(), stream)
2443-
}
2444-
2445-
func (s *Redis) XInfoGroupsCtx(ctx context.Context, stream string) ([]red.XInfoGroup, error) {
2446-
conn, err := getRedis(s)
2447-
if err != nil {
2448-
return nil, err
2449-
}
2450-
return conn.XInfoGroups(ctx, stream).Result()
2451-
}
2452-
2453-
func (s *Redis) XInfoStream(stream string) (*red.XInfoStream, error) {
2454-
return s.XInfoStreamCtx(context.Background(), stream)
2455-
}
2456-
2457-
func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoStream, error) {
2458-
conn, err := getRedis(s)
2459-
if err != nil {
2460-
return nil, err
2461-
}
2462-
return conn.XInfoStream(ctx, stream).Result()
2463-
}
2464-
2465-
func (s *Redis) XAdd(stream string, noMkStream bool, id string, values interface{}) (string, error) {
2466-
return s.XAddCtx(context.Background(), stream, noMkStream, id, values)
2467-
}
2468-
2469-
func (s *Redis) XAddCtx(ctx context.Context, stream string, noMkStream bool, id string, values interface{}) (string, error) {
2470-
conn, err := getRedis(s)
2471-
if err != nil {
2472-
return "", err
2473-
}
2474-
return conn.XAdd(ctx, &red.XAddArgs{
2475-
Stream: stream,
2476-
ID: id,
2477-
Values: values,
2478-
NoMkStream: noMkStream,
2479-
}).Result()
2480-
}
2481-
2482-
func (s *Redis) XAck(stream string, group string, ids ...string) (int64, error) {
2483-
return s.XAckCtx(context.Background(), stream, group, ids...)
2484-
}
2485-
2486-
func (s *Redis) XAckCtx(ctx context.Context, stream string, group string, ids ...string) (int64, error) {
2487-
conn, err := getRedis(s)
2488-
if err != nil {
2489-
return 0, err
2490-
}
2491-
return conn.XAck(ctx, stream, group, ids...).Result()
2492-
}
2493-
2494-
/**
2495-
* streams: list of streams and ids, e.g. stream1 stream2 id1 id2
2496-
*/
2497-
func (s *Redis) XReadGroup(group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
2498-
return s.XReadGroupCtx(context.Background(), group, consumerId, count, block, noAck, streams...)
2499-
}
2500-
2501-
func (s *Redis) XReadGroupCtx(ctx context.Context, group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
2502-
conn, err := getRedis(s)
2503-
if err != nil {
2504-
return nil, err
2505-
}
2506-
return conn.XReadGroup(ctx, &red.XReadGroupArgs{
2507-
Group: group,
2508-
Consumer: consumerId,
2509-
Count: count,
2510-
Block: block,
2511-
NoAck: noAck,
2512-
Streams: streams,
2513-
}).Result()
2514-
}
2515-
25162554
func (s *Redis) checkConnection(pingTimeout time.Duration) error {
25172555
conn, err := getRedis(s)
25182556
if err != nil {

0 commit comments

Comments
 (0)