diff --git a/cmd/consensus/main.go b/cmd/consensus/main.go index cf22c928e2d..a2f38189d44 100644 --- a/cmd/consensus/main.go +++ b/cmd/consensus/main.go @@ -805,7 +805,7 @@ func main() { }). Component("consensus compliance engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { // initialize the pending blocks cache - proposals := buffer.NewPendingBlocks() + proposals := buffer.NewPendingBlocks(node.LastFinalizedHeader.View, node.ComplianceConfig.GetSkipNewProposalsThreshold()) logger := createLogger(node.Logger, node.RootChainID) complianceCore, err := compliance.NewCore( diff --git a/consensus/integration/nodes_test.go b/consensus/integration/nodes_test.go index 165d2450933..6c8670f8ffd 100644 --- a/consensus/integration/nodes_test.go +++ b/consensus/integration/nodes_test.go @@ -514,12 +514,12 @@ func createNode( ) require.NoError(t, err) - // initialize the pending blocks cache - cache := buffer.NewPendingBlocks() - rootHeader, err := rootSnapshot.Head() require.NoError(t, err) + // initialize the pending blocks cache + cache := buffer.NewPendingBlocks(rootHeader.View, modulecompliance.DefaultConfig().SkipNewProposalsThreshold) + rootQC, err := rootSnapshot.QuorumCertificate() require.NoError(t, err) diff --git a/engine/collection/compliance/core.go b/engine/collection/compliance/core.go index a969c94b7f3..eb2e3971399 100644 --- a/engine/collection/compliance/core.go +++ b/engine/collection/compliance/core.go @@ -93,7 +93,10 @@ func NewCore( if err != nil { return nil, fmt.Errorf("could not initialized finalized boundary cache: %w", err) } - c.ProcessFinalizedBlock(final) + err = c.ProcessFinalizedBlock(final) + if err != nil { + return nil, fmt.Errorf("could not process finalized block: %w", err) + } // log the mempool size off the bat c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size()) @@ -193,7 +196,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error _, found := c.pending.ByID(block.ParentID) if found { // add the block to the cache - _ = c.pending.Add(proposal) + if err := c.pending.Add(proposal); err != nil { + if mempool.IsBeyondActiveRangeError(err) { + // In general we expect the block buffer to use SkipNewProposalsThreshold, + // however since it is instantiated outside this component, we allow the thresholds to differ + log.Debug().Err(err).Msg("dropping block beyond block buffer active range") + return nil + } + return fmt.Errorf("could not add proposal to pending buffer: %w", err) + } c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size()) return nil @@ -207,7 +218,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error return fmt.Errorf("could not check parent exists: %w", err) } if !exists { - _ = c.pending.Add(proposal) + if err := c.pending.Add(proposal); err != nil { + if mempool.IsBeyondActiveRangeError(err) { + // In general we expect the block buffer to use SkipNewProposalsThreshold, + // however since it is instantiated outside this component, we allow the thresholds to differ + log.Debug().Err(err).Msg("dropping block beyond block buffer active range") + return nil + } + return fmt.Errorf("could not add proposal to pending buffer: %w", err) + } c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size()) c.sync.RequestBlock(block.ParentID, block.Height-1) @@ -288,9 +307,6 @@ func (c *Core) processBlockAndDescendants(proposal flow.Slashable[*cluster.Propo } } - // drop all the children that should have been processed now - c.pending.DropForParent(blockID) - return nil } @@ -363,14 +379,19 @@ func (c *Core) processBlockProposal(proposal *cluster.Proposal) error { // ProcessFinalizedBlock performs pruning of stale data based on finalization event // removes pending blocks below the finalized view -func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) { +// No errors are expected during normal operation. +func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) error { // remove all pending blocks at or below the finalized view - c.pending.PruneByView(finalized.View) + err := c.pending.PruneByView(finalized.View) + if err != nil { + return err + } c.finalizedHeight.Set(finalized.Height) c.finalizedView.Set(finalized.View) // always record the metric c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size()) + return nil } // checkForAndLogOutdatedInputError checks whether error is an `engine.OutdatedInputError`. diff --git a/engine/collection/compliance/core_test.go b/engine/collection/compliance/core_test.go index aaf44149dc1..bac5e1d39fc 100644 --- a/engine/collection/compliance/core_test.go +++ b/engine/collection/compliance/core_test.go @@ -124,7 +124,7 @@ func (cs *CommonSuite) SetupTest() { // set up pending module mock cs.pending = &module.PendingClusterBlockBuffer{} - cs.pending.On("Add", mock.Anything, mock.Anything).Return(true) + cs.pending.On("Add", mock.Anything, mock.Anything) cs.pending.On("ByID", mock.Anything).Return( func(blockID flow.Identifier) flow.Slashable[*cluster.Proposal] { return cs.pendingDB[blockID] @@ -143,9 +143,8 @@ func (cs *CommonSuite) SetupTest() { return ok }, ) - cs.pending.On("DropForParent", mock.Anything).Return() cs.pending.On("Size").Return(uint(0)) - cs.pending.On("PruneByView", mock.Anything).Return() + cs.pending.On("PruneByView", mock.Anything).Return(nil) closed := func() <-chan struct{} { channel := make(chan struct{}) @@ -518,9 +517,6 @@ func (cs *CoreSuite) TestProcessBlockAndDescendants() { // check that we submitted each child to hotstuff cs.hotstuff.AssertExpectations(cs.T()) - - // make sure we drop the cache after trying to process - cs.pending.AssertCalled(cs.T(), "DropForParent", parent.ID()) } func (cs *CoreSuite) TestProposalBufferingOrder() { @@ -546,7 +542,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() { } // replace the engine buffer with the real one - cs.core.pending = realbuffer.NewPendingClusterBlocks() + cs.core.pending = realbuffer.NewPendingClusterBlocks(cs.head.Block.View, 100_000) // process all of the descendants for _, proposal := range proposals { diff --git a/engine/collection/compliance/engine.go b/engine/collection/compliance/engine.go index b2aea597977..b43cd3258c5 100644 --- a/engine/collection/compliance/engine.go +++ b/engine/collection/compliance/engine.go @@ -166,6 +166,8 @@ func (e *Engine) processOnFinalizedBlock(block *model.Block) error { if err != nil { // no expected errors return fmt.Errorf("could not get finalized header: %w", err) } - e.core.ProcessFinalizedBlock(finalHeader) + if err := e.core.ProcessFinalizedBlock(finalHeader); err != nil { + return fmt.Errorf("could not process finalized block: %w", err) + } return nil } diff --git a/engine/collection/epochmgr/factories/compliance.go b/engine/collection/epochmgr/factories/compliance.go index 26a2d57e224..c23eadec680 100644 --- a/engine/collection/epochmgr/factories/compliance.go +++ b/engine/collection/epochmgr/factories/compliance.go @@ -66,7 +66,11 @@ func (f *ComplianceEngineFactory) Create( validator hotstuff.Validator, ) (*compliance.Engine, error) { - cache := buffer.NewPendingClusterBlocks() + final, err := clusterState.Final().Head() + if err != nil { + return nil, fmt.Errorf("could not get finalized header: %w", err) + } + cache := buffer.NewPendingClusterBlocks(final.View, f.config.GetSkipNewProposalsThreshold()) core, err := compliance.NewCore( f.log, f.engMetrics, diff --git a/engine/consensus/compliance/core.go b/engine/consensus/compliance/core.go index f52ba2494ea..3ac5d9c4d1f 100644 --- a/engine/consensus/compliance/core.go +++ b/engine/consensus/compliance/core.go @@ -100,7 +100,10 @@ func NewCore( if err != nil { return nil, fmt.Errorf("could not initialized finalized boundary cache: %w", err) } - c.ProcessFinalizedBlock(final) + err = c.ProcessFinalizedBlock(final) + if err != nil { + return nil, fmt.Errorf("could not process finalized block: %w", err) + } c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) @@ -200,7 +203,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error { _, found := c.pending.ByID(header.ParentID) if found { // add the block to the cache - _ = c.pending.Add(proposal) + if err := c.pending.Add(proposal); err != nil { + if mempool.IsBeyondActiveRangeError(err) { + // In general we expect the block buffer to use SkipNewProposalsThreshold, + // however since it is instantiated outside this component, we allow the thresholds to differ + log.Debug().Err(err).Msg("dropping block beyond block buffer active range") + return nil + } + return fmt.Errorf("could not add proposal to pending buffer: %w", err) + } c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) return nil @@ -214,7 +225,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error { return fmt.Errorf("could not check parent exists: %w", err) } if !exists { - _ = c.pending.Add(proposal) + if err := c.pending.Add(proposal); err != nil { + if mempool.IsBeyondActiveRangeError(err) { + // In general we expect the block buffer to use SkipNewProposalsThreshold, + // however since it is instantiated outside this component, we allow the thresholds to differ + log.Debug().Err(err).Msg("dropping block beyond block buffer active range") + return nil + } + return fmt.Errorf("could not add proposal to pending buffer: %w", err) + } c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) c.sync.RequestBlock(header.ParentID, header.Height-1) @@ -300,9 +319,6 @@ func (c *Core) processBlockAndDescendants(proposal flow.Slashable[*flow.Proposal } } - // drop all the children that should have been processed now - c.pending.DropForParent(blockID) - return nil } @@ -392,14 +408,19 @@ func (c *Core) processBlockProposal(proposal *flow.Proposal) error { // ProcessFinalizedBlock performs pruning of stale data based on finalization event // removes pending blocks below the finalized view -func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) { +// No errors are expected during normal operation. +func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) error { // remove all pending blocks at or below the finalized view - c.pending.PruneByView(finalized.View) + err := c.pending.PruneByView(finalized.View) + if err != nil { + return err + } c.finalizedHeight.Set(finalized.Height) c.finalizedView.Set(finalized.View) // always record the metric c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) + return nil } // checkForAndLogOutdatedInputError checks whether error is an `engine.OutdatedInputError`. diff --git a/engine/consensus/compliance/core_test.go b/engine/consensus/compliance/core_test.go index 23b98835ae2..d59ab0ca4fb 100644 --- a/engine/consensus/compliance/core_test.go +++ b/engine/consensus/compliance/core_test.go @@ -200,7 +200,7 @@ func (cs *CommonSuite) SetupTest() { // set up pending module mock cs.pending = &module.PendingBlockBuffer{} - cs.pending.On("Add", mock.Anything, mock.Anything).Return(true) + cs.pending.On("Add", mock.Anything, mock.Anything) cs.pending.On("ByID", mock.Anything).Return( func(blockID flow.Identifier) flow.Slashable[*flow.Proposal] { return cs.pendingDB[blockID] @@ -219,9 +219,8 @@ func (cs *CommonSuite) SetupTest() { return ok }, ) - cs.pending.On("DropForParent", mock.Anything).Return() cs.pending.On("Size").Return(uint(0)) - cs.pending.On("PruneByView", mock.Anything).Return() + cs.pending.On("PruneByView", mock.Anything).Return(nil) // set up hotstuff module mock cs.hotstuff = module.NewHotStuff(cs.T()) @@ -565,9 +564,6 @@ func (cs *CoreSuite) TestProcessBlockAndDescendants() { Message: proposal0, }) require.NoError(cs.T(), err, "should pass handling children") - - // make sure we drop the cache after trying to process - cs.pending.AssertCalled(cs.T(), "DropForParent", parent.ID()) } func (cs *CoreSuite) TestProposalBufferingOrder() { @@ -588,7 +584,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() { } // replace the engine buffer with the real one - cs.core.pending = real.NewPendingBlocks() + cs.core.pending = real.NewPendingBlocks(cs.head.View, 100_000) // check that we request the ancestor block each time cs.sync.On("RequestBlock", missingBlock.ID(), missingBlock.Height).Times(len(proposals)) diff --git a/engine/consensus/compliance/engine.go b/engine/consensus/compliance/engine.go index 7a37a6dd9a8..d4a57dfcf27 100644 --- a/engine/consensus/compliance/engine.go +++ b/engine/consensus/compliance/engine.go @@ -180,6 +180,9 @@ func (e *Engine) processOnFinalizedBlock(block *model.Block) error { if err != nil { // no expected errors return fmt.Errorf("could not get finalized header: %w", err) } - e.core.ProcessFinalizedBlock(finalHeader) + err = e.core.ProcessFinalizedBlock(finalHeader) + if err != nil { + return fmt.Errorf("could not process finalized block: %w", err) + } return nil } diff --git a/model/cluster/block.go b/model/cluster/block.go index 148eef6c603..e31b3cc50a1 100644 --- a/model/cluster/block.go +++ b/model/cluster/block.go @@ -82,10 +82,7 @@ func NewRootBlock(untrusted UntrustedBlock) (*Block, error) { // Proposal represents a signed proposed block in collection node cluster consensus. // //structwrite:immutable - mutations allowed only within the constructor. -type Proposal struct { - Block Block - ProposerSigData []byte -} +type Proposal = flow.GenericProposal[Payload] // UntrustedProposal is an untrusted input-only representation of a cluster.Proposal, // used for construction. @@ -137,12 +134,6 @@ func NewRootProposal(untrusted UntrustedProposal) (*Proposal, error) { } -// ProposalHeader converts the proposal into a compact [ProposalHeader] representation, -// where the payload is compressed to a hash reference. -func (p *Proposal) ProposalHeader() *flow.ProposalHeader { - return &flow.ProposalHeader{Header: p.Block.ToHeader(), ProposerSigData: p.ProposerSigData} -} - // BlockResponse is the same as flow.BlockResponse, but for cluster // consensus. It contains a list of structurally validated cluster block proposals // that should correspond to the request. diff --git a/model/flow/block.go b/model/flow/block.go index 392e59d25c2..5a6bd3d3793 100644 --- a/model/flow/block.go +++ b/model/flow/block.go @@ -163,13 +163,19 @@ func (s BlockStatus) String() string { return [...]string{"BLOCK_UNKNOWN", "BLOCK_FINALIZED", "BLOCK_SEALED"}[s] } +// GenericProposal represents a generic Flow proposal structure parameterized by a block payload type. +// It includes the block header metadata, block payload, and proposer signature. +type GenericProposal[T HashablePayload] struct { + // Block is the block being proposed. + Block GenericBlock[T] + // ProposerSigData is the signature (vote) from the proposer of this block. + ProposerSigData []byte +} + // Proposal is a signed proposal that includes the block payload, in addition to the required header and signature. // //structwrite:immutable - mutations allowed only within the constructor -type Proposal struct { - Block Block - ProposerSigData []byte -} +type Proposal = GenericProposal[Payload] // UntrustedProposal is an untrusted input-only representation of a Proposal, // used for construction. @@ -223,7 +229,7 @@ func NewRootProposal(untrusted UntrustedProposal) (*Proposal, error) { // ProposalHeader converts the proposal into a compact [ProposalHeader] representation, // where the payload is compressed to a hash reference. -func (b *Proposal) ProposalHeader() *ProposalHeader { +func (b *GenericProposal[T]) ProposalHeader() *ProposalHeader { return &ProposalHeader{Header: b.Block.ToHeader(), ProposerSigData: b.ProposerSigData} } diff --git a/module/buffer.go b/module/buffer.go index 0937a49ad54..1d20ce232b1 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -5,40 +5,38 @@ import ( "github.com/onflow/flow-go/model/flow" ) -// PendingBlockBuffer defines an interface for a cache of pending blocks that -// cannot yet be processed because they do not connect to the rest of the chain -// state. They are indexed by parent ID to enable processing all of a parent's -// children once the parent is received. +// GenericPendingBlockBuffer implements a mempool of pending blocks that cannot yet be processed +// because they do not connect to the rest of the chain state. +// They are indexed by parent ID to enable processing all of a parent's children once the parent is received. +// They are also indexed by view to support pruning. +// // Safe for concurrent use. -type PendingBlockBuffer interface { - Add(block flow.Slashable[*flow.Proposal]) bool - - ByID(blockID flow.Identifier) (flow.Slashable[*flow.Proposal], bool) - - ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.Proposal], bool) - - DropForParent(parentID flow.Identifier) - - // PruneByView prunes any pending blocks with views less or equal to the given view. - PruneByView(view uint64) - +type GenericPendingBlockBuffer[T flow.HashablePayload] interface { + // Add adds the input block to the block buffer. + // If the block already exists, or is below the finalized view, this is a no-op. + // Errors returns: + // - mempool.BeyondActiveRangeError if block.View > finalizedView + activeViewRangeSize (when activeViewRangeSize > 0) + Add(block flow.Slashable[*flow.GenericProposal[T]]) error + + // ByID returns the block with the given ID, if it exists. + // Otherwise returns (nil, false) + ByID(blockID flow.Identifier) (flow.Slashable[*flow.GenericProposal[T]], bool) + + // ByParentID returns all direct children of the given block. + // If no children with the given parent exist, returns (nil, false) + ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[T]], bool) + + // PruneByView prunes all pending blocks with views less or equal to the given view. + // Errors returns: + // - mempool.BelowPrunedThresholdError if input level is below the lowest retained view (finalized view) + PruneByView(view uint64) error + + // Size returns the number of blocks in the buffer. Size() uint } -// PendingClusterBlockBuffer is the same thing as PendingBlockBuffer, but for -// collection node cluster consensus. -// Safe for concurrent use. -type PendingClusterBlockBuffer interface { - Add(block flow.Slashable[*cluster.Proposal]) bool - - ByID(blockID flow.Identifier) (flow.Slashable[*cluster.Proposal], bool) - - ByParentID(parentID flow.Identifier) ([]flow.Slashable[*cluster.Proposal], bool) - - DropForParent(parentID flow.Identifier) +// PendingBlockBuffer is the block buffer for consensus proposals. +type PendingBlockBuffer GenericPendingBlockBuffer[flow.Payload] - // PruneByView prunes any pending cluster blocks with views less or equal to the given view. - PruneByView(view uint64) - - Size() uint -} +// PendingClusterBlockBuffer is the block buffer for cluster proposals. +type PendingClusterBlockBuffer GenericPendingBlockBuffer[cluster.Payload] diff --git a/module/buffer/backend.go b/module/buffer/backend.go deleted file mode 100644 index 52d09e33ed6..00000000000 --- a/module/buffer/backend.go +++ /dev/null @@ -1,133 +0,0 @@ -package buffer - -import ( - "sync" - - "github.com/onflow/flow-go/model/flow" -) - -// item represents an item in the cache: the main block and auxiliary data that is -// needed to implement indexed lookups by parent ID and pruning by view. -type item[T any] struct { - view uint64 - parentID flow.Identifier - block flow.Slashable[T] -} - -// extractProposalHeader is a type constraint for the generic type which allows to extract flow.ProposalHeader -// from the underlying type. -type extractProposalHeader interface { - ProposalHeader() *flow.ProposalHeader -} - -// backend implements a simple cache of pending blocks, indexed by parent ID and pruned by view. -type backend[T extractProposalHeader] struct { - mu sync.RWMutex - // map of pending header IDs, keyed by parent ID for ByParentID lookups - blocksByParent map[flow.Identifier][]flow.Identifier - // set of pending blocks, keyed by ID to avoid duplication - blocksByID map[flow.Identifier]*item[T] -} - -// newBackend returns a new pending header cache. -func newBackend[T extractProposalHeader]() *backend[T] { - cache := &backend[T]{ - blocksByParent: make(map[flow.Identifier][]flow.Identifier), - blocksByID: make(map[flow.Identifier]*item[T]), - } - return cache -} - -// add adds the item to the cache, returning false if it already exists and -// true otherwise. -func (b *backend[T]) add(block flow.Slashable[T]) bool { - header := block.Message.ProposalHeader().Header - blockID := header.ID() - - b.mu.Lock() - defer b.mu.Unlock() - - _, exists := b.blocksByID[blockID] - if exists { - return false - } - - item := &item[T]{ - view: header.View, - parentID: header.ParentID, - block: block, - } - - b.blocksByID[blockID] = item - b.blocksByParent[item.parentID] = append(b.blocksByParent[item.parentID], blockID) - - return true -} - -func (b *backend[T]) byID(id flow.Identifier) (*item[T], bool) { - b.mu.RLock() - defer b.mu.RUnlock() - - item, exists := b.blocksByID[id] - if !exists { - return nil, false - } - - return item, true -} - -// byParentID returns a list of cached blocks with the given parent. If no such -// blocks exist, returns false. -func (b *backend[T]) byParentID(parentID flow.Identifier) ([]*item[T], bool) { - b.mu.RLock() - defer b.mu.RUnlock() - - forParent, exists := b.blocksByParent[parentID] - if !exists { - return nil, false - } - - items := make([]*item[T], 0, len(forParent)) - for _, blockID := range forParent { - items = append(items, b.blocksByID[blockID]) - } - - return items, true -} - -// dropForParent removes all cached blocks with the given parent (non-recursively). -func (b *backend[T]) dropForParent(parentID flow.Identifier) { - b.mu.Lock() - defer b.mu.Unlock() - - children, exists := b.blocksByParent[parentID] - if !exists { - return - } - - for _, childID := range children { - delete(b.blocksByID, childID) - } - delete(b.blocksByParent, parentID) -} - -// pruneByView prunes any items in the cache that have view less than or -// equal to the given view. The pruning view should be the finalized view. -func (b *backend[T]) pruneByView(view uint64) { - b.mu.Lock() - defer b.mu.Unlock() - - for id, item := range b.blocksByID { - if item.view <= view { - delete(b.blocksByID, id) - delete(b.blocksByParent, item.parentID) - } - } -} - -// size returns the number of elements stored in teh backend -func (b *backend[T]) size() uint { - b.mu.RLock() - defer b.mu.RUnlock() - return uint(len(b.blocksByID)) -} diff --git a/module/buffer/backend_test.go b/module/buffer/backend_test.go deleted file mode 100644 index 588ed674297..00000000000 --- a/module/buffer/backend_test.go +++ /dev/null @@ -1,154 +0,0 @@ -package buffer - -import ( - "math/rand" - "testing" - - "github.com/stretchr/testify/suite" - - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/utils/unittest" -) - -type BackendSuite struct { - suite.Suite - backend *backend[*flow.Proposal] -} - -func TestBackendSuite(t *testing.T) { - suite.Run(t, new(BackendSuite)) -} - -func (suite *BackendSuite) SetupTest() { - suite.backend = newBackend[*flow.Proposal]() -} - -func (suite *BackendSuite) item() *item[*flow.Proposal] { - parent := unittest.BlockHeaderFixture() - return suite.itemWithParent(parent) -} - -func (suite *BackendSuite) itemWithParent(parent *flow.Header) *item[*flow.Proposal] { - block := unittest.BlockWithParentFixture(parent) - return &item[*flow.Proposal]{ - view: block.View, - parentID: block.ParentID, - block: flow.Slashable[*flow.Proposal]{ - OriginID: unittest.IdentifierFixture(), - Message: unittest.ProposalFromBlock(block), - }, - } -} - -func (suite *BackendSuite) Add(item *item[*flow.Proposal]) { - suite.backend.add(item.block) -} - -func (suite *BackendSuite) TestAdd() { - expected := suite.item() - suite.backend.add(expected.block) - - actual, ok := suite.backend.byID(expected.block.Message.Block.ID()) - suite.Assert().True(ok) - suite.Assert().Equal(expected, actual) - - byParent, ok := suite.backend.byParentID(expected.parentID) - suite.Assert().True(ok) - suite.Assert().Len(byParent, 1) - suite.Assert().Equal(expected, byParent[0]) -} - -func (suite *BackendSuite) TestChildIndexing() { - - parent := suite.item() - child1 := suite.itemWithParent(parent.block.Message.Block.ToHeader()) - child2 := suite.itemWithParent(parent.block.Message.Block.ToHeader()) - grandchild := suite.itemWithParent(child1.block.Message.Block.ToHeader()) - unrelated := suite.item() - - suite.Add(child1) - suite.Add(child2) - suite.Add(grandchild) - suite.Add(unrelated) - - suite.Run("retrieve by parent ID", func() { - byParent, ok := suite.backend.byParentID(parent.block.Message.Block.ID()) - suite.Assert().True(ok) - // should only include direct children - suite.Assert().Len(byParent, 2) - suite.Assert().Contains(byParent, child1) - suite.Assert().Contains(byParent, child2) - }) - - suite.Run("drop for parent ID", func() { - suite.backend.dropForParent(parent.block.Message.Block.ID()) - - // should only drop direct children - _, exists := suite.backend.byID(child1.block.Message.Block.ID()) - suite.Assert().False(exists) - _, exists = suite.backend.byID(child2.block.Message.Block.ID()) - suite.Assert().False(exists) - - // grandchildren should be unaffected - _, exists = suite.backend.byParentID(child1.block.Message.Block.ID()) - suite.Assert().True(exists) - _, exists = suite.backend.byID(grandchild.block.Message.Block.ID()) - suite.Assert().True(exists) - - // nothing else should be affected - _, exists = suite.backend.byID(unrelated.block.Message.Block.ID()) - suite.Assert().True(exists) - }) -} - -func (suite *BackendSuite) TestPruneByView() { - - const N = 100 // number of items we're testing with - items := make([]*item[*flow.Proposal], 0, N) - - // build a pending buffer - for i := 0; i < N; i++ { - - // 10% of the time, add a new unrelated pending header - if i%10 == 0 { - item := suite.item() - suite.Add(item) - items = append(items, item) - continue - } - - // 90% of the time, build on an existing header - if i%2 == 1 { - parent := items[rand.Intn(len(items))] - item := suite.itemWithParent(parent.block.Message.Block.ToHeader()) - suite.Add(item) - items = append(items, item) - } - } - - // pick a height to prune that's guaranteed to prune at least one item - pruneAt := items[rand.Intn(len(items))].view - suite.backend.pruneByView(pruneAt) - - for _, item := range items { - view := item.view - id := item.block.Message.Block.ID() - parentID := item.parentID - - // check that items below the prune view were removed - if view <= pruneAt { - _, exists := suite.backend.byID(id) - suite.Assert().False(exists) - _, exists = suite.backend.byParentID(parentID) - suite.Assert().False(exists) - } - - // check that other items were not removed - if view > item.view { - _, exists := suite.backend.byID(id) - suite.Assert().True(exists) - _, exists = suite.backend.byParentID(parentID) - suite.Assert().True(exists) - } - } -} diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index ea9c43b1e0a..a033e93b179 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -1,58 +1,156 @@ package buffer import ( + "sync" + + "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/forest" + "github.com/onflow/flow-go/module/mempool" ) -// PendingBlocks is a mempool for holding blocks. Furthermore, given a block ID, we can -// query all children that are currently stored in the mempool. The mempool's backend -// is intended to work generically for consensus blocks as well as cluster blocks. -type PendingBlocks struct { - backend *backend[*flow.Proposal] +// proposalVertex implements [forest.Vertex] for generic block proposals. +// +//structwrite:immutable +type proposalVertex[T flow.HashablePayload] struct { + proposal flow.Slashable[*flow.GenericProposal[T]] + id flow.Identifier +} + +func newProposalVertex[T flow.HashablePayload](proposal flow.Slashable[*flow.GenericProposal[T]]) proposalVertex[T] { + return proposalVertex[T]{ + proposal: proposal, + id: proposal.Message.Block.ID(), + } +} + +// VertexID returns the block ID for the stored proposal. +func (v proposalVertex[T]) VertexID() flow.Identifier { + return v.id +} + +// Level returns the view for the stored proposal. +func (v proposalVertex[T]) Level() uint64 { + return v.proposal.Message.Block.View +} + +// Parent returns the parent ID and view for the stored proposal. +func (v proposalVertex[T]) Parent() (flow.Identifier, uint64) { + return v.proposal.Message.Block.ParentID, v.proposal.Message.Block.ParentView +} + +// GenericPendingBlocks implements a mempool of pending blocks that cannot yet be processed +// because they do not connect to the rest of the chain state. +// They are indexed by parent ID to enable processing all of a parent's children once the parent is received. +// They are also indexed by view to support pruning. +// +// Safe for concurrent use. +type GenericPendingBlocks[T flow.HashablePayload] struct { + lock *sync.Mutex + forest *forest.LevelledForest + activeViewRangeSize uint64 } +type PendingBlocks = GenericPendingBlocks[flow.Payload] +type PendingClusterBlocks = GenericPendingBlocks[cluster.Payload] + var _ module.PendingBlockBuffer = (*PendingBlocks)(nil) +var _ module.PendingClusterBlockBuffer = (*PendingClusterBlocks)(nil) -func NewPendingBlocks() *PendingBlocks { - b := &PendingBlocks{backend: newBackend[*flow.Proposal]()} - return b +func NewPendingBlocks(finalizedView uint64, activeViewRangeSize uint64) *PendingBlocks { + return &PendingBlocks{ + lock: new(sync.Mutex), + // LevelledForest's lowestLevel is inclusive, so add 1 here + forest: forest.NewLevelledForest(finalizedView + 1), + activeViewRangeSize: activeViewRangeSize, + } } -func (b *PendingBlocks) Add(block flow.Slashable[*flow.Proposal]) bool { - return b.backend.add(block) +func NewPendingClusterBlocks(finalizedView uint64, activeViewRangeSize uint64) *PendingClusterBlocks { + return &PendingClusterBlocks{ + lock: new(sync.Mutex), + forest: forest.NewLevelledForest(finalizedView), + activeViewRangeSize: activeViewRangeSize, + } } -func (b *PendingBlocks) ByID(blockID flow.Identifier) (flow.Slashable[*flow.Proposal], bool) { - item, ok := b.backend.byID(blockID) - if !ok { - return flow.Slashable[*flow.Proposal]{}, false +// Add adds the input block to the block buffer. +// If the block already exists, or is below the finalized view, this is a no-op. +// Errors returns: +// - mempool.BeyondActiveRangeError if block.View > finalizedView + activeViewRangeSize (when activeViewRangeSize > 0) +func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[*flow.GenericProposal[T]]) error { + b.lock.Lock() + defer b.lock.Unlock() + + blockView := block.Message.Block.View + finalizedView := b.highestPrunedView() + + // Check if block view exceeds the active view range size + // If activeViewRangeSize is 0, there's no limitation + if b.activeViewRangeSize > 0 && blockView > finalizedView+b.activeViewRangeSize { + return mempool.NewBeyondActiveRangeError( + "block view %d exceeds active view range size: finalized view %d + range size %d = %d", + blockView, finalizedView, b.activeViewRangeSize, finalizedView+b.activeViewRangeSize, + ) } - return item.block, true + + b.forest.AddVertex(newProposalVertex(block)) + return nil } -func (b *PendingBlocks) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.Proposal], bool) { - items, ok := b.backend.byParentID(parentID) +// ByID returns the block with the given ID, if it exists. +// Otherwise returns (nil, false) +func (b *GenericPendingBlocks[T]) ByID(blockID flow.Identifier) (flow.Slashable[*flow.GenericProposal[T]], bool) { + b.lock.Lock() + defer b.lock.Unlock() + vertex, ok := b.forest.GetVertex(blockID) if !ok { + return flow.Slashable[*flow.GenericProposal[T]]{}, false + } + return vertex.(proposalVertex[T]).proposal, true +} + +// ByParentID returns all direct children of the given block. +// If no children with the given parent exist, returns (nil, false) +func (b *GenericPendingBlocks[T]) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[T]], bool) { + b.lock.Lock() + defer b.lock.Unlock() + n := b.forest.GetNumberOfChildren(parentID) + if n == 0 { return nil, false } - proposals := make([]flow.Slashable[*flow.Proposal], 0, len(items)) - for _, item := range items { - proposals = append(proposals, item.block) + children := make([]flow.Slashable[*flow.GenericProposal[T]], 0, n) + iterator := b.forest.GetChildren(parentID) + for iterator.HasNext() { + vertex := iterator.NextVertex() + children = append(children, vertex.(proposalVertex[T]).proposal) } - return proposals, true + + return children, true } -func (b *PendingBlocks) DropForParent(parentID flow.Identifier) { - b.backend.dropForParent(parentID) +// PruneByView prunes all pending blocks with views less or equal to the given view. +// Errors returns: +// - mempool.BelowPrunedThresholdError if input level is below the lowest retained view (finalized view) +func (b *GenericPendingBlocks[T]) PruneByView(view uint64) error { + b.lock.Lock() + defer b.lock.Unlock() + // PruneUpToLevel prunes up to be EXCLUDING the input view, so add 1 here + return b.forest.PruneUpToLevel(view + 1) } -// PruneByView prunes any pending blocks with views less or equal to the given view. -func (b *PendingBlocks) PruneByView(view uint64) { - b.backend.pruneByView(view) +// Size returns the number of blocks in the buffer. +func (b *GenericPendingBlocks[T]) Size() uint { + b.lock.Lock() + defer b.lock.Unlock() + return uint(b.forest.GetSize()) } -func (b *PendingBlocks) Size() uint { - return b.backend.size() +// highestPrunedView returns the highest pruned view (finalized view). +// CAUTION: Caller must acquire the lock. +func (b *GenericPendingBlocks[T]) highestPrunedView() uint64 { + // LevelledForest.LowestLevel is the lowest UNPRUNED view, so subtract 1 here + return b.forest.LowestLevel - 1 } diff --git a/module/buffer/pending_blocks_test.go b/module/buffer/pending_blocks_test.go new file mode 100644 index 00000000000..62198ca9e48 --- /dev/null +++ b/module/buffer/pending_blocks_test.go @@ -0,0 +1,422 @@ +package buffer + +import ( + "math/rand" + "sync" + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/mempool" + "github.com/onflow/flow-go/utils/unittest" +) + +type PendingBlocksSuite struct { + suite.Suite + buffer *GenericPendingBlocks[flow.Payload] +} + +func TestPendingBlocksSuite(t *testing.T) { + suite.Run(t, new(PendingBlocksSuite)) +} + +func (suite *PendingBlocksSuite) SetupTest() { + // Initialize with finalized view 0 and no view range limitation (0 = no limit) + // Individual tests that need the limitation will create their own buffers + suite.buffer = NewPendingBlocks(0, 0) +} + +// block creates a new block proposal wrapped as Slashable. +func (suite *PendingBlocksSuite) block() flow.Slashable[*flow.Proposal] { + block := unittest.BlockFixture() + return unittest.AsSlashable(unittest.ProposalFromBlock(block)) +} + +// blockWithParent creates a new block proposal with the given parent header. +func (suite *PendingBlocksSuite) blockWithParent(parent *flow.Header) flow.Slashable[*flow.Proposal] { + block := unittest.BlockWithParentFixture(parent) + return unittest.AsSlashable(unittest.ProposalFromBlock(block)) +} + +// TestAdd tests adding blocks to the buffer. +func (suite *PendingBlocksSuite) TestAdd() { + block := suite.block() + suite.Require().NoError(suite.buffer.Add(block)) + + // Verify block can be retrieved by ID + retrieved, ok := suite.buffer.ByID(block.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(block.Message.Block.ID(), retrieved.Message.Block.ID()) + suite.Assert().Equal(block.Message.Block.View, retrieved.Message.Block.View) + + // Verify block can be retrieved by parent ID + children, ok := suite.buffer.ByParentID(block.Message.Block.ParentID) + suite.Assert().True(ok) + suite.Assert().Len(children, 1) + suite.Assert().Equal(block.Message.Block.ID(), children[0].Message.Block.ID()) +} + +// TestAddDuplicate verifies that adding the same block twice is a no-op. +func (suite *PendingBlocksSuite) TestAddDuplicate() { + block := suite.block() + suite.Require().NoError(suite.buffer.Add(block)) + suite.Require().NoError(suite.buffer.Add(block)) // Add again + + // Should still only have one block + suite.Assert().Equal(uint(1), suite.buffer.Size()) + + // Should still be retrievable + retrieved, ok := suite.buffer.ByID(block.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(block.Message.Block.ID(), retrieved.Message.Block.ID()) +} + +// TestAddBelowFinalizedView verifies that adding blocks below finalized view is a no-op. +func (suite *PendingBlocksSuite) TestAddBelowFinalizedView() { + finalizedView := uint64(1000) + buffer := NewPendingBlocks(finalizedView, 100_000) + + // Create a block with view below finalized + block := suite.block() + block.Message.Block.ParentView = finalizedView - 10 + block.Message.Block.View = finalizedView - 5 + + suite.Require().NoError(buffer.Add(block)) + + _, ok := buffer.ByID(block.Message.Block.ID()) + suite.Assert().False(ok) + suite.Assert().Equal(uint(0), buffer.Size()) +} + +// TestAddExceedsActiveViewRangeSize verifies that adding blocks that exceed the active view range size returns an error. +func (suite *PendingBlocksSuite) TestAddExceedsActiveViewRangeSize() { + finalizedView := uint64(1000) + activeViewRangeSize := uint64(100) + buffer := NewPendingBlocks(finalizedView, activeViewRangeSize) + + // Create a parent header and then a block that exceeds the active view range size + parentHeader := unittest.BlockHeaderFixture() + parentHeader.View = finalizedView + 50 + block := suite.blockWithParent(parentHeader) + block.Message.Block.View = finalizedView + activeViewRangeSize + 1 + + err := buffer.Add(block) + suite.Assert().Error(err) + suite.Assert().True(mempool.IsBeyondActiveRangeError(err)) + + // Verify block was not added + _, ok := buffer.ByID(block.Message.Block.ID()) + suite.Assert().False(ok) + suite.Assert().Equal(uint(0), buffer.Size()) +} + +// TestAddWithinActiveViewRangeSize verifies that adding blocks within the active view range size succeeds. +func (suite *PendingBlocksSuite) TestAddWithinActiveViewRangeSize() { + finalizedView := uint64(1000) + activeViewRangeSize := uint64(100) + buffer := NewPendingBlocks(finalizedView, activeViewRangeSize) + + // Create a parent header and then a block that is exactly at the limit + parentHeader := unittest.BlockHeaderFixture() + parentHeader.View = finalizedView + 50 + block := suite.blockWithParent(parentHeader) + block.Message.Block.View = finalizedView + activeViewRangeSize + + err := buffer.Add(block) + suite.Assert().NoError(err) + + // Verify block was added + _, ok := buffer.ByID(block.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(uint(1), buffer.Size()) +} + +// TestAddWithZeroActiveViewRangeSize verifies that when activeViewRangeSize is 0, there's no limitation. +func (suite *PendingBlocksSuite) TestAddWithZeroActiveViewRangeSize() { + finalizedView := uint64(1000) + activeViewRangeSize := uint64(0) // No limitation + buffer := NewPendingBlocks(finalizedView, activeViewRangeSize) + + // Create a parent header and then a block that is very far ahead + parentHeader := unittest.BlockHeaderFixture() + parentHeader.View = finalizedView + 500_000 + block := suite.blockWithParent(parentHeader) + block.Message.Block.View = finalizedView + 1_000_000 + + err := buffer.Add(block) + suite.Assert().NoError(err) + + // Verify block was added + _, ok := buffer.ByID(block.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(uint(1), buffer.Size()) +} + +// TestByID tests retrieving blocks by ID. +func (suite *PendingBlocksSuite) TestByID() { + block := suite.block() + suite.Require().NoError(suite.buffer.Add(block)) + + // Test retrieving existing block + retrieved, ok := suite.buffer.ByID(block.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(block.Message.Block.ID(), retrieved.Message.Block.ID()) + + // Test retrieving non-existent block + nonExistentID := unittest.IdentifierFixture() + _, ok = suite.buffer.ByID(nonExistentID) + suite.Assert().False(ok) +} + +// TestByParentID tests retrieving blocks by parent ID. +func (suite *PendingBlocksSuite) TestByParentID() { + parent := suite.block() + suite.Require().NoError(suite.buffer.Add(parent)) + + // Create multiple children of the parent + child1 := suite.blockWithParent(parent.Message.Block.ToHeader()) + child2 := suite.blockWithParent(parent.Message.Block.ToHeader()) + grandchild := suite.blockWithParent(child1.Message.Block.ToHeader()) + unrelated := suite.block() + + suite.Require().NoError(suite.buffer.Add(child1)) + suite.Require().NoError(suite.buffer.Add(child2)) + suite.Require().NoError(suite.buffer.Add(grandchild)) + suite.Require().NoError(suite.buffer.Add(unrelated)) + + // Test retrieving children of parent + children, ok := suite.buffer.ByParentID(parent.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Len(children, 2) + + // Verify correct children are returned + retrievedChildIDs := make(map[flow.Identifier]bool) + for _, child := range children { + retrievedChildIDs[child.Message.Block.ID()] = true + } + suite.Assert().True(retrievedChildIDs[child1.Message.Block.ID()]) + suite.Assert().True(retrievedChildIDs[child2.Message.Block.ID()]) + + // Test retrieving children of non-existent parent + nonExistentParentID := unittest.IdentifierFixture() + _, ok = suite.buffer.ByParentID(nonExistentParentID) + suite.Assert().False(ok) +} + +// TestByParentIDOnlyDirectChildren verifies that ByParentID only returns direct children. +func (suite *PendingBlocksSuite) TestByParentIDOnlyDirectChildren() { + parent := suite.block() + suite.Require().NoError(suite.buffer.Add(parent)) + + child := suite.blockWithParent(parent.Message.Block.ToHeader()) + grandchild := suite.blockWithParent(child.Message.Block.ToHeader()) + + suite.Require().NoError(suite.buffer.Add(child)) + suite.Require().NoError(suite.buffer.Add(grandchild)) + + // Parent should only have child, not grandchild + children, ok := suite.buffer.ByParentID(parent.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Len(children, 1) + suite.Assert().Equal(child.Message.Block.ID(), children[0].Message.Block.ID()) + + // Child should have grandchild + grandchildren, ok := suite.buffer.ByParentID(child.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Len(grandchildren, 1) + suite.Assert().Equal(grandchild.Message.Block.ID(), grandchildren[0].Message.Block.ID()) +} + +// TestPruneByView tests pruning blocks by view. +func (suite *PendingBlocksSuite) TestPruneByView() { + const N = 100 // number of blocks to test with + blocks := make([]flow.Slashable[*flow.Proposal], 0, N) + + // Build a buffer with various blocks + for i := 0; i < N; i++ { + // 10% of the time, add a new unrelated block + if i%10 == 0 { + block := suite.block() + suite.Require().NoError(suite.buffer.Add(block)) + blocks = append(blocks, block) + continue + } + + // 90% of the time, build on an existing block + if i%2 == 1 && len(blocks) > 0 { + parent := blocks[rand.Intn(len(blocks))] + block := suite.blockWithParent(parent.Message.Block.ToHeader()) + suite.Require().NoError(suite.buffer.Add(block)) + blocks = append(blocks, block) + } + } + + // Pick a view to prune that's guaranteed to prune at least one block + pruneAt := blocks[rand.Intn(len(blocks))].Message.Block.View + err := suite.buffer.PruneByView(pruneAt) + suite.Assert().NoError(err) + + // Verify blocks at or below prune view are removed + for _, block := range blocks { + view := block.Message.Block.View + id := block.Message.Block.ID() + + if view <= pruneAt { + _, exists := suite.buffer.ByID(id) + suite.Assert().False(exists, "block at view %d should be pruned", view) + } else { + _, exists := suite.buffer.ByID(id) + suite.Assert().True(exists, "block at view %d should not be pruned", view) + } + } +} + +// TestPruneByViewBelowFinalizedView verifies that pruning below finalized view returns an error. +func (suite *PendingBlocksSuite) TestPruneByViewBelowFinalizedView() { + finalizedView := uint64(100) + buffer := NewPendingBlocks(finalizedView, 100_000) + + // Add some blocks above finalized view + parent := unittest.BlockHeaderFixture() + parent.View = finalizedView + 10 + block := suite.blockWithParent(parent) + suite.Require().NoError(buffer.Add(block)) + + // Prune at finalized view should succeed + err := buffer.PruneByView(finalizedView) + suite.Assert().NoError(err) + + // Prune below finalized view should fail + err = buffer.PruneByView(finalizedView - 1) + suite.Assert().Error(err) + suite.Assert().True(mempool.IsBelowPrunedThresholdError(err)) +} + +// TestPruneByViewMultipleTimes tests that pruning multiple times works correctly. +func (suite *PendingBlocksSuite) TestPruneByViewMultipleTimes() { + // Create blocks at different views + parentHeader := unittest.BlockHeaderFixture() + parentHeader.View = 10 + parent := suite.blockWithParent(parentHeader) + suite.Require().NoError(suite.buffer.Add(parent)) + + // Create children - views will be automatically set to be greater than parent + child1 := suite.blockWithParent(parent.Message.Block.ToHeader()) + child1.Message.Block.View++ + suite.Require().NoError(suite.buffer.Add(child1)) + + child2 := suite.blockWithParent(parent.Message.Block.ToHeader()) + suite.Require().NoError(suite.buffer.Add(child2)) + + // Get actual views + parentView := parent.Message.Block.View + child1View := child1.Message.Block.View + child2View := child2.Message.Block.View + + // Prune at the parent's view (should remove parent only) + pruneView1 := parentView + err := suite.buffer.PruneByView(pruneView1) + suite.Assert().NoError(err) + + // Verify parent is pruned but children remain + suite.Assert().Equal(uint(2), suite.buffer.Size()) + _, ok := suite.buffer.ByID(parent.Message.Block.ID()) + suite.Assert().False(ok) + _, ok = suite.buffer.ByID(child1.Message.Block.ID()) + suite.Assert().True(ok) + _, ok = suite.buffer.ByID(child2.Message.Block.ID()) + suite.Assert().True(ok) + + // Prune at a view that removes all remaining blocks + pruneView2 := max(child1View, child2View) + + err = suite.buffer.PruneByView(pruneView2) + suite.Assert().NoError(err) + suite.Assert().Equal(uint(0), suite.buffer.Size()) +} + +// TestSize tests the Size method. +func (suite *PendingBlocksSuite) TestSize() { + // Initially empty + suite.Assert().Equal(uint(0), suite.buffer.Size()) + + // Add blocks and verify size increases + block1 := suite.block() + suite.Require().NoError(suite.buffer.Add(block1)) + suite.Assert().Equal(uint(1), suite.buffer.Size()) + + block2 := suite.block() + suite.Require().NoError(suite.buffer.Add(block2)) + suite.Assert().Equal(uint(2), suite.buffer.Size()) + + // Adding duplicate should not increase size + suite.Require().NoError(suite.buffer.Add(block1)) + suite.Assert().Equal(uint(2), suite.buffer.Size()) +} + +// TestConcurrentAccess tests that the buffer is safe for concurrent access. +// NOTE: correctness here depends on [PendingBlockSuite.block] not returning duplicates. +func (suite *PendingBlocksSuite) TestConcurrentAccess() { + const numGoroutines = 10 + const blocksPerGoroutine = 10 + + wg := new(sync.WaitGroup) + wg.Add(numGoroutines) + + // Concurrently add blocks + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < blocksPerGoroutine; j++ { + block := suite.block() + suite.Require().NoError(suite.buffer.Add(block)) + } + }() + } + wg.Wait() + + // Verify all blocks were added + suite.Assert().Equal(uint(numGoroutines*blocksPerGoroutine), suite.buffer.Size()) +} + +// TestEmptyBufferOperations tests operations on an empty buffer. +func (suite *PendingBlocksSuite) TestEmptyBufferOperations() { + suite.Assert().Equal(uint(0), suite.buffer.Size()) + + // ByID should return false + _, ok := suite.buffer.ByID(unittest.IdentifierFixture()) + suite.Assert().False(ok) + + // ByParentID should return false + _, ok = suite.buffer.ByParentID(unittest.IdentifierFixture()) + suite.Assert().False(ok) + + // PruneByView should succeed (no-op) + err := suite.buffer.PruneByView(100) + suite.Assert().NoError(err) +} + +// TestAddAfterPrune verifies that blocks can be added after pruning. +func (suite *PendingBlocksSuite) TestAddAfterPrune() { + // Add and prune a block + block1 := suite.block() + suite.Require().NoError(suite.buffer.Add(block1)) + + err := suite.buffer.PruneByView(block1.Message.Block.View) + suite.Assert().NoError(err) + + // Verify block is pruned + _, ok := suite.buffer.ByID(block1.Message.Block.ID()) + suite.Assert().False(ok) + + // Add a new block after pruning with view above pruned view + block2 := suite.blockWithParent(block1.Message.Block.ToHeader()) + suite.Require().NoError(suite.buffer.Add(block2)) + + // Verify new block is added + retrieved, ok := suite.buffer.ByID(block2.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(block2.Message.Block.ID(), retrieved.Message.Block.ID()) + suite.Assert().Equal(uint(1), suite.buffer.Size()) +} diff --git a/module/buffer/pending_cluster_blocks.go b/module/buffer/pending_cluster_blocks.go deleted file mode 100644 index 1e2bf5e2b9a..00000000000 --- a/module/buffer/pending_cluster_blocks.go +++ /dev/null @@ -1,53 +0,0 @@ -package buffer - -import ( - "github.com/onflow/flow-go/model/cluster" - "github.com/onflow/flow-go/model/flow" -) - -type PendingClusterBlocks struct { - backend *backend[*cluster.Proposal] -} - -func NewPendingClusterBlocks() *PendingClusterBlocks { - b := &PendingClusterBlocks{backend: newBackend[*cluster.Proposal]()} - return b -} - -func (b *PendingClusterBlocks) Add(block flow.Slashable[*cluster.Proposal]) bool { - return b.backend.add(block) -} - -func (b *PendingClusterBlocks) ByID(blockID flow.Identifier) (flow.Slashable[*cluster.Proposal], bool) { - item, ok := b.backend.byID(blockID) - if !ok { - return flow.Slashable[*cluster.Proposal]{}, false - } - return item.block, true -} - -func (b *PendingClusterBlocks) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*cluster.Proposal], bool) { - items, ok := b.backend.byParentID(parentID) - if !ok { - return nil, false - } - - proposals := make([]flow.Slashable[*cluster.Proposal], 0, len(items)) - for _, item := range items { - proposals = append(proposals, item.block) - } - return proposals, true -} - -func (b *PendingClusterBlocks) DropForParent(parentID flow.Identifier) { - b.backend.dropForParent(parentID) -} - -// PruneByView prunes any pending cluster blocks with views less or equal to the given view. -func (b *PendingClusterBlocks) PruneByView(view uint64) { - b.backend.pruneByView(view) -} - -func (b *PendingClusterBlocks) Size() uint { - return b.backend.size() -} diff --git a/module/mempool/errors.go b/module/mempool/errors.go index 1dec04c84f8..8d1c4ecc561 100644 --- a/module/mempool/errors.go +++ b/module/mempool/errors.go @@ -56,3 +56,30 @@ func IsBelowPrunedThresholdError(err error) bool { var newIsBelowPrunedThresholdError BelowPrunedThresholdError return errors.As(err, &newIsBelowPrunedThresholdError) } + +// BeyondActiveRangeError indicates that an input is beyond the allowed active range. +// Mempools may impose an active range (eg. by view or by height) to bound the maximum +// possible size of the mempool and limit spam attacks. +type BeyondActiveRangeError struct { + err error +} + +func NewBeyondActiveRangeError(msg string, args ...interface{}) error { + return BeyondActiveRangeError{ + err: fmt.Errorf(msg, args...), + } +} + +func (e BeyondActiveRangeError) Unwrap() error { + return e.err +} + +func (e BeyondActiveRangeError) Error() string { + return e.err.Error() +} + +// IsBeyondActiveRangeError returns whether the given error is a BeyondActiveRangeError error +func IsBeyondActiveRangeError(err error) bool { + var beyondActiveRangeErr BeyondActiveRangeError + return errors.As(err, &beyondActiveRangeErr) +} diff --git a/module/metrics/example/verification/main.go b/module/metrics/example/verification/main.go index 103b751240b..c67f4d033a4 100644 --- a/module/metrics/example/verification/main.go +++ b/module/metrics/example/verification/main.go @@ -108,7 +108,7 @@ func demo() { } // creates consensus cache for follower engine, and registers size method of backend for metrics - pendingBlocks := buffer.NewPendingBlocks() + pendingBlocks := buffer.NewPendingBlocks(0, 100_000) err = mc.Register(metrics.ResourcePendingBlock, pendingBlocks.Size) if err != nil { panic(err) @@ -173,7 +173,7 @@ func demo() { tryRandomCall(func() { proposal := unittest.ProposalFixture() - pendingBlocks.Add(flow.Slashable[*flow.Proposal]{ + _ = pendingBlocks.Add(flow.Slashable[*flow.Proposal]{ OriginID: unittest.IdentifierFixture(), Message: proposal, }) diff --git a/module/mock/buffered_proposal.go b/module/mock/buffered_proposal.go new file mode 100644 index 00000000000..f755285df37 --- /dev/null +++ b/module/mock/buffered_proposal.go @@ -0,0 +1,47 @@ +// Code generated by mockery. DO NOT EDIT. + +package mock + +import ( + flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" +) + +// BufferedProposal is an autogenerated mock type for the BufferedProposal type +type BufferedProposal struct { + mock.Mock +} + +// ProposalHeader provides a mock function with no fields +func (_m *BufferedProposal) ProposalHeader() *flow.ProposalHeader { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ProposalHeader") + } + + var r0 *flow.ProposalHeader + if rf, ok := ret.Get(0).(func() *flow.ProposalHeader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*flow.ProposalHeader) + } + } + + return r0 +} + +// NewBufferedProposal creates a new instance of BufferedProposal. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBufferedProposal(t interface { + mock.TestingT + Cleanup(func()) +}) *BufferedProposal { + mock := &BufferedProposal{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/module/mock/generic_pending_block_buffer.go b/module/mock/generic_pending_block_buffer.go new file mode 100644 index 00000000000..57701d49b9c --- /dev/null +++ b/module/mock/generic_pending_block_buffer.go @@ -0,0 +1,139 @@ +// Code generated by mockery. DO NOT EDIT. + +package mock + +import ( + flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" +) + +// GenericPendingBlockBuffer is an autogenerated mock type for the GenericPendingBlockBuffer type +type GenericPendingBlockBuffer[T flow.HashablePayload] struct { + mock.Mock +} + +// Add provides a mock function with given fields: block +func (_m *GenericPendingBlockBuffer[T]) Add(block flow.Slashable[*flow.GenericProposal[T]]) error { + ret := _m.Called(block) + + if len(ret) == 0 { + panic("no return value specified for Add") + } + + var r0 error + if rf, ok := ret.Get(0).(func(flow.Slashable[*flow.GenericProposal[T]]) error); ok { + r0 = rf(block) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ByID provides a mock function with given fields: blockID +func (_m *GenericPendingBlockBuffer[T]) ByID(blockID flow.Identifier) (flow.Slashable[*flow.GenericProposal[T]], bool) { + ret := _m.Called(blockID) + + if len(ret) == 0 { + panic("no return value specified for ByID") + } + + var r0 flow.Slashable[*flow.GenericProposal[T]] + var r1 bool + if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[*flow.GenericProposal[T]], bool)); ok { + return rf(blockID) + } + if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[*flow.GenericProposal[T]]); ok { + r0 = rf(blockID) + } else { + r0 = ret.Get(0).(flow.Slashable[*flow.GenericProposal[T]]) + } + + if rf, ok := ret.Get(1).(func(flow.Identifier) bool); ok { + r1 = rf(blockID) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// ByParentID provides a mock function with given fields: parentID +func (_m *GenericPendingBlockBuffer[T]) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[T]], bool) { + ret := _m.Called(parentID) + + if len(ret) == 0 { + panic("no return value specified for ByParentID") + } + + var r0 []flow.Slashable[*flow.GenericProposal[T]] + var r1 bool + if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[T]], bool)); ok { + return rf(parentID) + } + if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[*flow.GenericProposal[T]]); ok { + r0 = rf(parentID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]flow.Slashable[*flow.GenericProposal[T]]) + } + } + + if rf, ok := ret.Get(1).(func(flow.Identifier) bool); ok { + r1 = rf(parentID) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// PruneByView provides a mock function with given fields: view +func (_m *GenericPendingBlockBuffer[T]) PruneByView(view uint64) error { + ret := _m.Called(view) + + if len(ret) == 0 { + panic("no return value specified for PruneByView") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(view) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Size provides a mock function with no fields +func (_m *GenericPendingBlockBuffer[T]) Size() uint { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Size") + } + + var r0 uint + if rf, ok := ret.Get(0).(func() uint); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint) + } + + return r0 +} + +// NewGenericPendingBlockBuffer creates a new instance of GenericPendingBlockBuffer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewGenericPendingBlockBuffer[T flow.HashablePayload](t interface { + mock.TestingT + Cleanup(func()) +}) *GenericPendingBlockBuffer[T] { + mock := &GenericPendingBlockBuffer[T]{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/module/mock/pending_block_buffer.go b/module/mock/pending_block_buffer.go index f9ecd30f752..2cfb30bc9df 100644 --- a/module/mock/pending_block_buffer.go +++ b/module/mock/pending_block_buffer.go @@ -13,40 +13,40 @@ type PendingBlockBuffer struct { } // Add provides a mock function with given fields: block -func (_m *PendingBlockBuffer) Add(block flow.Slashable[*flow.Proposal]) bool { +func (_m *PendingBlockBuffer) Add(block flow.Slashable[*flow.GenericProposal[flow.Payload]]) error { ret := _m.Called(block) if len(ret) == 0 { panic("no return value specified for Add") } - var r0 bool - if rf, ok := ret.Get(0).(func(flow.Slashable[*flow.Proposal]) bool); ok { + var r0 error + if rf, ok := ret.Get(0).(func(flow.Slashable[*flow.GenericProposal[flow.Payload]]) error); ok { r0 = rf(block) } else { - r0 = ret.Get(0).(bool) + r0 = ret.Error(0) } return r0 } // ByID provides a mock function with given fields: blockID -func (_m *PendingBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashable[*flow.Proposal], bool) { +func (_m *PendingBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashable[*flow.GenericProposal[flow.Payload]], bool) { ret := _m.Called(blockID) if len(ret) == 0 { panic("no return value specified for ByID") } - var r0 flow.Slashable[*flow.Proposal] + var r0 flow.Slashable[*flow.GenericProposal[flow.Payload]] var r1 bool - if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[*flow.Proposal], bool)); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[*flow.GenericProposal[flow.Payload]], bool)); ok { return rf(blockID) } - if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[*flow.Proposal]); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[*flow.GenericProposal[flow.Payload]]); ok { r0 = rf(blockID) } else { - r0 = ret.Get(0).(flow.Slashable[*flow.Proposal]) + r0 = ret.Get(0).(flow.Slashable[*flow.GenericProposal[flow.Payload]]) } if rf, ok := ret.Get(1).(func(flow.Identifier) bool); ok { @@ -59,23 +59,23 @@ func (_m *PendingBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashable[*flo } // ByParentID provides a mock function with given fields: parentID -func (_m *PendingBlockBuffer) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.Proposal], bool) { +func (_m *PendingBlockBuffer) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[flow.Payload]], bool) { ret := _m.Called(parentID) if len(ret) == 0 { panic("no return value specified for ByParentID") } - var r0 []flow.Slashable[*flow.Proposal] + var r0 []flow.Slashable[*flow.GenericProposal[flow.Payload]] var r1 bool - if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[*flow.Proposal], bool)); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[flow.Payload]], bool)); ok { return rf(parentID) } - if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[*flow.Proposal]); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[*flow.GenericProposal[flow.Payload]]); ok { r0 = rf(parentID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]flow.Slashable[*flow.Proposal]) + r0 = ret.Get(0).([]flow.Slashable[*flow.GenericProposal[flow.Payload]]) } } @@ -88,14 +88,22 @@ func (_m *PendingBlockBuffer) ByParentID(parentID flow.Identifier) ([]flow.Slash return r0, r1 } -// DropForParent provides a mock function with given fields: parentID -func (_m *PendingBlockBuffer) DropForParent(parentID flow.Identifier) { - _m.Called(parentID) -} - // PruneByView provides a mock function with given fields: view -func (_m *PendingBlockBuffer) PruneByView(view uint64) { - _m.Called(view) +func (_m *PendingBlockBuffer) PruneByView(view uint64) error { + ret := _m.Called(view) + + if len(ret) == 0 { + panic("no return value specified for PruneByView") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(view) + } else { + r0 = ret.Error(0) + } + + return r0 } // Size provides a mock function with no fields diff --git a/module/mock/pending_cluster_block_buffer.go b/module/mock/pending_cluster_block_buffer.go index d23cf6228a4..8233b6e384d 100644 --- a/module/mock/pending_cluster_block_buffer.go +++ b/module/mock/pending_cluster_block_buffer.go @@ -15,40 +15,40 @@ type PendingClusterBlockBuffer struct { } // Add provides a mock function with given fields: block -func (_m *PendingClusterBlockBuffer) Add(block flow.Slashable[*cluster.Proposal]) bool { +func (_m *PendingClusterBlockBuffer) Add(block flow.Slashable[*flow.GenericProposal[cluster.Payload]]) error { ret := _m.Called(block) if len(ret) == 0 { panic("no return value specified for Add") } - var r0 bool - if rf, ok := ret.Get(0).(func(flow.Slashable[*cluster.Proposal]) bool); ok { + var r0 error + if rf, ok := ret.Get(0).(func(flow.Slashable[*flow.GenericProposal[cluster.Payload]]) error); ok { r0 = rf(block) } else { - r0 = ret.Get(0).(bool) + r0 = ret.Error(0) } return r0 } // ByID provides a mock function with given fields: blockID -func (_m *PendingClusterBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashable[*cluster.Proposal], bool) { +func (_m *PendingClusterBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashable[*flow.GenericProposal[cluster.Payload]], bool) { ret := _m.Called(blockID) if len(ret) == 0 { panic("no return value specified for ByID") } - var r0 flow.Slashable[*cluster.Proposal] + var r0 flow.Slashable[*flow.GenericProposal[cluster.Payload]] var r1 bool - if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[*cluster.Proposal], bool)); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[*flow.GenericProposal[cluster.Payload]], bool)); ok { return rf(blockID) } - if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[*cluster.Proposal]); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[*flow.GenericProposal[cluster.Payload]]); ok { r0 = rf(blockID) } else { - r0 = ret.Get(0).(flow.Slashable[*cluster.Proposal]) + r0 = ret.Get(0).(flow.Slashable[*flow.GenericProposal[cluster.Payload]]) } if rf, ok := ret.Get(1).(func(flow.Identifier) bool); ok { @@ -61,23 +61,23 @@ func (_m *PendingClusterBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashab } // ByParentID provides a mock function with given fields: parentID -func (_m *PendingClusterBlockBuffer) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*cluster.Proposal], bool) { +func (_m *PendingClusterBlockBuffer) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[cluster.Payload]], bool) { ret := _m.Called(parentID) if len(ret) == 0 { panic("no return value specified for ByParentID") } - var r0 []flow.Slashable[*cluster.Proposal] + var r0 []flow.Slashable[*flow.GenericProposal[cluster.Payload]] var r1 bool - if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[*cluster.Proposal], bool)); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[cluster.Payload]], bool)); ok { return rf(parentID) } - if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[*cluster.Proposal]); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[*flow.GenericProposal[cluster.Payload]]); ok { r0 = rf(parentID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]flow.Slashable[*cluster.Proposal]) + r0 = ret.Get(0).([]flow.Slashable[*flow.GenericProposal[cluster.Payload]]) } } @@ -90,14 +90,22 @@ func (_m *PendingClusterBlockBuffer) ByParentID(parentID flow.Identifier) ([]flo return r0, r1 } -// DropForParent provides a mock function with given fields: parentID -func (_m *PendingClusterBlockBuffer) DropForParent(parentID flow.Identifier) { - _m.Called(parentID) -} - // PruneByView provides a mock function with given fields: view -func (_m *PendingClusterBlockBuffer) PruneByView(view uint64) { - _m.Called(view) +func (_m *PendingClusterBlockBuffer) PruneByView(view uint64) error { + ret := _m.Called(view) + + if len(ret) == 0 { + panic("no return value specified for PruneByView") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(view) + } else { + r0 = ret.Error(0) + } + + return r0 } // Size provides a mock function with no fields