Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions networking/ocr_endpoint_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func (o *ocrEndpointV2) Start() error {
}

for oid := range o.streams {
oid := oid
o.subs.Go(func() {
o.runRecv(oid)
})
Expand Down Expand Up @@ -322,7 +321,6 @@ func (o *ocrEndpointV2) Broadcast(payload []byte) {
var subs subprocesses.Subprocesses
defer subs.Wait()
for oracleID := range o.peerMapping {
oracleID := oracleID
subs.Go(func() {
o.SendTo(payload, oracleID)
})
Expand Down
1 change: 0 additions & 1 deletion offchainreporting2plus/chains/evmutil/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func ContractConfigFromConfigSetEvent(changed ocr2aggregator.OCR2AggregatorConfi
}
signers := []types.OnchainPublicKey{}
for _, addr := range changed.Signers {
addr := addr
signers = append(signers, types.OnchainPublicKey(addr[:]))
}
return types.ContractConfig{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ func enprotoOffchainConfig(o offchainConfig) OffchainConfigProto {
}
offchainPublicKeys := make([][]byte, 0, len(o.OffchainPublicKeys))
for _, k := range o.OffchainPublicKeys {
k := k // have to copy or we append the same key over and over
offchainPublicKeys = append(offchainPublicKeys, k[:])
}
maxDurationInitializationNanoseconds := (*uint64)(nil)
Expand Down Expand Up @@ -202,7 +201,6 @@ func enprotoOffchainConfig(o offchainConfig) OffchainConfigProto {
func enprotoSharedSecretEncryptions(e config.SharedSecretEncryptions) SharedSecretEncryptionsProto {
encs := make([][]byte, 0, len(e.Encryptions))
for _, enc := range e.Encryptions {
enc := enc
encs = append(encs, enc[:])
}
return SharedSecretEncryptionsProto{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ func enprotoOffchainConfig(o offchainConfig) OffchainConfigProto {
}
offchainPublicKeys := make([][]byte, 0, len(o.OffchainPublicKeys))
for _, k := range o.OffchainPublicKeys {
k := k // have to copy or we append the same key over and over
offchainPublicKeys = append(offchainPublicKeys, k[:])
}
maxDurationInitializationNanoseconds := (*uint64)(nil)
Expand Down Expand Up @@ -205,7 +204,6 @@ func enprotoOffchainConfig(o offchainConfig) OffchainConfigProto {
func enprotoSharedSecretEncryptions(e config.SharedSecretEncryptions) SharedSecretEncryptionsProto {
encs := make([][]byte, 0, len(e.Encryptions))
for _, enc := range e.Encryptions {
enc := enc
encs = append(encs, enc[:])
}
return SharedSecretEncryptionsProto{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ type reportAttestationState[RI any] struct {

scheduler *scheduler.Scheduler[EventMissingOutcome[RI]]
chLocalEvent chan EventComputedReports[RI]
// reap() is used to prevent unbounded state growth of rounds
// reap() is used to prevent unbounded state growth of rounds.

rounds map[uint64]*round[RI]
// highest sequence number for which we have attested reports
highestAttestedSeqNr uint64
// highest sequence number for which we have received report signatures
// from each oracle

// Highest sequence number for which we know a certified commit exists.
// This is used for determining the window of rounds we keep in memory.
// Computed as select_largest(f+1, highestReportSignaturesSeqNr).
highWaterMark uint64
highestReportSignaturesSeqNr []uint64
}

Expand Down Expand Up @@ -125,6 +127,8 @@ func (repatt *reportAttestationState[RI]) messageReportSignatures(
msg MessageReportSignatures[RI],
sender commontypes.OracleID,
) {
repatt.tryReap(msg.SeqNr, sender)

if repatt.isBeyondExpiry(msg.SeqNr) {
repatt.logger.Debug("ignoring MessageReportSignatures for expired seqNr", commontypes.LogFields{
"seqNr": msg.SeqNr,
Expand All @@ -133,10 +137,6 @@ func (repatt *reportAttestationState[RI]) messageReportSignatures(
return
}

if repatt.highestReportSignaturesSeqNr[sender] < msg.SeqNr {
repatt.highestReportSignaturesSeqNr[sender] = msg.SeqNr
}

if repatt.isBeyondLookahead(msg.SeqNr) {
repatt.logger.Debug("ignoring MessageReportSignatures for seqNr beyond lookahead", commontypes.LogFields{
"seqNr": msg.SeqNr,
Expand Down Expand Up @@ -377,10 +377,6 @@ func (repatt *reportAttestationState[RI]) tryComplete(seqNr uint64) {
return
}

if repatt.highestAttestedSeqNr < seqNr {
repatt.highestAttestedSeqNr = seqNr
}

repatt.rounds[seqNr].complete = true

repatt.logger.Debug("sending attested reports to transmission protocol", commontypes.LogFields{
Expand All @@ -402,8 +398,6 @@ func (repatt *reportAttestationState[RI]) tryComplete(seqNr uint64) {
case <-repatt.ctx.Done():
}
}

repatt.reap()
}

func (repatt *reportAttestationState[RI]) verifySignatures(publicKey types.OnchainPublicKey, seqNr uint64, reportsPlus []ocr3types.ReportPlus[RI], signatures [][]byte) bool {
Expand All @@ -423,8 +417,6 @@ func (repatt *reportAttestationState[RI]) verifySignatures(publicKey types.Oncha
allValid := true

for k := 0; k < n; k++ {
k := k

go func() {
defer wg.Done()
for i := k; i < len(reportsPlus); i += n {
Expand Down Expand Up @@ -516,8 +508,9 @@ func (repatt *reportAttestationState[RI]) backgroundComputeReports(ctx context.C
func (repatt *reportAttestationState[RI]) eventComputedReports(ev EventComputedReports[RI]) {
if repatt.rounds[ev.SeqNr] == nil {
repatt.logger.Debug("discarding EventComputedReports from old round", commontypes.LogFields{
"evSeqNr": ev.SeqNr,
"highestAttestedSeqNr": repatt.highestAttestedSeqNr,
"evSeqNr": ev.SeqNr,
"highWaterMark": repatt.highWaterMark,
"expiryRounds": repatt.expiryRounds(),
})
return
}
Expand Down Expand Up @@ -550,35 +543,57 @@ func (repatt *reportAttestationState[RI]) eventComputedReports(ev EventComputedR
// no need to call tryComplete since receipt of our own MessageReportSignatures will do so
}

// reap expired rounds if there is a new high water mark
func (repatt *reportAttestationState[RI]) tryReap(seqNr uint64, sender commontypes.OracleID) {
if repatt.highestReportSignaturesSeqNr[sender] >= seqNr {
return
}

repatt.highestReportSignaturesSeqNr[sender] = seqNr

var newHighWaterMark uint64
{
highestReportSignaturesSeqNr := append([]uint64{}, repatt.highestReportSignaturesSeqNr...)
sort.Slice(highestReportSignaturesSeqNr, func(i, j int) bool {
return highestReportSignaturesSeqNr[i] > highestReportSignaturesSeqNr[j]
})
newHighWaterMark = highestReportSignaturesSeqNr[repatt.config.F] // (f+1)th largest seqNr
}

if repatt.highWaterMark >= newHighWaterMark {
return
}

repatt.highWaterMark = newHighWaterMark // (f+1)th largest seqNr
repatt.reap()
}

func (repatt *reportAttestationState[RI]) isBeyondExpiry(seqNr uint64) bool {
highest := repatt.highestAttestedSeqNr
expiry := uint64(repatt.expiryRounds())
if highest <= expiry {
if repatt.highWaterMark <= expiry {
return false
}
return seqNr < highest-expiry
return seqNr < repatt.highWaterMark-expiry
}

func (repatt *reportAttestationState[RI]) isBeyondLookahead(seqNr uint64) bool {
highestReportSignaturesSeqNr := append([]uint64{}, repatt.highestReportSignaturesSeqNr...)
sort.Slice(highestReportSignaturesSeqNr, func(i, j int) bool {
return highestReportSignaturesSeqNr[i] > highestReportSignaturesSeqNr[j]
})
highest := highestReportSignaturesSeqNr[repatt.config.F] // (f+1)th largest seqNr
lookahead := uint64(repatt.lookaheadRounds())
if seqNr <= lookahead {
return false
}
return highest < seqNr-lookahead
return repatt.highWaterMark < seqNr-lookahead
}

// reap expired entries from repatt.finalized to prevent unbounded state growth
// reap expired entries from repatt.rounds to prevent unbounded state growth
func (repatt *reportAttestationState[RI]) reap() {
maxActiveRoundCount := repatt.expiryRounds() + repatt.lookaheadRounds()
// only reap if more than ~ a third of the rounds can be discarded
// only reap if more than ~ a third of the rounds can potentially be discarded
if 3*len(repatt.rounds) <= 4*maxActiveRoundCount {
return
}

beforeRounds := len(repatt.rounds)

// A long time ago in a galaxy far, far away, Go used to leak memory when
// repeatedly adding and deleting from the same map without ever exceeding
// some maximum length. Fortunately, this is no longer the case
Expand All @@ -589,6 +604,12 @@ func (repatt *reportAttestationState[RI]) reap() {
delete(repatt.rounds, seqNr)
}
}

repatt.logger.Debug("reaped expired rounds", commontypes.LogFields{
"before": beforeRounds,
"after": len(repatt.rounds),
"highWaterMark": repatt.highWaterMark,
})
}

// The age (denoted in rounds) after which a report is considered expired and
Expand Down
1 change: 0 additions & 1 deletion ragep2p/ragep2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,6 @@ func (ho *Host) dialLoop() {
}
ho.peersMu.Unlock()
for _, p := range peers {
p := p // copy for goroutine
ds := dialStates[p.other]
dialProcesses.Go(func() {
p.connLifeCycleMu.Lock()
Expand Down
Loading