Skip to content

Commit c539928

Browse files
bors[bot]durkmurderjordanschalm
authored
Merge #4149
4149: [Consensus] Service events processing order r=durkmurder a=durkmurder https://github.com/dapperlabs/flow-go/issues/6502 ### Context This PR changes how service events are processed. Previously service events were processed when observing QC for a block which seals service event. This was needed since follower couldn't determine payload validity even though participant could do it. In our current implementation follower accepts only certified blocks, meaning he has same safety guarantees as participant with respect to payload validity. We are changing service events to be processed directly in a block which seals them, not in next child. As part of this PR: - changed implementation - updated tests(badger, mutator, epochs) - updated epoch builder to include less blocks, updated tests P.S in many tests we had incorrectly built payload, for instance block contains a result and seal for same block, which is not supported. To be 100% honest, we still have many places where we don't enforce gap of 1 block between incorporated result and seal, but that can be refactored in separate PR(if needed). Please carefully check if I have correctly modified tests. Co-authored-by: Yurii Oleksyshyn <[email protected]> Co-authored-by: Jordan Schalm <[email protected]>
2 parents ecefc8a + d5068d6 commit c539928

File tree

6 files changed

+237
-301
lines changed

6 files changed

+237
-301
lines changed

engine/access/rpc/backend/backend_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_NoTransitionSpan() {
167167
epochBuilder := unittest.NewEpochBuilder(suite.T(), state)
168168
// build epoch 1
169169
// blocks in current state
170-
// P <- A(S_P-1) <- B(S_P) <- C(S_A) <- D(S_B) |setup| <- E(S_C) <- F(S_D) |commit| <- G(S_E)
170+
// P <- A(S_P-1) <- B(S_P) <- C(S_A) <- D(S_B) |setup| <- E(S_C) <- F(S_D) |commit|
171171
epochBuilder.
172172
BuildEpoch().
173173
CompleteEpoch()
@@ -181,9 +181,9 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_NoTransitionSpan() {
181181
suite.state.On("AtHeight", height).Return(state.AtHeight(height)).Once()
182182
}
183183

184-
// Take snapshot at height of block D (epoch1.heights[3]) for valid segment and valid snapshot
185-
// where it's sealing segment is B <- C <- D
186-
snap := state.AtHeight(epoch1.Range()[3])
184+
// Take snapshot at height of block D (epoch1.heights[2]) for valid segment and valid snapshot
185+
// where its sealing segment is A <- B <- C
186+
snap := state.AtHeight(epoch1.Range()[2])
187187
suite.state.On("Final").Return(snap).Once()
188188

189189
backend := New(
@@ -283,8 +283,8 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_TransitionSpans() {
283283
suite.Require().NoError(err)
284284
fmt.Println()
285285

286-
// we expect the endpoint to return last valid snapshot which is the snapshot at block D (height 3)
287-
expectedSnapshotBytes, err := convert.SnapshotToBytes(state.AtHeight(epoch1.Range()[3]))
286+
// we expect the endpoint to return last valid snapshot which is the snapshot at block C (height 2)
287+
expectedSnapshotBytes, err := convert.SnapshotToBytes(state.AtHeight(epoch1.Range()[2]))
288288
suite.Require().NoError(err)
289289
suite.Require().Equal(expectedSnapshotBytes, bytes)
290290
})
@@ -300,7 +300,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_PhaseTransitionSpan() {
300300
epochBuilder := unittest.NewEpochBuilder(suite.T(), state)
301301
// build epoch 1
302302
// blocks in current state
303-
// P <- A(S_P-1) <- B(S_P) <- C(S_A) <- D(S_B) |setup| <- E(S_C) <- F(S_D) |commit| <- G(S_E)
303+
// P <- A(S_P-1) <- B(S_P) <- C(S_A) <- D(S_B) |setup| <- E(S_C) <- F(S_D) |commit|
304304
epochBuilder.
305305
BuildEpoch().
306306
CompleteEpoch()
@@ -314,11 +314,11 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_PhaseTransitionSpan() {
314314
suite.state.On("AtHeight", height).Return(state.AtHeight(height))
315315
}
316316

317-
// Take snapshot at height of block E (epoch1.heights[4]) the sealing segment for this snapshot
318-
// is C(S_A) <- D(S_B) |setup| <- E(S_C) which spans the epoch setup phase. This will force
317+
// Take snapshot at height of block D (epoch1.heights[3]) the sealing segment for this snapshot
318+
// is C(S_A) <- D(S_B) |setup|) which spans the epoch setup phase. This will force
319319
// our RPC endpoint to return a snapshot at block D which is the snapshot at the boundary where the phase
320320
// transition happens.
321-
snap := state.AtHeight(epoch1.Range()[4])
321+
snap := state.AtHeight(epoch1.Range()[3])
322322
suite.state.On("Final").Return(snap).Once()
323323

324324
backend := New(
@@ -346,8 +346,8 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_PhaseTransitionSpan() {
346346
bytes, err := backend.GetLatestProtocolStateSnapshot(context.Background())
347347
suite.Require().NoError(err)
348348

349-
// we expect the endpoint to return last valid snapshot which is the snapshot at block D (height 3)
350-
expectedSnapshotBytes, err := convert.SnapshotToBytes(state.AtHeight(epoch1.Range()[3]))
349+
// we expect the endpoint to return last valid snapshot which is the snapshot at block C (height 2)
350+
expectedSnapshotBytes, err := convert.SnapshotToBytes(state.AtHeight(epoch1.Range()[2]))
351351
suite.Require().NoError(err)
352352
suite.Require().Equal(expectedSnapshotBytes, bytes)
353353
})
@@ -363,7 +363,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_EpochTransitionSpan() {
363363
epochBuilder := unittest.NewEpochBuilder(suite.T(), state)
364364
// build epoch 1
365365
// blocks in current state
366-
// P <- A(S_P-1) <- B(S_P) <- C(S_A) <- D(S_B) |setup| <- E(S_C) <- F(S_D) |commit| <- G(S_E)
366+
// P <- A(S_P-1) <- B(S_P) <- C(S_A) <- D(S_B) |setup| <- E(S_C) <- F(S_D) |commit|
367367
epochBuilder.BuildEpoch()
368368

369369
// add more blocks to our state in the commit phase, this will allow

state/protocol/badger/mutator.go

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,7 @@ func (m *FollowerState) Finalize(ctx context.Context, blockID flow.Identifier) e
664664
// If epoch emergency fallback is triggered, the current epoch continues until
665665
// the next spork - so skip these updates.
666666
if !epochFallbackTriggered {
667-
epochPhaseMetrics, epochPhaseEvents, err := m.epochPhaseMetricsAndEventsOnBlockFinalized(header, epochStatus)
667+
epochPhaseMetrics, epochPhaseEvents, err := m.epochPhaseMetricsAndEventsOnBlockFinalized(block, epochStatus)
668668
if err != nil {
669669
return fmt.Errorf("could not determine epoch phase metrics/events for finalized block: %w", err)
670670
}
@@ -848,41 +848,38 @@ func (m *FollowerState) epochTransitionMetricsAndEventsOnBlockFinalized(block *f
848848
return
849849
}
850850

851-
// epochPhaseMetricsAndEventsOnBlockFinalized determines metrics to update
852-
// and protocol events to emit, if this block is the first of a new epoch phase.
853-
//
854-
// Protocol events and metric updates happen when we finalize the block at
855-
// which a service event causing an epoch phase change comes into effect.
856-
// See handleEpochServiceEvents for details.
851+
// epochPhaseMetricsAndEventsOnBlockFinalized determines metrics to update and protocol
852+
// events to emit. Service Events embedded into an execution result take effect, when the
853+
// execution result's _seal is finalized_ (i.e. when the block holding a seal for the
854+
// result is finalized). See also handleEpochServiceEvents for further details. Example:
857855
//
858856
// Convention:
859857
//
860-
// A <-- ... <-- P(Seal_A) <----- B
861-
// ↑ ↑
862-
// block sealing service event first block of new Epoch phase
863-
// for epoch-phase transition (e.g. EpochSetup phase)
864-
// (e.g. EpochSetup event)
858+
// A <-- ... <-- C(Seal_A)
865859
//
866-
// Per convention, protocol events for epoch phase changes are emitted when
867-
// the first block of the new phase (eg. EpochSetup phase) is _finalized_.
868-
// Meaning that the new phase has started.
860+
// Suppose an EpochSetup service event is emitted during execution of block A. C seals A, therefore
861+
// we apply the metrics/events when C is finalized. The first block of the EpochSetup
862+
// phase is block C.
869863
//
870864
// This function should only be called when epoch fallback *has not already been triggered*.
871865
// No errors are expected during normal operation.
872-
func (m *FollowerState) epochPhaseMetricsAndEventsOnBlockFinalized(block *flow.Header, epochStatus *flow.EpochStatus) (
866+
func (m *FollowerState) epochPhaseMetricsAndEventsOnBlockFinalized(block *flow.Block, epochStatus *flow.EpochStatus) (
873867
metrics []func(),
874868
events []func(),
875869
err error,
876870
) {
877871

878-
parent, err := m.blocks.ByID(block.ParentID)
872+
// block payload may not specify seals in order, so order them by block height before processing
873+
orderedSeals, err := protocol.OrderedSeals(block.Payload, m.headers)
879874
if err != nil {
880-
return nil, nil, fmt.Errorf("could not get parent (id=%x): %w", block.ParentID, err)
875+
if errors.Is(err, storage.ErrNotFound) {
876+
return nil, nil, fmt.Errorf("ordering seals: parent payload contains seals for unknown block: %s", err.Error())
877+
}
878+
return nil, nil, fmt.Errorf("unexpected error ordering seals: %w", err)
881879
}
882880

883881
// track service event driven metrics and protocol events that should be emitted
884-
for _, seal := range parent.Payload.Seals {
885-
882+
for _, seal := range orderedSeals {
886883
result, err := m.results.ByID(seal.ResultID)
887884
if err != nil {
888885
return nil, nil, fmt.Errorf("could not retrieve result (id=%x) for seal (id=%x): %w", seal.ResultID, seal.ID(), err)
@@ -893,12 +890,12 @@ func (m *FollowerState) epochPhaseMetricsAndEventsOnBlockFinalized(block *flow.H
893890
// update current epoch phase
894891
events = append(events, func() { m.metrics.CurrentEpochPhase(flow.EpochPhaseSetup) })
895892
// track epoch phase transition (staking->setup)
896-
events = append(events, func() { m.consumer.EpochSetupPhaseStarted(ev.Counter-1, block) })
893+
events = append(events, func() { m.consumer.EpochSetupPhaseStarted(ev.Counter-1, block.Header) })
897894
case *flow.EpochCommit:
898895
// update current epoch phase
899896
events = append(events, func() { m.metrics.CurrentEpochPhase(flow.EpochPhaseCommitted) })
900897
// track epoch phase transition (setup->committed)
901-
events = append(events, func() { m.consumer.EpochCommittedPhaseStarted(ev.Counter-1, block) })
898+
events = append(events, func() { m.consumer.EpochCommittedPhaseStarted(ev.Counter-1, block.Header) })
902899
// track final view of committed epoch
903900
nextEpochSetup, err := m.epoch.setups.ByID(epochStatus.NextEpoch.SetupID)
904901
if err != nil {
@@ -971,27 +968,26 @@ func (m *FollowerState) epochStatus(block *flow.Header, epochFallbackTriggered b
971968

972969
// handleEpochServiceEvents handles applying state changes which occur as a result
973970
// of service events being included in a block payload:
974-
// * inserting incorporated service events
975-
// * updating EpochStatus for the candidate block
971+
// - inserting incorporated service events
972+
// - updating EpochStatus for the candidate block
976973
//
977974
// Consider a chain where a service event is emitted during execution of block A.
978-
// Block B contains a receipt for A. Block C contains a seal for block A. Block
979-
// D contains a QC for C.
975+
// Block B contains a receipt for A. Block C contains a seal for block A.
980976
//
981-
// A <- B(RA) <- C(SA) <- D
977+
// A <- .. <- B(RA) <- .. <- C(SA)
982978
//
983979
// Service events are included within execution results, which are stored
984980
// opaquely as part of the block payload in block B. We only validate and insert
985-
// the typed service event to storage once we have received a valid QC for the
986-
// block containing the seal for A. This occurs once we mark block D as valid
987-
// with MarkValid. Because of this, any change to the protocol state introduced
988-
// by a service event emitted in A would only become visible when querying D or
989-
// later (D's children).
990-
// TODO(active-pacemaker) update docs here (remove reference to MarkValid) https://github.com/dapperlabs/flow-go/issues/6254
981+
// the typed service event to storage once we process C, the block containing the
982+
// seal for block A. This is because we rely on the sealing subsystem to validate
983+
// correctness of the service event before processing it.
984+
// Consequently, any change to the protocol state introduced by a service event
985+
// emitted during execution of block A would only become visible when querying
986+
// C or its descendants.
991987
//
992988
// This method will only apply service-event-induced state changes when the
993-
// input block has the form of block D (ie. has a parent, which contains a seal
994-
// for a block in which a service event was emitted).
989+
// input block has the form of block C (ie. contains a seal for a block in
990+
// which a service event was emitted).
995991
//
996992
// Return values:
997993
// - dbUpdates - If the service events are valid, or there are no service events,
@@ -1025,20 +1021,16 @@ func (m *FollowerState) handleEpochServiceEvents(candidate *flow.Block) (dbUpdat
10251021
return dbUpdates, nil
10261022
}
10271023

1028-
// We apply service events from blocks which are sealed by this block's PARENT.
1029-
// The parent's payload might contain epoch preparation service events for the next
1024+
// We apply service events from blocks which are sealed by this candidate block.
1025+
// The block's payload might contain epoch preparation service events for the next
10301026
// epoch. In this case, we need to update the tentative protocol state.
10311027
// We need to validate whether all information is available in the protocol
10321028
// state to go to the next epoch when needed. In cases where there is a bug
10331029
// in the smart contract, it could be that this happens too late and the
10341030
// chain finalization should halt.
1035-
parent, err := m.blocks.ByID(candidate.Header.ParentID)
1036-
if err != nil {
1037-
return nil, fmt.Errorf("could not get parent (id=%x): %w", candidate.Header.ParentID, err)
1038-
}
10391031

10401032
// block payload may not specify seals in order, so order them by block height before processing
1041-
orderedSeals, err := protocol.OrderedSeals(parent.Payload, m.headers)
1033+
orderedSeals, err := protocol.OrderedSeals(candidate.Payload, m.headers)
10421034
if err != nil {
10431035
if errors.Is(err, storage.ErrNotFound) {
10441036
return nil, fmt.Errorf("ordering seals: parent payload contains seals for unknown block: %s", err.Error())

0 commit comments

Comments
 (0)