diff --git a/networking/ocr_endpoint_v2.go b/networking/ocr_endpoint_v2.go index 9fa9272a..5d81a71f 100644 --- a/networking/ocr_endpoint_v2.go +++ b/networking/ocr_endpoint_v2.go @@ -190,7 +190,6 @@ func (o *ocrEndpointV2) Start() error { } for oid := range o.streams { - oid := oid o.subs.Go(func() { o.runRecv(oid) }) @@ -322,7 +321,6 @@ func (o *ocrEndpointV2) Broadcast(payload []byte) { var subs subprocesses.Subprocesses defer subs.Wait() for oracleID := range o.peerMapping { - oracleID := oracleID subs.Go(func() { o.SendTo(payload, oracleID) }) diff --git a/offchainreporting2plus/chains/evmutil/evm.go b/offchainreporting2plus/chains/evmutil/evm.go index 1de028f9..9b169937 100644 --- a/offchainreporting2plus/chains/evmutil/evm.go +++ b/offchainreporting2plus/chains/evmutil/evm.go @@ -35,7 +35,6 @@ func ContractConfigFromConfigSetEvent(changed ocr2aggregator.OCR2AggregatorConfi } signers := []types.OnchainPublicKey{} for _, addr := range changed.Signers { - addr := addr signers = append(signers, types.OnchainPublicKey(addr[:])) } return types.ContractConfig{ diff --git a/offchainreporting2plus/internal/config/ocr2config/serialize.go b/offchainreporting2plus/internal/config/ocr2config/serialize.go index 8c560d06..1e62549b 100644 --- a/offchainreporting2plus/internal/config/ocr2config/serialize.go +++ b/offchainreporting2plus/internal/config/ocr2config/serialize.go @@ -164,7 +164,6 @@ func enprotoOffchainConfig(o offchainConfig) OffchainConfigProto { } offchainPublicKeys := make([][]byte, 0, len(o.OffchainPublicKeys)) for _, k := range o.OffchainPublicKeys { - k := k // have to copy or we append the same key over and over offchainPublicKeys = append(offchainPublicKeys, k[:]) } maxDurationInitializationNanoseconds := (*uint64)(nil) @@ -202,7 +201,6 @@ func enprotoOffchainConfig(o offchainConfig) OffchainConfigProto { func enprotoSharedSecretEncryptions(e config.SharedSecretEncryptions) SharedSecretEncryptionsProto { encs := make([][]byte, 0, len(e.Encryptions)) for _, enc := range e.Encryptions { - enc := enc encs = append(encs, enc[:]) } return SharedSecretEncryptionsProto{ diff --git a/offchainreporting2plus/internal/config/ocr3config/serialize.go b/offchainreporting2plus/internal/config/ocr3config/serialize.go index cb041a41..e57c368b 100644 --- a/offchainreporting2plus/internal/config/ocr3config/serialize.go +++ b/offchainreporting2plus/internal/config/ocr3config/serialize.go @@ -166,7 +166,6 @@ func enprotoOffchainConfig(o offchainConfig) OffchainConfigProto { } offchainPublicKeys := make([][]byte, 0, len(o.OffchainPublicKeys)) for _, k := range o.OffchainPublicKeys { - k := k // have to copy or we append the same key over and over offchainPublicKeys = append(offchainPublicKeys, k[:]) } maxDurationInitializationNanoseconds := (*uint64)(nil) @@ -205,7 +204,6 @@ func enprotoOffchainConfig(o offchainConfig) OffchainConfigProto { func enprotoSharedSecretEncryptions(e config.SharedSecretEncryptions) SharedSecretEncryptionsProto { encs := make([][]byte, 0, len(e.Encryptions)) for _, enc := range e.Encryptions { - enc := enc encs = append(encs, enc[:]) } return SharedSecretEncryptionsProto{ diff --git a/offchainreporting2plus/internal/ocr3/protocol/report_attestation.go b/offchainreporting2plus/internal/ocr3/protocol/report_attestation.go index f0a760ce..7d67bb7c 100644 --- a/offchainreporting2plus/internal/ocr3/protocol/report_attestation.go +++ b/offchainreporting2plus/internal/ocr3/protocol/report_attestation.go @@ -64,12 +64,14 @@ type reportAttestationState[RI any] struct { scheduler *scheduler.Scheduler[EventMissingOutcome[RI]] chLocalEvent chan EventComputedReports[RI] - // reap() is used to prevent unbounded state growth of rounds + // reap() is used to prevent unbounded state growth of rounds. + rounds map[uint64]*round[RI] - // highest sequence number for which we have attested reports - highestAttestedSeqNr uint64 - // highest sequence number for which we have received report signatures - // from each oracle + + // Highest sequence number for which we know a certified commit exists. + // This is used for determining the window of rounds we keep in memory. + // Computed as select_largest(f+1, highestReportSignaturesSeqNr). + highWaterMark uint64 highestReportSignaturesSeqNr []uint64 } @@ -125,6 +127,8 @@ func (repatt *reportAttestationState[RI]) messageReportSignatures( msg MessageReportSignatures[RI], sender commontypes.OracleID, ) { + repatt.tryReap(msg.SeqNr, sender) + if repatt.isBeyondExpiry(msg.SeqNr) { repatt.logger.Debug("ignoring MessageReportSignatures for expired seqNr", commontypes.LogFields{ "seqNr": msg.SeqNr, @@ -133,10 +137,6 @@ func (repatt *reportAttestationState[RI]) messageReportSignatures( return } - if repatt.highestReportSignaturesSeqNr[sender] < msg.SeqNr { - repatt.highestReportSignaturesSeqNr[sender] = msg.SeqNr - } - if repatt.isBeyondLookahead(msg.SeqNr) { repatt.logger.Debug("ignoring MessageReportSignatures for seqNr beyond lookahead", commontypes.LogFields{ "seqNr": msg.SeqNr, @@ -377,10 +377,6 @@ func (repatt *reportAttestationState[RI]) tryComplete(seqNr uint64) { return } - if repatt.highestAttestedSeqNr < seqNr { - repatt.highestAttestedSeqNr = seqNr - } - repatt.rounds[seqNr].complete = true repatt.logger.Debug("sending attested reports to transmission protocol", commontypes.LogFields{ @@ -402,8 +398,6 @@ func (repatt *reportAttestationState[RI]) tryComplete(seqNr uint64) { case <-repatt.ctx.Done(): } } - - repatt.reap() } func (repatt *reportAttestationState[RI]) verifySignatures(publicKey types.OnchainPublicKey, seqNr uint64, reportsPlus []ocr3types.ReportPlus[RI], signatures [][]byte) bool { @@ -423,8 +417,6 @@ func (repatt *reportAttestationState[RI]) verifySignatures(publicKey types.Oncha allValid := true for k := 0; k < n; k++ { - k := k - go func() { defer wg.Done() for i := k; i < len(reportsPlus); i += n { @@ -516,8 +508,9 @@ func (repatt *reportAttestationState[RI]) backgroundComputeReports(ctx context.C func (repatt *reportAttestationState[RI]) eventComputedReports(ev EventComputedReports[RI]) { if repatt.rounds[ev.SeqNr] == nil { repatt.logger.Debug("discarding EventComputedReports from old round", commontypes.LogFields{ - "evSeqNr": ev.SeqNr, - "highestAttestedSeqNr": repatt.highestAttestedSeqNr, + "evSeqNr": ev.SeqNr, + "highWaterMark": repatt.highWaterMark, + "expiryRounds": repatt.expiryRounds(), }) return } @@ -550,35 +543,57 @@ func (repatt *reportAttestationState[RI]) eventComputedReports(ev EventComputedR // no need to call tryComplete since receipt of our own MessageReportSignatures will do so } +// reap expired rounds if there is a new high water mark +func (repatt *reportAttestationState[RI]) tryReap(seqNr uint64, sender commontypes.OracleID) { + if repatt.highestReportSignaturesSeqNr[sender] >= seqNr { + return + } + + repatt.highestReportSignaturesSeqNr[sender] = seqNr + + var newHighWaterMark uint64 + { + highestReportSignaturesSeqNr := append([]uint64{}, repatt.highestReportSignaturesSeqNr...) + sort.Slice(highestReportSignaturesSeqNr, func(i, j int) bool { + return highestReportSignaturesSeqNr[i] > highestReportSignaturesSeqNr[j] + }) + newHighWaterMark = highestReportSignaturesSeqNr[repatt.config.F] // (f+1)th largest seqNr + } + + if repatt.highWaterMark >= newHighWaterMark { + return + } + + repatt.highWaterMark = newHighWaterMark // (f+1)th largest seqNr + repatt.reap() +} + func (repatt *reportAttestationState[RI]) isBeyondExpiry(seqNr uint64) bool { - highest := repatt.highestAttestedSeqNr expiry := uint64(repatt.expiryRounds()) - if highest <= expiry { + if repatt.highWaterMark <= expiry { return false } - return seqNr < highest-expiry + return seqNr < repatt.highWaterMark-expiry } func (repatt *reportAttestationState[RI]) isBeyondLookahead(seqNr uint64) bool { - highestReportSignaturesSeqNr := append([]uint64{}, repatt.highestReportSignaturesSeqNr...) - sort.Slice(highestReportSignaturesSeqNr, func(i, j int) bool { - return highestReportSignaturesSeqNr[i] > highestReportSignaturesSeqNr[j] - }) - highest := highestReportSignaturesSeqNr[repatt.config.F] // (f+1)th largest seqNr lookahead := uint64(repatt.lookaheadRounds()) if seqNr <= lookahead { return false } - return highest < seqNr-lookahead + return repatt.highWaterMark < seqNr-lookahead } -// reap expired entries from repatt.finalized to prevent unbounded state growth +// reap expired entries from repatt.rounds to prevent unbounded state growth func (repatt *reportAttestationState[RI]) reap() { maxActiveRoundCount := repatt.expiryRounds() + repatt.lookaheadRounds() - // only reap if more than ~ a third of the rounds can be discarded + // only reap if more than ~ a third of the rounds can potentially be discarded if 3*len(repatt.rounds) <= 4*maxActiveRoundCount { return } + + beforeRounds := len(repatt.rounds) + // A long time ago in a galaxy far, far away, Go used to leak memory when // repeatedly adding and deleting from the same map without ever exceeding // some maximum length. Fortunately, this is no longer the case @@ -589,6 +604,12 @@ func (repatt *reportAttestationState[RI]) reap() { delete(repatt.rounds, seqNr) } } + + repatt.logger.Debug("reaped expired rounds", commontypes.LogFields{ + "before": beforeRounds, + "after": len(repatt.rounds), + "highWaterMark": repatt.highWaterMark, + }) } // The age (denoted in rounds) after which a report is considered expired and diff --git a/ragep2p/ragep2p.go b/ragep2p/ragep2p.go index 1a7dc08a..a1cccb35 100644 --- a/ragep2p/ragep2p.go +++ b/ragep2p/ragep2p.go @@ -597,7 +597,6 @@ func (ho *Host) dialLoop() { } ho.peersMu.Unlock() for _, p := range peers { - p := p // copy for goroutine ds := dialStates[p.other] dialProcesses.Go(func() { p.connLifeCycleMu.Lock()