Skip to content

Commit 88bb5d9

Browse files
kaleofdutygtklockerstchrysa
committed
Improvements:
- Support DON-to-DON messaging using PeerGroup - OCR3.1 progress - Fix log messages - Add config digests for DON-to-DON messaging. - Add peeridhelper for safe reuse of PeerID for arbitrary message signing Based on 01cc9044e71de1fb6d75056d532641421a34c779 Co-authored-by: Kostis Karantias <[email protected]> Co-authored-by: stchrysa <[email protected]>
1 parent babe0ec commit 88bb5d9

File tree

11 files changed

+320
-112
lines changed

11 files changed

+320
-112
lines changed

networking/peer_group.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,25 @@ import (
1313
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"
1414
)
1515

16-
func peerGroupStreamNamePrefix(configDigestPrefix ocr2types.ConfigDigestPrefix) (streamNamePrefix string, ok bool) {
16+
func peerGroupStreamNamePrefix(configDigest ocr2types.ConfigDigest) (streamNamePrefix string, err error) {
17+
configDigestPrefix := ocr2types.ConfigDigestPrefixFromConfigDigest(configDigest)
18+
1719
switch configDigestPrefix { // nolint:exhaustive
1820
case ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo:
19-
return "ccip-rmn/", true
21+
return "ccip-rmn/", nil
22+
case ocr2types.ConfigDigestPrefixDONToDONMessagingGroup:
23+
// We include the full config digest in the stream name prefix to have clean
24+
// namespacing between different PeerGroups.
25+
return fmt.Sprintf("don-to-don/%s/", configDigest), nil
26+
case ocr2types.ConfigDigestPrefixDONToDONDiscoveryGroup:
27+
// We include the full config digest in the stream name prefix to have clean
28+
// namespacing between different PeerGroups.
29+
// Based on the current don-to-don design, we do not expect to have any streams
30+
// with this config digest prefix, but we nevertheless need to allowlist it
31+
// to enable creation of corresponding PeerGroups.
32+
return fmt.Sprintf("don-to-don-discovery/%s/", configDigest), nil
2033
default:
21-
return "", false
34+
return "", fmt.Errorf("config digest prefix %s is not allowed", configDigestPrefix)
2235
}
2336
}
2437

@@ -84,10 +97,9 @@ func (f *peerGroupFactory) NewPeerGroup(
8497
peerIDs []string,
8598
bootstrappers []commontypes.BootstrapperLocator,
8699
) (PeerGroup, error) {
87-
configDigestPrefix := ocr2types.ConfigDigestPrefixFromConfigDigest(configDigest)
88-
streamNamePrefix, ok := peerGroupStreamNamePrefix(configDigestPrefix)
89-
if !ok {
90-
return nil, fmt.Errorf("config digest prefix %s is not allowed", configDigestPrefix)
100+
streamNamePrefix, err := peerGroupStreamNamePrefix(configDigest)
101+
if err != nil {
102+
return nil, fmt.Errorf("could not get stream name prefix: %w", err)
91103
}
92104

93105
decodedv2PeerIDs, err := decodev2PeerIDs(peerIDs)

offchainreporting/internal/managed/track_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (state *trackConfigState) checkLatestConfigDetails() (
147147
defer configCancel()
148148
contractConfig, err := state.configTracker.ConfigFromLogs(configCtx, changedInBlock)
149149
if err != nil {
150-
state.logger.ErrorIfNotCanceled("TrackConfig: error during LatestConfigDetails()", configCtx, commontypes.LogFields{
150+
state.logger.ErrorIfNotCanceled("TrackConfig: error during ConfigFromLogs()", configCtx, commontypes.LogFields{
151151
"error": err,
152152
})
153153
return nil, true

offchainreporting2plus/internal/managed/track_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (state *trackConfigState) checkLatestConfigDetails() (
121121

122122
contractConfig, err := state.configTracker.LatestConfig(ctx, changedInBlock)
123123
if err != nil {
124-
state.logger.ErrorIfNotCanceled("TrackConfig: error during LatestConfigDetails()", ctx, commontypes.LogFields{
124+
state.logger.ErrorIfNotCanceled("TrackConfig: error during LatestConfig()", ctx, commontypes.LogFields{
125125
"error": err,
126126
})
127127
return nil, true

offchainreporting2plus/internal/ocr3_1/protocol/oracle.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ func (o *oracleState[RI]) run() {
167167
chNetToStatePersistence := make(chan MessageToStatePersistenceWithSender[RI])
168168
o.chNetToStatePersistence = chNetToStatePersistence
169169

170+
chOutcomeGenerationToStatePersistence := make(chan EventToStatePersistence[RI])
170171
chReportAttestationToStatePersistence := make(chan EventToStatePersistence[RI])
171172

172173
chNetToBlobExchange := make(chan MessageToBlobExchangeWithSender[RI])
@@ -243,6 +244,7 @@ func (o *oracleState[RI]) run() {
243244
chPacemakerToOutcomeGeneration,
244245
chOutcomeGenerationToPacemaker,
245246
chOutcomeGenerationToReportAttestation,
247+
chOutcomeGenerationToStatePersistence,
246248
&blobEndpoint,
247249
o.config,
248250
o.database,
@@ -282,6 +284,7 @@ func (o *oracleState[RI]) run() {
282284
o.childCtx,
283285

284286
chNetToStatePersistence,
287+
chOutcomeGenerationToStatePersistence,
285288
chReportAttestationToStatePersistence,
286289
o.config,
287290
o.database,

offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func RunOutcomeGeneration[RI any](
3333
chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI],
3434
chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI],
3535
chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI],
36+
chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI],
3637
blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher,
3738
config ocr3config.SharedConfig,
3839
database Database,
@@ -58,6 +59,7 @@ func RunOutcomeGeneration[RI any](
5859
chPacemakerToOutcomeGeneration: chPacemakerToOutcomeGeneration,
5960
chOutcomeGenerationToPacemaker: chOutcomeGenerationToPacemaker,
6061
chOutcomeGenerationToReportAttestation: chOutcomeGenerationToReportAttestation,
62+
chOutcomeGenerationToStatePersistence: chOutcomeGenerationToStatePersistence,
6163
blobBroadcastFetcher: blobBroadcastFetcher,
6264
config: config,
6365
database: database,
@@ -83,6 +85,7 @@ type outcomeGenerationState[RI any] struct {
8385
chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI]
8486
chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI]
8587
chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI]
88+
chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI]
8689
blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher
8790
config ocr3config.SharedConfig
8891
database Database

offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation_follower.go

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar
8383

8484
outgen.refreshCommittedSeqNrAndCert()
8585
if !outgen.ensureHighestCertifiedIsCompatible(msg.EpochStartProof.HighestCertified, "MessageEpochStart") {
86+
select {
87+
case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{
88+
msg.EpochStartProof.HighestCertified.SeqNr(),
89+
}:
90+
case <-outgen.ctx.Done():
91+
}
8692
return
8793
}
8894

@@ -154,6 +160,7 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar
154160
prepareQc.ReportsPlusPrecursor,
155161
reportPlusPrecursorDigest,
156162
}
163+
157164
outgen.logger.Debug("broadcasting MessagePrepare (reproposal where prepareQcSeqNr == committedSeqNr)", commontypes.LogFields{
158165
"seqNr": outgen.sharedState.seqNr,
159166
})
@@ -914,38 +921,33 @@ func (outgen *outcomeGenerationState[RI]) tryProcessCommitPool() {
914921
return
915922
}
916923

917-
if outgen.followerState.openKVTxn == nil {
918-
outgen.logger.Critical("assumption violation, open kv transaction must exist in this phase", commontypes.LogFields{
919-
"seqNr": outgen.sharedState.seqNr,
920-
"phase": outgen.followerState.phase,
921-
})
922-
panic("")
923-
}
924-
err := outgen.followerState.openKVTxn.Commit()
925-
outgen.followerState.openKVTxn.Discard()
926-
outgen.followerState.openKVTxn = nil
927-
if err != nil {
928-
outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{
929-
"seqNr": outgen.sharedState.seqNr,
930-
"error": err,
931-
})
932-
933-
{
934-
kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
935-
if err != nil {
936-
outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{
937-
"seqNr": outgen.sharedState.seqNr,
938-
"error": err,
939-
})
940-
return
941-
}
924+
if outgen.followerState.openKVTxn != nil {
925+
err := outgen.followerState.openKVTxn.Commit()
926+
outgen.followerState.openKVTxn.Discard()
927+
outgen.followerState.openKVTxn = nil
928+
if err != nil {
929+
outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{
930+
"seqNr": outgen.sharedState.seqNr,
931+
"error": err,
932+
})
942933

943-
if kvSeqNr < outgen.sharedState.seqNr {
944-
outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{
945-
"seqNr": outgen.sharedState.seqNr,
946-
"kvSeqNr": kvSeqNr,
947-
})
948-
return
934+
{
935+
kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
936+
if err != nil {
937+
outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{
938+
"seqNr": outgen.sharedState.seqNr,
939+
"error": err,
940+
})
941+
return
942+
}
943+
944+
if kvSeqNr < outgen.sharedState.seqNr {
945+
outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{
946+
"seqNr": outgen.sharedState.seqNr,
947+
"kvSeqNr": kvSeqNr,
948+
})
949+
return
950+
}
949951
}
950952
}
951953
}
@@ -1252,6 +1254,7 @@ func (outgen *outcomeGenerationState[RI]) persistCommitAsBlock(commit *Certified
12521254
}
12531255

12541256
func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() {
1257+
12551258
preRefreshCommittedSeqNr := outgen.sharedState.committedSeqNr
12561259

12571260
postRefreshCommittedSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
@@ -1269,9 +1272,20 @@ func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() {
12691272
})
12701273

12711274
if postRefreshCommittedSeqNr == preRefreshCommittedSeqNr {
1275+
return
1276+
} else if postRefreshCommittedSeqNr+1 == preRefreshCommittedSeqNr {
1277+
1278+
logger.Warn("last kv transaction commit failed, requesting state sync", nil)
1279+
select {
1280+
case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{
1281+
preRefreshCommittedSeqNr,
1282+
}:
1283+
case <-outgen.ctx.Done():
1284+
}
1285+
12721286
return
12731287
} else if postRefreshCommittedSeqNr < preRefreshCommittedSeqNr {
1274-
logger.Critical("assumption violation, kv is behind what outgen knows as committed", nil)
1288+
logger.Critical("assumption violation, kv is way behind what outgen knows as committed", nil)
12751289
panic("")
12761290
}
12771291

0 commit comments

Comments
 (0)