Skip to content

Commit 059995d

Browse files
committed
Skip noisy warnings "Failed to process announcement from reconcile" / "peer is not an oracle in any of our jobs"
OCR3.1 polishing
1 parent babe0ec commit 059995d

File tree

6 files changed

+67
-36
lines changed

6 files changed

+67
-36
lines changed

networking/ragedisco/discovery_protocol.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,8 @@ func (p *discoveryProtocol) lockedProcessAnnouncement(ann Announcement) error {
542542
}
543543

544544
if p.locked.numGroupsByOracle[pid] == 0 {
545-
return fmt.Errorf("peer %s is not an oracle in any of our jobs; perhaps whoever sent this is running a job that includes us and this peer, but we are not running that job", pid)
545+
return nil
546+
546547
}
547548

548549
err = ann.verify()

offchainreporting2plus/internal/ocr3_1/protocol/oracle.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ func (o *oracleState[RI]) run() {
167167
chNetToStatePersistence := make(chan MessageToStatePersistenceWithSender[RI])
168168
o.chNetToStatePersistence = chNetToStatePersistence
169169

170+
chOutcomeGenerationToStatePersistence := make(chan EventToStatePersistence[RI])
170171
chReportAttestationToStatePersistence := make(chan EventToStatePersistence[RI])
171172

172173
chNetToBlobExchange := make(chan MessageToBlobExchangeWithSender[RI])
@@ -243,6 +244,7 @@ func (o *oracleState[RI]) run() {
243244
chPacemakerToOutcomeGeneration,
244245
chOutcomeGenerationToPacemaker,
245246
chOutcomeGenerationToReportAttestation,
247+
chOutcomeGenerationToStatePersistence,
246248
&blobEndpoint,
247249
o.config,
248250
o.database,
@@ -282,6 +284,7 @@ func (o *oracleState[RI]) run() {
282284
o.childCtx,
283285

284286
chNetToStatePersistence,
287+
chOutcomeGenerationToStatePersistence,
285288
chReportAttestationToStatePersistence,
286289
o.config,
287290
o.database,

offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func RunOutcomeGeneration[RI any](
3333
chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI],
3434
chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI],
3535
chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI],
36+
chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI],
3637
blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher,
3738
config ocr3config.SharedConfig,
3839
database Database,
@@ -58,6 +59,7 @@ func RunOutcomeGeneration[RI any](
5859
chPacemakerToOutcomeGeneration: chPacemakerToOutcomeGeneration,
5960
chOutcomeGenerationToPacemaker: chOutcomeGenerationToPacemaker,
6061
chOutcomeGenerationToReportAttestation: chOutcomeGenerationToReportAttestation,
62+
chOutcomeGenerationToStatePersistence: chOutcomeGenerationToStatePersistence,
6163
blobBroadcastFetcher: blobBroadcastFetcher,
6264
config: config,
6365
database: database,
@@ -83,6 +85,7 @@ type outcomeGenerationState[RI any] struct {
8385
chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI]
8486
chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI]
8587
chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI]
88+
chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI]
8689
blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher
8790
config ocr3config.SharedConfig
8891
database Database

offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation_follower.go

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar
8383

8484
outgen.refreshCommittedSeqNrAndCert()
8585
if !outgen.ensureHighestCertifiedIsCompatible(msg.EpochStartProof.HighestCertified, "MessageEpochStart") {
86+
select {
87+
case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{
88+
msg.EpochStartProof.HighestCertified.SeqNr(),
89+
}:
90+
case <-outgen.ctx.Done():
91+
}
8692
return
8793
}
8894

@@ -154,6 +160,7 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar
154160
prepareQc.ReportsPlusPrecursor,
155161
reportPlusPrecursorDigest,
156162
}
163+
157164
outgen.logger.Debug("broadcasting MessagePrepare (reproposal where prepareQcSeqNr == committedSeqNr)", commontypes.LogFields{
158165
"seqNr": outgen.sharedState.seqNr,
159166
})
@@ -914,38 +921,33 @@ func (outgen *outcomeGenerationState[RI]) tryProcessCommitPool() {
914921
return
915922
}
916923

917-
if outgen.followerState.openKVTxn == nil {
918-
outgen.logger.Critical("assumption violation, open kv transaction must exist in this phase", commontypes.LogFields{
919-
"seqNr": outgen.sharedState.seqNr,
920-
"phase": outgen.followerState.phase,
921-
})
922-
panic("")
923-
}
924-
err := outgen.followerState.openKVTxn.Commit()
925-
outgen.followerState.openKVTxn.Discard()
926-
outgen.followerState.openKVTxn = nil
927-
if err != nil {
928-
outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{
929-
"seqNr": outgen.sharedState.seqNr,
930-
"error": err,
931-
})
932-
933-
{
934-
kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
935-
if err != nil {
936-
outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{
937-
"seqNr": outgen.sharedState.seqNr,
938-
"error": err,
939-
})
940-
return
941-
}
924+
if outgen.followerState.openKVTxn != nil {
925+
err := outgen.followerState.openKVTxn.Commit()
926+
outgen.followerState.openKVTxn.Discard()
927+
outgen.followerState.openKVTxn = nil
928+
if err != nil {
929+
outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{
930+
"seqNr": outgen.sharedState.seqNr,
931+
"error": err,
932+
})
942933

943-
if kvSeqNr < outgen.sharedState.seqNr {
944-
outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{
945-
"seqNr": outgen.sharedState.seqNr,
946-
"kvSeqNr": kvSeqNr,
947-
})
948-
return
934+
{
935+
kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
936+
if err != nil {
937+
outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{
938+
"seqNr": outgen.sharedState.seqNr,
939+
"error": err,
940+
})
941+
return
942+
}
943+
944+
if kvSeqNr < outgen.sharedState.seqNr {
945+
outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{
946+
"seqNr": outgen.sharedState.seqNr,
947+
"kvSeqNr": kvSeqNr,
948+
})
949+
return
950+
}
949951
}
950952
}
951953
}
@@ -1252,6 +1254,7 @@ func (outgen *outcomeGenerationState[RI]) persistCommitAsBlock(commit *Certified
12521254
}
12531255

