Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e6b4e8c
make Proposal type generic
jordanschalm Nov 26, 2025
880557b
make Buffer definition generic to both proposal types
jordanschalm Nov 27, 2025
09eb86c
use leveled forest for block buffer
jordanschalm Nov 27, 2025
46000ac
add docs, todos
jordanschalm Nov 27, 2025
d54e974
ai: remove bool return from pending blocks
jordanschalm Nov 27, 2025
9530e1e
ai: remove DropByParent method
jordanschalm Nov 27, 2025
7235f7a
update mocks
jordanschalm Nov 27, 2025
e3acbf0
add lower view boundary
jordanschalm Nov 27, 2025
56be6f0
update comments
jordanschalm Nov 27, 2025
760434b
add mutex
jordanschalm Nov 27, 2025
99b181f
check retained level error
jordanschalm Nov 27, 2025
3ab8dd5
add missing return
jordanschalm Nov 27, 2025
a4529e8
update buffer
jordanschalm Nov 28, 2025
b281bc0
update mocks
jordanschalm Nov 28, 2025
5dbfe91
fix tests for signature change
jordanschalm Nov 28, 2025
39f010a
bug: forest can get child count for nonextnt node
jordanschalm Nov 28, 2025
49a196b
ai: add tests for pending blocks suite
jordanschalm Nov 28, 2025
b3b1460
adjust tests
jordanschalm Nov 28, 2025
beec64c
remove old buffer backend
jordanschalm Nov 28, 2025
e813f5c
ai: add threshold also to mempool
jordanschalm Nov 28, 2025
e1bed68
ai: address lint errors
jordanschalm Nov 28, 2025
718a820
Apply suggestion from @jordanschalm
jordanschalm Nov 28, 2025
846a5f8
Merge branch 'master' into jord/8170-ln-perm-block-buffer
jordanschalm Nov 28, 2025
292a175
Merge branch 'master' into jord/8170-ln-perm-block-buffer
jordanschalm Dec 8, 2025
acccf49
Use HashablePayload as base generic type parameter
tim-barry Dec 9, 2025
7f89681
Merge pull request #8248 from onflow/tim/8196-refactor-generic-types-…
jordanschalm Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ func main() {
}).
Component("consensus compliance engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize the pending blocks cache
proposals := buffer.NewPendingBlocks()
proposals := buffer.NewPendingBlocks(node.LastFinalizedHeader.View, node.ComplianceConfig.GetSkipNewProposalsThreshold())

logger := createLogger(node.Logger, node.RootChainID)
complianceCore, err := compliance.NewCore(
Expand Down
6 changes: 3 additions & 3 deletions consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,12 +514,12 @@ func createNode(
)
require.NoError(t, err)

// initialize the pending blocks cache
cache := buffer.NewPendingBlocks()

rootHeader, err := rootSnapshot.Head()
require.NoError(t, err)

// initialize the pending blocks cache
cache := buffer.NewPendingBlocks(rootHeader.View, modulecompliance.DefaultConfig().SkipNewProposalsThreshold)

rootQC, err := rootSnapshot.QuorumCertificate()
require.NoError(t, err)

Expand Down
37 changes: 29 additions & 8 deletions engine/collection/compliance/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func NewCore(
if err != nil {
return nil, fmt.Errorf("could not initialized finalized boundary cache: %w", err)
}
c.ProcessFinalizedBlock(final)
err = c.ProcessFinalizedBlock(final)
if err != nil {
return nil, fmt.Errorf("could not process finalized block: %w", err)
}

// log the mempool size off the bat
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())
Expand Down Expand Up @@ -193,7 +196,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error
_, found := c.pending.ByID(block.ParentID)
if found {
// add the block to the cache
_ = c.pending.Add(proposal)
if err := c.pending.Add(proposal); err != nil {
if mempool.IsBeyondActiveRangeError(err) {
// In general we expect the block buffer to use SkipNewProposalsThreshold,
// however since it is instantiated outside this component, we allow the thresholds to differ
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
return nil
}
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
}
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())

return nil
Expand All @@ -207,7 +218,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error
return fmt.Errorf("could not check parent exists: %w", err)
}
if !exists {
_ = c.pending.Add(proposal)
if err := c.pending.Add(proposal); err != nil {
if mempool.IsBeyondActiveRangeError(err) {
// In general we expect the block buffer to use SkipNewProposalsThreshold,
// however since it is instantiated outside this component, we allow the thresholds to differ
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
return nil
}
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
}
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())

c.sync.RequestBlock(block.ParentID, block.Height-1)
Expand Down Expand Up @@ -288,9 +307,6 @@ func (c *Core) processBlockAndDescendants(proposal flow.Slashable[*cluster.Propo
}
}

// drop all the children that should have been processed now
c.pending.DropForParent(blockID)

return nil
}

