@@ -2,6 +2,8 @@ package ethereum
22
33import (
44 "context"
5+ "crypto/rand"
6+ "math/big"
57 "sync"
68 "time"
79
@@ -64,14 +66,16 @@ type topicManager struct {
6466
6567 // Attestation subnet tracking.
6668 advertisedSubnets []int
69+ selectedSubnet int // The randomly selected subnet to forward events for
6770 seenSubnets map [uint64 ]bool
6871 trackingStartSlot phase0.Slot
6972
7073 // Configuration.
71- detectionWindow int
72- mismatchThreshold int
73- cooldownPeriod time.Duration
74- highWaterMark int
74+ detectionWindow int
75+ mismatchThreshold int
76+ cooldownPeriod time.Duration
77+ highWaterMark int
78+ attestationMaxSubnets int
7579
7680 // Mismatch tracking.
7781 mismatchCount int
@@ -99,18 +103,20 @@ func NewTopicManager(log logrus.FieldLogger, config *TopicConfig) TopicManager {
99103 }
100104
101105 return & topicManager {
102- log : log .WithField ("component" , "topic_manager" ),
103- allTopics : cfg .AllTopics ,
104- conditions : make (map [string ]TopicCondition ),
105- optInTopics : optInMap ,
106- excludedTopics : make (map [string ]bool ),
107- seenSubnets : make (map [uint64 ]bool ),
108- detectionWindow : cfg .MismatchDetectionWindow ,
109- mismatchThreshold : cfg .MismatchThreshold ,
110- cooldownPeriod : cfg .MismatchCooldown ,
111- highWaterMark : cfg .SubnetHighWaterMark ,
112- mismatchEnabled : cfg .AttestationEnabled , // Mismatch detection is enabled when attestation is enabled
113- reconnectChan : make (chan struct {}),
106+ log : log .WithField ("component" , "topic_manager" ),
107+ allTopics : cfg .AllTopics ,
108+ conditions : make (map [string ]TopicCondition ),
109+ optInTopics : optInMap ,
110+ excludedTopics : make (map [string ]bool ),
111+ selectedSubnet : - 1 , // -1 indicates no subnet selected yet
112+ seenSubnets : make (map [uint64 ]bool ),
113+ detectionWindow : cfg .MismatchDetectionWindow ,
114+ mismatchThreshold : cfg .MismatchThreshold ,
115+ cooldownPeriod : cfg .MismatchCooldown ,
116+ highWaterMark : cfg .SubnetHighWaterMark ,
117+ attestationMaxSubnets : cfg .AttestationMaxSubnets ,
118+ mismatchEnabled : cfg .AttestationEnabled , // Mismatch detection is enabled when attestation is enabled
119+ reconnectChan : make (chan struct {}),
114120 }
115121}
116122
@@ -191,7 +197,33 @@ func (tm *topicManager) SetAdvertisedSubnets(subnets []int) {
191197 defer tm .mu .Unlock ()
192198
193199 tm .advertisedSubnets = subnets
194- tm .log .WithField ("subnets" , subnets ).Info ("Set advertised attestation subnets" )
200+
201+ // Only select a random subnet if attestations will be enabled
202+ // (i.e., when subnet count is within the max threshold)
203+ attestationsWillBeEnabled := tm .attestationMaxSubnets > 0 && len (subnets ) <= tm .attestationMaxSubnets
204+
205+ if len (subnets ) > 0 {
206+ if attestationsWillBeEnabled {
207+ // Randomly select one subnet to forward events for
208+ n , err := rand .Int (rand .Reader , big .NewInt (int64 (len (subnets ))))
209+ if err != nil {
210+ tm .selectedSubnet = subnets [0 ]
211+ tm .log .WithError (err ).Warn ("Failed to randomly select subnet, using first subnet" )
212+ } else {
213+ tm .selectedSubnet = subnets [n .Int64 ()]
214+ }
215+
216+ tm .log .WithFields (logrus.Fields {
217+ "selected_subnet" : tm .selectedSubnet ,
218+ }).Info ("Selected random subnet for forwarding" )
219+ } else {
220+ // Attestations won't be enabled due to too many subnets
221+ tm .selectedSubnet = - 1
222+ }
223+ } else {
224+ tm .selectedSubnet = - 1
225+ tm .log .WithField ("subnets" , subnets ).Warn ("Missing advertised attestation subnets" )
226+ }
195227}
196228
197229// RecordAttestation records an attestation for subnet tracking.
@@ -233,6 +265,7 @@ func (tm *topicManager) RecordAttestation(subnetID uint64, slot phase0.Slot) {
233265 "mismatch_count" : tm .mismatchCount ,
234266 "threshold" : tm .mismatchThreshold ,
235267 "advertised_subnets" : tm .advertisedSubnets ,
268+ "selected_subnet" : tm .selectedSubnet ,
236269 "seen_subnets" : tm .getSeenSubnetsList (),
237270 "slot" : slot ,
238271 }).Warn ("Subnet mismatch detected" )
@@ -252,13 +285,8 @@ func (tm *topicManager) IsActiveSubnet(subnetID uint64) bool {
252285 tm .mu .RLock ()
253286 defer tm .mu .RUnlock ()
254287
255- for _ , subnet := range tm .advertisedSubnets {
256- if uint64 (subnet ) == subnetID { //nolint:gosec // conversion safe.
257- return true
258- }
259- }
260-
261- return false
288+ // Only the selected subnet is considered active for forwarding
289+ return tm .selectedSubnet >= 0 && uint64 (tm .selectedSubnet ) == subnetID
262290}
263291
264292// NeedsReconnection returns a channel that signals when reconnection is needed.
@@ -297,7 +325,10 @@ func (tm *topicManager) checkForMismatch() bool {
297325 }
298326
299327 // Count non-advertised subnets from existing seenSubnets map
328+ // We only count subnets that are NOT advertised at all, excluding those that
329+ // are advertised but not selected for forwarding
300330 nonAdvertisedCount := 0
331+ advertisedButNotSelected := 0
301332
302333 for seenSubnet := range tm .seenSubnets {
303334 isAdvertised := false
@@ -306,6 +337,11 @@ func (tm *topicManager) checkForMismatch() bool {
306337 if uint64 (advertised ) == seenSubnet { //nolint:gosec // conversion safe.
307338 isAdvertised = true
308339
340+ // Count advertised but not selected subnets
341+ if tm .selectedSubnet >= 0 && uint64 (tm .selectedSubnet ) != seenSubnet {
342+ advertisedButNotSelected ++
343+ }
344+
309345 break
310346 }
311347 }
@@ -319,13 +355,16 @@ func (tm *topicManager) checkForMismatch() bool {
319355 warningThreshold := int (float64 (tm .highWaterMark ) * 0.8 )
320356 if nonAdvertisedCount >= warningThreshold && nonAdvertisedCount <= tm .highWaterMark {
321357 tm .log .WithFields (logrus.Fields {
322- "non_advertised_count" : nonAdvertisedCount ,
323- "high_water_mark" : tm .highWaterMark ,
324- "advertised_subnets" : tm .advertisedSubnets ,
358+ "non_advertised_count" : nonAdvertisedCount ,
359+ "advertised_but_not_selected" : advertisedButNotSelected ,
360+ "high_water_mark" : tm .highWaterMark ,
361+ "advertised_subnets" : tm .advertisedSubnets ,
362+ "selected_subnet" : tm .selectedSubnet ,
325363 }).Warn ("Approaching subnet high water mark threshold" )
326364 }
327365
328- // Only mismatch if we exceed high water mark
366+ // Only mismatch if we exceed high water mark with truly non-advertised subnets
367+ // Advertised-but-not-selected subnets are NOT counted towards the high water mark
329368 return nonAdvertisedCount > tm .highWaterMark
330369}
331370
@@ -394,6 +433,31 @@ func (tm *topicManager) StartSubnetRefresh(ctx context.Context, refreshInterval
394433 }).Info ("Advertised subnets changed, updating" )
395434
396435 tm .advertisedSubnets = newSubnets
436+
437+ // Re-select a random subnet when advertised subnets change
438+ attestationsWillBeEnabled := tm .attestationMaxSubnets > 0 && len (newSubnets ) <= tm .attestationMaxSubnets
439+
440+ if len (newSubnets ) > 0 && attestationsWillBeEnabled {
441+ n , err := rand .Int (rand .Reader , big .NewInt (int64 (len (newSubnets ))))
442+ if err != nil {
443+ tm .selectedSubnet = newSubnets [0 ]
444+ tm .log .WithError (err ).Warn ("Failed to randomly select subnet, using first subnet" )
445+ } else {
446+ tm .selectedSubnet = newSubnets [n .Int64 ()]
447+ }
448+
449+ tm .log .WithFields (logrus.Fields {
450+ "selected_subnet" : tm .selectedSubnet ,
451+ }).Info ("Re-selected random subnet for forwarding" )
452+ } else {
453+ tm .selectedSubnet = - 1
454+ if len (newSubnets ) > 0 && ! attestationsWillBeEnabled {
455+ tm .log .WithFields (logrus.Fields {
456+ "subnet_count" : len (newSubnets ),
457+ "max_subnets" : tm .attestationMaxSubnets ,
458+ }).Debug ("Attestations disabled due to subnet count exceeding threshold" )
459+ }
460+ }
397461 }
398462
399463 tm .mu .Unlock ()
0 commit comments