12541256
func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() {
1257+
12551258
preRefreshCommittedSeqNr := outgen.sharedState.committedSeqNr
12561259

12571260
postRefreshCommittedSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
@@ -1269,9 +1272,20 @@ func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() {
12691272
})
12701273

12711274
if postRefreshCommittedSeqNr == preRefreshCommittedSeqNr {
1275+
return
1276+
} else if postRefreshCommittedSeqNr+1 == preRefreshCommittedSeqNr {
1277+
1278+
logger.Warn("last kv transaction commit failed, requesting state sync", nil)
1279+
select {
1280+
case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{
1281+
preRefreshCommittedSeqNr,
1282+
}:
1283+
case <-outgen.ctx.Done():
1284+
}
1285+
12721286
return
12731287
} else if postRefreshCommittedSeqNr < preRefreshCommittedSeqNr {
1274-
logger.Critical("assumption violation, kv is behind what outgen knows as committed", nil)
1288+
logger.Critical("assumption violation, kv is way behind what outgen knows as committed", nil)
12751289
panic("")
12761290
}
12771291

offchainreporting2plus/internal/ocr3_1/protocol/state_persistence.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func RunStatePersistence[RI any](
1717
ctx context.Context,
1818

1919
chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI],
20+
chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI],
2021
chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI],
2122
config ocr3config.SharedConfig,
2223
database Database,
@@ -32,6 +33,7 @@ func RunStatePersistence[RI any](
3233
defer sched.Close()
3334

3435
newStatePersistenceState(ctx, chNetToStatePersistence,
36+
chOutcomeGenerationToStatePersistence,
3537
chReportAttestationToStatePersistence,
3638
config, database, id, kvStore, logger, netSender, reportingPlugin, sched).run(restoredState)
3739
}
@@ -42,6 +44,7 @@ type statePersistenceState[RI any] struct {
4244
ctx context.Context
4345

4446
chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI]
47+
chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI]
4548
chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI]
4649
tTryReplay <-chan time.Time
4750
config ocr3config.SharedConfig
@@ -70,6 +73,8 @@ func (state *statePersistenceState[RI]) run(restoredState StatePersistenceState)
7073
select {
7174
case msg := <-state.chNetToStatePersistence:
7275
msg.msg.processStatePersistence(state, msg.sender)
76+
case ev := <-state.chOutcomeGenerationToStatePersistence:
77+
ev.processStatePersistence(state)
7378
case ev := <-state.chReportAttestationToStatePersistence:
7479
ev.processStatePersistence(state)
7580
case ev := <-state.blockSyncState.scheduler.Scheduled():
@@ -180,6 +185,7 @@ func (state *statePersistenceState[RI]) eventStateSyncRequest(ev EventStateSyncR
180185
state.logger.Debug("received EventStateSyncRequest", commontypes.LogFields{
181186
"heardSeqNr": ev.SeqNr,
182187
})
188+
state.tTryReplay = time.After(0)
183189
state.heardSeqNr(ev.SeqNr)
184190
}
185191

@@ -319,7 +325,7 @@ func (state *statePersistenceState[RI]) eventReadyToSendNextBlockSyncRequest(ev
319325
func newStatePersistenceState[RI any](
320326
ctx context.Context,
321327
chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI],
322-
328+
chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI],
323329
chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI],
324330
config ocr3config.SharedConfig,
325331
database Database,
@@ -348,6 +354,7 @@ func newStatePersistenceState[RI any](
348354
ctx,
349355

350356
chNetToStatePersistence,
357+
chOutcomeGenerationToStatePersistence,
351358
chReportAttestationToStatePersistence,
352359
tTryReplay,
353360
config,

offchainreporting2plus/internal/ocr3_1/serialization/serialization.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,7 +1188,10 @@ func (fpm *fromProtoMessage[RI]) chunkDigests(pbcds [][]byte) ([]protocol.BlobCh
11881188
}
11891189
cds := make([]protocol.BlobChunkDigest, 0, len(pbcds))
11901190
for _, pbcd := range pbcds {
1191-
cds = append(cds, protocol.BlobChunkDigest(pbcd))
1191+
var blockChunkDigest protocol.BlobChunkDigest
1192+
copy(blockChunkDigest[:], pbcd)
1193+
1194+
cds = append(cds, blockChunkDigest)
11921195
}
11931196
return cds, nil
11941197
}
@@ -1217,7 +1220,7 @@ func (fpm *fromProtoMessage[RI]) messageBlobChunkResponse(m *MessageBlobChunkRes
12171220
copy(blobDigest[:], m.BlobDigest)
12181221

12191222
return protocol.MessageBlobChunkResponse[RI]{
1220-
fpm.requestHandle,
1223+
nil, // TODO: consider using a sentinel value here, e.g. "EmptyRequestHandleForInboundResponse"
12211224
blobDigest,
12221225
m.ChunkIndex,
12231226
m.Chunk,

0 commit comments

Comments
 (0)