Skip to content

Commit 922babb

Browse files
authored
Alerting: Add mutex to Redis HA subs (grafana#89870)
1 parent 81935a3 commit 922babb

File tree

1 file changed

+41
-12
lines changed

1 file changed

+41
-12
lines changed

pkg/services/ngalert/notifier/redis_peer.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type redisPeer struct {
6464
states map[string]alertingCluster.State
6565
subs map[string]*redis.PubSub
6666
statesMtx sync.RWMutex
67+
subsMtx sync.RWMutex
6768

6869
readyc chan struct{}
6970
shutdownc chan struct{}
@@ -225,8 +226,10 @@ func newRedisPeer(cfg redisConfig, logger log.Logger, reg prometheus.Registerer,
225226
p.nodePingDuration = nodePingDuration
226227
p.nodePingFailures = nodePingFailures
227228

229+
p.subsMtx.Lock()
228230
p.subs[fullStateChannel] = p.redis.Subscribe(context.Background(), p.withPrefix(fullStateChannel))
229231
p.subs[fullStateChannelReq] = p.redis.Subscribe(context.Background(), p.withPrefix(fullStateChannelReq))
232+
p.subsMtx.Unlock()
230233

231234
go p.heartbeatLoop()
232235
go p.membersSyncLoop()
@@ -461,7 +464,9 @@ func (p *redisPeer) AddState(key string, state alertingCluster.State, _ promethe
461464
// As we also want to get the state from other nodes, we subscribe to the key.
462465
sub := p.redis.Subscribe(context.Background(), p.withPrefix(key))
463466
go p.receiveLoop(sub)
467+
p.subsMtx.Lock()
464468
p.subs[key] = sub
469+
p.subsMtx.Unlock()
465470
return newRedisChannel(p, key, p.withPrefix(key), update)
466471
}
467472

@@ -507,17 +512,29 @@ func (p *redisPeer) fullStateReqReceiveLoop() {
507512
select {
508513
case <-p.shutdownc:
509514
return
510-
case data := <-p.subs[fullStateChannelReq].Channel():
511-
// The payload of a full state request is the name of the peer that is
512-
// requesting the full state. In case we received our own request, we
513-
// can just ignore it. Redis pub/sub fanouts to all clients, regardless
514-
// if a client was also the publisher.
515-
if data.Payload == p.name {
515+
default:
516+
p.subsMtx.RLock()
517+
sub, ok := p.subs[fullStateChannelReq]
518+
p.subsMtx.RUnlock()
519+
520+
if !ok {
521+
time.Sleep(waitForMsgIdle)
516522
continue
517523
}
518-
p.fullStateSyncPublish()
519-
default:
520-
time.Sleep(waitForMsgIdle)
524+
525+
select {
526+
case data := <-sub.Channel():
527+
// The payload of a full state request is the name of the peer that is
528+
// requesting the full state. In case we received our own request, we
529+
// can just ignore it. Redis pub/sub fanouts to all clients, regardless
530+
// if a client was also the publisher.
531+
if data.Payload == p.name {
532+
continue
533+
}
534+
p.fullStateSyncPublish()
535+
default:
536+
time.Sleep(waitForMsgIdle)
537+
}
521538
}
522539
}
523540
}
@@ -527,10 +544,22 @@ func (p *redisPeer) fullStateSyncReceiveLoop() {
527544
select {
528545
case <-p.shutdownc:
529546
return
530-
case data := <-p.subs[fullStateChannel].Channel():
531-
p.mergeFullState([]byte(data.Payload))
532547
default:
533-
time.Sleep(waitForMsgIdle)
548+
p.subsMtx.RLock()
549+
sub, ok := p.subs[fullStateChannel]
550+
p.subsMtx.RUnlock()
551+
552+
if !ok {
553+
time.Sleep(waitForMsgIdle)
554+
continue
555+
}
556+
557+
select {
558+
case data := <-sub.Channel():
559+
p.mergeFullState([]byte(data.Payload))
560+
default:
561+
time.Sleep(waitForMsgIdle)
562+
}
534563
}
535564
}
536565
}

0 commit comments

Comments
 (0)