Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions blockchain/sync/poll/testutil/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package testutil

import (
"context"
"fmt"

"github.com/DmitriyVTitov/size"
)

type Data struct {
Number uint64
Hash string
ParentHash string
}

func (data Data) Size() int {
return size.Of(data)
}

type Adapter struct {
finalized []uint64
nextFinalized int

latestBlocks []Data
nextLatest int
}

func MustNewAdapter(finalized []uint64, latestBlocks []Data) *Adapter {
if len(finalized) == 0 {
panic("Finalized value is empty")
}

for i := 1; i < len(finalized); i++ {
if finalized[i-1] >= finalized[i] {
panic("Invalid finalized range")
}
}

return &Adapter{
finalized: finalized,
latestBlocks: latestBlocks,
}
}

func (adapter *Adapter) GetFinalizedBlockNumber(ctx context.Context) (uint64, error) {
if adapter.nextFinalized == len(adapter.finalized) {
return adapter.finalized[adapter.nextFinalized-1], nil
}

finalized := adapter.finalized[adapter.nextFinalized]

adapter.nextFinalized++

return finalized, nil
}

func (adapter *Adapter) GetLatestBlockNumber(ctx context.Context) (uint64, error) {
if len(adapter.latestBlocks) == 0 {
return adapter.finalized[len(adapter.finalized)-1], nil
}

return adapter.latestBlocks[len(adapter.latestBlocks)-1].Number, nil
}

func (adapter *Adapter) GetBlockData(ctx context.Context, blockNumber uint64) (Data, error) {
if finalized := adapter.finalized[len(adapter.finalized)-1]; blockNumber <= finalized {
data := Data{
Number: blockNumber,
Hash: fmt.Sprintf("DataHash-%v", blockNumber),
}

if blockNumber > 0 {
data.ParentHash = fmt.Sprintf("DataHash-%v", blockNumber-1)
}

return data, nil
}

data := adapter.latestBlocks[adapter.nextLatest]
if data.Number != blockNumber {
panic("Requested block number mismatch with expected latest blocks")
}

adapter.nextLatest++

return data, nil
}

func (adapter *Adapter) GetBlockHash(data Data) string {
return data.Hash
}

func (adapter *Adapter) GetParentBlockHash(data Data) string {
return data.ParentHash
}
1 change: 1 addition & 0 deletions blockchain/sync/process/db/processor_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewBatchAggregateProcessor[T any](option BatchOption, db *gorm.DB, processo
return &BatchAggregateProcessor[T]{
AggregateProcessor: NewAggregateProcessor(option.Processor, db, innerProcessors...),
option: option,
processors: processors,
lastBatchTime: time.Now(),
}
}
Expand Down
128 changes: 128 additions & 0 deletions blockchain/sync/sync_db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package sync

import (
"context"
"sync"
"testing"
"time"

"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/poll"
"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/poll/testutil"
"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/process/db"
"github.com/Conflux-Chain/go-conflux-util/parallel"
"github.com/Conflux-Chain/go-conflux-util/store"
"github.com/stretchr/testify/assert"
)

func TestCatchUpDB(t *testing.T) {
adapter := testutil.MustNewAdapter([]uint64{3, 5}, nil)

storeConfig := store.NewMemoryConfig()
DB := storeConfig.MustOpenOrCreate()

processor := newTestDBProcessor()

nextBlockNumber := CatchUpDB(
context.Background(),
CatchupParamsDB[testutil.Data]{
Adapter: adapter,
Poller: poll.CatchUpOption{
Parallel: poll.ParallelOption{
SerialOption: parallel.SerialOption{
Window: 5,
},
},
},
Processor: db.BatchOption{
BatchSize: 3, // 1 batch contains 3 blocks
},
DB: DB,
NextBlockNumber: 2, // start to sync from block 2
},
processor,
)

assert.Equal(t, uint64(6), nextBlockNumber)

processor.assertData(t,
[][]uint64{
{2, 3, 4},
{5},
},
nil,
nil,
)
}

func TestSyncFinalizedDB(t *testing.T) {
adapter := testutil.MustNewAdapter([]uint64{3, 5}, nil)

storeConfig := store.NewMemoryConfig()
DB := storeConfig.MustOpenOrCreate()

processor := newTestDBProcessor()

var wg sync.WaitGroup

StartFinalizedDB(context.Background(), &wg, ParamsDB[testutil.Data]{
Adapter: adapter,
DB: DB,
NextBlockNumber: 2, // start to sync from block 2
}, processor)

// wait for poll-and-process
time.Sleep(500 * time.Millisecond)

processor.assertData(t,
nil,
[]uint64{2, 3, 4, 5},
nil,
)
}

