Skip to content

Commit c10f59e

Browse files
committed
Redis: recover subscription if group or stream deleted (dapr#3221)
Signed-off-by: Bernd Verst <[email protected]>
1 parent 934e86c commit c10f59e

File tree

1 file changed

+17
-4
lines changed

1 file changed

+17
-4
lines changed

pubsub/redis/redis.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"fmt"
2020
"reflect"
21+
"strings"
2122
"sync"
2223
"sync/atomic"
2324
"time"
@@ -108,15 +109,22 @@ func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest)
108109
return nil
109110
}
110111

112+
func (r *redisStreams) CreateConsumerGroup(ctx context.Context, stream string) error {
113+
err := r.client.XGroupCreateMkStream(ctx, stream, r.clientSettings.ConsumerID, "0")
114+
// Ignore BUSYGROUP errors
115+
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
116+
r.logger.Errorf("redis streams: %s", err)
117+
return err
118+
}
119+
return nil
120+
}
121+
111122
func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
112123
if r.closed.Load() {
113124
return errors.New("component is closed")
114125
}
115126

116-
err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.clientSettings.ConsumerID, "0")
117-
// Ignore BUSYGROUP errors
118-
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
119-
r.logger.Errorf("redis streams: %s", err)
127+
if err := r.CreateConsumerGroup(ctx, req.Topic); err != nil {
120128
return err
121129
}
122130

@@ -242,6 +250,11 @@ func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, h
242250
streams, err := r.client.XReadGroupResult(ctx, r.clientSettings.ConsumerID, r.clientSettings.ConsumerID, []string{stream, ">"}, int64(r.clientSettings.QueueDepth), time.Duration(r.clientSettings.ReadTimeout))
243251
if err != nil {
244252
if !errors.Is(err, r.client.GetNilValueError()) && err != context.Canceled {
253+
if strings.Contains(err.Error(), "NOGROUP") {
254+
r.logger.Warnf("redis streams: consumer group %s does not exist for stream %s. This could mean the server experienced data loss, or the group/stream was deleted.", r.clientSettings.ConsumerID, stream)
255+
r.logger.Warnf("redis streams: recreating group %s for stream %s", r.clientSettings.ConsumerID, stream)
256+
r.CreateConsumerGroup(ctx, stream)
257+
}
245258
r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err)
246259
}
247260
continue

0 commit comments

Comments
 (0)