From 3932849c200192ef87c5af7da7ba0474acc1ef92 Mon Sep 17 00:00:00 2001 From: "Simon.ZG" Date: Fri, 27 Jun 2025 14:38:16 -0700 Subject: [PATCH] use heap to store multiple sorted finalize state, to support pipelined consensus algorithm such as HotStuff-pipeline --- baseapp/abci.go | 25 +++++++++++++++++++- baseapp/state/manager.go | 49 +++++++++++++++++++++++++++++++++------- 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/baseapp/abci.go b/baseapp/abci.go index ca4e4408d7f0..5600bf67cf21 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -1,6 +1,7 @@ package baseapp import ( + "bytes" "context" "fmt" "sort" @@ -489,6 +490,7 @@ func (app *BaseApp) ProcessProposal(req *abci.ProcessProposalRequest) (resp *abc ProposerAddress: req.ProposerAddress, NextValidatorsHash: req.NextValidatorsHash, AppHash: app.LastCommitID().Hash, + ConsensusHash: req.Hash, } app.stateManager.SetState(execModeProcessProposal, app.cms, header, app.logger, app.streamingManager) @@ -744,7 +746,28 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz // finalizeBlockState should be set on InitChain or ProcessProposal. If it is // nil, it means we are replaying this block and we need to set the state here // given that during block replay ProcessProposal is not executed by CometBFT. - finalizeState := app.stateManager.GetState(execModeFinalize) + var finalizeState *state.State + for { + firstState := app.stateManager.GetState(execModeFinalize) + if firstState == nil { + break + } + + // only match state up to the requested height + if req.Height < firstState.Context().BlockHeight() { + break + } + + // matche state by it's height and hash + // special case for initialHeight + if (req.Height == app.initialHeight && firstState.Context().HeaderInfo().Height == app.initialHeight) || (req.Height == firstState.Context().HeaderInfo().Height && bytes.Equal(req.Hash, firstState.Context().HeaderInfo().Hash)) { + finalizeState = firstState + break + } else { + // throw away the oldest unmatched state + app.stateManager.ClearState(execModeFinalize) + } + } if finalizeState == nil { app.stateManager.SetState(execModeFinalize, app.cms, header, app.logger, app.streamingManager) finalizeState = app.stateManager.GetState(execModeFinalize) diff --git a/baseapp/state/manager.go b/baseapp/state/manager.go index dddaf8e32f28..ad0254e00ac7 100644 --- a/baseapp/state/manager.go +++ b/baseapp/state/manager.go @@ -1,6 +1,7 @@ package state import ( + "container/heap" "fmt" "sync" @@ -37,15 +38,44 @@ type Manager struct { checkState *State prepareProposalState *State processProposalState *State - finalizeBlockState *State + finalizeBlockState *MinHeap stateMut sync.RWMutex gasConfig config.GasConfig } +type MinHeap []*State + +func (h MinHeap) Len() int { return len(h) } +func (h MinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h MinHeap) Less(i, j int) bool { + if h[i].ctx.BlockHeight() < h[j].ctx.BlockHeight() { + return true + } else if h[i].ctx.BlockHeight() == h[j].ctx.BlockHeight() && h[i].ctx.BlockTime().Before(h[j].ctx.BlockTime()) { + return true + } else { + return false + } +} + +func (h *MinHeap) Push(x any) { + *h = append(*h, x.(*State)) +} + +func (h *MinHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[:n-1] + return x +} + func NewManager(gasConfig config.GasConfig) *Manager { + finalizeBlockState := &MinHeap{} + heap.Init(finalizeBlockState) return &Manager{ - gasConfig: gasConfig, + gasConfig: gasConfig, + finalizeBlockState: finalizeBlockState, } } @@ -55,8 +85,10 @@ func (mgr *Manager) GetState(mode sdk.ExecMode) *State { switch mode { case sdk.ExecModeFinalize: - return mgr.finalizeBlockState - + if mgr.finalizeBlockState.Len() > 0 { + return (*mgr.finalizeBlockState)[0] + } + return nil case sdk.ExecModePrepareProposal: return mgr.prepareProposalState @@ -84,6 +116,7 @@ func (mgr *Manager) SetState( Time: h.Time, ChainID: h.ChainID, AppHash: h.AppHash, + Hash: h.ConsensusHash, } baseState := NewState( sdk.NewContext(ms, h, false, logger). @@ -107,8 +140,7 @@ func (mgr *Manager) SetState( mgr.processProposalState = baseState case sdk.ExecModeFinalize: - mgr.finalizeBlockState = baseState - + heap.Push(mgr.finalizeBlockState, baseState) default: panic(fmt.Sprintf("invalid runTxMode for setState: %d", mode)) } @@ -129,8 +161,9 @@ func (mgr *Manager) ClearState(mode sdk.ExecMode) { mgr.processProposalState = nil case sdk.ExecModeFinalize: - mgr.finalizeBlockState = nil - + if mgr.finalizeBlockState.Len() > 0 { + heap.Pop(mgr.finalizeBlockState) + } default: panic(fmt.Sprintf("invalid runTxMode for clearState: %d", mode)) }