@@ -54,11 +54,10 @@ type (
5454 tRebalance * time.Timer
5555
5656 // KIP-848 consumer group fields
57- assignorName string
58- consumerMembers map [string ]* consumerMember
59- partitionEpochs map [uuid ]map [int32 ]int32 // (topicID, partition) -> owning member's epoch; -1 or absent means free
60- targetAssignmentsStale bool // session timeout cannot snapshot metadata, defers to next heartbeat
61- lastTopicMeta topicMetaSnap // last snapshot received, for deferred recomputation
57+ assignorName string
58+ consumerMembers map [string ]* consumerMember
59+ partitionEpochs map [uuid ]map [int32 ]int32 // (topicID, partition) -> owning member's epoch; -1 or absent means free
60+ lastTopicMeta topicMetaSnap // last snapshot received, for recomputation on member removal
6261
6362 quit sync.Once
6463 quitCh chan struct {}
@@ -181,23 +180,31 @@ func (c *Cluster) snapshotTopicMeta() topicMetaSnap {
181180 return snap
182181}
183182
184- // notifyTopicChange marks all 848 consumer groups as needing
185- // recomputation after a topic is created, deleted, or has partitions
186- // added. We notify all groups rather than filtering by subscription
187- // because group member state is owned by the group's manage goroutine
188- // and cannot be read safely from here.
183+ // notifyTopicChange recomputes target assignments for all 848 consumer
184+ // groups after a topic is created, deleted, or has partitions added. We
185+ // capture a fresh metadata snapshot here (in the cluster run loop where
186+ // c.data is safe to read) and pass it to the manage goroutine so that
187+ // the recomputation always sees the latest topics. This avoids a race
188+ // where the manage goroutine could recompute using a stale snapshot
189+ // from a heartbeat that was enqueued before the topic change.
190+ //
191+ // This blocks the cluster run loop until each group's manage goroutine
192+ // processes the notification. This is safe: the manage goroutine never
193+ // calls c.admin() and replies go to cc.respCh (drained by the
194+ // connection write goroutine, not the run loop).
189195func (c * Cluster ) notifyTopicChange () {
196+ snap := c .snapshotTopicMeta ()
190197 for _ , g := range c .groups .gs {
191198 select {
192199 case g .controlCh <- func () {
193200 if len (g .consumerMembers ) > 0 {
194- g .targetAssignmentsStale = true
201+ g .lastTopicMeta = snap
202+ g .computeTargetAssignment (snap )
195203 g .updateConsumerStateField ()
196204 }
197205 }:
198- default :
199- // Buffer full - a notification is already pending, the
200- // next heartbeat will pick up fresh metadata anyway.
206+ case <- g .quitCh :
207+ case <- g .c .die :
201208 }
202209 }
203210}
@@ -753,10 +760,6 @@ func (g *group) manage(detachNew func()) {
753760 kresp = g .handleOffsetDelete (creq )
754761 case * kmsg.ConsumerGroupHeartbeatRequest :
755762 g .lastTopicMeta = creq .topicMeta
756- if g .targetAssignmentsStale {
757- g .targetAssignmentsStale = false
758- g .computeTargetAssignment (creq .topicMeta )
759- }
760763 kresp = g .handleConsumerHeartbeat (creq )
761764 firstJoin (true )
762765 }
@@ -2132,7 +2135,7 @@ func (g *group) atConsumerSessionTimeout(m *consumerMember) {
21322135 g .fenceConsumerMember (m )
21332136 delete (g .consumerMembers , m .memberID )
21342137 g .generation ++
2135- g .targetAssignmentsStale = true
2138+ g .computeTargetAssignment ( g . lastTopicMeta )
21362139 g .updateConsumerStateField ()
21372140 }
21382141 })
@@ -2158,7 +2161,7 @@ func (g *group) scheduleConsumerRebalanceTimeout(m *consumerMember) {
21582161 g .fenceConsumerMember (cur )
21592162 delete (g .consumerMembers , memberID )
21602163 g .generation ++
2161- g .targetAssignmentsStale = true
2164+ g .computeTargetAssignment ( g . lastTopicMeta )
21622165 g .updateConsumerStateField ()
21632166 })
21642167}
0 commit comments