@@ -83,6 +83,12 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar
8383
8484 outgen .refreshCommittedSeqNrAndCert ()
8585 if ! outgen .ensureHighestCertifiedIsCompatible (msg .EpochStartProof .HighestCertified , "MessageEpochStart" ) {
86+ select {
87+ case outgen .chOutcomeGenerationToStatePersistence <- EventStateSyncRequest [RI ]{
88+ msg .EpochStartProof .HighestCertified .SeqNr (),
89+ }:
90+ case <- outgen .ctx .Done ():
91+ }
8692 return
8793 }
8894
@@ -154,6 +160,7 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar
154160 prepareQc .ReportsPlusPrecursor ,
155161 reportPlusPrecursorDigest ,
156162 }
163+
157164 outgen .logger .Debug ("broadcasting MessagePrepare (reproposal where prepareQcSeqNr == committedSeqNr)" , commontypes.LogFields {
158165 "seqNr" : outgen .sharedState .seqNr ,
159166 })
@@ -914,38 +921,33 @@ func (outgen *outcomeGenerationState[RI]) tryProcessCommitPool() {
914921 return
915922 }
916923
917- if outgen .followerState .openKVTxn == nil {
918- outgen .logger .Critical ("assumption violation, open kv transaction must exist in this phase" , commontypes.LogFields {
919- "seqNr" : outgen .sharedState .seqNr ,
920- "phase" : outgen .followerState .phase ,
921- })
922- panic ("" )
923- }
924- err := outgen .followerState .openKVTxn .Commit ()
925- outgen .followerState .openKVTxn .Discard ()
926- outgen .followerState .openKVTxn = nil
927- if err != nil {
928- outgen .logger .Warn ("failed to commit kv transaction" , commontypes.LogFields {
929- "seqNr" : outgen .sharedState .seqNr ,
930- "error" : err ,
931- })
932-
933- {
934- kvSeqNr , err := outgen .kvStore .HighestCommittedSeqNr ()
935- if err != nil {
936- outgen .logger .Error ("failed to validate kv commit post-condition, upon kv commit failure" , commontypes.LogFields {
937- "seqNr" : outgen .sharedState .seqNr ,
938- "error" : err ,
939- })
940- return
941- }
924+ if outgen .followerState .openKVTxn != nil {
925+ err := outgen .followerState .openKVTxn .Commit ()
926+ outgen .followerState .openKVTxn .Discard ()
927+ outgen .followerState .openKVTxn = nil
928+ if err != nil {
929+ outgen .logger .Warn ("failed to commit kv transaction" , commontypes.LogFields {
930+ "seqNr" : outgen .sharedState .seqNr ,
931+ "error" : err ,
932+ })
942933
943- if kvSeqNr < outgen .sharedState .seqNr {
944- outgen .logger .Error ("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied" , commontypes.LogFields {
945- "seqNr" : outgen .sharedState .seqNr ,
946- "kvSeqNr" : kvSeqNr ,
947- })
948- return
934+ {
935+ kvSeqNr , err := outgen .kvStore .HighestCommittedSeqNr ()
936+ if err != nil {
937+ outgen .logger .Error ("failed to validate kv commit post-condition, upon kv commit failure" , commontypes.LogFields {
938+ "seqNr" : outgen .sharedState .seqNr ,
939+ "error" : err ,
940+ })
941+ return
942+ }
943+
944+ if kvSeqNr < outgen .sharedState .seqNr {
945+ outgen .logger .Error ("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied" , commontypes.LogFields {
946+ "seqNr" : outgen .sharedState .seqNr ,
947+ "kvSeqNr" : kvSeqNr ,
948+ })
949+ return
950+ }
949951 }
950952 }
951953 }
@@ -1252,6 +1254,7 @@ func (outgen *outcomeGenerationState[RI]) persistCommitAsBlock(commit *Certified
12521254}
12531255
12541256func (outgen * outcomeGenerationState [RI ]) refreshCommittedSeqNrAndCert () {
1257+
12551258 preRefreshCommittedSeqNr := outgen .sharedState .committedSeqNr
12561259
12571260 postRefreshCommittedSeqNr , err := outgen .kvStore .HighestCommittedSeqNr ()
@@ -1269,9 +1272,20 @@ func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() {
12691272 })
12701273
12711274 if postRefreshCommittedSeqNr == preRefreshCommittedSeqNr {
1275+ return
1276+ } else if postRefreshCommittedSeqNr + 1 == preRefreshCommittedSeqNr {
1277+
1278+ logger .Warn ("last kv transaction commit failed, requesting state sync" , nil )
1279+ select {
1280+ case outgen .chOutcomeGenerationToStatePersistence <- EventStateSyncRequest [RI ]{
1281+ preRefreshCommittedSeqNr ,
1282+ }:
1283+ case <- outgen .ctx .Done ():
1284+ }
1285+
12721286 return
12731287 } else if postRefreshCommittedSeqNr < preRefreshCommittedSeqNr {
1274- logger .Critical ("assumption violation, kv is behind what outgen knows as committed" , nil )
1288+ logger .Critical ("assumption violation, kv is way behind what outgen knows as committed" , nil )
12751289 panic ("" )
12761290 }
12771291
0 commit comments