@@ -55,11 +55,14 @@ type snapshotMap map[string]*cachev3.Snapshot
5555
5656type nodeInfoMap map [int64 ]* corev3.Node
5757
58+ type nodeFrequencyMap map [string ]int
59+
5860type streamDurationMap map [int64 ]time.Time
5961
6062type snapshotCache struct {
6163 cachev3.SnapshotCache
6264 streamIDNodeInfo nodeInfoMap
65+ nodeFrequency nodeFrequencyMap
6366 streamDuration streamDurationMap
6467 deltaStreamDuration streamDurationMap
6568 snapshotVersion int64
@@ -133,6 +136,7 @@ func NewSnapshotCache(ads bool, logger logging.Logger) SnapshotCacheWithCallback
133136 log : wrappedLogger ,
134137 lastSnapshot : make (snapshotMap ),
135138 streamIDNodeInfo : make (nodeInfoMap ),
139+ nodeFrequency : make (nodeFrequencyMap ),
136140 streamDuration : make (streamDurationMap ),
137141 deltaStreamDuration : make (streamDurationMap ),
138142 }
@@ -179,6 +183,15 @@ func (s *snapshotCache) OnStreamClosed(streamID int64, node *corev3.Node) {
179183
180184 delete (s .streamIDNodeInfo , streamID )
181185 delete (s .streamDuration , streamID )
186+
187+ s .nodeFrequency [node .Id ] -= 1
188+ if s .nodeFrequency [node .Id ] <= 0 {
189+ delete (s .nodeFrequency , node .Id )
190+
191+ // Only snapshots for nodes with active connections are updated, we need to clear
192+ // the snapshot for this node so it doesn't get stale data when it reconnects.
193+ s .ClearSnapshot (node .Id )
194+ }
182195}
183196
184197func (s * snapshotCache ) OnStreamRequest (streamID int64 , req * discoveryv3.DiscoveryRequest ) error {
@@ -196,6 +209,7 @@ func (s *snapshotCache) OnStreamRequest(streamID int64, req *discoveryv3.Discove
196209 }
197210 s .log .Debugf ("First discovery request on stream %d, got nodeID %s" , streamID , req .Node .Id )
198211 s .streamIDNodeInfo [streamID ] = req .Node
212+ s .nodeFrequency [req .Node .Id ] += 1
199213 }
200214 nodeID := s .streamIDNodeInfo [streamID ].Id
201215 cluster := s .streamIDNodeInfo [streamID ].Cluster
@@ -286,6 +300,15 @@ func (s *snapshotCache) OnDeltaStreamClosed(streamID int64, node *corev3.Node) {
286300
287301 delete (s .streamIDNodeInfo , streamID )
288302 delete (s .deltaStreamDuration , streamID )
303+
304+ s .nodeFrequency [node .Id ] -= 1
305+ if s .nodeFrequency [node .Id ] <= 0 {
306+ delete (s .nodeFrequency , node .Id )
307+
308+ // Only snapshots for nodes with active connections are updated, we need to clear
309+ // the snapshot for this node so it doesn't get stale data when it reconnects.
310+ s .ClearSnapshot (node .Id )
311+ }
289312}
290313
291314func (s * snapshotCache ) OnStreamDeltaRequest (streamID int64 , req * discoveryv3.DeltaDiscoveryRequest ) error {
@@ -308,6 +331,7 @@ func (s *snapshotCache) OnStreamDeltaRequest(streamID int64, req *discoveryv3.De
308331 }
309332 s .log .Debugf ("First incremental discovery request on stream %d, got nodeID %s" , streamID , req .Node .Id )
310333 s .streamIDNodeInfo [streamID ] = req .Node
334+ s .nodeFrequency [req .Node .Id ] += 1
311335 }
312336 nodeID := s .streamIDNodeInfo [streamID ].Id
313337 cluster := s .streamIDNodeInfo [streamID ].Cluster
0 commit comments