Expand Down Expand Up @@ -363,14 +379,19 @@ func (c *Core) processBlockProposal(proposal *cluster.Proposal) error {

// ProcessFinalizedBlock performs pruning of stale data based on finalization event
// removes pending blocks below the finalized view
func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) {
// No errors are expected during normal operation.
func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) error {
// remove all pending blocks at or below the finalized view
c.pending.PruneByView(finalized.View)
err := c.pending.PruneByView(finalized.View)
if err != nil {
return err
}
c.finalizedHeight.Set(finalized.Height)
c.finalizedView.Set(finalized.View)

// always record the metric
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())
return nil
}

// checkForAndLogOutdatedInputError checks whether error is an `engine.OutdatedInputError`.
Expand Down
10 changes: 3 additions & 7 deletions engine/collection/compliance/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (cs *CommonSuite) SetupTest() {

// set up pending module mock
cs.pending = &module.PendingClusterBlockBuffer{}
cs.pending.On("Add", mock.Anything, mock.Anything).Return(true)
cs.pending.On("Add", mock.Anything, mock.Anything)
cs.pending.On("ByID", mock.Anything).Return(
func(blockID flow.Identifier) flow.Slashable[*cluster.Proposal] {
return cs.pendingDB[blockID]
Expand All @@ -143,9 +143,8 @@ func (cs *CommonSuite) SetupTest() {
return ok
},
)
cs.pending.On("DropForParent", mock.Anything).Return()
cs.pending.On("Size").Return(uint(0))
cs.pending.On("PruneByView", mock.Anything).Return()
cs.pending.On("PruneByView", mock.Anything).Return(nil)

closed := func() <-chan struct{} {
channel := make(chan struct{})
Expand Down Expand Up @@ -518,9 +517,6 @@ func (cs *CoreSuite) TestProcessBlockAndDescendants() {

// check that we submitted each child to hotstuff
cs.hotstuff.AssertExpectations(cs.T())

// make sure we drop the cache after trying to process
cs.pending.AssertCalled(cs.T(), "DropForParent", parent.ID())
}

func (cs *CoreSuite) TestProposalBufferingOrder() {
Expand All @@ -546,7 +542,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() {
}

// replace the engine buffer with the real one
cs.core.pending = realbuffer.NewPendingClusterBlocks()
cs.core.pending = realbuffer.NewPendingClusterBlocks(cs.head.Block.View, 100_000)

// process all of the descendants
for _, proposal := range proposals {
Expand Down
4 changes: 3 additions & 1 deletion engine/collection/compliance/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ func (e *Engine) processOnFinalizedBlock(block *model.Block) error {
if err != nil { // no expected errors
return fmt.Errorf("could not get finalized header: %w", err)
}
e.core.ProcessFinalizedBlock(finalHeader)
if err := e.core.ProcessFinalizedBlock(finalHeader); err != nil {
return fmt.Errorf("could not process finalized block: %w", err)
}
return nil
}
6 changes: 5 additions & 1 deletion engine/collection/epochmgr/factories/compliance.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ func (f *ComplianceEngineFactory) Create(
validator hotstuff.Validator,
) (*compliance.Engine, error) {

cache := buffer.NewPendingClusterBlocks()
final, err := clusterState.Final().Head()
if err != nil {
return nil, fmt.Errorf("could not get finalized header: %w", err)
}
cache := buffer.NewPendingClusterBlocks(final.View, f.config.GetSkipNewProposalsThreshold())
core, err := compliance.NewCore(
f.log,
f.engMetrics,
Expand Down
37 changes: 29 additions & 8 deletions engine/consensus/compliance/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ func NewCore(
if err != nil {
return nil, fmt.Errorf("could not initialized finalized boundary cache: %w", err)
}
c.ProcessFinalizedBlock(final)
err = c.ProcessFinalizedBlock(final)
if err != nil {
return nil, fmt.Errorf("could not process finalized block: %w", err)
}

c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size())

Expand Down Expand Up @@ -200,7 +203,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error {
_, found := c.pending.ByID(header.ParentID)
if found {
// add the block to the cache
_ = c.pending.Add(proposal)
if err := c.pending.Add(proposal); err != nil {
if mempool.IsBeyondActiveRangeError(err) {
// In general we expect the block buffer to use SkipNewProposalsThreshold,
// however since it is instantiated outside this component, we allow the thresholds to differ
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
return nil
}
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
}
c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size())

return nil
Expand All @@ -214,7 +225,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error {
return fmt.Errorf("could not check parent exists: %w", err)
}
if !exists {
_ = c.pending.Add(proposal)
if err := c.pending.Add(proposal); err != nil {
if mempool.IsBeyondActiveRangeError(err) {
// In general we expect the block buffer to use SkipNewProposalsThreshold,
// however since it is instantiated outside this component, we allow the thresholds to differ
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
return nil
}
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
}
c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size())

c.sync.RequestBlock(header.ParentID, header.Height-1)
Expand Down Expand Up @@ -300,9 +319,6 @@ func (c *Core) processBlockAndDescendants(proposal flow.Slashable[*flow.Proposal
}
}

// drop all the children that should have been processed now
c.pending.DropForParent(blockID)

return nil
}

Expand Down Expand Up @@ -392,14 +408,19 @@ func (c *Core) processBlockProposal(proposal *flow.Proposal) error {

// ProcessFinalizedBlock performs pruning of stale data based on finalization event
// removes pending blocks below the finalized view
func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) {
// No errors are expected during normal operation.
func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) error {
// remove all pending blocks at or below the finalized view
c.pending.PruneByView(finalized.View)
err := c.pending.PruneByView(finalized.View)
if err != nil {
return err
}
c.finalizedHeight.Set(finalized.Height)
c.finalizedView.Set(finalized.View)

// always record the metric
c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size())
return nil
}

// checkForAndLogOutdatedInputError checks whether error is an `engine.OutdatedInputError`.
Expand Down
10 changes: 3 additions & 7 deletions engine/consensus/compliance/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (cs *CommonSuite) SetupTest() {

// set up pending module mock
cs.pending = &module.PendingBlockBuffer{}
cs.pending.On("Add", mock.Anything, mock.Anything).Return(true)
cs.pending.On("Add", mock.Anything, mock.Anything)
cs.pending.On("ByID", mock.Anything).Return(
func(blockID flow.Identifier) flow.Slashable[*flow.Proposal] {
return cs.pendingDB[blockID]
Expand All @@ -219,9 +219,8 @@ func (cs *CommonSuite) SetupTest() {
return ok
},
)
cs.pending.On("DropForParent", mock.Anything).Return()
cs.pending.On("Size").Return(uint(0))
cs.pending.On("PruneByView", mock.Anything).Return()
cs.pending.On("PruneByView", mock.Anything).Return(nil)

// set up hotstuff module mock
cs.hotstuff = module.NewHotStuff(cs.T())
Expand Down Expand Up @@ -565,9 +564,6 @@ func (cs *CoreSuite) TestProcessBlockAndDescendants() {
Message: proposal0,
})
require.NoError(cs.T(), err, "should pass handling children")

// make sure we drop the cache after trying to process
cs.pending.AssertCalled(cs.T(), "DropForParent", parent.ID())
}

func (cs *CoreSuite) TestProposalBufferingOrder() {
Expand All @@ -588,7 +584,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() {
}

// replace the engine buffer with the real one
cs.core.pending = real.NewPendingBlocks()
cs.core.pending = real.NewPendingBlocks(cs.head.View, 100_000)

// check that we request the ancestor block each time
cs.sync.On("RequestBlock", missingBlock.ID(), missingBlock.Height).Times(len(proposals))
Expand Down
5 changes: 4 additions & 1 deletion engine/consensus/compliance/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ func (e *Engine) processOnFinalizedBlock(block *model.Block) error {
if err != nil { // no expected errors
return fmt.Errorf("could not get finalized header: %w", err)
}
e.core.ProcessFinalizedBlock(finalHeader)
err = e.core.ProcessFinalizedBlock(finalHeader)
if err != nil {
return fmt.Errorf("could not process finalized block: %w", err)
}
return nil
}
11 changes: 1 addition & 10 deletions model/cluster/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ func NewRootBlock(untrusted UntrustedBlock) (*Block, error) {
// Proposal represents a signed proposed block in collection node cluster consensus.
//
//structwrite:immutable - mutations allowed only within the constructor.
type Proposal struct {
Block Block
ProposerSigData []byte
}
type Proposal = flow.GenericProposal[Payload]

// UntrustedProposal is an untrusted input-only representation of a cluster.Proposal,
// used for construction.
Expand Down Expand Up @@ -137,12 +134,6 @@ func NewRootProposal(untrusted UntrustedProposal) (*Proposal, error) {

}

// ProposalHeader converts the proposal into a compact [ProposalHeader] representation,
// where the payload is compressed to a hash reference.
func (p *Proposal) ProposalHeader() *flow.ProposalHeader {
return &flow.ProposalHeader{Header: p.Block.ToHeader(), ProposerSigData: p.ProposerSigData}
}

// BlockResponse is the same as flow.BlockResponse, but for cluster
// consensus. It contains a list of structurally validated cluster block proposals
// that should correspond to the request.
Expand Down
16 changes: 11 additions & 5 deletions model/flow/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,19 @@ func (s BlockStatus) String() string {
return [...]string{"BLOCK_UNKNOWN", "BLOCK_FINALIZED", "BLOCK_SEALED"}[s]
}

// GenericProposal represents a generic Flow proposal structure parameterized by a block payload type.
// It includes the block header metadata, block payload, and proposer signature.
type GenericProposal[T HashablePayload] struct {
// Block is the block being proposed.
Block GenericBlock[T]
// ProposerSigData is the signature (vote) from the proposer of this block.
ProposerSigData []byte
}

// Proposal is a signed proposal that includes the block payload, in addition to the required header and signature.
//
//structwrite:immutable - mutations allowed only within the constructor
type Proposal struct {
Block Block
ProposerSigData []byte
}
type Proposal = GenericProposal[Payload]

// UntrustedProposal is an untrusted input-only representation of a Proposal,
// used for construction.
Expand Down Expand Up @@ -223,7 +229,7 @@ func NewRootProposal(untrusted UntrustedProposal) (*Proposal, error) {

// ProposalHeader converts the proposal into a compact [ProposalHeader] representation,
// where the payload is compressed to a hash reference.
func (b *Proposal) ProposalHeader() *ProposalHeader {
func (b *GenericProposal[T]) ProposalHeader() *ProposalHeader {
return &ProposalHeader{Header: b.Block.ToHeader(), ProposerSigData: b.ProposerSigData}
}

Expand Down
Loading
Loading