@@ -64,12 +64,14 @@ type reportAttestationState[RI any] struct {
6464
6565 scheduler * scheduler.Scheduler [EventMissingOutcome [RI ]]
6666 chLocalEvent chan EventComputedReports [RI ]
67- // reap() is used to prevent unbounded state growth of rounds
67+ // reap() is used to prevent unbounded state growth of rounds.
68+
6869 rounds map [uint64 ]* round [RI ]
69- // highest sequence number for which we have attested reports
70- highestAttestedSeqNr uint64
71- // highest sequence number for which we have received report signatures
72- // from each oracle
70+
71+ // Highest sequence number for which we know a certified commit exists.
72+ // This is used for determining the window of rounds we keep in memory.
73+ // Computed as select_largest(f+1, highestReportSignaturesSeqNr).
74+ highWaterMark uint64
7375 highestReportSignaturesSeqNr []uint64
7476}
7577
@@ -125,6 +127,8 @@ func (repatt *reportAttestationState[RI]) messageReportSignatures(
125127 msg MessageReportSignatures [RI ],
126128 sender commontypes.OracleID ,
127129) {
130+ repatt .tryReap (msg .SeqNr , sender )
131+
128132 if repatt .isBeyondExpiry (msg .SeqNr ) {
129133 repatt .logger .Debug ("ignoring MessageReportSignatures for expired seqNr" , commontypes.LogFields {
130134 "seqNr" : msg .SeqNr ,
@@ -133,10 +137,6 @@ func (repatt *reportAttestationState[RI]) messageReportSignatures(
133137 return
134138 }
135139
136- if repatt .highestReportSignaturesSeqNr [sender ] < msg .SeqNr {
137- repatt .highestReportSignaturesSeqNr [sender ] = msg .SeqNr
138- }
139-
140140 if repatt .isBeyondLookahead (msg .SeqNr ) {
141141 repatt .logger .Debug ("ignoring MessageReportSignatures for seqNr beyond lookahead" , commontypes.LogFields {
142142 "seqNr" : msg .SeqNr ,
@@ -377,10 +377,6 @@ func (repatt *reportAttestationState[RI]) tryComplete(seqNr uint64) {
377377 return
378378 }
379379
380- if repatt .highestAttestedSeqNr < seqNr {
381- repatt .highestAttestedSeqNr = seqNr
382- }
383-
384380 repatt .rounds [seqNr ].complete = true
385381
386382 repatt .logger .Debug ("sending attested reports to transmission protocol" , commontypes.LogFields {
@@ -402,8 +398,6 @@ func (repatt *reportAttestationState[RI]) tryComplete(seqNr uint64) {
402398 case <- repatt .ctx .Done ():
403399 }
404400 }
405-
406- repatt .reap ()
407401}
408402
409403func (repatt * reportAttestationState [RI ]) verifySignatures (publicKey types.OnchainPublicKey , seqNr uint64 , reportsPlus []ocr3types.ReportPlus [RI ], signatures [][]byte ) bool {
@@ -423,8 +417,6 @@ func (repatt *reportAttestationState[RI]) verifySignatures(publicKey types.Oncha
423417 allValid := true
424418
425419 for k := 0 ; k < n ; k ++ {
426- k := k
427-
428420 go func () {
429421 defer wg .Done ()
430422 for i := k ; i < len (reportsPlus ); i += n {
@@ -516,8 +508,9 @@ func (repatt *reportAttestationState[RI]) backgroundComputeReports(ctx context.C
516508func (repatt * reportAttestationState [RI ]) eventComputedReports (ev EventComputedReports [RI ]) {
517509 if repatt .rounds [ev .SeqNr ] == nil {
518510 repatt .logger .Debug ("discarding EventComputedReports from old round" , commontypes.LogFields {
519- "evSeqNr" : ev .SeqNr ,
520- "highestAttestedSeqNr" : repatt .highestAttestedSeqNr ,
511+ "evSeqNr" : ev .SeqNr ,
512+ "highWaterMark" : repatt .highWaterMark ,
513+ "expiryRounds" : repatt .expiryRounds (),
521514 })
522515 return
523516 }
@@ -550,35 +543,57 @@ func (repatt *reportAttestationState[RI]) eventComputedReports(ev EventComputedR
550543 // no need to call tryComplete since receipt of our own MessageReportSignatures will do so
551544}
552545
546+ // reap expired rounds if there is a new high water mark
547+ func (repatt * reportAttestationState [RI ]) tryReap (seqNr uint64 , sender commontypes.OracleID ) {
548+ if repatt .highestReportSignaturesSeqNr [sender ] >= seqNr {
549+ return
550+ }
551+
552+ repatt .highestReportSignaturesSeqNr [sender ] = seqNr
553+
554+ var newHighWaterMark uint64
555+ {
556+ highestReportSignaturesSeqNr := append ([]uint64 {}, repatt .highestReportSignaturesSeqNr ... )
557+ sort .Slice (highestReportSignaturesSeqNr , func (i , j int ) bool {
558+ return highestReportSignaturesSeqNr [i ] > highestReportSignaturesSeqNr [j ]
559+ })
560+ newHighWaterMark = highestReportSignaturesSeqNr [repatt .config .F ] // (f+1)th largest seqNr
561+ }
562+
563+ if repatt .highWaterMark >= newHighWaterMark {
564+ return
565+ }
566+
567+ repatt .highWaterMark = newHighWaterMark // (f+1)th largest seqNr
568+ repatt .reap ()
569+ }
570+
553571func (repatt * reportAttestationState [RI ]) isBeyondExpiry (seqNr uint64 ) bool {
554- highest := repatt .highestAttestedSeqNr
555572 expiry := uint64 (repatt .expiryRounds ())
556- if highest <= expiry {
573+ if repatt . highWaterMark <= expiry {
557574 return false
558575 }
559- return seqNr < highest - expiry
576+ return seqNr < repatt . highWaterMark - expiry
560577}
561578
562579func (repatt * reportAttestationState [RI ]) isBeyondLookahead (seqNr uint64 ) bool {
563- highestReportSignaturesSeqNr := append ([]uint64 {}, repatt .highestReportSignaturesSeqNr ... )
564- sort .Slice (highestReportSignaturesSeqNr , func (i , j int ) bool {
565- return highestReportSignaturesSeqNr [i ] > highestReportSignaturesSeqNr [j ]
566- })
567- highest := highestReportSignaturesSeqNr [repatt .config .F ] // (f+1)th largest seqNr
568580 lookahead := uint64 (repatt .lookaheadRounds ())
569581 if seqNr <= lookahead {
570582 return false
571583 }
572- return highest < seqNr - lookahead
584+ return repatt . highWaterMark < seqNr - lookahead
573585}
574586
575- // reap expired entries from repatt.finalized to prevent unbounded state growth
587+ // reap expired entries from repatt.rounds to prevent unbounded state growth
576588func (repatt * reportAttestationState [RI ]) reap () {
577589 maxActiveRoundCount := repatt .expiryRounds () + repatt .lookaheadRounds ()
578- // only reap if more than ~ a third of the rounds can be discarded
590+ // only reap if more than ~ a third of the rounds can potentially be discarded
579591 if 3 * len (repatt .rounds ) <= 4 * maxActiveRoundCount {
580592 return
581593 }
594+
595+ beforeRounds := len (repatt .rounds )
596+
582597 // A long time ago in a galaxy far, far away, Go used to leak memory when
583598 // repeatedly adding and deleting from the same map without ever exceeding
584599 // some maximum length. Fortunately, this is no longer the case
@@ -589,6 +604,12 @@ func (repatt *reportAttestationState[RI]) reap() {
589604 delete (repatt .rounds , seqNr )
590605 }
591606 }
607+
608+ repatt .logger .Debug ("reaped expired rounds" , commontypes.LogFields {
609+ "before" : beforeRounds ,
610+ "after" : len (repatt .rounds ),
611+ "highWaterMark" : repatt .highWaterMark ,
612+ })
592613}
593614
594615// The age (denoted in rounds) after which a report is considered expired and
0 commit comments