From e9d13583eecff06bae1c5786838c541587646628 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 8 Jan 2026 14:38:30 +0800 Subject: [PATCH] Supports to init reorg window from db when sync the latest data --- README.md | 2 +- blockchain/sync/poll/latest_poller.go | 4 +- blockchain/sync/poll/reorg_window.go | 35 ++++++++++- blockchain/sync/poll/reorg_window_test.go | 73 +++++++++++++++++++++++ blockchain/sync/sync_db.go | 5 +- 5 files changed, 114 insertions(+), 5 deletions(-) create mode 100644 blockchain/sync/poll/reorg_window_test.go diff --git a/README.md b/README.md index d211b1d..c1d69e0 100644 --- a/README.md +++ b/README.md @@ -19,4 +19,4 @@ Utilities for golang developments on Conflux blockchain, especially for backend |[Pprof](./pprof)|To enable pprof server based on configuration.| |[Rate Limit](./rate)|Utilities to limit request rate, along with HTTP handler middlewares| |[Store](./store/README.md)|Provides utilities to initialize database.| -|[Viper](./viper/README.md)| Initialize the original [viper](https://github.com/spf13/viper) in common and fix some issues.| +|[Viper](./viper/README.md)|Initialize the original [viper](https://github.com/spf13/viper) in common and fix some issues.| diff --git a/blockchain/sync/poll/latest_poller.go b/blockchain/sync/poll/latest_poller.go index afb01de..174d535 100644 --- a/blockchain/sync/poll/latest_poller.go +++ b/blockchain/sync/poll/latest_poller.go @@ -24,7 +24,7 @@ type LatestPoller[T any] struct { health *health.TimedCounter } -func NewLatestPoller[T any](adapter Adapter[T], nextBlockNumber uint64, option ...Option) *LatestPoller[T] { +func NewLatestPoller[T any](adapter Adapter[T], nextBlockNumber uint64, reorgParams ReorgWindowParams, option ...Option) *LatestPoller[T] { opt := normalizeOpt(option...) return &LatestPoller[T]{ @@ -32,7 +32,7 @@ func NewLatestPoller[T any](adapter Adapter[T], nextBlockNumber uint64, option . adapter: adapter, nextBlockNumber: nextBlockNumber, dataCh: make(chan Revertable[T], opt.BufferSize), - window: NewReorgWindow(), + window: NewReorgWindowWithLatestBlocks(reorgParams), health: health.NewTimedCounter(opt.Health), } } diff --git a/blockchain/sync/poll/reorg_window.go b/blockchain/sync/poll/reorg_window.go index f59c21b..712e909 100644 --- a/blockchain/sync/poll/reorg_window.go +++ b/blockchain/sync/poll/reorg_window.go @@ -1,8 +1,14 @@ package poll +type ReorgWindowParams struct { + FinalizedBlockNumber uint64 + FinalizedBlockHash string + LatestBlocks map[uint64]string +} + // ReorgWindow is used to detect chain reorg. type ReorgWindow struct { - // no capacity, since latest_finalized is small enough + // no capacity, since max(latest_finalized, latest_checkpoint) is small enough blockNumber2Hashes map[uint64]string earlist, latest uint64 } @@ -13,6 +19,33 @@ func NewReorgWindow() *ReorgWindow { } } +// NewReorgWindowWithLatestBlocks initializes with given latest blocks and the last finalized block number. +// +// When service restarted, user should load recent blocks and the last finalized block from database, +// and initialize the reorg window so as to correctly handle chain reorg during the service down time. +func NewReorgWindowWithLatestBlocks(params ReorgWindowParams) *ReorgWindow { + window := NewReorgWindow() + if len(params.FinalizedBlockHash) == 0 { + return window + } + + window.earlist = params.FinalizedBlockNumber + window.latest = params.FinalizedBlockNumber + window.blockNumber2Hashes[params.FinalizedBlockNumber] = params.FinalizedBlockHash + + for { + hash, ok := params.LatestBlocks[window.latest+1] + if !ok { + break + } + + window.latest++ + window.blockNumber2Hashes[window.latest] = hash + } + + return window +} + func (window *ReorgWindow) Push(blockNumber uint64, blockHash, parentBlockHash string) (appended, popped bool) { // window is empty if len(window.blockNumber2Hashes) == 0 { diff --git a/blockchain/sync/poll/reorg_window_test.go b/blockchain/sync/poll/reorg_window_test.go new file mode 100644 index 0000000..2642922 --- /dev/null +++ b/blockchain/sync/poll/reorg_window_test.go @@ -0,0 +1,73 @@ +package poll + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReorgWindowPush(t *testing.T) { + window := NewReorgWindow() + + // push empty + pushed, popped := window.Push(5, "Hash - 5", "Hash - 4") + assert.Equal(t, true, pushed) + assert.Equal(t, false, popped) + + // push in sequence + pushed, popped = window.Push(6, "Hash - 6", "Hash - 5") + assert.Equal(t, true, pushed) + assert.Equal(t, false, popped) + + // push not in sequence + pushed, popped = window.Push(6, "Hash - 6", "Hash - 5") + assert.Equal(t, false, pushed) + assert.Equal(t, false, popped) + pushed, popped = window.Push(8, "Hash - 8", "Hash - 7") + assert.Equal(t, false, pushed) + assert.Equal(t, false, popped) + + // push in sequence, but parent hash mismatch + pushed, popped = window.Push(7, "Hash - 7", "Hash - 66") + assert.Equal(t, false, pushed) + assert.Equal(t, true, popped) + + // push block 6 again due to popped + pushed, popped = window.Push(6, "Hash - 66", "Hash - 5") + assert.Equal(t, true, pushed) + assert.Equal(t, false, popped) +} + +func TestReorgWindowEvict(t *testing.T) { + window := NewReorgWindow() + + for i := uint64(1); i < 10; i++ { + pushed, popped := window.Push(i, fmt.Sprintf("Hash - %v", i), fmt.Sprintf("Hash - %v", i-1)) + assert.Equal(t, true, pushed) + assert.Equal(t, false, popped) + } + + window.Evict(5) + + assert.Equal(t, uint64(6), window.earlist) +} + +func TestReorgWindowWithLatestBlocks(t *testing.T) { + window := NewReorgWindowWithLatestBlocks(ReorgWindowParams{ + FinalizedBlockNumber: 5, + FinalizedBlockHash: "Hash - 5", + LatestBlocks: map[uint64]string{ + 6: "Hash - 6", + 7: "Hash - 7", + 8: "Hash - 8", + }, + }) + + assert.Equal(t, uint64(5), window.earlist) + assert.Equal(t, uint64(8), window.latest) + + pushed, popped := window.Push(9, "Hash - 9", "Hash - 8") + assert.Equal(t, true, pushed) + assert.Equal(t, false, popped) +} diff --git a/blockchain/sync/sync_db.go b/blockchain/sync/sync_db.go index be099bd..eefeb7e 100644 --- a/blockchain/sync/sync_db.go +++ b/blockchain/sync/sync_db.go @@ -25,6 +25,9 @@ type ParamsDB[T any] struct { Processor db.Option DB *gorm.DB NextBlockNumber uint64 + + // only used to sync latest data, and usually loads from database + Reorg poll.ReorgWindowParams } func CatchUpDB[T channel.Sizable](ctx context.Context, params CatchupParamsDB[T], processors ...db.BatchProcessor[T]) uint64 { @@ -54,7 +57,7 @@ func StartFinalizedDB[T any](ctx context.Context, wg *sync.WaitGroup, params Par } func StartLatestDB[T any](ctx context.Context, wg *sync.WaitGroup, params ParamsDB[T], processors ...db.RevertableProcessor[T]) { - poller := poll.NewLatestPoller(params.Adapter, params.NextBlockNumber, params.Poller) + poller := poll.NewLatestPoller(params.Adapter, params.NextBlockNumber, params.Reorg, params.Poller) wg.Add(1) go poller.Poll(ctx, wg)