Skip to content

Commit f681cd4

Browse files
committed
Made protocol state idempotent with respect to inserts
1 parent 0b1de87 commit f681cd4

File tree

3 files changed

+71
-4
lines changed

3 files changed

+71
-4
lines changed

engine/common/follower/core.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (c *Core) processCertifiedBlocks(ctx context.Context, blocks CertifiedBlock
258258
// Step 1: add blocks to our PendingTree of certified blocks
259259
pendingTreeSpan, _ := c.tracer.StartSpanFromContext(ctx, trace.FollowerExtendPendingTree)
260260
connectedBlocks, err := c.pendingTree.AddBlocks(blocks)
261-
defer pendingTreeSpan.End()
261+
pendingTreeSpan.End()
262262
if err != nil {
263263
return fmt.Errorf("could not process batch of certified blocks: %w", err)
264264
}
@@ -282,7 +282,7 @@ func (c *Core) processCertifiedBlocks(ctx context.Context, blocks CertifiedBlock
282282
// Is NOT concurrency safe: should be executed by _single dedicated_ goroutine.
283283
// No errors expected during normal operations.
284284
func (c *Core) processFinalizedBlock(ctx context.Context, finalized *flow.Header) error {
285-
span, ctx := c.tracer.StartSpanFromContext(ctx, trace.FollowerProcessFinalizedBlock)
285+
span, _ := c.tracer.StartSpanFromContext(ctx, trace.FollowerProcessFinalizedBlock)
286286
defer span.End()
287287

288288
connectedBlocks, err := c.pendingTree.FinalizeFork(finalized)

state/protocol/badger/mutator.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,12 @@ func (m *FollowerState) ExtendCertified(ctx context.Context, candidate *flow.Blo
116116
defer span.End()
117117

118118
blockID := candidate.ID()
119+
// check if candidate block has been already processed
120+
processed, err := m.checkBlockAlreadyProcessed(blockID)
121+
if err != nil || processed {
122+
return err
123+
}
124+
119125
// sanity check if certifyingQC actually certifies candidate block
120126
if certifyingQC.View != candidate.Header.View {
121127
return fmt.Errorf("qc doesn't certify candidate block, expect %d view, got %d", candidate.Header.View, certifyingQC.View)
@@ -125,7 +131,7 @@ func (m *FollowerState) ExtendCertified(ctx context.Context, candidate *flow.Blo
125131
}
126132

127133
// check if the block header is a valid extension of parent block
128-
err := m.headerExtend(candidate)
134+
err = m.headerExtend(candidate)
129135
if err != nil {
130136
// since we have a QC for this block, it cannot be an invalid extension
131137
return fmt.Errorf("unexpected invalid block (id=%x) with certifying qc (id=%x): %s",
@@ -156,8 +162,14 @@ func (m *ParticipantState) Extend(ctx context.Context, candidate *flow.Block) er
156162
span, ctx := m.tracer.StartSpanFromContext(ctx, trace.ProtoStateMutatorExtend)
157163
defer span.End()
158164

165+
// check if candidate block has been already processed
166+
processed, err := m.checkBlockAlreadyProcessed(candidate.ID())
167+
if err != nil || processed {
168+
return err
169+
}
170+
159171
// check if the block header is a valid extension of parent block
160-
err := m.headerExtend(candidate)
172+
err = m.headerExtend(candidate)
161173
if err != nil {
162174
return fmt.Errorf("header not compliant with chain state: %w", err)
163175
}
@@ -244,6 +256,23 @@ func (m *FollowerState) headerExtend(candidate *flow.Block) error {
244256
return nil
245257
}
246258

259+
// checkBlockAlreadyProcessed checks if block with given blockID has been added to the protocol state.
260+
// Returns:
261+
// * (true, nil) - block has been already processed.
262+
// * (false, nil) - block has not been processed.
263+
// * (false, error) - unknown error when trying to query protocol state.
264+
// No errors are expected during normal operation.
265+
func (m *FollowerState) checkBlockAlreadyProcessed(blockID flow.Identifier) (bool, error) {
266+
_, err := m.headers.ByBlockID(blockID)
267+
if err != nil {
268+
if errors.Is(err, storage.ErrNotFound) {
269+
return false, nil
270+
}
271+
return false, fmt.Errorf("could not check if candidate block (%x) has been already processed: %w", blockID, err)
272+
}
273+
return true, nil
274+
}
275+
247276
// checkOutdatedExtension checks whether candidate block is
248277
// valid in the context of the entire state. For this, the block needs to
249278
// directly connect, through its ancestors, to the last finalized block.

state/protocol/badger/mutator_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2108,6 +2108,11 @@ func TestExtendInvalidGuarantee(t *testing.T) {
21082108

21092109
// now the guarantee has invalid signer indices: the checksum should have 4 bytes, but it only has 1
21102110
payload.Guarantees[0].SignerIndices = []byte{byte(1)}
2111+
2112+
// create new block that has invalid collection guarantee
2113+
block = unittest.BlockWithParentFixture(head)
2114+
block.SetPayload(payload)
2115+
21112116
err = state.Extend(context.Background(), block)
21122117
require.True(t, signature.IsInvalidSignerIndicesError(err), err)
21132118
require.ErrorIs(t, err, signature.ErrInvalidChecksum)
@@ -2291,6 +2296,39 @@ func TestHeaderInvalidTimestamp(t *testing.T) {
22912296
})
22922297
}
22932298

2299+
// TestProtocolStateIdempotent tests that both participant and follower states correctly process adding same block twice
2300+
// where second extend doesn't result in an error and effectively is no-op.
2301+
func TestProtocolStateIdempotent(t *testing.T) {
2302+
rootSnapshot := unittest.RootSnapshotFixture(participants)
2303+
head, err := rootSnapshot.Head()
2304+
require.NoError(t, err)
2305+
t.Run("follower", func(t *testing.T) {
2306+
util.RunWithFollowerProtocolState(t, rootSnapshot, func(db *badger.DB, state *protocol.FollowerState) {
2307+
block := unittest.BlockWithParentFixture(head)
2308+
err := state.ExtendCertified(context.Background(), block, unittest.CertifyBlock(block.Header))
2309+
require.NoError(t, err)
2310+
2311+
// same operation should be no-op
2312+
err = state.ExtendCertified(context.Background(), block, unittest.CertifyBlock(block.Header))
2313+
require.NoError(t, err)
2314+
})
2315+
})
2316+
t.Run("participant", func(t *testing.T) {
2317+
util.RunWithFullProtocolState(t, rootSnapshot, func(db *badger.DB, state *protocol.ParticipantState) {
2318+
block := unittest.BlockWithParentFixture(head)
2319+
err := state.Extend(context.Background(), block)
2320+
require.NoError(t, err)
2321+
2322+
// same operation should be no-op
2323+
err = state.Extend(context.Background(), block)
2324+
require.NoError(t, err)
2325+
2326+
err = state.ExtendCertified(context.Background(), block, unittest.CertifyBlock(block.Header))
2327+
require.NoError(t, err)
2328+
})
2329+
})
2330+
}
2331+
22942332
func assertEpochEmergencyFallbackTriggered(t *testing.T, state realprotocol.State, expected bool) {
22952333
triggered, err := state.Params().EpochFallbackTriggered()
22962334
require.NoError(t, err)

0 commit comments

Comments
 (0)