Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ Utilities for golang developments on Conflux blockchain, especially for backend
|[Pprof](./pprof)|To enable pprof server based on configuration.|
|[Rate Limit](./rate)|Utilities to limit request rate, along with HTTP handler middlewares|
|[Store](./store/README.md)|Provides utilities to initialize database.|
|[Viper](./viper/README.md)| Initialize the original [viper](https://github.com/spf13/viper) in common and fix some issues.|
|[Viper](./viper/README.md)|Initialize the original [viper](https://github.com/spf13/viper) in common and fix some issues.|
4 changes: 2 additions & 2 deletions blockchain/sync/poll/latest_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ type LatestPoller[T any] struct {
health *health.TimedCounter
}

func NewLatestPoller[T any](adapter Adapter[T], nextBlockNumber uint64, option ...Option) *LatestPoller[T] {
func NewLatestPoller[T any](adapter Adapter[T], nextBlockNumber uint64, reorgParams ReorgWindowParams, option ...Option) *LatestPoller[T] {
opt := normalizeOpt(option...)

return &LatestPoller[T]{
option: opt,
adapter: adapter,
nextBlockNumber: nextBlockNumber,
dataCh: make(chan Revertable[T], opt.BufferSize),
window: NewReorgWindow(),
window: NewReorgWindowWithLatestBlocks(reorgParams),
health: health.NewTimedCounter(opt.Health),
}
}
Expand Down
35 changes: 34 additions & 1 deletion blockchain/sync/poll/reorg_window.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package poll

type ReorgWindowParams struct {
FinalizedBlockNumber uint64
FinalizedBlockHash string
LatestBlocks map[uint64]string
}

// ReorgWindow is used to detect chain reorg.
type ReorgWindow struct {
// no capacity, since latest_finalized is small enough
// no capacity, since max(latest_finalized, latest_checkpoint) is small enough
blockNumber2Hashes map[uint64]string
earlist, latest uint64
}
Expand All @@ -13,6 +19,33 @@ func NewReorgWindow() *ReorgWindow {
}
}

// NewReorgWindowWithLatestBlocks initializes with given latest blocks and the last finalized block number.
//
// When service restarted, user should load recent blocks and the last finalized block from database,
// and initialize the reorg window so as to correctly handle chain reorg during the service down time.
func NewReorgWindowWithLatestBlocks(params ReorgWindowParams) *ReorgWindow {
window := NewReorgWindow()
if len(params.FinalizedBlockHash) == 0 {
return window
}

window.earlist = params.FinalizedBlockNumber
window.latest = params.FinalizedBlockNumber
window.blockNumber2Hashes[params.FinalizedBlockNumber] = params.FinalizedBlockHash

for {
hash, ok := params.LatestBlocks[window.latest+1]
if !ok {
break
}

window.latest++
window.blockNumber2Hashes[window.latest] = hash
}

return window
}

func (window *ReorgWindow) Push(blockNumber uint64, blockHash, parentBlockHash string) (appended, popped bool) {
// window is empty
if len(window.blockNumber2Hashes) == 0 {
Expand Down
73 changes: 73 additions & 0 deletions blockchain/sync/poll/reorg_window_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package poll

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestReorgWindowPush(t *testing.T) {
window := NewReorgWindow()

// push empty
pushed, popped := window.Push(5, "Hash - 5", "Hash - 4")
assert.Equal(t, true, pushed)
assert.Equal(t, false, popped)

// push in sequence
pushed, popped = window.Push(6, "Hash - 6", "Hash - 5")
assert.Equal(t, true, pushed)
assert.Equal(t, false, popped)

// push not in sequence
pushed, popped = window.Push(6, "Hash - 6", "Hash - 5")
assert.Equal(t, false, pushed)
assert.Equal(t, false, popped)
pushed, popped = window.Push(8, "Hash - 8", "Hash - 7")
assert.Equal(t, false, pushed)
assert.Equal(t, false, popped)

// push in sequence, but parent hash mismatch
pushed, popped = window.Push(7, "Hash - 7", "Hash - 66")
assert.Equal(t, false, pushed)
assert.Equal(t, true, popped)

// push block 6 again due to popped
pushed, popped = window.Push(6, "Hash - 66", "Hash - 5")
assert.Equal(t, true, pushed)
assert.Equal(t, false, popped)
}

func TestReorgWindowEvict(t *testing.T) {
window := NewReorgWindow()

for i := uint64(1); i < 10; i++ {
pushed, popped := window.Push(i, fmt.Sprintf("Hash - %v", i), fmt.Sprintf("Hash - %v", i-1))
assert.Equal(t, true, pushed)
assert.Equal(t, false, popped)
}

window.Evict(5)

assert.Equal(t, uint64(6), window.earlist)
}

func TestReorgWindowWithLatestBlocks(t *testing.T) {
window := NewReorgWindowWithLatestBlocks(ReorgWindowParams{
FinalizedBlockNumber: 5,
FinalizedBlockHash: "Hash - 5",
LatestBlocks: map[uint64]string{
6: "Hash - 6",
7: "Hash - 7",
8: "Hash - 8",
},
})

assert.Equal(t, uint64(5), window.earlist)
assert.Equal(t, uint64(8), window.latest)

pushed, popped := window.Push(9, "Hash - 9", "Hash - 8")
assert.Equal(t, true, pushed)
assert.Equal(t, false, popped)
}
5 changes: 4 additions & 1 deletion blockchain/sync/sync_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type ParamsDB[T any] struct {
Processor db.Option
DB *gorm.DB
NextBlockNumber uint64

// only used to sync latest data, and usually loads from database
Reorg poll.ReorgWindowParams
}

func CatchUpDB[T channel.Sizable](ctx context.Context, params CatchupParamsDB[T], processors ...db.BatchProcessor[T]) uint64 {
Expand Down Expand Up @@ -54,7 +57,7 @@ func StartFinalizedDB[T any](ctx context.Context, wg *sync.WaitGroup, params Par
}

func StartLatestDB[T any](ctx context.Context, wg *sync.WaitGroup, params ParamsDB[T], processors ...db.RevertableProcessor[T]) {
poller := poll.NewLatestPoller(params.Adapter, params.NextBlockNumber, params.Poller)
poller := poll.NewLatestPoller(params.Adapter, params.NextBlockNumber, params.Reorg, params.Poller)
wg.Add(1)
go poller.Poll(ctx, wg)

Expand Down
Loading