From e6b4e8c5cc13ec6cf82a55b0f6e5fa10b027bcc5 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Wed, 26 Nov 2025 14:01:28 -0800 Subject: [PATCH 01/23] make Proposal type generic this should allow removing some type overhead in the BlockBuffer types --- model/cluster/block.go | 11 +---------- model/flow/block.go | 16 +++++++++++----- module/buffer/backend.go | 2 +- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/model/cluster/block.go b/model/cluster/block.go index 148eef6c603..e31b3cc50a1 100644 --- a/model/cluster/block.go +++ b/model/cluster/block.go @@ -82,10 +82,7 @@ func NewRootBlock(untrusted UntrustedBlock) (*Block, error) { // Proposal represents a signed proposed block in collection node cluster consensus. // //structwrite:immutable - mutations allowed only within the constructor. -type Proposal struct { - Block Block - ProposerSigData []byte -} +type Proposal = flow.GenericProposal[Payload] // UntrustedProposal is an untrusted input-only representation of a cluster.Proposal, // used for construction. @@ -137,12 +134,6 @@ func NewRootProposal(untrusted UntrustedProposal) (*Proposal, error) { } -// ProposalHeader converts the proposal into a compact [ProposalHeader] representation, -// where the payload is compressed to a hash reference. -func (p *Proposal) ProposalHeader() *flow.ProposalHeader { - return &flow.ProposalHeader{Header: p.Block.ToHeader(), ProposerSigData: p.ProposerSigData} -} - // BlockResponse is the same as flow.BlockResponse, but for cluster // consensus. It contains a list of structurally validated cluster block proposals // that should correspond to the request. diff --git a/model/flow/block.go b/model/flow/block.go index 392e59d25c2..5a6bd3d3793 100644 --- a/model/flow/block.go +++ b/model/flow/block.go @@ -163,13 +163,19 @@ func (s BlockStatus) String() string { return [...]string{"BLOCK_UNKNOWN", "BLOCK_FINALIZED", "BLOCK_SEALED"}[s] } +// GenericProposal represents a generic Flow proposal structure parameterized by a block payload type. +// It includes the block header metadata, block payload, and proposer signature. +type GenericProposal[T HashablePayload] struct { + // Block is the block being proposed. + Block GenericBlock[T] + // ProposerSigData is the signature (vote) from the proposer of this block. + ProposerSigData []byte +} + // Proposal is a signed proposal that includes the block payload, in addition to the required header and signature. // //structwrite:immutable - mutations allowed only within the constructor -type Proposal struct { - Block Block - ProposerSigData []byte -} +type Proposal = GenericProposal[Payload] // UntrustedProposal is an untrusted input-only representation of a Proposal, // used for construction. @@ -223,7 +229,7 @@ func NewRootProposal(untrusted UntrustedProposal) (*Proposal, error) { // ProposalHeader converts the proposal into a compact [ProposalHeader] representation, // where the payload is compressed to a hash reference. -func (b *Proposal) ProposalHeader() *ProposalHeader { +func (b *GenericProposal[T]) ProposalHeader() *ProposalHeader { return &ProposalHeader{Header: b.Block.ToHeader(), ProposerSigData: b.ProposerSigData} } diff --git a/module/buffer/backend.go b/module/buffer/backend.go index 52d09e33ed6..fe841728178 100644 --- a/module/buffer/backend.go +++ b/module/buffer/backend.go @@ -8,7 +8,7 @@ import ( // item represents an item in the cache: the main block and auxiliary data that is // needed to implement indexed lookups by parent ID and pruning by view. -type item[T any] struct { +type item[T extractProposalHeader] struct { view uint64 parentID flow.Identifier block flow.Slashable[T] From 880557b99ae0f750e4a951aac7537fad507d00fb Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Thu, 27 Nov 2025 11:22:27 -0800 Subject: [PATCH 02/23] make Buffer definition generic to both proposal types --- module/buffer.go | 31 +++++---------- module/buffer/pending_blocks.go | 36 +++++++++-------- module/buffer/pending_cluster_blocks.go | 53 ------------------------- 3 files changed, 30 insertions(+), 90 deletions(-) delete mode 100644 module/buffer/pending_cluster_blocks.go diff --git a/module/buffer.go b/module/buffer.go index 0937a49ad54..d820eeaae23 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -5,17 +5,22 @@ import ( "github.com/onflow/flow-go/model/flow" ) +// TODO: docs +type BufferedProposal interface { + ProposalHeader() *flow.ProposalHeader +} + // PendingBlockBuffer defines an interface for a cache of pending blocks that // cannot yet be processed because they do not connect to the rest of the chain // state. They are indexed by parent ID to enable processing all of a parent's // children once the parent is received. // Safe for concurrent use. -type PendingBlockBuffer interface { - Add(block flow.Slashable[*flow.Proposal]) bool +type GenericPendingBlockBuffer[T BufferedProposal] interface { + Add(block flow.Slashable[T]) bool - ByID(blockID flow.Identifier) (flow.Slashable[*flow.Proposal], bool) + ByID(blockID flow.Identifier) (flow.Slashable[T], bool) - ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.Proposal], bool) + ByParentID(parentID flow.Identifier) ([]flow.Slashable[T], bool) DropForParent(parentID flow.Identifier) @@ -25,20 +30,6 @@ type PendingBlockBuffer interface { Size() uint } -// PendingClusterBlockBuffer is the same thing as PendingBlockBuffer, but for -// collection node cluster consensus. -// Safe for concurrent use. -type PendingClusterBlockBuffer interface { - Add(block flow.Slashable[*cluster.Proposal]) bool - - ByID(blockID flow.Identifier) (flow.Slashable[*cluster.Proposal], bool) - - ByParentID(parentID flow.Identifier) ([]flow.Slashable[*cluster.Proposal], bool) +type PendingBlockBuffer = GenericPendingBlockBuffer[*flow.Proposal] - DropForParent(parentID flow.Identifier) - - // PruneByView prunes any pending cluster blocks with views less or equal to the given view. - PruneByView(view uint64) - - Size() uint -} +type PendingClusterBlockBuffer = GenericPendingBlockBuffer[*cluster.Proposal] diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index ea9c43b1e0a..88314be37a6 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -1,58 +1,60 @@ package buffer import ( + "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" ) -// PendingBlocks is a mempool for holding blocks. Furthermore, given a block ID, we can -// query all children that are currently stored in the mempool. The mempool's backend -// is intended to work generically for consensus blocks as well as cluster blocks. -type PendingBlocks struct { - backend *backend[*flow.Proposal] +type GenericPendingBlocks[T module.BufferedProposal] struct { + backend *backend[T] } -var _ module.PendingBlockBuffer = (*PendingBlocks)(nil) +type PendingClusterBlocks = GenericPendingBlocks[*cluster.Proposal] +type PendingBlocks = GenericPendingBlocks[*flow.Proposal] + +func NewPendingClusterBlocks() *PendingClusterBlocks { + return &PendingClusterBlocks{backend: newBackend[*cluster.Proposal]()} +} func NewPendingBlocks() *PendingBlocks { - b := &PendingBlocks{backend: newBackend[*flow.Proposal]()} - return b + return &PendingBlocks{backend: newBackend[*flow.Proposal]()} } -func (b *PendingBlocks) Add(block flow.Slashable[*flow.Proposal]) bool { +func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) bool { return b.backend.add(block) } -func (b *PendingBlocks) ByID(blockID flow.Identifier) (flow.Slashable[*flow.Proposal], bool) { +func (b *GenericPendingBlocks[T]) ByID(blockID flow.Identifier) (flow.Slashable[T], bool) { item, ok := b.backend.byID(blockID) if !ok { - return flow.Slashable[*flow.Proposal]{}, false + return flow.Slashable[T]{}, false } return item.block, true } -func (b *PendingBlocks) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.Proposal], bool) { +func (b *GenericPendingBlocks[T]) ByParentID(parentID flow.Identifier) ([]flow.Slashable[T], bool) { items, ok := b.backend.byParentID(parentID) if !ok { return nil, false } - proposals := make([]flow.Slashable[*flow.Proposal], 0, len(items)) + proposals := make([]flow.Slashable[T], 0, len(items)) for _, item := range items { proposals = append(proposals, item.block) } return proposals, true } -func (b *PendingBlocks) DropForParent(parentID flow.Identifier) { +func (b *GenericPendingBlocks[T]) DropForParent(parentID flow.Identifier) { b.backend.dropForParent(parentID) } -// PruneByView prunes any pending blocks with views less or equal to the given view. -func (b *PendingBlocks) PruneByView(view uint64) { +// PruneByView prunes any pending cluster blocks with views less or equal to the given view. +func (b *GenericPendingBlocks[T]) PruneByView(view uint64) { b.backend.pruneByView(view) } -func (b *PendingBlocks) Size() uint { +func (b *GenericPendingBlocks[T]) Size() uint { return b.backend.size() } diff --git a/module/buffer/pending_cluster_blocks.go b/module/buffer/pending_cluster_blocks.go deleted file mode 100644 index 1e2bf5e2b9a..00000000000 --- a/module/buffer/pending_cluster_blocks.go +++ /dev/null @@ -1,53 +0,0 @@ -package buffer - -import ( - "github.com/onflow/flow-go/model/cluster" - "github.com/onflow/flow-go/model/flow" -) - -type PendingClusterBlocks struct { - backend *backend[*cluster.Proposal] -} - -func NewPendingClusterBlocks() *PendingClusterBlocks { - b := &PendingClusterBlocks{backend: newBackend[*cluster.Proposal]()} - return b -} - -func (b *PendingClusterBlocks) Add(block flow.Slashable[*cluster.Proposal]) bool { - return b.backend.add(block) -} - -func (b *PendingClusterBlocks) ByID(blockID flow.Identifier) (flow.Slashable[*cluster.Proposal], bool) { - item, ok := b.backend.byID(blockID) - if !ok { - return flow.Slashable[*cluster.Proposal]{}, false - } - return item.block, true -} - -func (b *PendingClusterBlocks) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*cluster.Proposal], bool) { - items, ok := b.backend.byParentID(parentID) - if !ok { - return nil, false - } - - proposals := make([]flow.Slashable[*cluster.Proposal], 0, len(items)) - for _, item := range items { - proposals = append(proposals, item.block) - } - return proposals, true -} - -func (b *PendingClusterBlocks) DropForParent(parentID flow.Identifier) { - b.backend.dropForParent(parentID) -} - -// PruneByView prunes any pending cluster blocks with views less or equal to the given view. -func (b *PendingClusterBlocks) PruneByView(view uint64) { - b.backend.pruneByView(view) -} - -func (b *PendingClusterBlocks) Size() uint { - return b.backend.size() -} From 09eb86c698034afdfd1493b5262e601fed513361 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Thu, 27 Nov 2025 11:50:46 -0800 Subject: [PATCH 03/23] use leveled forest for block buffer --- module/buffer.go | 7 +++- module/buffer/pending_blocks.go | 71 ++++++++++++++++++++++++++------- 2 files changed, 61 insertions(+), 17 deletions(-) diff --git a/module/buffer.go b/module/buffer.go index d820eeaae23..9b9b243aa6e 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -5,12 +5,13 @@ import ( "github.com/onflow/flow-go/model/flow" ) -// TODO: docs +// BufferedProposal generically represents either a [cluster.Proposal] or [flow.Proposal]. type BufferedProposal interface { + *cluster.Proposal | *flow.Proposal ProposalHeader() *flow.ProposalHeader } -// PendingBlockBuffer defines an interface for a cache of pending blocks that +// GenericPendingBlockBuffer defines an interface for a cache of pending blocks that // cannot yet be processed because they do not connect to the rest of the chain // state. They are indexed by parent ID to enable processing all of a parent's // children once the parent is received. @@ -30,6 +31,8 @@ type GenericPendingBlockBuffer[T BufferedProposal] interface { Size() uint } +// PendingBlockBuffer is the block buffer for consensus proposals. type PendingBlockBuffer = GenericPendingBlockBuffer[*flow.Proposal] +// PendingClusterBlockBuffer is the block buffer for cluster proposals. type PendingClusterBlockBuffer = GenericPendingBlockBuffer[*cluster.Proposal] diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index 88314be37a6..84feab6174c 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -4,57 +4,98 @@ import ( "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/forest" ) +// proposalVertex +// TODO: docs +// +//structwrite:immutable +type proposalVertex[T module.BufferedProposal] struct { + proposal flow.Slashable[T] + id flow.Identifier +} + +func (v proposalVertex[T]) header() *flow.Header { + return v.proposal.Message.ProposalHeader().Header +} + +func newProposalVertex[T module.BufferedProposal](proposal flow.Slashable[T]) proposalVertex[T] { + return proposalVertex[T]{ + proposal: proposal, + id: proposal.Message.ProposalHeader().Header.ID(), + } +} + +func (v proposalVertex[T]) VertexID() flow.Identifier { + return v.id +} + +func (v proposalVertex[T]) Level() uint64 { + return v.header().View +} + +func (v proposalVertex[T]) Parent() (flow.Identifier, uint64) { + return v.header().ParentID, v.header().ParentView +} + type GenericPendingBlocks[T module.BufferedProposal] struct { - backend *backend[T] + forest *forest.LevelledForest } type PendingClusterBlocks = GenericPendingBlocks[*cluster.Proposal] type PendingBlocks = GenericPendingBlocks[*flow.Proposal] func NewPendingClusterBlocks() *PendingClusterBlocks { - return &PendingClusterBlocks{backend: newBackend[*cluster.Proposal]()} + return &PendingClusterBlocks{forest: forest.NewLevelledForest(1_000_000)} } func NewPendingBlocks() *PendingBlocks { - return &PendingBlocks{backend: newBackend[*flow.Proposal]()} + return &PendingBlocks{forest: forest.NewLevelledForest(1_000_000)} } +// TODO remove bool return here func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) bool { - return b.backend.add(block) + b.forest.AddVertex(newProposalVertex(block)) + return true } func (b *GenericPendingBlocks[T]) ByID(blockID flow.Identifier) (flow.Slashable[T], bool) { - item, ok := b.backend.byID(blockID) + vertex, ok := b.forest.GetVertex(blockID) if !ok { return flow.Slashable[T]{}, false } - return item.block, true + return vertex.(proposalVertex[T]).proposal, true } func (b *GenericPendingBlocks[T]) ByParentID(parentID flow.Identifier) ([]flow.Slashable[T], bool) { - items, ok := b.backend.byParentID(parentID) - if !ok { + n := b.forest.GetNumberOfChildren(parentID) + if n == 0 { return nil, false } - proposals := make([]flow.Slashable[T], 0, len(items)) - for _, item := range items { - proposals = append(proposals, item.block) + children := make([]flow.Slashable[T], 0, n) + iterator := b.forest.GetChildren(parentID) + for iterator.HasNext() { + vertex := iterator.NextVertex() + children = append(children, vertex.(proposalVertex[T]).proposal) } - return proposals, true + + return children, true } +// TODO: remove this func (b *GenericPendingBlocks[T]) DropForParent(parentID flow.Identifier) { - b.backend.dropForParent(parentID) + //b.forest. + //b.backend.dropForParent(parentID) } // PruneByView prunes any pending cluster blocks with views less or equal to the given view. func (b *GenericPendingBlocks[T]) PruneByView(view uint64) { - b.backend.pruneByView(view) + err := b.forest.PruneUpToLevel(view - 1) // TODO: OBO here + _ = err // TODO: deal with error here } func (b *GenericPendingBlocks[T]) Size() uint { - return b.backend.size() + return uint(b.forest.GetSize()) } From 46000acddce5fc042637ecf5185719f9859627ee Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Thu, 27 Nov 2025 13:18:19 -0800 Subject: [PATCH 04/23] add docs, todos --- module/buffer.go | 1 + module/buffer/pending_blocks.go | 29 +++++++++++++++++++++++------ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/module/buffer.go b/module/buffer.go index 9b9b243aa6e..45c66412993 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -15,6 +15,7 @@ type BufferedProposal interface { // cannot yet be processed because they do not connect to the rest of the chain // state. They are indexed by parent ID to enable processing all of a parent's // children once the parent is received. +// TODO paste doc from impl // Safe for concurrent use. type GenericPendingBlockBuffer[T BufferedProposal] interface { Add(block flow.Slashable[T]) bool diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index 84feab6174c..00a8e1af849 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -7,8 +7,7 @@ import ( "github.com/onflow/flow-go/module/forest" ) -// proposalVertex -// TODO: docs +// proposalVertex implements [forest.Vertex] for generic block proposals. // //structwrite:immutable type proposalVertex[T module.BufferedProposal] struct { @@ -16,6 +15,7 @@ type proposalVertex[T module.BufferedProposal] struct { id flow.Identifier } +// header is a shortform way to access the proposal's header. func (v proposalVertex[T]) header() *flow.Header { return v.proposal.Message.ProposalHeader().Header } @@ -27,33 +27,50 @@ func newProposalVertex[T module.BufferedProposal](proposal flow.Slashable[T]) pr } } +// VertexID returns the block ID for the stored proposal. func (v proposalVertex[T]) VertexID() flow.Identifier { return v.id } +// Level returns the view for the stored proposal. func (v proposalVertex[T]) Level() uint64 { return v.header().View } +// Parent returns the parent ID and view for the stored proposal. func (v proposalVertex[T]) Parent() (flow.Identifier, uint64) { return v.header().ParentID, v.header().ParentView } +// GenericPendingBlocks implements a mempool of pending blocks that cannot yet be processed +// because they do not connect to the rest of the chain state. +// They are indexed by parent ID to enable processing all of a parent's children once the parent is received. +// They are also indexed by view to support pruning. +// The size of this mempool is partly limited by the enforcement of an allowed view range, however +// a strong size limit also requires that stored proposals are validated to ensure we store only +// one proposal per view. Higher-level logic is responsible for validating proposals prior to storing here. +// +// Safe for concurrent use. type GenericPendingBlocks[T module.BufferedProposal] struct { + // TODO concurrency forest *forest.LevelledForest } -type PendingClusterBlocks = GenericPendingBlocks[*cluster.Proposal] type PendingBlocks = GenericPendingBlocks[*flow.Proposal] +type PendingClusterBlocks = GenericPendingBlocks[*cluster.Proposal] -func NewPendingClusterBlocks() *PendingClusterBlocks { - return &PendingClusterBlocks{forest: forest.NewLevelledForest(1_000_000)} -} +var _ module.PendingBlockBuffer = (*PendingBlocks)(nil) +var _ module.PendingClusterBlockBuffer = (*PendingClusterBlocks)(nil) +// TODO: inject finalizedView func NewPendingBlocks() *PendingBlocks { return &PendingBlocks{forest: forest.NewLevelledForest(1_000_000)} } +func NewPendingClusterBlocks() *PendingClusterBlocks { + return &PendingClusterBlocks{forest: forest.NewLevelledForest(1_000_000)} +} + // TODO remove bool return here func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) bool { b.forest.AddVertex(newProposalVertex(block)) From d54e9744256add83bb294d5fb00838f5f1c2fd61 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Thu, 27 Nov 2025 13:22:24 -0800 Subject: [PATCH 05/23] ai: remove bool return from pending blocks --- engine/collection/compliance/core.go | 4 ++-- engine/collection/compliance/core_test.go | 2 +- engine/consensus/compliance/core.go | 4 ++-- engine/consensus/compliance/core_test.go | 2 +- module/buffer.go | 2 +- module/buffer/pending_blocks.go | 4 +--- 6 files changed, 8 insertions(+), 10 deletions(-) diff --git a/engine/collection/compliance/core.go b/engine/collection/compliance/core.go index a969c94b7f3..d8ff841a58b 100644 --- a/engine/collection/compliance/core.go +++ b/engine/collection/compliance/core.go @@ -193,7 +193,7 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error _, found := c.pending.ByID(block.ParentID) if found { // add the block to the cache - _ = c.pending.Add(proposal) + c.pending.Add(proposal) c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size()) return nil @@ -207,7 +207,7 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error return fmt.Errorf("could not check parent exists: %w", err) } if !exists { - _ = c.pending.Add(proposal) + c.pending.Add(proposal) c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size()) c.sync.RequestBlock(block.ParentID, block.Height-1) diff --git a/engine/collection/compliance/core_test.go b/engine/collection/compliance/core_test.go index aaf44149dc1..d4594134732 100644 --- a/engine/collection/compliance/core_test.go +++ b/engine/collection/compliance/core_test.go @@ -124,7 +124,7 @@ func (cs *CommonSuite) SetupTest() { // set up pending module mock cs.pending = &module.PendingClusterBlockBuffer{} - cs.pending.On("Add", mock.Anything, mock.Anything).Return(true) + cs.pending.On("Add", mock.Anything, mock.Anything) cs.pending.On("ByID", mock.Anything).Return( func(blockID flow.Identifier) flow.Slashable[*cluster.Proposal] { return cs.pendingDB[blockID] diff --git a/engine/consensus/compliance/core.go b/engine/consensus/compliance/core.go index f52ba2494ea..6bdf56633f6 100644 --- a/engine/consensus/compliance/core.go +++ b/engine/consensus/compliance/core.go @@ -200,7 +200,7 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error { _, found := c.pending.ByID(header.ParentID) if found { // add the block to the cache - _ = c.pending.Add(proposal) + c.pending.Add(proposal) c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) return nil @@ -214,7 +214,7 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error { return fmt.Errorf("could not check parent exists: %w", err) } if !exists { - _ = c.pending.Add(proposal) + c.pending.Add(proposal) c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) c.sync.RequestBlock(header.ParentID, header.Height-1) diff --git a/engine/consensus/compliance/core_test.go b/engine/consensus/compliance/core_test.go index 23b98835ae2..7dee268f138 100644 --- a/engine/consensus/compliance/core_test.go +++ b/engine/consensus/compliance/core_test.go @@ -200,7 +200,7 @@ func (cs *CommonSuite) SetupTest() { // set up pending module mock cs.pending = &module.PendingBlockBuffer{} - cs.pending.On("Add", mock.Anything, mock.Anything).Return(true) + cs.pending.On("Add", mock.Anything, mock.Anything) cs.pending.On("ByID", mock.Anything).Return( func(blockID flow.Identifier) flow.Slashable[*flow.Proposal] { return cs.pendingDB[blockID] diff --git a/module/buffer.go b/module/buffer.go index 45c66412993..73daa303d4b 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -18,7 +18,7 @@ type BufferedProposal interface { // TODO paste doc from impl // Safe for concurrent use. type GenericPendingBlockBuffer[T BufferedProposal] interface { - Add(block flow.Slashable[T]) bool + Add(block flow.Slashable[T]) ByID(blockID flow.Identifier) (flow.Slashable[T], bool) diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index 00a8e1af849..059248103ec 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -71,10 +71,8 @@ func NewPendingClusterBlocks() *PendingClusterBlocks { return &PendingClusterBlocks{forest: forest.NewLevelledForest(1_000_000)} } -// TODO remove bool return here -func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) bool { +func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) { b.forest.AddVertex(newProposalVertex(block)) - return true } func (b *GenericPendingBlocks[T]) ByID(blockID flow.Identifier) (flow.Slashable[T], bool) { From 9530e1e40c89374752213ea61e4e1e041317e1ad Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Thu, 27 Nov 2025 13:26:00 -0800 Subject: [PATCH 06/23] ai: remove DropByParent method --- engine/collection/compliance/core.go | 3 --- engine/collection/compliance/core_test.go | 4 ---- engine/consensus/compliance/core.go | 3 --- engine/consensus/compliance/core_test.go | 4 ---- module/buffer.go | 2 -- module/buffer/pending_blocks.go | 6 ------ module/mock/pending_block_buffer.go | 5 ----- module/mock/pending_cluster_block_buffer.go | 5 ----- 8 files changed, 32 deletions(-) diff --git a/engine/collection/compliance/core.go b/engine/collection/compliance/core.go index d8ff841a58b..73ebce133a8 100644 --- a/engine/collection/compliance/core.go +++ b/engine/collection/compliance/core.go @@ -288,9 +288,6 @@ func (c *Core) processBlockAndDescendants(proposal flow.Slashable[*cluster.Propo } } - // drop all the children that should have been processed now - c.pending.DropForParent(blockID) - return nil } diff --git a/engine/collection/compliance/core_test.go b/engine/collection/compliance/core_test.go index d4594134732..493af42b1c1 100644 --- a/engine/collection/compliance/core_test.go +++ b/engine/collection/compliance/core_test.go @@ -143,7 +143,6 @@ func (cs *CommonSuite) SetupTest() { return ok }, ) - cs.pending.On("DropForParent", mock.Anything).Return() cs.pending.On("Size").Return(uint(0)) cs.pending.On("PruneByView", mock.Anything).Return() @@ -518,9 +517,6 @@ func (cs *CoreSuite) TestProcessBlockAndDescendants() { // check that we submitted each child to hotstuff cs.hotstuff.AssertExpectations(cs.T()) - - // make sure we drop the cache after trying to process - cs.pending.AssertCalled(cs.T(), "DropForParent", parent.ID()) } func (cs *CoreSuite) TestProposalBufferingOrder() { diff --git a/engine/consensus/compliance/core.go b/engine/consensus/compliance/core.go index 6bdf56633f6..2d244ec1124 100644 --- a/engine/consensus/compliance/core.go +++ b/engine/consensus/compliance/core.go @@ -300,9 +300,6 @@ func (c *Core) processBlockAndDescendants(proposal flow.Slashable[*flow.Proposal } } - // drop all the children that should have been processed now - c.pending.DropForParent(blockID) - return nil } diff --git a/engine/consensus/compliance/core_test.go b/engine/consensus/compliance/core_test.go index 7dee268f138..06aed7eb813 100644 --- a/engine/consensus/compliance/core_test.go +++ b/engine/consensus/compliance/core_test.go @@ -219,7 +219,6 @@ func (cs *CommonSuite) SetupTest() { return ok }, ) - cs.pending.On("DropForParent", mock.Anything).Return() cs.pending.On("Size").Return(uint(0)) cs.pending.On("PruneByView", mock.Anything).Return() @@ -565,9 +564,6 @@ func (cs *CoreSuite) TestProcessBlockAndDescendants() { Message: proposal0, }) require.NoError(cs.T(), err, "should pass handling children") - - // make sure we drop the cache after trying to process - cs.pending.AssertCalled(cs.T(), "DropForParent", parent.ID()) } func (cs *CoreSuite) TestProposalBufferingOrder() { diff --git a/module/buffer.go b/module/buffer.go index 73daa303d4b..4aff71b0189 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -24,8 +24,6 @@ type GenericPendingBlockBuffer[T BufferedProposal] interface { ByParentID(parentID flow.Identifier) ([]flow.Slashable[T], bool) - DropForParent(parentID flow.Identifier) - // PruneByView prunes any pending blocks with views less or equal to the given view. PruneByView(view uint64) diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index 059248103ec..6373a7385cf 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -99,12 +99,6 @@ func (b *GenericPendingBlocks[T]) ByParentID(parentID flow.Identifier) ([]flow.S return children, true } -// TODO: remove this -func (b *GenericPendingBlocks[T]) DropForParent(parentID flow.Identifier) { - //b.forest. - //b.backend.dropForParent(parentID) -} - // PruneByView prunes any pending cluster blocks with views less or equal to the given view. func (b *GenericPendingBlocks[T]) PruneByView(view uint64) { err := b.forest.PruneUpToLevel(view - 1) // TODO: OBO here diff --git a/module/mock/pending_block_buffer.go b/module/mock/pending_block_buffer.go index f9ecd30f752..40859b477c0 100644 --- a/module/mock/pending_block_buffer.go +++ b/module/mock/pending_block_buffer.go @@ -88,11 +88,6 @@ func (_m *PendingBlockBuffer) ByParentID(parentID flow.Identifier) ([]flow.Slash return r0, r1 } -// DropForParent provides a mock function with given fields: parentID -func (_m *PendingBlockBuffer) DropForParent(parentID flow.Identifier) { - _m.Called(parentID) -} - // PruneByView provides a mock function with given fields: view func (_m *PendingBlockBuffer) PruneByView(view uint64) { _m.Called(view) diff --git a/module/mock/pending_cluster_block_buffer.go b/module/mock/pending_cluster_block_buffer.go index d23cf6228a4..0f2298fedfc 100644 --- a/module/mock/pending_cluster_block_buffer.go +++ b/module/mock/pending_cluster_block_buffer.go @@ -90,11 +90,6 @@ func (_m *PendingClusterBlockBuffer) ByParentID(parentID flow.Identifier) ([]flo return r0, r1 } -// DropForParent provides a mock function with given fields: parentID -func (_m *PendingClusterBlockBuffer) DropForParent(parentID flow.Identifier) { - _m.Called(parentID) -} - // PruneByView provides a mock function with given fields: view func (_m *PendingClusterBlockBuffer) PruneByView(view uint64) { _m.Called(view) From 7235f7a8e163258dc16f0b37c92164e7839abf9c Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Thu, 27 Nov 2025 13:56:14 -0800 Subject: [PATCH 07/23] update mocks --- module/mock/buffered_proposal.go | 47 ++++++++ module/mock/generic_pending_block_buffer.go | 115 ++++++++++++++++++++ module/mock/pending_block_buffer.go | 17 +-- module/mock/pending_cluster_block_buffer.go | 17 +-- 4 files changed, 166 insertions(+), 30 deletions(-) create mode 100644 module/mock/buffered_proposal.go create mode 100644 module/mock/generic_pending_block_buffer.go diff --git a/module/mock/buffered_proposal.go b/module/mock/buffered_proposal.go new file mode 100644 index 00000000000..f755285df37 --- /dev/null +++ b/module/mock/buffered_proposal.go @@ -0,0 +1,47 @@ +// Code generated by mockery. DO NOT EDIT. + +package mock + +import ( + flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" +) + +// BufferedProposal is an autogenerated mock type for the BufferedProposal type +type BufferedProposal struct { + mock.Mock +} + +// ProposalHeader provides a mock function with no fields +func (_m *BufferedProposal) ProposalHeader() *flow.ProposalHeader { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ProposalHeader") + } + + var r0 *flow.ProposalHeader + if rf, ok := ret.Get(0).(func() *flow.ProposalHeader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*flow.ProposalHeader) + } + } + + return r0 +} + +// NewBufferedProposal creates a new instance of BufferedProposal. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBufferedProposal(t interface { + mock.TestingT + Cleanup(func()) +}) *BufferedProposal { + mock := &BufferedProposal{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/module/mock/generic_pending_block_buffer.go b/module/mock/generic_pending_block_buffer.go new file mode 100644 index 00000000000..e99c76be469 --- /dev/null +++ b/module/mock/generic_pending_block_buffer.go @@ -0,0 +1,115 @@ +// Code generated by mockery. DO NOT EDIT. + +package mock + +import ( + flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" + + module "github.com/onflow/flow-go/module" +) + +// GenericPendingBlockBuffer is an autogenerated mock type for the GenericPendingBlockBuffer type +type GenericPendingBlockBuffer[T module.BufferedProposal] struct { + mock.Mock +} + +// Add provides a mock function with given fields: block +func (_m *GenericPendingBlockBuffer[T]) Add(block flow.Slashable[T]) { + _m.Called(block) +} + +// ByID provides a mock function with given fields: blockID +func (_m *GenericPendingBlockBuffer[T]) ByID(blockID flow.Identifier) (flow.Slashable[T], bool) { + ret := _m.Called(blockID) + + if len(ret) == 0 { + panic("no return value specified for ByID") + } + + var r0 flow.Slashable[T] + var r1 bool + if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[T], bool)); ok { + return rf(blockID) + } + if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[T]); ok { + r0 = rf(blockID) + } else { + r0 = ret.Get(0).(flow.Slashable[T]) + } + + if rf, ok := ret.Get(1).(func(flow.Identifier) bool); ok { + r1 = rf(blockID) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// ByParentID provides a mock function with given fields: parentID +func (_m *GenericPendingBlockBuffer[T]) ByParentID(parentID flow.Identifier) ([]flow.Slashable[T], bool) { + ret := _m.Called(parentID) + + if len(ret) == 0 { + panic("no return value specified for ByParentID") + } + + var r0 []flow.Slashable[T] + var r1 bool + if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[T], bool)); ok { + return rf(parentID) + } + if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[T]); ok { + r0 = rf(parentID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]flow.Slashable[T]) + } + } + + if rf, ok := ret.Get(1).(func(flow.Identifier) bool); ok { + r1 = rf(parentID) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// PruneByView provides a mock function with given fields: view +func (_m *GenericPendingBlockBuffer[T]) PruneByView(view uint64) { + _m.Called(view) +} + +// Size provides a mock function with no fields +func (_m *GenericPendingBlockBuffer[T]) Size() uint { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Size") + } + + var r0 uint + if rf, ok := ret.Get(0).(func() uint); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint) + } + + return r0 +} + +// NewGenericPendingBlockBuffer creates a new instance of GenericPendingBlockBuffer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewGenericPendingBlockBuffer[T module.BufferedProposal](t interface { + mock.TestingT + Cleanup(func()) +}) *GenericPendingBlockBuffer[T] { + mock := &GenericPendingBlockBuffer[T]{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/module/mock/pending_block_buffer.go b/module/mock/pending_block_buffer.go index 40859b477c0..014bdb438b7 100644 --- a/module/mock/pending_block_buffer.go +++ b/module/mock/pending_block_buffer.go @@ -13,21 +13,8 @@ type PendingBlockBuffer struct { } // Add provides a mock function with given fields: block -func (_m *PendingBlockBuffer) Add(block flow.Slashable[*flow.Proposal]) bool { - ret := _m.Called(block) - - if len(ret) == 0 { - panic("no return value specified for Add") - } - - var r0 bool - if rf, ok := ret.Get(0).(func(flow.Slashable[*flow.Proposal]) bool); ok { - r0 = rf(block) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 +func (_m *PendingBlockBuffer) Add(block flow.Slashable[*flow.Proposal]) { + _m.Called(block) } // ByID provides a mock function with given fields: blockID diff --git a/module/mock/pending_cluster_block_buffer.go b/module/mock/pending_cluster_block_buffer.go index 0f2298fedfc..81d771ebbff 100644 --- a/module/mock/pending_cluster_block_buffer.go +++ b/module/mock/pending_cluster_block_buffer.go @@ -15,21 +15,8 @@ type PendingClusterBlockBuffer struct { } // Add provides a mock function with given fields: block -func (_m *PendingClusterBlockBuffer) Add(block flow.Slashable[*cluster.Proposal]) bool { - ret := _m.Called(block) - - if len(ret) == 0 { - panic("no return value specified for Add") - } - - var r0 bool - if rf, ok := ret.Get(0).(func(flow.Slashable[*cluster.Proposal]) bool); ok { - r0 = rf(block) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 +func (_m *PendingClusterBlockBuffer) Add(block flow.Slashable[*cluster.Proposal]) { + _m.Called(block) } // ByID provides a mock function with given fields: blockID From e3acbf0b6f44c097a682e40f84a94e0aaa9e5462 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Thu, 27 Nov 2025 13:57:56 -0800 Subject: [PATCH 08/23] add lower view boundary --- cmd/consensus/main.go | 2 +- consensus/integration/nodes_test.go | 6 +++--- engine/collection/compliance/core_test.go | 2 +- engine/collection/epochmgr/factories/compliance.go | 6 +++++- engine/consensus/compliance/core_test.go | 2 +- module/buffer.go | 4 ++-- module/buffer/pending_blocks.go | 9 ++++----- 7 files changed, 17 insertions(+), 14 deletions(-) diff --git a/cmd/consensus/main.go b/cmd/consensus/main.go index cf22c928e2d..1abc9ead98b 100644 --- a/cmd/consensus/main.go +++ b/cmd/consensus/main.go @@ -805,7 +805,7 @@ func main() { }). Component("consensus compliance engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { // initialize the pending blocks cache - proposals := buffer.NewPendingBlocks() + proposals := buffer.NewPendingBlocks(node.LastFinalizedHeader.View) logger := createLogger(node.Logger, node.RootChainID) complianceCore, err := compliance.NewCore( diff --git a/consensus/integration/nodes_test.go b/consensus/integration/nodes_test.go index 165d2450933..7a4053f366c 100644 --- a/consensus/integration/nodes_test.go +++ b/consensus/integration/nodes_test.go @@ -514,12 +514,12 @@ func createNode( ) require.NoError(t, err) - // initialize the pending blocks cache - cache := buffer.NewPendingBlocks() - rootHeader, err := rootSnapshot.Head() require.NoError(t, err) + // initialize the pending blocks cache + cache := buffer.NewPendingBlocks(rootHeader.View) + rootQC, err := rootSnapshot.QuorumCertificate() require.NoError(t, err) diff --git a/engine/collection/compliance/core_test.go b/engine/collection/compliance/core_test.go index 493af42b1c1..6bbad0d1ba4 100644 --- a/engine/collection/compliance/core_test.go +++ b/engine/collection/compliance/core_test.go @@ -542,7 +542,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() { } // replace the engine buffer with the real one - cs.core.pending = realbuffer.NewPendingClusterBlocks() + cs.core.pending = realbuffer.NewPendingClusterBlocks(cs.head.Block.View) // process all of the descendants for _, proposal := range proposals { diff --git a/engine/collection/epochmgr/factories/compliance.go b/engine/collection/epochmgr/factories/compliance.go index 26a2d57e224..5d1596994c6 100644 --- a/engine/collection/epochmgr/factories/compliance.go +++ b/engine/collection/epochmgr/factories/compliance.go @@ -66,7 +66,11 @@ func (f *ComplianceEngineFactory) Create( validator hotstuff.Validator, ) (*compliance.Engine, error) { - cache := buffer.NewPendingClusterBlocks() + final, err := clusterState.Final().Head() + if err != nil { + return nil, fmt.Errorf("could not get finalized header: %w", err) + } + cache := buffer.NewPendingClusterBlocks(final.View) core, err := compliance.NewCore( f.log, f.engMetrics, diff --git a/engine/consensus/compliance/core_test.go b/engine/consensus/compliance/core_test.go index 06aed7eb813..86277a36d42 100644 --- a/engine/consensus/compliance/core_test.go +++ b/engine/consensus/compliance/core_test.go @@ -584,7 +584,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() { } // replace the engine buffer with the real one - cs.core.pending = real.NewPendingBlocks() + cs.core.pending = real.NewPendingBlocks(cs.head.View) // check that we request the ancestor block each time cs.sync.On("RequestBlock", missingBlock.ID(), missingBlock.Height).Times(len(proposals)) diff --git a/module/buffer.go b/module/buffer.go index 4aff71b0189..62970dae93f 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -31,7 +31,7 @@ type GenericPendingBlockBuffer[T BufferedProposal] interface { } // PendingBlockBuffer is the block buffer for consensus proposals. -type PendingBlockBuffer = GenericPendingBlockBuffer[*flow.Proposal] +type PendingBlockBuffer GenericPendingBlockBuffer[*flow.Proposal] // PendingClusterBlockBuffer is the block buffer for cluster proposals. -type PendingClusterBlockBuffer = GenericPendingBlockBuffer[*cluster.Proposal] +type PendingClusterBlockBuffer GenericPendingBlockBuffer[*cluster.Proposal] diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index 6373a7385cf..a1e08df8cc3 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -62,13 +62,12 @@ type PendingClusterBlocks = GenericPendingBlocks[*cluster.Proposal] var _ module.PendingBlockBuffer = (*PendingBlocks)(nil) var _ module.PendingClusterBlockBuffer = (*PendingClusterBlocks)(nil) -// TODO: inject finalizedView -func NewPendingBlocks() *PendingBlocks { - return &PendingBlocks{forest: forest.NewLevelledForest(1_000_000)} +func NewPendingBlocks(finalizedView uint64) *PendingBlocks { + return &PendingBlocks{forest: forest.NewLevelledForest(finalizedView)} } -func NewPendingClusterBlocks() *PendingClusterBlocks { - return &PendingClusterBlocks{forest: forest.NewLevelledForest(1_000_000)} +func NewPendingClusterBlocks(finalizedView uint64) *PendingClusterBlocks { + return &PendingClusterBlocks{forest: forest.NewLevelledForest(finalizedView)} } func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) { From 56be6f08646ab89d140739a28080e1cbb5b7dd44 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Thu, 27 Nov 2025 14:24:48 -0800 Subject: [PATCH 09/23] update comments --- module/buffer.go | 26 +++++++++++++++++++------- module/buffer/pending_blocks.go | 17 +++++++++++++---- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/module/buffer.go b/module/buffer.go index 62970dae93f..5e21f8b84b3 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -11,22 +11,34 @@ type BufferedProposal interface { ProposalHeader() *flow.ProposalHeader } -// GenericPendingBlockBuffer defines an interface for a cache of pending blocks that -// cannot yet be processed because they do not connect to the rest of the chain -// state. They are indexed by parent ID to enable processing all of a parent's -// children once the parent is received. -// TODO paste doc from impl +// GenericPendingBlockBuffer implements a mempool of pending blocks that cannot yet be processed +// because they do not connect to the rest of the chain state. +// They are indexed by parent ID to enable processing all of a parent's children once the parent is received. +// They are also indexed by view to support pruning. +// The size of this mempool is partly limited by the enforcement of an allowed view range, however +// a strong size limit also requires that stored proposals are validated to ensure we store only +// one proposal per view. Higher-level logic is responsible for validating proposals prior to storing here. +// // Safe for concurrent use. type GenericPendingBlockBuffer[T BufferedProposal] interface { + // Add adds the input block to the block buffer. + // If the block already exists, or is below the finalized view, this is a no-op. Add(block flow.Slashable[T]) + // ByID returns the block with the given ID, if it exists. + // Otherwise returns (nil, false) ByID(blockID flow.Identifier) (flow.Slashable[T], bool) + // ByParentID returns all direct children of the given block. + // If no children with the given parent exist, returns (nil, false) ByParentID(parentID flow.Identifier) ([]flow.Slashable[T], bool) - // PruneByView prunes any pending blocks with views less or equal to the given view. - PruneByView(view uint64) + // PruneByView prunes all pending blocks with views less or equal to the given view. + // Errors returns: + // - mempool.BelowPrunedThresholdError if input level is below the lowest retained view (finalized view) + PruneByView(view uint64) error + // Size returns the number of blocks in the buffer. Size() uint } diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index a1e08df8cc3..a833507c552 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -70,10 +70,14 @@ func NewPendingClusterBlocks(finalizedView uint64) *PendingClusterBlocks { return &PendingClusterBlocks{forest: forest.NewLevelledForest(finalizedView)} } +// Add adds the input block to the block buffer. +// If the block already exists, or is below the finalized view, this is a no-op. func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) { b.forest.AddVertex(newProposalVertex(block)) } +// ByID returns the block with the given ID, if it exists. +// Otherwise returns (nil, false) func (b *GenericPendingBlocks[T]) ByID(blockID flow.Identifier) (flow.Slashable[T], bool) { vertex, ok := b.forest.GetVertex(blockID) if !ok { @@ -82,6 +86,8 @@ func (b *GenericPendingBlocks[T]) ByID(blockID flow.Identifier) (flow.Slashable[ return vertex.(proposalVertex[T]).proposal, true } +// ByParentID returns all direct children of the given block. +// If no children with the given parent exist, returns (nil, false) func (b *GenericPendingBlocks[T]) ByParentID(parentID flow.Identifier) ([]flow.Slashable[T], bool) { n := b.forest.GetNumberOfChildren(parentID) if n == 0 { @@ -98,12 +104,15 @@ func (b *GenericPendingBlocks[T]) ByParentID(parentID flow.Identifier) ([]flow.S return children, true } -// PruneByView prunes any pending cluster blocks with views less or equal to the given view. -func (b *GenericPendingBlocks[T]) PruneByView(view uint64) { - err := b.forest.PruneUpToLevel(view - 1) // TODO: OBO here - _ = err // TODO: deal with error here +// PruneByView prunes all pending blocks with views less or equal to the given view. +// Errors returns: +// - mempool.BelowPrunedThresholdError if input level is below the lowest retained view (finalized view) +func (b *GenericPendingBlocks[T]) PruneByView(view uint64) error { + // PruneUpToLevel prunes up to be EXCLUDING the input view, so add 1 here + return b.forest.PruneUpToLevel(view + 1) } +// Size returns the number of blocks in the buffer. func (b *GenericPendingBlocks[T]) Size() uint { return uint(b.forest.GetSize()) } From 760434b7dceed9dd395c1a6c27e1bce68b4ae6c9 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Thu, 27 Nov 2025 14:33:04 -0800 Subject: [PATCH 10/23] add mutex --- module/buffer/pending_blocks.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index a833507c552..f08a7b92b08 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -1,6 +1,8 @@ package buffer import ( + "sync" + "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" @@ -52,7 +54,7 @@ func (v proposalVertex[T]) Parent() (flow.Identifier, uint64) { // // Safe for concurrent use. type GenericPendingBlocks[T module.BufferedProposal] struct { - // TODO concurrency + lock *sync.Mutex forest *forest.LevelledForest } @@ -63,22 +65,32 @@ var _ module.PendingBlockBuffer = (*PendingBlocks)(nil) var _ module.PendingClusterBlockBuffer = (*PendingClusterBlocks)(nil) func NewPendingBlocks(finalizedView uint64) *PendingBlocks { - return &PendingBlocks{forest: forest.NewLevelledForest(finalizedView)} + return &PendingBlocks{ + lock: new(sync.Mutex), + forest: forest.NewLevelledForest(finalizedView), + } } func NewPendingClusterBlocks(finalizedView uint64) *PendingClusterBlocks { - return &PendingClusterBlocks{forest: forest.NewLevelledForest(finalizedView)} + return &PendingClusterBlocks{ + lock: new(sync.Mutex), + forest: forest.NewLevelledForest(finalizedView), + } } // Add adds the input block to the block buffer. // If the block already exists, or is below the finalized view, this is a no-op. func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) { + b.lock.Lock() + defer b.lock.Unlock() b.forest.AddVertex(newProposalVertex(block)) } // ByID returns the block with the given ID, if it exists. // Otherwise returns (nil, false) func (b *GenericPendingBlocks[T]) ByID(blockID flow.Identifier) (flow.Slashable[T], bool) { + b.lock.Lock() + defer b.lock.Unlock() vertex, ok := b.forest.GetVertex(blockID) if !ok { return flow.Slashable[T]{}, false @@ -89,6 +101,8 @@ func (b *GenericPendingBlocks[T]) ByID(blockID flow.Identifier) (flow.Slashable[ // ByParentID returns all direct children of the given block. // If no children with the given parent exist, returns (nil, false) func (b *GenericPendingBlocks[T]) ByParentID(parentID flow.Identifier) ([]flow.Slashable[T], bool) { + b.lock.Lock() + defer b.lock.Unlock() n := b.forest.GetNumberOfChildren(parentID) if n == 0 { return nil, false @@ -108,11 +122,15 @@ func (b *GenericPendingBlocks[T]) ByParentID(parentID flow.Identifier) ([]flow.S // Errors returns: // - mempool.BelowPrunedThresholdError if input level is below the lowest retained view (finalized view) func (b *GenericPendingBlocks[T]) PruneByView(view uint64) error { + b.lock.Lock() + defer b.lock.Unlock() // PruneUpToLevel prunes up to be EXCLUDING the input view, so add 1 here return b.forest.PruneUpToLevel(view + 1) } // Size returns the number of blocks in the buffer. func (b *GenericPendingBlocks[T]) Size() uint { + b.lock.Lock() + defer b.lock.Unlock() return uint(b.forest.GetSize()) } From 99b181fd58692d0e346f97706f33c7c81dba62c6 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Thu, 27 Nov 2025 14:33:12 -0800 Subject: [PATCH 11/23] check retained level error --- engine/collection/compliance/core.go | 14 +++++++++++--- engine/consensus/compliance/core.go | 13 ++++++++++--- engine/consensus/compliance/engine.go | 5 ++++- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/engine/collection/compliance/core.go b/engine/collection/compliance/core.go index 73ebce133a8..24f9bc2e80a 100644 --- a/engine/collection/compliance/core.go +++ b/engine/collection/compliance/core.go @@ -93,7 +93,10 @@ func NewCore( if err != nil { return nil, fmt.Errorf("could not initialized finalized boundary cache: %w", err) } - c.ProcessFinalizedBlock(final) + err = c.ProcessFinalizedBlock(final) + if err != nil { + return nil, fmt.Errorf("could not process finalized block: %w", err) + } // log the mempool size off the bat c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size()) @@ -360,14 +363,19 @@ func (c *Core) processBlockProposal(proposal *cluster.Proposal) error { // ProcessFinalizedBlock performs pruning of stale data based on finalization event // removes pending blocks below the finalized view -func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) { +// No errors are expected during normal operation. +func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) error { // remove all pending blocks at or below the finalized view - c.pending.PruneByView(finalized.View) + err := c.pending.PruneByView(finalized.View) + if err != nil { + return err + } c.finalizedHeight.Set(finalized.Height) c.finalizedView.Set(finalized.View) // always record the metric c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size()) + return nil } // checkForAndLogOutdatedInputError checks whether error is an `engine.OutdatedInputError`. diff --git a/engine/consensus/compliance/core.go b/engine/consensus/compliance/core.go index 2d244ec1124..8db49315509 100644 --- a/engine/consensus/compliance/core.go +++ b/engine/consensus/compliance/core.go @@ -100,7 +100,10 @@ func NewCore( if err != nil { return nil, fmt.Errorf("could not initialized finalized boundary cache: %w", err) } - c.ProcessFinalizedBlock(final) + err = c.ProcessFinalizedBlock(final) + if err != nil { + return nil, fmt.Errorf("could not process finalized block: %w", err) + } c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) @@ -389,9 +392,13 @@ func (c *Core) processBlockProposal(proposal *flow.Proposal) error { // ProcessFinalizedBlock performs pruning of stale data based on finalization event // removes pending blocks below the finalized view -func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) { +// No errors are expected during normal operation. +func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) error { // remove all pending blocks at or below the finalized view - c.pending.PruneByView(finalized.View) + err := c.pending.PruneByView(finalized.View) + if err != nil { + return err + } c.finalizedHeight.Set(finalized.Height) c.finalizedView.Set(finalized.View) diff --git a/engine/consensus/compliance/engine.go b/engine/consensus/compliance/engine.go index 7a37a6dd9a8..d4a57dfcf27 100644 --- a/engine/consensus/compliance/engine.go +++ b/engine/consensus/compliance/engine.go @@ -180,6 +180,9 @@ func (e *Engine) processOnFinalizedBlock(block *model.Block) error { if err != nil { // no expected errors return fmt.Errorf("could not get finalized header: %w", err) } - e.core.ProcessFinalizedBlock(finalHeader) + err = e.core.ProcessFinalizedBlock(finalHeader) + if err != nil { + return fmt.Errorf("could not process finalized block: %w", err) + } return nil } From 3ab8dd53b226f8e5199b43081cea19e8a78cb670 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Thu, 27 Nov 2025 14:40:08 -0800 Subject: [PATCH 12/23] add missing return --- engine/consensus/compliance/core.go | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/consensus/compliance/core.go b/engine/consensus/compliance/core.go index 8db49315509..c8dff879788 100644 --- a/engine/consensus/compliance/core.go +++ b/engine/consensus/compliance/core.go @@ -404,6 +404,7 @@ func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) error { // always record the metric c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) + return nil } // checkForAndLogOutdatedInputError checks whether error is an `engine.OutdatedInputError`. From a4529e8ee1e441ef93ac77b3aff7ea068fd398f3 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Fri, 28 Nov 2025 09:57:12 -0800 Subject: [PATCH 13/23] update buffer --- module/buffer.go | 3 --- module/buffer/pending_blocks.go | 3 --- 2 files changed, 6 deletions(-) diff --git a/module/buffer.go b/module/buffer.go index 5e21f8b84b3..f8ee29230d1 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -15,9 +15,6 @@ type BufferedProposal interface { // because they do not connect to the rest of the chain state. // They are indexed by parent ID to enable processing all of a parent's children once the parent is received. // They are also indexed by view to support pruning. -// The size of this mempool is partly limited by the enforcement of an allowed view range, however -// a strong size limit also requires that stored proposals are validated to ensure we store only -// one proposal per view. Higher-level logic is responsible for validating proposals prior to storing here. // // Safe for concurrent use. type GenericPendingBlockBuffer[T BufferedProposal] interface { diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index f08a7b92b08..da80c91835e 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -48,9 +48,6 @@ func (v proposalVertex[T]) Parent() (flow.Identifier, uint64) { // because they do not connect to the rest of the chain state. // They are indexed by parent ID to enable processing all of a parent's children once the parent is received. // They are also indexed by view to support pruning. -// The size of this mempool is partly limited by the enforcement of an allowed view range, however -// a strong size limit also requires that stored proposals are validated to ensure we store only -// one proposal per view. Higher-level logic is responsible for validating proposals prior to storing here. // // Safe for concurrent use. type GenericPendingBlocks[T module.BufferedProposal] struct { From b281bc0e93442bce4686aa217ef0eaf2b45a944f Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Fri, 28 Nov 2025 10:00:20 -0800 Subject: [PATCH 14/23] update mocks --- module/mock/generic_pending_block_buffer.go | 17 +++++++++++++++-- module/mock/pending_block_buffer.go | 17 +++++++++++++++-- module/mock/pending_cluster_block_buffer.go | 17 +++++++++++++++-- 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/module/mock/generic_pending_block_buffer.go b/module/mock/generic_pending_block_buffer.go index e99c76be469..4dde4b3a26c 100644 --- a/module/mock/generic_pending_block_buffer.go +++ b/module/mock/generic_pending_block_buffer.go @@ -78,8 +78,21 @@ func (_m *GenericPendingBlockBuffer[T]) ByParentID(parentID flow.Identifier) ([] } // PruneByView provides a mock function with given fields: view -func (_m *GenericPendingBlockBuffer[T]) PruneByView(view uint64) { - _m.Called(view) +func (_m *GenericPendingBlockBuffer[T]) PruneByView(view uint64) error { + ret := _m.Called(view) + + if len(ret) == 0 { + panic("no return value specified for PruneByView") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(view) + } else { + r0 = ret.Error(0) + } + + return r0 } // Size provides a mock function with no fields diff --git a/module/mock/pending_block_buffer.go b/module/mock/pending_block_buffer.go index 014bdb438b7..38402437684 100644 --- a/module/mock/pending_block_buffer.go +++ b/module/mock/pending_block_buffer.go @@ -76,8 +76,21 @@ func (_m *PendingBlockBuffer) ByParentID(parentID flow.Identifier) ([]flow.Slash } // PruneByView provides a mock function with given fields: view -func (_m *PendingBlockBuffer) PruneByView(view uint64) { - _m.Called(view) +func (_m *PendingBlockBuffer) PruneByView(view uint64) error { + ret := _m.Called(view) + + if len(ret) == 0 { + panic("no return value specified for PruneByView") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(view) + } else { + r0 = ret.Error(0) + } + + return r0 } // Size provides a mock function with no fields diff --git a/module/mock/pending_cluster_block_buffer.go b/module/mock/pending_cluster_block_buffer.go index 81d771ebbff..c300f7a5ec4 100644 --- a/module/mock/pending_cluster_block_buffer.go +++ b/module/mock/pending_cluster_block_buffer.go @@ -78,8 +78,21 @@ func (_m *PendingClusterBlockBuffer) ByParentID(parentID flow.Identifier) ([]flo } // PruneByView provides a mock function with given fields: view -func (_m *PendingClusterBlockBuffer) PruneByView(view uint64) { - _m.Called(view) +func (_m *PendingClusterBlockBuffer) PruneByView(view uint64) error { + ret := _m.Called(view) + + if len(ret) == 0 { + panic("no return value specified for PruneByView") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(view) + } else { + r0 = ret.Error(0) + } + + return r0 } // Size provides a mock function with no fields From 5dbfe911e20ddc3eda5a24355df5bf57ad238a2c Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Fri, 28 Nov 2025 10:27:18 -0800 Subject: [PATCH 15/23] fix tests for signature change --- engine/collection/compliance/core_test.go | 2 +- engine/consensus/compliance/core_test.go | 2 +- module/metrics/example/verification/main.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/engine/collection/compliance/core_test.go b/engine/collection/compliance/core_test.go index 6bbad0d1ba4..4a495f38a63 100644 --- a/engine/collection/compliance/core_test.go +++ b/engine/collection/compliance/core_test.go @@ -144,7 +144,7 @@ func (cs *CommonSuite) SetupTest() { }, ) cs.pending.On("Size").Return(uint(0)) - cs.pending.On("PruneByView", mock.Anything).Return() + cs.pending.On("PruneByView", mock.Anything).Return(nil) closed := func() <-chan struct{} { channel := make(chan struct{}) diff --git a/engine/consensus/compliance/core_test.go b/engine/consensus/compliance/core_test.go index 86277a36d42..d33fc523bb7 100644 --- a/engine/consensus/compliance/core_test.go +++ b/engine/consensus/compliance/core_test.go @@ -220,7 +220,7 @@ func (cs *CommonSuite) SetupTest() { }, ) cs.pending.On("Size").Return(uint(0)) - cs.pending.On("PruneByView", mock.Anything).Return() + cs.pending.On("PruneByView", mock.Anything).Return(nil) // set up hotstuff module mock cs.hotstuff = module.NewHotStuff(cs.T()) diff --git a/module/metrics/example/verification/main.go b/module/metrics/example/verification/main.go index 103b751240b..88ed6b52044 100644 --- a/module/metrics/example/verification/main.go +++ b/module/metrics/example/verification/main.go @@ -108,7 +108,7 @@ func demo() { } // creates consensus cache for follower engine, and registers size method of backend for metrics - pendingBlocks := buffer.NewPendingBlocks() + pendingBlocks := buffer.NewPendingBlocks(0) err = mc.Register(metrics.ResourcePendingBlock, pendingBlocks.Size) if err != nil { panic(err) From 39f010a43d3a9b023416ce6434a978a5c7dea521 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Fri, 28 Nov 2025 10:37:00 -0800 Subject: [PATCH 16/23] bug: forest can get child count for nonextnt node --- module/forest/leveled_forest.go | 7 +++++-- module/forest/leveled_forest_test.go | 9 ++++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/module/forest/leveled_forest.go b/module/forest/leveled_forest.go index 9dd65d47543..e0967f052f5 100644 --- a/module/forest/leveled_forest.go +++ b/module/forest/leveled_forest.go @@ -141,9 +141,12 @@ func (f *LevelledForest) GetChildren(id flow.Identifier) VertexIterator { return newVertexIterator(nil) // VertexIterator gracefully handles nil slices } -// GetNumberOfChildren returns number of children of given vertex +// GetNumberOfChildren returns the number of children of the given vertex that exist in the forest. func (f *LevelledForest) GetNumberOfChildren(id flow.Identifier) int { - container := f.vertices[id] // if vertex does not exist, container is the default zero value for vertexContainer, which contains a nil-slice for its children + container, ok := f.vertices[id] + if !ok { + return 0 + } num := 0 for _, child := range container.children { if child.vertex != nil { diff --git a/module/forest/leveled_forest_test.go b/module/forest/leveled_forest_test.go index c0285918167..e63b2aba5ed 100644 --- a/module/forest/leveled_forest_test.go +++ b/module/forest/leveled_forest_test.go @@ -206,6 +206,10 @@ func TestLevelledForest_GetChildren(t *testing.T) { // testing children for Block that is contained in Tree but no children are known it = F.GetChildren(string2Identifier("D")) assert.False(t, it.HasNext()) + + // testing children for Block that does not exist (and is not a parent of any existent vertices) (should return 0) + it = F.GetChildren(string2Identifier("NotAddedOrReferenced")) + assert.False(t, it.HasNext()) } // TestLevelledForest_GetNumberOfChildren tests that children are returned properly @@ -215,11 +219,14 @@ func TestLevelledForest_GetNumberOfChildren(t *testing.T) { // testing children for Block that is contained in Tree assert.Equal(t, 2, F.GetNumberOfChildren(string2Identifier("X"))) - // testing children for referenced Block that is NOT contained in Tree + // testing children for referenced Block that is NOT contained in Tree (but is referenced as parent of existent vertices) assert.Equal(t, 2, F.GetNumberOfChildren(string2Identifier("Genesis"))) // testing children for Block that is contained in Tree but no children are known assert.Equal(t, 0, F.GetNumberOfChildren(string2Identifier("D"))) + + // testing children for Block that does not exist (and is not a parent of any existent vertices) (should return 0) + assert.Equal(t, 0, F.GetNumberOfChildren(string2Identifier("NotAddedOrReferenced"))) } // TestLevelledForest_GetVerticesAtLevel tests that Vertex blob is returned properly From 49a196b7e4f7971110bc9196bba1801a8051e744 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Fri, 28 Nov 2025 11:32:17 -0800 Subject: [PATCH 17/23] ai: add tests for pending blocks suite --- module/buffer/pending_blocks_test.go | 458 +++++++++++++++++++++++++++ 1 file changed, 458 insertions(+) create mode 100644 module/buffer/pending_blocks_test.go diff --git a/module/buffer/pending_blocks_test.go b/module/buffer/pending_blocks_test.go new file mode 100644 index 00000000000..e6d8af6f008 --- /dev/null +++ b/module/buffer/pending_blocks_test.go @@ -0,0 +1,458 @@ +package buffer + +import ( + "math/rand" + "sync" + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/mempool" + "github.com/onflow/flow-go/utils/unittest" +) + +type PendingBlocksSuite struct { + suite.Suite + buffer *GenericPendingBlocks[*flow.Proposal] +} + +func TestPendingBlocksSuite(t *testing.T) { + suite.Run(t, new(PendingBlocksSuite)) +} + +func (suite *PendingBlocksSuite) SetupTest() { + // Initialize with finalized view 0 + suite.buffer = NewPendingBlocks(0) +} + +// block creates a new block proposal wrapped as Slashable. +func (suite *PendingBlocksSuite) block() flow.Slashable[*flow.Proposal] { + block := unittest.BlockFixture() + return unittest.AsSlashable(unittest.ProposalFromBlock(block)) +} + +// blockWithParent creates a new block proposal with the given parent header. +func (suite *PendingBlocksSuite) blockWithParent(parent *flow.Header) flow.Slashable[*flow.Proposal] { + block := unittest.BlockWithParentFixture(parent) + return unittest.AsSlashable(unittest.ProposalFromBlock(block)) +} + +// TestAdd tests adding blocks to the buffer. +func (suite *PendingBlocksSuite) TestAdd() { + block := suite.block() + suite.buffer.Add(block) + + // Verify block can be retrieved by ID + retrieved, ok := suite.buffer.ByID(block.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(block.Message.Block.ID(), retrieved.Message.Block.ID()) + suite.Assert().Equal(block.Message.Block.View, retrieved.Message.Block.View) + + // Verify block can be retrieved by parent ID + children, ok := suite.buffer.ByParentID(block.Message.Block.ParentID) + suite.Assert().True(ok) + suite.Assert().Len(children, 1) + suite.Assert().Equal(block.Message.Block.ID(), children[0].Message.Block.ID()) +} + +// TestAddDuplicate verifies that adding the same block twice is a no-op. +func (suite *PendingBlocksSuite) TestAddDuplicate() { + block := suite.block() + suite.buffer.Add(block) + suite.buffer.Add(block) // Add again + + // Should still only have one block + suite.Assert().Equal(uint(1), suite.buffer.Size()) + + // Should still be retrievable + retrieved, ok := suite.buffer.ByID(block.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(block.Message.Block.ID(), retrieved.Message.Block.ID()) +} + +// TestAddBelowFinalizedView verifies that adding blocks below finalized view is a no-op. +func (suite *PendingBlocksSuite) TestAddBelowFinalizedView() { + finalizedView := uint64(100) + buffer := NewPendingBlocks(finalizedView) + + // Create a block with view below finalized + parent := unittest.BlockHeaderFixture() + parent.View = finalizedView - 10 + block := suite.blockWithParent(parent) + // Note: Block views are set when creating the block, we can't modify them directly + // This test verifies the behavior when a block below finalized view is added + // The forest should reject it during Add + + buffer.Add(block) + + // Block should not be added if its view is below finalized + // Note: The actual behavior depends on the forest implementation + // If the block was created with view >= finalizedView, it will be added + // We'll verify the behavior is correct + _, ok := buffer.ByID(block.Message.Block.ID()) + // If view is below finalized, it should not be added + if block.Message.Block.View < finalizedView { + suite.Assert().False(ok) + suite.Assert().Equal(uint(0), buffer.Size()) + } +} + +// TestByID tests retrieving blocks by ID. +func (suite *PendingBlocksSuite) TestByID() { + block := suite.block() + suite.buffer.Add(block) + + // Test retrieving existing block + retrieved, ok := suite.buffer.ByID(block.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(block.Message.Block.ID(), retrieved.Message.Block.ID()) + + // Test retrieving non-existent block + nonExistentID := unittest.IdentifierFixture() + _, ok = suite.buffer.ByID(nonExistentID) + suite.Assert().False(ok) +} + +// TestByParentID tests retrieving blocks by parent ID. +func (suite *PendingBlocksSuite) TestByParentID() { + parent := suite.block() + suite.buffer.Add(parent) + + // Create multiple children of the parent + child1 := suite.blockWithParent(parent.Message.Block.ToHeader()) + child2 := suite.blockWithParent(parent.Message.Block.ToHeader()) + grandchild := suite.blockWithParent(child1.Message.Block.ToHeader()) + unrelated := suite.block() + + suite.buffer.Add(child1) + suite.buffer.Add(child2) + suite.buffer.Add(grandchild) + suite.buffer.Add(unrelated) + + // Test retrieving children of parent + children, ok := suite.buffer.ByParentID(parent.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Len(children, 2) + + // Verify correct children are returned + childIDs := make(map[flow.Identifier]bool) + for _, child := range children { + childIDs[child.Message.Block.ID()] = true + } + suite.Assert().True(childIDs[child1.Message.Block.ID()]) + suite.Assert().True(childIDs[child2.Message.Block.ID()]) + suite.Assert().False(childIDs[grandchild.Message.Block.ID()]) // grandchild is not direct child + + // Test retrieving children of child1 + grandchildren, ok := suite.buffer.ByParentID(child1.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Len(grandchildren, 1) + suite.Assert().Equal(grandchild.Message.Block.ID(), grandchildren[0].Message.Block.ID()) + + // Test retrieving children of non-existent parent + nonExistentParentID := unittest.IdentifierFixture() + _, ok = suite.buffer.ByParentID(nonExistentParentID) + suite.Assert().False(ok) +} + +// TestByParentIDOnlyDirectChildren verifies that ByParentID only returns direct children. +func (suite *PendingBlocksSuite) TestByParentIDOnlyDirectChildren() { + parent := suite.block() + suite.buffer.Add(parent) + + child := suite.blockWithParent(parent.Message.Block.ToHeader()) + grandchild := suite.blockWithParent(child.Message.Block.ToHeader()) + + suite.buffer.Add(child) + suite.buffer.Add(grandchild) + + // Parent should only have child, not grandchild + children, ok := suite.buffer.ByParentID(parent.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Len(children, 1) + suite.Assert().Equal(child.Message.Block.ID(), children[0].Message.Block.ID()) + + // Child should have grandchild + grandchildren, ok := suite.buffer.ByParentID(child.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Len(grandchildren, 1) + suite.Assert().Equal(grandchild.Message.Block.ID(), grandchildren[0].Message.Block.ID()) +} + +// TestPruneByView tests pruning blocks by view. +func (suite *PendingBlocksSuite) TestPruneByView() { + const N = 100 // number of blocks to test with + blocks := make([]flow.Slashable[*flow.Proposal], 0, N) + + // Build a buffer with various blocks + for i := 0; i < N; i++ { + // 10% of the time, add a new unrelated block + if i%10 == 0 { + block := suite.block() + suite.buffer.Add(block) + blocks = append(blocks, block) + continue + } + + // 90% of the time, build on an existing block + if i%2 == 1 && len(blocks) > 0 { + parent := blocks[rand.Intn(len(blocks))] + block := suite.blockWithParent(parent.Message.Block.ToHeader()) + suite.buffer.Add(block) + blocks = append(blocks, block) + } + } + + // Pick a view to prune that's guaranteed to prune at least one block + pruneAt := blocks[rand.Intn(len(blocks))].Message.Block.View + err := suite.buffer.PruneByView(pruneAt) + suite.Assert().NoError(err) + + // Verify blocks at or below prune view are removed + for _, block := range blocks { + view := block.Message.Block.View + id := block.Message.Block.ID() + + if view <= pruneAt { + _, exists := suite.buffer.ByID(id) + suite.Assert().False(exists, "block at view %d should be pruned", view) + } else { + _, exists := suite.buffer.ByID(id) + suite.Assert().True(exists, "block at view %d should not be pruned", view) + } + } +} + +// TestPruneByViewBelowFinalizedView verifies that pruning below finalized view returns an error. +func (suite *PendingBlocksSuite) TestPruneByViewBelowFinalizedView() { + finalizedView := uint64(100) + buffer := NewPendingBlocks(finalizedView) + + // Add some blocks above finalized view + parent := unittest.BlockHeaderFixture() + parent.View = finalizedView + 10 + block := suite.blockWithParent(parent) + buffer.Add(block) + + // Prune at finalized view should succeed + err := buffer.PruneByView(finalizedView) + suite.Assert().NoError(err) + + // Prune below finalized view should fail + err = buffer.PruneByView(finalizedView - 1) + suite.Assert().Error(err) + suite.Assert().True(mempool.IsBelowPrunedThresholdError(err)) +} + +// TestPruneByViewMultipleTimes tests that pruning multiple times works correctly. +func (suite *PendingBlocksSuite) TestPruneByViewMultipleTimes() { + // Create blocks at different views + parentHeader := unittest.BlockHeaderFixture() + parentHeader.View = 10 + parent := suite.blockWithParent(parentHeader) + suite.buffer.Add(parent) + + // Create children - views will be automatically set to be greater than parent + child1 := suite.blockWithParent(parent.Message.Block.ToHeader()) + suite.buffer.Add(child1) + + child2 := suite.blockWithParent(parent.Message.Block.ToHeader()) + suite.buffer.Add(child2) + + // Get actual views + parentView := parent.Message.Block.View + child1View := child1.Message.Block.View + child2View := child2.Message.Block.View + + // Find minimum child view to ensure we prune between parent and children + minChildView := child1View + if child2View < minChildView { + minChildView = child2View + } + + // Prune at a view between parent and children (should remove parent only) + // Use parentView + 1 to ensure we're above parent but below children + pruneView1 := parentView + 1 + if pruneView1 >= minChildView { + // If children are too close to parent, use a view just below the minimum child view + pruneView1 = minChildView - 1 + } + err := suite.buffer.PruneByView(pruneView1) + suite.Assert().NoError(err) + suite.Assert().False(suite.buffer.Size() == 0) // children should remain + + // Verify parent is pruned but children remain + _, ok := suite.buffer.ByID(parent.Message.Block.ID()) + suite.Assert().False(ok) + _, ok = suite.buffer.ByID(child1.Message.Block.ID()) + suite.Assert().True(ok) + _, ok = suite.buffer.ByID(child2.Message.Block.ID()) + suite.Assert().True(ok) + + // Prune at a view that removes all remaining blocks + maxView := child1View + if child2View > maxView { + maxView = child2View + } + err = suite.buffer.PruneByView(maxView) + suite.Assert().NoError(err) + suite.Assert().Equal(uint(0), suite.buffer.Size()) +} + +// TestSize tests the Size method. +func (suite *PendingBlocksSuite) TestSize() { + // Initially empty + suite.Assert().Equal(uint(0), suite.buffer.Size()) + + // Add blocks and verify size increases + block1 := suite.block() + suite.buffer.Add(block1) + suite.Assert().Equal(uint(1), suite.buffer.Size()) + + block2 := suite.block() + suite.buffer.Add(block2) + suite.Assert().Equal(uint(2), suite.buffer.Size()) + + // Adding duplicate should not increase size + suite.buffer.Add(block1) + suite.Assert().Equal(uint(2), suite.buffer.Size()) +} + +// TestConcurrentAccess tests that the buffer is safe for concurrent access. +func (suite *PendingBlocksSuite) TestConcurrentAccess() { + const numGoroutines = 10 + const blocksPerGoroutine = 10 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + // Concurrently add blocks + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < blocksPerGoroutine; j++ { + block := suite.block() + suite.buffer.Add(block) + } + }() + } + + wg.Wait() + + // Verify all blocks were added (accounting for potential duplicates) + suite.Assert().GreaterOrEqual(suite.buffer.Size(), uint(numGoroutines*blocksPerGoroutine/2)) +} + +// TestConcurrentReadWrite tests concurrent reads and writes. +func (suite *PendingBlocksSuite) TestConcurrentReadWrite() { + const numWriters = 5 + const numReaders = 5 + const blocksPerWriter = 10 + + var wg sync.WaitGroup + + // Start writers + for i := 0; i < numWriters; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < blocksPerWriter; j++ { + block := suite.block() + suite.buffer.Add(block) + } + }() + } + + // Start readers + for i := 0; i < numReaders; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < blocksPerWriter; j++ { + suite.buffer.Size() + suite.buffer.ByID(unittest.IdentifierFixture()) + } + }() + } + + wg.Wait() + // Test should complete without race conditions +} + +// TestChildIndexing verifies that child indexing works correctly with multiple children. +func (suite *PendingBlocksSuite) TestChildIndexing() { + parent := suite.block() + suite.buffer.Add(parent) + + child1 := suite.blockWithParent(parent.Message.Block.ToHeader()) + child2 := suite.blockWithParent(parent.Message.Block.ToHeader()) + grandchild := suite.blockWithParent(child1.Message.Block.ToHeader()) + unrelated := suite.block() + + suite.buffer.Add(child1) + suite.buffer.Add(child2) + suite.buffer.Add(grandchild) + suite.buffer.Add(unrelated) + + // Verify parent has correct children + children, ok := suite.buffer.ByParentID(parent.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Len(children, 2) + suite.Assert().Contains( + []flow.Identifier{children[0].Message.Block.ID(), children[1].Message.Block.ID()}, + child1.Message.Block.ID(), + ) + suite.Assert().Contains( + []flow.Identifier{children[0].Message.Block.ID(), children[1].Message.Block.ID()}, + child2.Message.Block.ID(), + ) + + // Verify unrelated block is not a child of parent + suite.Assert().NotEqual(unrelated.Message.Block.ParentID, parent.Message.Block.ID()) +} + +// TestEmptyBufferOperations tests operations on an empty buffer. +func (suite *PendingBlocksSuite) TestEmptyBufferOperations() { + suite.Assert().Equal(uint(0), suite.buffer.Size()) + + // ByID should return false + _, ok := suite.buffer.ByID(unittest.IdentifierFixture()) + suite.Assert().False(ok) + + // ByParentID should return false + _, ok = suite.buffer.ByParentID(unittest.IdentifierFixture()) + suite.Assert().False(ok) + + // PruneByView should succeed (no-op) + err := suite.buffer.PruneByView(100) + suite.Assert().NoError(err) +} + +// TestAddAfterPrune verifies that blocks can be added after pruning. +func (suite *PendingBlocksSuite) TestAddAfterPrune() { + // Add and prune a block + parentHeader := unittest.BlockHeaderFixture() + parentHeader.View = 10 + block1 := suite.blockWithParent(parentHeader) + suite.buffer.Add(block1) + + err := suite.buffer.PruneByView(block1.Message.Block.View) + suite.Assert().NoError(err) + + // Verify block is pruned + _, ok := suite.buffer.ByID(block1.Message.Block.ID()) + suite.Assert().False(ok) + + // Add a new block after pruning + parentHeader2 := unittest.BlockHeaderFixture() + parentHeader2.View = 20 + block2 := suite.blockWithParent(parentHeader2) + suite.buffer.Add(block2) + + // Verify new block is added + retrieved, ok := suite.buffer.ByID(block2.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(block2.Message.Block.ID(), retrieved.Message.Block.ID()) + suite.Assert().Equal(uint(1), suite.buffer.Size()) +} From b3b14604d1c72671bb440a5ea2e20f5102c9108b Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Fri, 28 Nov 2025 11:52:18 -0800 Subject: [PATCH 18/23] adjust tests remove some ai-generated tests and adjust implementation and docs --- module/buffer/pending_blocks_test.go | 149 +++++---------------------- 1 file changed, 24 insertions(+), 125 deletions(-) diff --git a/module/buffer/pending_blocks_test.go b/module/buffer/pending_blocks_test.go index e6d8af6f008..ab2867520ad 100644 --- a/module/buffer/pending_blocks_test.go +++ b/module/buffer/pending_blocks_test.go @@ -73,29 +73,19 @@ func (suite *PendingBlocksSuite) TestAddDuplicate() { // TestAddBelowFinalizedView verifies that adding blocks below finalized view is a no-op. func (suite *PendingBlocksSuite) TestAddBelowFinalizedView() { - finalizedView := uint64(100) + finalizedView := uint64(1000) buffer := NewPendingBlocks(finalizedView) // Create a block with view below finalized - parent := unittest.BlockHeaderFixture() - parent.View = finalizedView - 10 - block := suite.blockWithParent(parent) - // Note: Block views are set when creating the block, we can't modify them directly - // This test verifies the behavior when a block below finalized view is added - // The forest should reject it during Add + block := suite.block() + block.Message.Block.ParentView = finalizedView - 10 + block.Message.Block.View = finalizedView - 5 buffer.Add(block) - // Block should not be added if its view is below finalized - // Note: The actual behavior depends on the forest implementation - // If the block was created with view >= finalizedView, it will be added - // We'll verify the behavior is correct _, ok := buffer.ByID(block.Message.Block.ID()) - // If view is below finalized, it should not be added - if block.Message.Block.View < finalizedView { - suite.Assert().False(ok) - suite.Assert().Equal(uint(0), buffer.Size()) - } + suite.Assert().False(ok) + suite.Assert().Equal(uint(0), buffer.Size()) } // TestByID tests retrieving blocks by ID. @@ -136,19 +126,12 @@ func (suite *PendingBlocksSuite) TestByParentID() { suite.Assert().Len(children, 2) // Verify correct children are returned - childIDs := make(map[flow.Identifier]bool) + retrievedChildIDs := make(map[flow.Identifier]bool) for _, child := range children { - childIDs[child.Message.Block.ID()] = true + retrievedChildIDs[child.Message.Block.ID()] = true } - suite.Assert().True(childIDs[child1.Message.Block.ID()]) - suite.Assert().True(childIDs[child2.Message.Block.ID()]) - suite.Assert().False(childIDs[grandchild.Message.Block.ID()]) // grandchild is not direct child - - // Test retrieving children of child1 - grandchildren, ok := suite.buffer.ByParentID(child1.Message.Block.ID()) - suite.Assert().True(ok) - suite.Assert().Len(grandchildren, 1) - suite.Assert().Equal(grandchild.Message.Block.ID(), grandchildren[0].Message.Block.ID()) + suite.Assert().True(retrievedChildIDs[child1.Message.Block.ID()]) + suite.Assert().True(retrievedChildIDs[child2.Message.Block.ID()]) // Test retrieving children of non-existent parent nonExistentParentID := unittest.IdentifierFixture() @@ -255,6 +238,7 @@ func (suite *PendingBlocksSuite) TestPruneByViewMultipleTimes() { // Create children - views will be automatically set to be greater than parent child1 := suite.blockWithParent(parent.Message.Block.ToHeader()) + child1.Message.Block.View++ suite.buffer.Add(child1) child2 := suite.blockWithParent(parent.Message.Block.ToHeader()) @@ -265,24 +249,13 @@ func (suite *PendingBlocksSuite) TestPruneByViewMultipleTimes() { child1View := child1.Message.Block.View child2View := child2.Message.Block.View - // Find minimum child view to ensure we prune between parent and children - minChildView := child1View - if child2View < minChildView { - minChildView = child2View - } - - // Prune at a view between parent and children (should remove parent only) - // Use parentView + 1 to ensure we're above parent but below children - pruneView1 := parentView + 1 - if pruneView1 >= minChildView { - // If children are too close to parent, use a view just below the minimum child view - pruneView1 = minChildView - 1 - } + // Prune at the parent's view (should remove parent only) + pruneView1 := parentView err := suite.buffer.PruneByView(pruneView1) suite.Assert().NoError(err) - suite.Assert().False(suite.buffer.Size() == 0) // children should remain // Verify parent is pruned but children remain + suite.Assert().Equal(uint(2), suite.buffer.Size()) _, ok := suite.buffer.ByID(parent.Message.Block.ID()) suite.Assert().False(ok) _, ok = suite.buffer.ByID(child1.Message.Block.ID()) @@ -291,11 +264,9 @@ func (suite *PendingBlocksSuite) TestPruneByViewMultipleTimes() { suite.Assert().True(ok) // Prune at a view that removes all remaining blocks - maxView := child1View - if child2View > maxView { - maxView = child2View - } - err = suite.buffer.PruneByView(maxView) + pruneView2 := max(child1View, child2View) + + err = suite.buffer.PruneByView(pruneView2) suite.Assert().NoError(err) suite.Assert().Equal(uint(0), suite.buffer.Size()) } @@ -320,11 +291,12 @@ func (suite *PendingBlocksSuite) TestSize() { } // TestConcurrentAccess tests that the buffer is safe for concurrent access. +// NOTE: correctness here depends on [PendingBlockSuite.block] not returning duplicates. func (suite *PendingBlocksSuite) TestConcurrentAccess() { const numGoroutines = 10 const blocksPerGoroutine = 10 - var wg sync.WaitGroup + wg := new(sync.WaitGroup) wg.Add(numGoroutines) // Concurrently add blocks @@ -337,79 +309,10 @@ func (suite *PendingBlocksSuite) TestConcurrentAccess() { } }() } - wg.Wait() - // Verify all blocks were added (accounting for potential duplicates) - suite.Assert().GreaterOrEqual(suite.buffer.Size(), uint(numGoroutines*blocksPerGoroutine/2)) -} - -// TestConcurrentReadWrite tests concurrent reads and writes. -func (suite *PendingBlocksSuite) TestConcurrentReadWrite() { - const numWriters = 5 - const numReaders = 5 - const blocksPerWriter = 10 - - var wg sync.WaitGroup - - // Start writers - for i := 0; i < numWriters; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < blocksPerWriter; j++ { - block := suite.block() - suite.buffer.Add(block) - } - }() - } - - // Start readers - for i := 0; i < numReaders; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < blocksPerWriter; j++ { - suite.buffer.Size() - suite.buffer.ByID(unittest.IdentifierFixture()) - } - }() - } - - wg.Wait() - // Test should complete without race conditions -} - -// TestChildIndexing verifies that child indexing works correctly with multiple children. -func (suite *PendingBlocksSuite) TestChildIndexing() { - parent := suite.block() - suite.buffer.Add(parent) - - child1 := suite.blockWithParent(parent.Message.Block.ToHeader()) - child2 := suite.blockWithParent(parent.Message.Block.ToHeader()) - grandchild := suite.blockWithParent(child1.Message.Block.ToHeader()) - unrelated := suite.block() - - suite.buffer.Add(child1) - suite.buffer.Add(child2) - suite.buffer.Add(grandchild) - suite.buffer.Add(unrelated) - - // Verify parent has correct children - children, ok := suite.buffer.ByParentID(parent.Message.Block.ID()) - suite.Assert().True(ok) - suite.Assert().Len(children, 2) - suite.Assert().Contains( - []flow.Identifier{children[0].Message.Block.ID(), children[1].Message.Block.ID()}, - child1.Message.Block.ID(), - ) - suite.Assert().Contains( - []flow.Identifier{children[0].Message.Block.ID(), children[1].Message.Block.ID()}, - child2.Message.Block.ID(), - ) - - // Verify unrelated block is not a child of parent - suite.Assert().NotEqual(unrelated.Message.Block.ParentID, parent.Message.Block.ID()) + // Verify all blocks were added + suite.Assert().Equal(uint(numGoroutines*blocksPerGoroutine), suite.buffer.Size()) } // TestEmptyBufferOperations tests operations on an empty buffer. @@ -432,9 +335,7 @@ func (suite *PendingBlocksSuite) TestEmptyBufferOperations() { // TestAddAfterPrune verifies that blocks can be added after pruning. func (suite *PendingBlocksSuite) TestAddAfterPrune() { // Add and prune a block - parentHeader := unittest.BlockHeaderFixture() - parentHeader.View = 10 - block1 := suite.blockWithParent(parentHeader) + block1 := suite.block() suite.buffer.Add(block1) err := suite.buffer.PruneByView(block1.Message.Block.View) @@ -444,10 +345,8 @@ func (suite *PendingBlocksSuite) TestAddAfterPrune() { _, ok := suite.buffer.ByID(block1.Message.Block.ID()) suite.Assert().False(ok) - // Add a new block after pruning - parentHeader2 := unittest.BlockHeaderFixture() - parentHeader2.View = 20 - block2 := suite.blockWithParent(parentHeader2) + // Add a new block after pruning with view above pruned view + block2 := suite.blockWithParent(block1.Message.Block.ToHeader()) suite.buffer.Add(block2) // Verify new block is added From beec64c8b54308ca82c51f64eda8d335235a4428 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Fri, 28 Nov 2025 11:53:37 -0800 Subject: [PATCH 19/23] remove old buffer backend --- module/buffer/backend.go | 133 ----------------------------- module/buffer/backend_test.go | 154 ---------------------------------- 2 files changed, 287 deletions(-) delete mode 100644 module/buffer/backend.go delete mode 100644 module/buffer/backend_test.go diff --git a/module/buffer/backend.go b/module/buffer/backend.go deleted file mode 100644 index fe841728178..00000000000 --- a/module/buffer/backend.go +++ /dev/null @@ -1,133 +0,0 @@ -package buffer - -import ( - "sync" - - "github.com/onflow/flow-go/model/flow" -) - -// item represents an item in the cache: the main block and auxiliary data that is -// needed to implement indexed lookups by parent ID and pruning by view. -type item[T extractProposalHeader] struct { - view uint64 - parentID flow.Identifier - block flow.Slashable[T] -} - -// extractProposalHeader is a type constraint for the generic type which allows to extract flow.ProposalHeader -// from the underlying type. -type extractProposalHeader interface { - ProposalHeader() *flow.ProposalHeader -} - -// backend implements a simple cache of pending blocks, indexed by parent ID and pruned by view. -type backend[T extractProposalHeader] struct { - mu sync.RWMutex - // map of pending header IDs, keyed by parent ID for ByParentID lookups - blocksByParent map[flow.Identifier][]flow.Identifier - // set of pending blocks, keyed by ID to avoid duplication - blocksByID map[flow.Identifier]*item[T] -} - -// newBackend returns a new pending header cache. -func newBackend[T extractProposalHeader]() *backend[T] { - cache := &backend[T]{ - blocksByParent: make(map[flow.Identifier][]flow.Identifier), - blocksByID: make(map[flow.Identifier]*item[T]), - } - return cache -} - -// add adds the item to the cache, returning false if it already exists and -// true otherwise. -func (b *backend[T]) add(block flow.Slashable[T]) bool { - header := block.Message.ProposalHeader().Header - blockID := header.ID() - - b.mu.Lock() - defer b.mu.Unlock() - - _, exists := b.blocksByID[blockID] - if exists { - return false - } - - item := &item[T]{ - view: header.View, - parentID: header.ParentID, - block: block, - } - - b.blocksByID[blockID] = item - b.blocksByParent[item.parentID] = append(b.blocksByParent[item.parentID], blockID) - - return true -} - -func (b *backend[T]) byID(id flow.Identifier) (*item[T], bool) { - b.mu.RLock() - defer b.mu.RUnlock() - - item, exists := b.blocksByID[id] - if !exists { - return nil, false - } - - return item, true -} - -// byParentID returns a list of cached blocks with the given parent. If no such -// blocks exist, returns false. -func (b *backend[T]) byParentID(parentID flow.Identifier) ([]*item[T], bool) { - b.mu.RLock() - defer b.mu.RUnlock() - - forParent, exists := b.blocksByParent[parentID] - if !exists { - return nil, false - } - - items := make([]*item[T], 0, len(forParent)) - for _, blockID := range forParent { - items = append(items, b.blocksByID[blockID]) - } - - return items, true -} - -// dropForParent removes all cached blocks with the given parent (non-recursively). -func (b *backend[T]) dropForParent(parentID flow.Identifier) { - b.mu.Lock() - defer b.mu.Unlock() - - children, exists := b.blocksByParent[parentID] - if !exists { - return - } - - for _, childID := range children { - delete(b.blocksByID, childID) - } - delete(b.blocksByParent, parentID) -} - -// pruneByView prunes any items in the cache that have view less than or -// equal to the given view. The pruning view should be the finalized view. -func (b *backend[T]) pruneByView(view uint64) { - b.mu.Lock() - defer b.mu.Unlock() - - for id, item := range b.blocksByID { - if item.view <= view { - delete(b.blocksByID, id) - delete(b.blocksByParent, item.parentID) - } - } -} - -// size returns the number of elements stored in teh backend -func (b *backend[T]) size() uint { - b.mu.RLock() - defer b.mu.RUnlock() - return uint(len(b.blocksByID)) -} diff --git a/module/buffer/backend_test.go b/module/buffer/backend_test.go deleted file mode 100644 index 588ed674297..00000000000 --- a/module/buffer/backend_test.go +++ /dev/null @@ -1,154 +0,0 @@ -package buffer - -import ( - "math/rand" - "testing" - - "github.com/stretchr/testify/suite" - - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/utils/unittest" -) - -type BackendSuite struct { - suite.Suite - backend *backend[*flow.Proposal] -} - -func TestBackendSuite(t *testing.T) { - suite.Run(t, new(BackendSuite)) -} - -func (suite *BackendSuite) SetupTest() { - suite.backend = newBackend[*flow.Proposal]() -} - -func (suite *BackendSuite) item() *item[*flow.Proposal] { - parent := unittest.BlockHeaderFixture() - return suite.itemWithParent(parent) -} - -func (suite *BackendSuite) itemWithParent(parent *flow.Header) *item[*flow.Proposal] { - block := unittest.BlockWithParentFixture(parent) - return &item[*flow.Proposal]{ - view: block.View, - parentID: block.ParentID, - block: flow.Slashable[*flow.Proposal]{ - OriginID: unittest.IdentifierFixture(), - Message: unittest.ProposalFromBlock(block), - }, - } -} - -func (suite *BackendSuite) Add(item *item[*flow.Proposal]) { - suite.backend.add(item.block) -} - -func (suite *BackendSuite) TestAdd() { - expected := suite.item() - suite.backend.add(expected.block) - - actual, ok := suite.backend.byID(expected.block.Message.Block.ID()) - suite.Assert().True(ok) - suite.Assert().Equal(expected, actual) - - byParent, ok := suite.backend.byParentID(expected.parentID) - suite.Assert().True(ok) - suite.Assert().Len(byParent, 1) - suite.Assert().Equal(expected, byParent[0]) -} - -func (suite *BackendSuite) TestChildIndexing() { - - parent := suite.item() - child1 := suite.itemWithParent(parent.block.Message.Block.ToHeader()) - child2 := suite.itemWithParent(parent.block.Message.Block.ToHeader()) - grandchild := suite.itemWithParent(child1.block.Message.Block.ToHeader()) - unrelated := suite.item() - - suite.Add(child1) - suite.Add(child2) - suite.Add(grandchild) - suite.Add(unrelated) - - suite.Run("retrieve by parent ID", func() { - byParent, ok := suite.backend.byParentID(parent.block.Message.Block.ID()) - suite.Assert().True(ok) - // should only include direct children - suite.Assert().Len(byParent, 2) - suite.Assert().Contains(byParent, child1) - suite.Assert().Contains(byParent, child2) - }) - - suite.Run("drop for parent ID", func() { - suite.backend.dropForParent(parent.block.Message.Block.ID()) - - // should only drop direct children - _, exists := suite.backend.byID(child1.block.Message.Block.ID()) - suite.Assert().False(exists) - _, exists = suite.backend.byID(child2.block.Message.Block.ID()) - suite.Assert().False(exists) - - // grandchildren should be unaffected - _, exists = suite.backend.byParentID(child1.block.Message.Block.ID()) - suite.Assert().True(exists) - _, exists = suite.backend.byID(grandchild.block.Message.Block.ID()) - suite.Assert().True(exists) - - // nothing else should be affected - _, exists = suite.backend.byID(unrelated.block.Message.Block.ID()) - suite.Assert().True(exists) - }) -} - -func (suite *BackendSuite) TestPruneByView() { - - const N = 100 // number of items we're testing with - items := make([]*item[*flow.Proposal], 0, N) - - // build a pending buffer - for i := 0; i < N; i++ { - - // 10% of the time, add a new unrelated pending header - if i%10 == 0 { - item := suite.item() - suite.Add(item) - items = append(items, item) - continue - } - - // 90% of the time, build on an existing header - if i%2 == 1 { - parent := items[rand.Intn(len(items))] - item := suite.itemWithParent(parent.block.Message.Block.ToHeader()) - suite.Add(item) - items = append(items, item) - } - } - - // pick a height to prune that's guaranteed to prune at least one item - pruneAt := items[rand.Intn(len(items))].view - suite.backend.pruneByView(pruneAt) - - for _, item := range items { - view := item.view - id := item.block.Message.Block.ID() - parentID := item.parentID - - // check that items below the prune view were removed - if view <= pruneAt { - _, exists := suite.backend.byID(id) - suite.Assert().False(exists) - _, exists = suite.backend.byParentID(parentID) - suite.Assert().False(exists) - } - - // check that other items were not removed - if view > item.view { - _, exists := suite.backend.byID(id) - suite.Assert().True(exists) - _, exists = suite.backend.byParentID(parentID) - suite.Assert().True(exists) - } - } -} From e813f5cde2764c5b29f9eb60caf06d7b89b25d52 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Fri, 28 Nov 2025 12:35:13 -0800 Subject: [PATCH 20/23] ai: add threshold also to mempool it already exists in the compliance engine as well, but including in mempool provides stronger localized guarantee of behaviour --- cmd/consensus/main.go | 2 +- consensus/integration/nodes_test.go | 2 +- engine/collection/compliance/core.go | 20 +++- engine/collection/compliance/core_test.go | 2 +- .../epochmgr/factories/compliance.go | 2 +- engine/consensus/compliance/core.go | 20 +++- engine/consensus/compliance/core_test.go | 2 +- module/buffer.go | 4 +- module/buffer/pending_blocks.go | 46 ++++++++-- module/buffer/pending_blocks_test.go | 91 ++++++++++++++++--- module/mempool/errors.go | 27 ++++++ module/metrics/example/verification/main.go | 2 +- module/mock/generic_pending_block_buffer.go | 17 +++- module/mock/pending_block_buffer.go | 17 +++- module/mock/pending_cluster_block_buffer.go | 17 +++- 15 files changed, 232 insertions(+), 39 deletions(-) diff --git a/cmd/consensus/main.go b/cmd/consensus/main.go index 1abc9ead98b..a2f38189d44 100644 --- a/cmd/consensus/main.go +++ b/cmd/consensus/main.go @@ -805,7 +805,7 @@ func main() { }). Component("consensus compliance engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { // initialize the pending blocks cache - proposals := buffer.NewPendingBlocks(node.LastFinalizedHeader.View) + proposals := buffer.NewPendingBlocks(node.LastFinalizedHeader.View, node.ComplianceConfig.GetSkipNewProposalsThreshold()) logger := createLogger(node.Logger, node.RootChainID) complianceCore, err := compliance.NewCore( diff --git a/consensus/integration/nodes_test.go b/consensus/integration/nodes_test.go index 7a4053f366c..6c8670f8ffd 100644 --- a/consensus/integration/nodes_test.go +++ b/consensus/integration/nodes_test.go @@ -518,7 +518,7 @@ func createNode( require.NoError(t, err) // initialize the pending blocks cache - cache := buffer.NewPendingBlocks(rootHeader.View) + cache := buffer.NewPendingBlocks(rootHeader.View, modulecompliance.DefaultConfig().SkipNewProposalsThreshold) rootQC, err := rootSnapshot.QuorumCertificate() require.NoError(t, err) diff --git a/engine/collection/compliance/core.go b/engine/collection/compliance/core.go index 24f9bc2e80a..eb2e3971399 100644 --- a/engine/collection/compliance/core.go +++ b/engine/collection/compliance/core.go @@ -196,7 +196,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error _, found := c.pending.ByID(block.ParentID) if found { // add the block to the cache - c.pending.Add(proposal) + if err := c.pending.Add(proposal); err != nil { + if mempool.IsBeyondActiveRangeError(err) { + // In general we expect the block buffer to use SkipNewProposalsThreshold, + // however since it is instantiated outside this component, we allow the thresholds to differ + log.Debug().Err(err).Msg("dropping block beyond block buffer active range") + return nil + } + return fmt.Errorf("could not add proposal to pending buffer: %w", err) + } c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size()) return nil @@ -210,7 +218,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error return fmt.Errorf("could not check parent exists: %w", err) } if !exists { - c.pending.Add(proposal) + if err := c.pending.Add(proposal); err != nil { + if mempool.IsBeyondActiveRangeError(err) { + // In general we expect the block buffer to use SkipNewProposalsThreshold, + // however since it is instantiated outside this component, we allow the thresholds to differ + log.Debug().Err(err).Msg("dropping block beyond block buffer active range") + return nil + } + return fmt.Errorf("could not add proposal to pending buffer: %w", err) + } c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size()) c.sync.RequestBlock(block.ParentID, block.Height-1) diff --git a/engine/collection/compliance/core_test.go b/engine/collection/compliance/core_test.go index 4a495f38a63..bac5e1d39fc 100644 --- a/engine/collection/compliance/core_test.go +++ b/engine/collection/compliance/core_test.go @@ -542,7 +542,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() { } // replace the engine buffer with the real one - cs.core.pending = realbuffer.NewPendingClusterBlocks(cs.head.Block.View) + cs.core.pending = realbuffer.NewPendingClusterBlocks(cs.head.Block.View, 100_000) // process all of the descendants for _, proposal := range proposals { diff --git a/engine/collection/epochmgr/factories/compliance.go b/engine/collection/epochmgr/factories/compliance.go index 5d1596994c6..c23eadec680 100644 --- a/engine/collection/epochmgr/factories/compliance.go +++ b/engine/collection/epochmgr/factories/compliance.go @@ -70,7 +70,7 @@ func (f *ComplianceEngineFactory) Create( if err != nil { return nil, fmt.Errorf("could not get finalized header: %w", err) } - cache := buffer.NewPendingClusterBlocks(final.View) + cache := buffer.NewPendingClusterBlocks(final.View, f.config.GetSkipNewProposalsThreshold()) core, err := compliance.NewCore( f.log, f.engMetrics, diff --git a/engine/consensus/compliance/core.go b/engine/consensus/compliance/core.go index c8dff879788..3ac5d9c4d1f 100644 --- a/engine/consensus/compliance/core.go +++ b/engine/consensus/compliance/core.go @@ -203,7 +203,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error { _, found := c.pending.ByID(header.ParentID) if found { // add the block to the cache - c.pending.Add(proposal) + if err := c.pending.Add(proposal); err != nil { + if mempool.IsBeyondActiveRangeError(err) { + // In general we expect the block buffer to use SkipNewProposalsThreshold, + // however since it is instantiated outside this component, we allow the thresholds to differ + log.Debug().Err(err).Msg("dropping block beyond block buffer active range") + return nil + } + return fmt.Errorf("could not add proposal to pending buffer: %w", err) + } c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) return nil @@ -217,7 +225,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error { return fmt.Errorf("could not check parent exists: %w", err) } if !exists { - c.pending.Add(proposal) + if err := c.pending.Add(proposal); err != nil { + if mempool.IsBeyondActiveRangeError(err) { + // In general we expect the block buffer to use SkipNewProposalsThreshold, + // however since it is instantiated outside this component, we allow the thresholds to differ + log.Debug().Err(err).Msg("dropping block beyond block buffer active range") + return nil + } + return fmt.Errorf("could not add proposal to pending buffer: %w", err) + } c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) c.sync.RequestBlock(header.ParentID, header.Height-1) diff --git a/engine/consensus/compliance/core_test.go b/engine/consensus/compliance/core_test.go index d33fc523bb7..d59ab0ca4fb 100644 --- a/engine/consensus/compliance/core_test.go +++ b/engine/consensus/compliance/core_test.go @@ -584,7 +584,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() { } // replace the engine buffer with the real one - cs.core.pending = real.NewPendingBlocks(cs.head.View) + cs.core.pending = real.NewPendingBlocks(cs.head.View, 100_000) // check that we request the ancestor block each time cs.sync.On("RequestBlock", missingBlock.ID(), missingBlock.Height).Times(len(proposals)) diff --git a/module/buffer.go b/module/buffer.go index f8ee29230d1..f130a4caf52 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -20,7 +20,9 @@ type BufferedProposal interface { type GenericPendingBlockBuffer[T BufferedProposal] interface { // Add adds the input block to the block buffer. // If the block already exists, or is below the finalized view, this is a no-op. - Add(block flow.Slashable[T]) + // Errors returns: + // - mempool.BlockViewTooFarAheadError if block.View > finalizedView + activeViewRangeSize (when activeViewRangeSize > 0) + Add(block flow.Slashable[T]) error // ByID returns the block with the given ID, if it exists. // Otherwise returns (nil, false) diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index da80c91835e..8852166d455 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -7,6 +7,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/forest" + "github.com/onflow/flow-go/module/mempool" ) // proposalVertex implements [forest.Vertex] for generic block proposals. @@ -51,8 +52,9 @@ func (v proposalVertex[T]) Parent() (flow.Identifier, uint64) { // // Safe for concurrent use. type GenericPendingBlocks[T module.BufferedProposal] struct { - lock *sync.Mutex - forest *forest.LevelledForest + lock *sync.Mutex + forest *forest.LevelledForest + activeViewRangeSize uint64 } type PendingBlocks = GenericPendingBlocks[*flow.Proposal] @@ -61,26 +63,45 @@ type PendingClusterBlocks = GenericPendingBlocks[*cluster.Proposal] var _ module.PendingBlockBuffer = (*PendingBlocks)(nil) var _ module.PendingClusterBlockBuffer = (*PendingClusterBlocks)(nil) -func NewPendingBlocks(finalizedView uint64) *PendingBlocks { +func NewPendingBlocks(finalizedView uint64, activeViewRangeSize uint64) *PendingBlocks { return &PendingBlocks{ - lock: new(sync.Mutex), - forest: forest.NewLevelledForest(finalizedView), + lock: new(sync.Mutex), + // LevelledForest's lowestLevel is inclusive, so add 1 here + forest: forest.NewLevelledForest(finalizedView + 1), + activeViewRangeSize: activeViewRangeSize, } } -func NewPendingClusterBlocks(finalizedView uint64) *PendingClusterBlocks { +func NewPendingClusterBlocks(finalizedView uint64, activeViewRangeSize uint64) *PendingClusterBlocks { return &PendingClusterBlocks{ - lock: new(sync.Mutex), - forest: forest.NewLevelledForest(finalizedView), + lock: new(sync.Mutex), + forest: forest.NewLevelledForest(finalizedView), + activeViewRangeSize: activeViewRangeSize, } } // Add adds the input block to the block buffer. // If the block already exists, or is below the finalized view, this is a no-op. -func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) { +// Errors returns: +// - mempool.BeyondActiveRangeError if block.View > finalizedView + activeViewRangeSize (when activeViewRangeSize > 0) +func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) error { b.lock.Lock() defer b.lock.Unlock() + + blockView := block.Message.ProposalHeader().Header.View + finalizedView := b.highestPrunedView() + + // Check if block view exceeds the active view range size + // If activeViewRangeSize is 0, there's no limitation + if b.activeViewRangeSize > 0 && blockView > finalizedView+b.activeViewRangeSize { + return mempool.NewBeyondActiveRangeError( + "block view %d exceeds active view range size: finalized view %d + range size %d = %d", + blockView, finalizedView, b.activeViewRangeSize, finalizedView+b.activeViewRangeSize, + ) + } + b.forest.AddVertex(newProposalVertex(block)) + return nil } // ByID returns the block with the given ID, if it exists. @@ -131,3 +152,10 @@ func (b *GenericPendingBlocks[T]) Size() uint { defer b.lock.Unlock() return uint(b.forest.GetSize()) } + +// highestPrunedView returns the highest pruned view (finalized view). +// CAUTION: Caller must acquire the lock. +func (b *GenericPendingBlocks[T]) highestPrunedView() uint64 { + // LevelledForest.LowestLevel is the lowest UNPRUNED view, so subtract 1 here + return b.forest.LowestLevel - 1 +} diff --git a/module/buffer/pending_blocks_test.go b/module/buffer/pending_blocks_test.go index ab2867520ad..fe0cb8b9879 100644 --- a/module/buffer/pending_blocks_test.go +++ b/module/buffer/pending_blocks_test.go @@ -22,8 +22,9 @@ func TestPendingBlocksSuite(t *testing.T) { } func (suite *PendingBlocksSuite) SetupTest() { - // Initialize with finalized view 0 - suite.buffer = NewPendingBlocks(0) + // Initialize with finalized view 0 and no view range limitation (0 = no limit) + // Individual tests that need the limitation will create their own buffers + suite.buffer = NewPendingBlocks(0, 0) } // block creates a new block proposal wrapped as Slashable. @@ -41,7 +42,7 @@ func (suite *PendingBlocksSuite) blockWithParent(parent *flow.Header) flow.Slash // TestAdd tests adding blocks to the buffer. func (suite *PendingBlocksSuite) TestAdd() { block := suite.block() - suite.buffer.Add(block) + suite.Require().NoError(suite.buffer.Add(block)) // Verify block can be retrieved by ID retrieved, ok := suite.buffer.ByID(block.Message.Block.ID()) @@ -59,8 +60,8 @@ func (suite *PendingBlocksSuite) TestAdd() { // TestAddDuplicate verifies that adding the same block twice is a no-op. func (suite *PendingBlocksSuite) TestAddDuplicate() { block := suite.block() - suite.buffer.Add(block) - suite.buffer.Add(block) // Add again + suite.Require().NoError(suite.buffer.Add(block)) + suite.Require().NoError(suite.buffer.Add(block)) // Add again // Should still only have one block suite.Assert().Equal(uint(1), suite.buffer.Size()) @@ -74,24 +75,88 @@ func (suite *PendingBlocksSuite) TestAddDuplicate() { // TestAddBelowFinalizedView verifies that adding blocks below finalized view is a no-op. func (suite *PendingBlocksSuite) TestAddBelowFinalizedView() { finalizedView := uint64(1000) - buffer := NewPendingBlocks(finalizedView) + buffer := NewPendingBlocks(finalizedView, 100_000) // Create a block with view below finalized block := suite.block() block.Message.Block.ParentView = finalizedView - 10 block.Message.Block.View = finalizedView - 5 - buffer.Add(block) + suite.Require().NoError(buffer.Add(block)) _, ok := buffer.ByID(block.Message.Block.ID()) suite.Assert().False(ok) suite.Assert().Equal(uint(0), buffer.Size()) } +// TestAddExceedsActiveViewRangeSize verifies that adding blocks that exceed the active view range size returns an error. +func (suite *PendingBlocksSuite) TestAddExceedsActiveViewRangeSize() { + finalizedView := uint64(1000) + activeViewRangeSize := uint64(100) + buffer := NewPendingBlocks(finalizedView, activeViewRangeSize) + + // Create a parent header and then a block that exceeds the active view range size + parentHeader := unittest.BlockHeaderFixture() + parentHeader.View = finalizedView + 50 + block := suite.blockWithParent(parentHeader) + block.Message.Block.View = finalizedView + activeViewRangeSize + 1 + + err := buffer.Add(block) + suite.Assert().Error(err) + suite.Assert().True(mempool.IsBeyondActiveRangeError(err)) + + // Verify block was not added + _, ok := buffer.ByID(block.Message.Block.ID()) + suite.Assert().False(ok) + suite.Assert().Equal(uint(0), buffer.Size()) +} + +// TestAddWithinActiveViewRangeSize verifies that adding blocks within the active view range size succeeds. +func (suite *PendingBlocksSuite) TestAddWithinActiveViewRangeSize() { + finalizedView := uint64(1000) + activeViewRangeSize := uint64(100) + buffer := NewPendingBlocks(finalizedView, activeViewRangeSize) + + // Create a parent header and then a block that is exactly at the limit + parentHeader := unittest.BlockHeaderFixture() + parentHeader.View = finalizedView + 50 + block := suite.blockWithParent(parentHeader) + block.Message.Block.View = finalizedView + activeViewRangeSize + + err := buffer.Add(block) + suite.Assert().NoError(err) + + // Verify block was added + _, ok := buffer.ByID(block.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(uint(1), buffer.Size()) +} + +// TestAddWithZeroActiveViewRangeSize verifies that when activeViewRangeSize is 0, there's no limitation. +func (suite *PendingBlocksSuite) TestAddWithZeroActiveViewRangeSize() { + finalizedView := uint64(1000) + activeViewRangeSize := uint64(0) // No limitation + buffer := NewPendingBlocks(finalizedView, activeViewRangeSize) + + // Create a parent header and then a block that is very far ahead + parentHeader := unittest.BlockHeaderFixture() + parentHeader.View = finalizedView + 500_000 + block := suite.blockWithParent(parentHeader) + block.Message.Block.View = finalizedView + 1_000_000 + + err := buffer.Add(block) + suite.Assert().NoError(err) + + // Verify block was added + _, ok := buffer.ByID(block.Message.Block.ID()) + suite.Assert().True(ok) + suite.Assert().Equal(uint(1), buffer.Size()) +} + // TestByID tests retrieving blocks by ID. func (suite *PendingBlocksSuite) TestByID() { block := suite.block() - suite.buffer.Add(block) + suite.Require().NoError(suite.buffer.Add(block)) // Test retrieving existing block retrieved, ok := suite.buffer.ByID(block.Message.Block.ID()) @@ -173,7 +238,7 @@ func (suite *PendingBlocksSuite) TestPruneByView() { // 10% of the time, add a new unrelated block if i%10 == 0 { block := suite.block() - suite.buffer.Add(block) + suite.Require().NoError(suite.buffer.Add(block)) blocks = append(blocks, block) continue } @@ -182,7 +247,7 @@ func (suite *PendingBlocksSuite) TestPruneByView() { if i%2 == 1 && len(blocks) > 0 { parent := blocks[rand.Intn(len(blocks))] block := suite.blockWithParent(parent.Message.Block.ToHeader()) - suite.buffer.Add(block) + suite.Require().NoError(suite.buffer.Add(block)) blocks = append(blocks, block) } } @@ -210,13 +275,13 @@ func (suite *PendingBlocksSuite) TestPruneByView() { // TestPruneByViewBelowFinalizedView verifies that pruning below finalized view returns an error. func (suite *PendingBlocksSuite) TestPruneByViewBelowFinalizedView() { finalizedView := uint64(100) - buffer := NewPendingBlocks(finalizedView) + buffer := NewPendingBlocks(finalizedView, 100_000) // Add some blocks above finalized view parent := unittest.BlockHeaderFixture() parent.View = finalizedView + 10 block := suite.blockWithParent(parent) - buffer.Add(block) + suite.Require().NoError(buffer.Add(block)) // Prune at finalized view should succeed err := buffer.PruneByView(finalizedView) @@ -305,7 +370,7 @@ func (suite *PendingBlocksSuite) TestConcurrentAccess() { defer wg.Done() for j := 0; j < blocksPerGoroutine; j++ { block := suite.block() - suite.buffer.Add(block) + suite.Require().NoError(suite.buffer.Add(block)) } }() } diff --git a/module/mempool/errors.go b/module/mempool/errors.go index 1dec04c84f8..8d1c4ecc561 100644 --- a/module/mempool/errors.go +++ b/module/mempool/errors.go @@ -56,3 +56,30 @@ func IsBelowPrunedThresholdError(err error) bool { var newIsBelowPrunedThresholdError BelowPrunedThresholdError return errors.As(err, &newIsBelowPrunedThresholdError) } + +// BeyondActiveRangeError indicates that an input is beyond the allowed active range. +// Mempools may impose an active range (eg. by view or by height) to bound the maximum +// possible size of the mempool and limit spam attacks. +type BeyondActiveRangeError struct { + err error +} + +func NewBeyondActiveRangeError(msg string, args ...interface{}) error { + return BeyondActiveRangeError{ + err: fmt.Errorf(msg, args...), + } +} + +func (e BeyondActiveRangeError) Unwrap() error { + return e.err +} + +func (e BeyondActiveRangeError) Error() string { + return e.err.Error() +} + +// IsBeyondActiveRangeError returns whether the given error is a BeyondActiveRangeError error +func IsBeyondActiveRangeError(err error) bool { + var beyondActiveRangeErr BeyondActiveRangeError + return errors.As(err, &beyondActiveRangeErr) +} diff --git a/module/metrics/example/verification/main.go b/module/metrics/example/verification/main.go index 88ed6b52044..1966d97e8b8 100644 --- a/module/metrics/example/verification/main.go +++ b/module/metrics/example/verification/main.go @@ -108,7 +108,7 @@ func demo() { } // creates consensus cache for follower engine, and registers size method of backend for metrics - pendingBlocks := buffer.NewPendingBlocks(0) + pendingBlocks := buffer.NewPendingBlocks(0, 100_000) err = mc.Register(metrics.ResourcePendingBlock, pendingBlocks.Size) if err != nil { panic(err) diff --git a/module/mock/generic_pending_block_buffer.go b/module/mock/generic_pending_block_buffer.go index 4dde4b3a26c..22665f5f847 100644 --- a/module/mock/generic_pending_block_buffer.go +++ b/module/mock/generic_pending_block_buffer.go @@ -15,8 +15,21 @@ type GenericPendingBlockBuffer[T module.BufferedProposal] struct { } // Add provides a mock function with given fields: block -func (_m *GenericPendingBlockBuffer[T]) Add(block flow.Slashable[T]) { - _m.Called(block) +func (_m *GenericPendingBlockBuffer[T]) Add(block flow.Slashable[T]) error { + ret := _m.Called(block) + + if len(ret) == 0 { + panic("no return value specified for Add") + } + + var r0 error + if rf, ok := ret.Get(0).(func(flow.Slashable[T]) error); ok { + r0 = rf(block) + } else { + r0 = ret.Error(0) + } + + return r0 } // ByID provides a mock function with given fields: blockID diff --git a/module/mock/pending_block_buffer.go b/module/mock/pending_block_buffer.go index 38402437684..d610d193cfd 100644 --- a/module/mock/pending_block_buffer.go +++ b/module/mock/pending_block_buffer.go @@ -13,8 +13,21 @@ type PendingBlockBuffer struct { } // Add provides a mock function with given fields: block -func (_m *PendingBlockBuffer) Add(block flow.Slashable[*flow.Proposal]) { - _m.Called(block) +func (_m *PendingBlockBuffer) Add(block flow.Slashable[*flow.Proposal]) error { + ret := _m.Called(block) + + if len(ret) == 0 { + panic("no return value specified for Add") + } + + var r0 error + if rf, ok := ret.Get(0).(func(flow.Slashable[*flow.Proposal]) error); ok { + r0 = rf(block) + } else { + r0 = ret.Error(0) + } + + return r0 } // ByID provides a mock function with given fields: blockID diff --git a/module/mock/pending_cluster_block_buffer.go b/module/mock/pending_cluster_block_buffer.go index c300f7a5ec4..e7d75352df1 100644 --- a/module/mock/pending_cluster_block_buffer.go +++ b/module/mock/pending_cluster_block_buffer.go @@ -15,8 +15,21 @@ type PendingClusterBlockBuffer struct { } // Add provides a mock function with given fields: block -func (_m *PendingClusterBlockBuffer) Add(block flow.Slashable[*cluster.Proposal]) { - _m.Called(block) +func (_m *PendingClusterBlockBuffer) Add(block flow.Slashable[*cluster.Proposal]) error { + ret := _m.Called(block) + + if len(ret) == 0 { + panic("no return value specified for Add") + } + + var r0 error + if rf, ok := ret.Get(0).(func(flow.Slashable[*cluster.Proposal]) error); ok { + r0 = rf(block) + } else { + r0 = ret.Error(0) + } + + return r0 } // ByID provides a mock function with given fields: blockID From e1bed68c78246ce0a3961c5958bdadc5aadca553 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Fri, 28 Nov 2025 13:28:46 -0800 Subject: [PATCH 21/23] ai: address lint errors --- engine/collection/compliance/engine.go | 4 ++- module/buffer/pending_blocks_test.go | 32 ++++++++++----------- module/metrics/example/verification/main.go | 2 +- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/engine/collection/compliance/engine.go b/engine/collection/compliance/engine.go index b2aea597977..b43cd3258c5 100644 --- a/engine/collection/compliance/engine.go +++ b/engine/collection/compliance/engine.go @@ -166,6 +166,8 @@ func (e *Engine) processOnFinalizedBlock(block *model.Block) error { if err != nil { // no expected errors return fmt.Errorf("could not get finalized header: %w", err) } - e.core.ProcessFinalizedBlock(finalHeader) + if err := e.core.ProcessFinalizedBlock(finalHeader); err != nil { + return fmt.Errorf("could not process finalized block: %w", err) + } return nil } diff --git a/module/buffer/pending_blocks_test.go b/module/buffer/pending_blocks_test.go index fe0cb8b9879..386f9a6b0c6 100644 --- a/module/buffer/pending_blocks_test.go +++ b/module/buffer/pending_blocks_test.go @@ -172,7 +172,7 @@ func (suite *PendingBlocksSuite) TestByID() { // TestByParentID tests retrieving blocks by parent ID. func (suite *PendingBlocksSuite) TestByParentID() { parent := suite.block() - suite.buffer.Add(parent) + suite.Require().NoError(suite.buffer.Add(parent)) // Create multiple children of the parent child1 := suite.blockWithParent(parent.Message.Block.ToHeader()) @@ -180,10 +180,10 @@ func (suite *PendingBlocksSuite) TestByParentID() { grandchild := suite.blockWithParent(child1.Message.Block.ToHeader()) unrelated := suite.block() - suite.buffer.Add(child1) - suite.buffer.Add(child2) - suite.buffer.Add(grandchild) - suite.buffer.Add(unrelated) + suite.Require().NoError(suite.buffer.Add(child1)) + suite.Require().NoError(suite.buffer.Add(child2)) + suite.Require().NoError(suite.buffer.Add(grandchild)) + suite.Require().NoError(suite.buffer.Add(unrelated)) // Test retrieving children of parent children, ok := suite.buffer.ByParentID(parent.Message.Block.ID()) @@ -207,13 +207,13 @@ func (suite *PendingBlocksSuite) TestByParentID() { // TestByParentIDOnlyDirectChildren verifies that ByParentID only returns direct children. func (suite *PendingBlocksSuite) TestByParentIDOnlyDirectChildren() { parent := suite.block() - suite.buffer.Add(parent) + suite.Require().NoError(suite.buffer.Add(parent)) child := suite.blockWithParent(parent.Message.Block.ToHeader()) grandchild := suite.blockWithParent(child.Message.Block.ToHeader()) - suite.buffer.Add(child) - suite.buffer.Add(grandchild) + suite.Require().NoError(suite.buffer.Add(child)) + suite.Require().NoError(suite.buffer.Add(grandchild)) // Parent should only have child, not grandchild children, ok := suite.buffer.ByParentID(parent.Message.Block.ID()) @@ -299,15 +299,15 @@ func (suite *PendingBlocksSuite) TestPruneByViewMultipleTimes() { parentHeader := unittest.BlockHeaderFixture() parentHeader.View = 10 parent := suite.blockWithParent(parentHeader) - suite.buffer.Add(parent) + suite.Require().NoError(suite.buffer.Add(parent)) // Create children - views will be automatically set to be greater than parent child1 := suite.blockWithParent(parent.Message.Block.ToHeader()) child1.Message.Block.View++ - suite.buffer.Add(child1) + suite.Require().NoError(suite.buffer.Add(child1)) child2 := suite.blockWithParent(parent.Message.Block.ToHeader()) - suite.buffer.Add(child2) + suite.Require().NoError(suite.buffer.Add(child2)) // Get actual views parentView := parent.Message.Block.View @@ -343,15 +343,15 @@ func (suite *PendingBlocksSuite) TestSize() { // Add blocks and verify size increases block1 := suite.block() - suite.buffer.Add(block1) + suite.Require().NoError(suite.buffer.Add(block1)) suite.Assert().Equal(uint(1), suite.buffer.Size()) block2 := suite.block() - suite.buffer.Add(block2) + suite.Require().NoError(suite.buffer.Add(block2)) suite.Assert().Equal(uint(2), suite.buffer.Size()) // Adding duplicate should not increase size - suite.buffer.Add(block1) + suite.Require().NoError(suite.buffer.Add(block1)) suite.Assert().Equal(uint(2), suite.buffer.Size()) } @@ -401,7 +401,7 @@ func (suite *PendingBlocksSuite) TestEmptyBufferOperations() { func (suite *PendingBlocksSuite) TestAddAfterPrune() { // Add and prune a block block1 := suite.block() - suite.buffer.Add(block1) + suite.Require().NoError(suite.buffer.Add(block1)) err := suite.buffer.PruneByView(block1.Message.Block.View) suite.Assert().NoError(err) @@ -412,7 +412,7 @@ func (suite *PendingBlocksSuite) TestAddAfterPrune() { // Add a new block after pruning with view above pruned view block2 := suite.blockWithParent(block1.Message.Block.ToHeader()) - suite.buffer.Add(block2) + suite.Require().NoError(suite.buffer.Add(block2)) // Verify new block is added retrieved, ok := suite.buffer.ByID(block2.Message.Block.ID()) diff --git a/module/metrics/example/verification/main.go b/module/metrics/example/verification/main.go index 1966d97e8b8..c67f4d033a4 100644 --- a/module/metrics/example/verification/main.go +++ b/module/metrics/example/verification/main.go @@ -173,7 +173,7 @@ func demo() { tryRandomCall(func() { proposal := unittest.ProposalFixture() - pendingBlocks.Add(flow.Slashable[*flow.Proposal]{ + _ = pendingBlocks.Add(flow.Slashable[*flow.Proposal]{ OriginID: unittest.IdentifierFixture(), Message: proposal, }) From 718a820380b6062830c99b64225b98579df80f53 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Fri, 28 Nov 2025 14:49:32 -0800 Subject: [PATCH 22/23] Apply suggestion from @jordanschalm --- module/buffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module/buffer.go b/module/buffer.go index f130a4caf52..d5525b59b31 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -21,7 +21,7 @@ type GenericPendingBlockBuffer[T BufferedProposal] interface { // Add adds the input block to the block buffer. // If the block already exists, or is below the finalized view, this is a no-op. // Errors returns: - // - mempool.BlockViewTooFarAheadError if block.View > finalizedView + activeViewRangeSize (when activeViewRangeSize > 0) + // - mempool.BeyondActiveRangeError if block.View > finalizedView + activeViewRangeSize (when activeViewRangeSize > 0) Add(block flow.Slashable[T]) error // ByID returns the block with the given ID, if it exists. From acccf4982ee954367c8d5ca5f01d9ef76622cf64 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 9 Dec 2025 14:45:48 -0800 Subject: [PATCH 23/23] Use HashablePayload as base generic type parameter --- module/buffer.go | 18 ++++------- module/buffer/pending_blocks.go | 35 +++++++++------------ module/buffer/pending_blocks_test.go | 2 +- module/mock/generic_pending_block_buffer.go | 30 +++++++++--------- module/mock/pending_block_buffer.go | 24 +++++++------- module/mock/pending_cluster_block_buffer.go | 24 +++++++------- 6 files changed, 60 insertions(+), 73 deletions(-) diff --git a/module/buffer.go b/module/buffer.go index d5525b59b31..1d20ce232b1 100644 --- a/module/buffer.go +++ b/module/buffer.go @@ -5,32 +5,26 @@ import ( "github.com/onflow/flow-go/model/flow" ) -// BufferedProposal generically represents either a [cluster.Proposal] or [flow.Proposal]. -type BufferedProposal interface { - *cluster.Proposal | *flow.Proposal - ProposalHeader() *flow.ProposalHeader -} - // GenericPendingBlockBuffer implements a mempool of pending blocks that cannot yet be processed // because they do not connect to the rest of the chain state. // They are indexed by parent ID to enable processing all of a parent's children once the parent is received. // They are also indexed by view to support pruning. // // Safe for concurrent use. -type GenericPendingBlockBuffer[T BufferedProposal] interface { +type GenericPendingBlockBuffer[T flow.HashablePayload] interface { // Add adds the input block to the block buffer. // If the block already exists, or is below the finalized view, this is a no-op. // Errors returns: // - mempool.BeyondActiveRangeError if block.View > finalizedView + activeViewRangeSize (when activeViewRangeSize > 0) - Add(block flow.Slashable[T]) error + Add(block flow.Slashable[*flow.GenericProposal[T]]) error // ByID returns the block with the given ID, if it exists. // Otherwise returns (nil, false) - ByID(blockID flow.Identifier) (flow.Slashable[T], bool) + ByID(blockID flow.Identifier) (flow.Slashable[*flow.GenericProposal[T]], bool) // ByParentID returns all direct children of the given block. // If no children with the given parent exist, returns (nil, false) - ByParentID(parentID flow.Identifier) ([]flow.Slashable[T], bool) + ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[T]], bool) // PruneByView prunes all pending blocks with views less or equal to the given view. // Errors returns: @@ -42,7 +36,7 @@ type GenericPendingBlockBuffer[T BufferedProposal] interface { } // PendingBlockBuffer is the block buffer for consensus proposals. -type PendingBlockBuffer GenericPendingBlockBuffer[*flow.Proposal] +type PendingBlockBuffer GenericPendingBlockBuffer[flow.Payload] // PendingClusterBlockBuffer is the block buffer for cluster proposals. -type PendingClusterBlockBuffer GenericPendingBlockBuffer[*cluster.Proposal] +type PendingClusterBlockBuffer GenericPendingBlockBuffer[cluster.Payload] diff --git a/module/buffer/pending_blocks.go b/module/buffer/pending_blocks.go index 8852166d455..a033e93b179 100644 --- a/module/buffer/pending_blocks.go +++ b/module/buffer/pending_blocks.go @@ -13,20 +13,15 @@ import ( // proposalVertex implements [forest.Vertex] for generic block proposals. // //structwrite:immutable -type proposalVertex[T module.BufferedProposal] struct { - proposal flow.Slashable[T] +type proposalVertex[T flow.HashablePayload] struct { + proposal flow.Slashable[*flow.GenericProposal[T]] id flow.Identifier } -// header is a shortform way to access the proposal's header. -func (v proposalVertex[T]) header() *flow.Header { - return v.proposal.Message.ProposalHeader().Header -} - -func newProposalVertex[T module.BufferedProposal](proposal flow.Slashable[T]) proposalVertex[T] { +func newProposalVertex[T flow.HashablePayload](proposal flow.Slashable[*flow.GenericProposal[T]]) proposalVertex[T] { return proposalVertex[T]{ proposal: proposal, - id: proposal.Message.ProposalHeader().Header.ID(), + id: proposal.Message.Block.ID(), } } @@ -37,12 +32,12 @@ func (v proposalVertex[T]) VertexID() flow.Identifier { // Level returns the view for the stored proposal. func (v proposalVertex[T]) Level() uint64 { - return v.header().View + return v.proposal.Message.Block.View } // Parent returns the parent ID and view for the stored proposal. func (v proposalVertex[T]) Parent() (flow.Identifier, uint64) { - return v.header().ParentID, v.header().ParentView + return v.proposal.Message.Block.ParentID, v.proposal.Message.Block.ParentView } // GenericPendingBlocks implements a mempool of pending blocks that cannot yet be processed @@ -51,14 +46,14 @@ func (v proposalVertex[T]) Parent() (flow.Identifier, uint64) { // They are also indexed by view to support pruning. // // Safe for concurrent use. -type GenericPendingBlocks[T module.BufferedProposal] struct { +type GenericPendingBlocks[T flow.HashablePayload] struct { lock *sync.Mutex forest *forest.LevelledForest activeViewRangeSize uint64 } -type PendingBlocks = GenericPendingBlocks[*flow.Proposal] -type PendingClusterBlocks = GenericPendingBlocks[*cluster.Proposal] +type PendingBlocks = GenericPendingBlocks[flow.Payload] +type PendingClusterBlocks = GenericPendingBlocks[cluster.Payload] var _ module.PendingBlockBuffer = (*PendingBlocks)(nil) var _ module.PendingClusterBlockBuffer = (*PendingClusterBlocks)(nil) @@ -84,11 +79,11 @@ func NewPendingClusterBlocks(finalizedView uint64, activeViewRangeSize uint64) * // If the block already exists, or is below the finalized view, this is a no-op. // Errors returns: // - mempool.BeyondActiveRangeError if block.View > finalizedView + activeViewRangeSize (when activeViewRangeSize > 0) -func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) error { +func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[*flow.GenericProposal[T]]) error { b.lock.Lock() defer b.lock.Unlock() - blockView := block.Message.ProposalHeader().Header.View + blockView := block.Message.Block.View finalizedView := b.highestPrunedView() // Check if block view exceeds the active view range size @@ -106,19 +101,19 @@ func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) error { // ByID returns the block with the given ID, if it exists. // Otherwise returns (nil, false) -func (b *GenericPendingBlocks[T]) ByID(blockID flow.Identifier) (flow.Slashable[T], bool) { +func (b *GenericPendingBlocks[T]) ByID(blockID flow.Identifier) (flow.Slashable[*flow.GenericProposal[T]], bool) { b.lock.Lock() defer b.lock.Unlock() vertex, ok := b.forest.GetVertex(blockID) if !ok { - return flow.Slashable[T]{}, false + return flow.Slashable[*flow.GenericProposal[T]]{}, false } return vertex.(proposalVertex[T]).proposal, true } // ByParentID returns all direct children of the given block. // If no children with the given parent exist, returns (nil, false) -func (b *GenericPendingBlocks[T]) ByParentID(parentID flow.Identifier) ([]flow.Slashable[T], bool) { +func (b *GenericPendingBlocks[T]) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[T]], bool) { b.lock.Lock() defer b.lock.Unlock() n := b.forest.GetNumberOfChildren(parentID) @@ -126,7 +121,7 @@ func (b *GenericPendingBlocks[T]) ByParentID(parentID flow.Identifier) ([]flow.S return nil, false } - children := make([]flow.Slashable[T], 0, n) + children := make([]flow.Slashable[*flow.GenericProposal[T]], 0, n) iterator := b.forest.GetChildren(parentID) for iterator.HasNext() { vertex := iterator.NextVertex() diff --git a/module/buffer/pending_blocks_test.go b/module/buffer/pending_blocks_test.go index 386f9a6b0c6..62198ca9e48 100644 --- a/module/buffer/pending_blocks_test.go +++ b/module/buffer/pending_blocks_test.go @@ -14,7 +14,7 @@ import ( type PendingBlocksSuite struct { suite.Suite - buffer *GenericPendingBlocks[*flow.Proposal] + buffer *GenericPendingBlocks[flow.Payload] } func TestPendingBlocksSuite(t *testing.T) { diff --git a/module/mock/generic_pending_block_buffer.go b/module/mock/generic_pending_block_buffer.go index 22665f5f847..57701d49b9c 100644 --- a/module/mock/generic_pending_block_buffer.go +++ b/module/mock/generic_pending_block_buffer.go @@ -5,17 +5,15 @@ package mock import ( flow "github.com/onflow/flow-go/model/flow" mock "github.com/stretchr/testify/mock" - - module "github.com/onflow/flow-go/module" ) // GenericPendingBlockBuffer is an autogenerated mock type for the GenericPendingBlockBuffer type -type GenericPendingBlockBuffer[T module.BufferedProposal] struct { +type GenericPendingBlockBuffer[T flow.HashablePayload] struct { mock.Mock } // Add provides a mock function with given fields: block -func (_m *GenericPendingBlockBuffer[T]) Add(block flow.Slashable[T]) error { +func (_m *GenericPendingBlockBuffer[T]) Add(block flow.Slashable[*flow.GenericProposal[T]]) error { ret := _m.Called(block) if len(ret) == 0 { @@ -23,7 +21,7 @@ func (_m *GenericPendingBlockBuffer[T]) Add(block flow.Slashable[T]) error { } var r0 error - if rf, ok := ret.Get(0).(func(flow.Slashable[T]) error); ok { + if rf, ok := ret.Get(0).(func(flow.Slashable[*flow.GenericProposal[T]]) error); ok { r0 = rf(block) } else { r0 = ret.Error(0) @@ -33,22 +31,22 @@ func (_m *GenericPendingBlockBuffer[T]) Add(block flow.Slashable[T]) error { } // ByID provides a mock function with given fields: blockID -func (_m *GenericPendingBlockBuffer[T]) ByID(blockID flow.Identifier) (flow.Slashable[T], bool) { +func (_m *GenericPendingBlockBuffer[T]) ByID(blockID flow.Identifier) (flow.Slashable[*flow.GenericProposal[T]], bool) { ret := _m.Called(blockID) if len(ret) == 0 { panic("no return value specified for ByID") } - var r0 flow.Slashable[T] + var r0 flow.Slashable[*flow.GenericProposal[T]] var r1 bool - if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[T], bool)); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[*flow.GenericProposal[T]], bool)); ok { return rf(blockID) } - if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[T]); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[*flow.GenericProposal[T]]); ok { r0 = rf(blockID) } else { - r0 = ret.Get(0).(flow.Slashable[T]) + r0 = ret.Get(0).(flow.Slashable[*flow.GenericProposal[T]]) } if rf, ok := ret.Get(1).(func(flow.Identifier) bool); ok { @@ -61,23 +59,23 @@ func (_m *GenericPendingBlockBuffer[T]) ByID(blockID flow.Identifier) (flow.Slas } // ByParentID provides a mock function with given fields: parentID -func (_m *GenericPendingBlockBuffer[T]) ByParentID(parentID flow.Identifier) ([]flow.Slashable[T], bool) { +func (_m *GenericPendingBlockBuffer[T]) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[T]], bool) { ret := _m.Called(parentID) if len(ret) == 0 { panic("no return value specified for ByParentID") } - var r0 []flow.Slashable[T] + var r0 []flow.Slashable[*flow.GenericProposal[T]] var r1 bool - if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[T], bool)); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[T]], bool)); ok { return rf(parentID) } - if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[T]); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[*flow.GenericProposal[T]]); ok { r0 = rf(parentID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]flow.Slashable[T]) + r0 = ret.Get(0).([]flow.Slashable[*flow.GenericProposal[T]]) } } @@ -128,7 +126,7 @@ func (_m *GenericPendingBlockBuffer[T]) Size() uint { // NewGenericPendingBlockBuffer creates a new instance of GenericPendingBlockBuffer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. -func NewGenericPendingBlockBuffer[T module.BufferedProposal](t interface { +func NewGenericPendingBlockBuffer[T flow.HashablePayload](t interface { mock.TestingT Cleanup(func()) }) *GenericPendingBlockBuffer[T] { diff --git a/module/mock/pending_block_buffer.go b/module/mock/pending_block_buffer.go index d610d193cfd..2cfb30bc9df 100644 --- a/module/mock/pending_block_buffer.go +++ b/module/mock/pending_block_buffer.go @@ -13,7 +13,7 @@ type PendingBlockBuffer struct { } // Add provides a mock function with given fields: block -func (_m *PendingBlockBuffer) Add(block flow.Slashable[*flow.Proposal]) error { +func (_m *PendingBlockBuffer) Add(block flow.Slashable[*flow.GenericProposal[flow.Payload]]) error { ret := _m.Called(block) if len(ret) == 0 { @@ -21,7 +21,7 @@ func (_m *PendingBlockBuffer) Add(block flow.Slashable[*flow.Proposal]) error { } var r0 error - if rf, ok := ret.Get(0).(func(flow.Slashable[*flow.Proposal]) error); ok { + if rf, ok := ret.Get(0).(func(flow.Slashable[*flow.GenericProposal[flow.Payload]]) error); ok { r0 = rf(block) } else { r0 = ret.Error(0) @@ -31,22 +31,22 @@ func (_m *PendingBlockBuffer) Add(block flow.Slashable[*flow.Proposal]) error { } // ByID provides a mock function with given fields: blockID -func (_m *PendingBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashable[*flow.Proposal], bool) { +func (_m *PendingBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashable[*flow.GenericProposal[flow.Payload]], bool) { ret := _m.Called(blockID) if len(ret) == 0 { panic("no return value specified for ByID") } - var r0 flow.Slashable[*flow.Proposal] + var r0 flow.Slashable[*flow.GenericProposal[flow.Payload]] var r1 bool - if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[*flow.Proposal], bool)); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[*flow.GenericProposal[flow.Payload]], bool)); ok { return rf(blockID) } - if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[*flow.Proposal]); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[*flow.GenericProposal[flow.Payload]]); ok { r0 = rf(blockID) } else { - r0 = ret.Get(0).(flow.Slashable[*flow.Proposal]) + r0 = ret.Get(0).(flow.Slashable[*flow.GenericProposal[flow.Payload]]) } if rf, ok := ret.Get(1).(func(flow.Identifier) bool); ok { @@ -59,23 +59,23 @@ func (_m *PendingBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashable[*flo } // ByParentID provides a mock function with given fields: parentID -func (_m *PendingBlockBuffer) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.Proposal], bool) { +func (_m *PendingBlockBuffer) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[flow.Payload]], bool) { ret := _m.Called(parentID) if len(ret) == 0 { panic("no return value specified for ByParentID") } - var r0 []flow.Slashable[*flow.Proposal] + var r0 []flow.Slashable[*flow.GenericProposal[flow.Payload]] var r1 bool - if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[*flow.Proposal], bool)); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[flow.Payload]], bool)); ok { return rf(parentID) } - if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[*flow.Proposal]); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[*flow.GenericProposal[flow.Payload]]); ok { r0 = rf(parentID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]flow.Slashable[*flow.Proposal]) + r0 = ret.Get(0).([]flow.Slashable[*flow.GenericProposal[flow.Payload]]) } } diff --git a/module/mock/pending_cluster_block_buffer.go b/module/mock/pending_cluster_block_buffer.go index e7d75352df1..8233b6e384d 100644 --- a/module/mock/pending_cluster_block_buffer.go +++ b/module/mock/pending_cluster_block_buffer.go @@ -15,7 +15,7 @@ type PendingClusterBlockBuffer struct { } // Add provides a mock function with given fields: block -func (_m *PendingClusterBlockBuffer) Add(block flow.Slashable[*cluster.Proposal]) error { +func (_m *PendingClusterBlockBuffer) Add(block flow.Slashable[*flow.GenericProposal[cluster.Payload]]) error { ret := _m.Called(block) if len(ret) == 0 { @@ -23,7 +23,7 @@ func (_m *PendingClusterBlockBuffer) Add(block flow.Slashable[*cluster.Proposal] } var r0 error - if rf, ok := ret.Get(0).(func(flow.Slashable[*cluster.Proposal]) error); ok { + if rf, ok := ret.Get(0).(func(flow.Slashable[*flow.GenericProposal[cluster.Payload]]) error); ok { r0 = rf(block) } else { r0 = ret.Error(0) @@ -33,22 +33,22 @@ func (_m *PendingClusterBlockBuffer) Add(block flow.Slashable[*cluster.Proposal] } // ByID provides a mock function with given fields: blockID -func (_m *PendingClusterBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashable[*cluster.Proposal], bool) { +func (_m *PendingClusterBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashable[*flow.GenericProposal[cluster.Payload]], bool) { ret := _m.Called(blockID) if len(ret) == 0 { panic("no return value specified for ByID") } - var r0 flow.Slashable[*cluster.Proposal] + var r0 flow.Slashable[*flow.GenericProposal[cluster.Payload]] var r1 bool - if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[*cluster.Proposal], bool)); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) (flow.Slashable[*flow.GenericProposal[cluster.Payload]], bool)); ok { return rf(blockID) } - if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[*cluster.Proposal]); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) flow.Slashable[*flow.GenericProposal[cluster.Payload]]); ok { r0 = rf(blockID) } else { - r0 = ret.Get(0).(flow.Slashable[*cluster.Proposal]) + r0 = ret.Get(0).(flow.Slashable[*flow.GenericProposal[cluster.Payload]]) } if rf, ok := ret.Get(1).(func(flow.Identifier) bool); ok { @@ -61,23 +61,23 @@ func (_m *PendingClusterBlockBuffer) ByID(blockID flow.Identifier) (flow.Slashab } // ByParentID provides a mock function with given fields: parentID -func (_m *PendingClusterBlockBuffer) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*cluster.Proposal], bool) { +func (_m *PendingClusterBlockBuffer) ByParentID(parentID flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[cluster.Payload]], bool) { ret := _m.Called(parentID) if len(ret) == 0 { panic("no return value specified for ByParentID") } - var r0 []flow.Slashable[*cluster.Proposal] + var r0 []flow.Slashable[*flow.GenericProposal[cluster.Payload]] var r1 bool - if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[*cluster.Proposal], bool)); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) ([]flow.Slashable[*flow.GenericProposal[cluster.Payload]], bool)); ok { return rf(parentID) } - if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[*cluster.Proposal]); ok { + if rf, ok := ret.Get(0).(func(flow.Identifier) []flow.Slashable[*flow.GenericProposal[cluster.Payload]]); ok { r0 = rf(parentID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]flow.Slashable[*cluster.Proposal]) + r0 = ret.Get(0).([]flow.Slashable[*flow.GenericProposal[cluster.Payload]]) } }