func TestSyncLatestDB(t *testing.T) {
// finalized block number: 5
// latest block number: 9, with 2 reorgs
adapter := testutil.MustNewAdapter([]uint64{3, 5}, []testutil.Data{
{Number: 6, Hash: "DataHash-6", ParentHash: "DataHash-5"},
{Number: 7, Hash: "DataHash-7", ParentHash: "DataHash-6"},

// revert block 7
{Number: 8, Hash: "DataHash-88", ParentHash: "DataHash-77"},
{Number: 7, Hash: "DataHash-77", ParentHash: "DataHash-6"},
{Number: 8, Hash: "DataHash-88", ParentHash: "DataHash-77"},

// revert blocks 7 & 8
{Number: 9, Hash: "DataHash-999", ParentHash: "DataHash-888"},
{Number: 8, Hash: "DataHash-888", ParentHash: "DataHash-777"},
{Number: 7, Hash: "DataHash-777", ParentHash: "DataHash-6"},
{Number: 8, Hash: "DataHash-888", ParentHash: "DataHash-777"},
{Number: 9, Hash: "DataHash-999", ParentHash: "DataHash-888"},
})

storeConfig := store.NewMemoryConfig()
DB := storeConfig.MustOpenOrCreate()

processor := newTestDBProcessor()

var wg sync.WaitGroup

StartLatestDB(context.Background(), &wg, ParamsDB[testutil.Data]{
Adapter: adapter,
DB: DB,
NextBlockNumber: 2, // start to sync from block 2
}, processor)

// wait for poll-and-process
time.Sleep(500 * time.Millisecond)

processor.assertData(t,
nil,
[]uint64{2, 3, 4, 5, 6, 7, 8, 9},
// 2 reorgs
[][]uint64{
{7},
{7, 8},
},
)
}
153 changes: 153 additions & 0 deletions blockchain/sync/sync_db_testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package sync

import (
"encoding/json"
"slices"
"testing"

"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/poll/testutil"
"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/process/db"
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
)

type dummyDBOp struct{}

func (op dummyDBOp) Exec(tx *gorm.DB) error { return nil }

type testDBProcessor struct {
batch []testutil.Data
batches [][]testutil.Data

singles []testutil.Data

drops [][]testutil.Data

prev testutil.Data
}

func newTestDBProcessor() *testDBProcessor {
return &testDBProcessor{}
}

func (p *testDBProcessor) requiresContinuous(data testutil.Data) {
if len(p.prev.Hash) == 0 {
return
}

if p.prev.Number+1 != data.Number {
panic("Block number not in sequence")
}

if p.prev.Hash != data.ParentHash {
panic("Block parent hash mismatch")
}
}

func (p *testDBProcessor) Process(data testutil.Data) db.Operation {
p.requiresContinuous(data)

p.singles = append(p.singles, data)

p.prev = data

return dummyDBOp{}
}

func (p *testDBProcessor) Revert(data testutil.Data) db.Operation {
if len(p.singles) == 0 {
panic("No block to revert")
}

if data.Number < p.singles[0].Number {
panic("Reverted block number too small")
}

if data.Number > p.singles[len(p.singles)-1].Number {
panic("Reverted block number too large")
}

var ancestor int
for p.singles[ancestor].Number != data.Number {
ancestor++
}

p.drops = append(p.drops, slices.Clone(p.singles[ancestor:]))
p.singles = p.singles[:ancestor]

if ancestor == 0 {
p.prev = testutil.Data{}
} else {
p.prev = p.singles[ancestor-1]
}

return dummyDBOp{}
}

func (p *testDBProcessor) BatchProcess(data testutil.Data) int {
p.requiresContinuous(data)

p.batch = append(p.batch, data)

p.prev = data

return len(p.batch)
}

func (p *testDBProcessor) BatchExec(tx *gorm.DB, createBatchSize int) error {
return nil
}

func (p *testDBProcessor) BatchReset() {
p.batches = append(p.batches, p.batch)
p.batch = nil
}

func (p *testDBProcessor) assertData(t *testing.T, batches [][]uint64, singles []uint64, drops [][]uint64) {
assert.Equal(t, batches, p.toNumberSlice2D(p.batches))
assert.Nil(t, p.batch)
assert.Equal(t, singles, p.toNumberSlice(p.singles))
assert.Equal(t, drops, p.toNumberSlice2D(p.drops))
}

func (p *testDBProcessor) toNumberSlice(data []testutil.Data) []uint64 {
if data == nil {
return nil
}

result := make([]uint64, 0, len(data))

for _, v := range data {
result = append(result, v.Number)
}

return result
}

func (p *testDBProcessor) toNumberSlice2D(data [][]testutil.Data) [][]uint64 {
if data == nil {
return nil
}

result := make([][]uint64, 0, len(data))

for _, v := range data {
result = append(result, p.toNumberSlice(v))
}

return result
}

func (p *testDBProcessor) String() string {
data := map[string]any{
"batches": p.toNumberSlice2D(p.batches),
"batch": p.toNumberSlice(p.batch),
"singles": p.toNumberSlice(p.singles),
"drops": p.toNumberSlice2D(p.drops),
"prev": p.prev.Number,
}

encoded, _ := json.MarshalIndent(data, "", " ")

return string(encoded)
}
Loading