Skip to content

Commit 1f4ba01

Browse files
authored
Add unit tests for sync db (#91)
1 parent 1c8f32a commit 1f4ba01

File tree

4 files changed

+377
-0
lines changed

4 files changed

+377
-0
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package testutil
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/DmitriyVTitov/size"
8+
)
9+
10+
type Data struct {
11+
Number uint64
12+
Hash string
13+
ParentHash string
14+
}
15+
16+
func (data Data) Size() int {
17+
return size.Of(data)
18+
}
19+
20+
type Adapter struct {
21+
finalized []uint64
22+
nextFinalized int
23+
24+
latestBlocks []Data
25+
nextLatest int
26+
}
27+
28+
func MustNewAdapter(finalized []uint64, latestBlocks []Data) *Adapter {
29+
if len(finalized) == 0 {
30+
panic("Finalized value is empty")
31+
}
32+
33+
for i := 1; i < len(finalized); i++ {
34+
if finalized[i-1] >= finalized[i] {
35+
panic("Invalid finalized range")
36+
}
37+
}
38+
39+
return &Adapter{
40+
finalized: finalized,
41+
latestBlocks: latestBlocks,
42+
}
43+
}
44+
45+
func (adapter *Adapter) GetFinalizedBlockNumber(ctx context.Context) (uint64, error) {
46+
if adapter.nextFinalized == len(adapter.finalized) {
47+
return adapter.finalized[adapter.nextFinalized-1], nil
48+
}
49+
50+
finalized := adapter.finalized[adapter.nextFinalized]
51+
52+
adapter.nextFinalized++
53+
54+
return finalized, nil
55+
}
56+
57+
func (adapter *Adapter) GetLatestBlockNumber(ctx context.Context) (uint64, error) {
58+
if len(adapter.latestBlocks) == 0 {
59+
return adapter.finalized[len(adapter.finalized)-1], nil
60+
}
61+
62+
return adapter.latestBlocks[len(adapter.latestBlocks)-1].Number, nil
63+
}
64+
65+
func (adapter *Adapter) GetBlockData(ctx context.Context, blockNumber uint64) (Data, error) {
66+
if finalized := adapter.finalized[len(adapter.finalized)-1]; blockNumber <= finalized {
67+
data := Data{
68+
Number: blockNumber,
69+
Hash: fmt.Sprintf("DataHash-%v", blockNumber),
70+
}
71+
72+
if blockNumber > 0 {
73+
data.ParentHash = fmt.Sprintf("DataHash-%v", blockNumber-1)
74+
}
75+
76+
return data, nil
77+
}
78+
79+
data := adapter.latestBlocks[adapter.nextLatest]
80+
if data.Number != blockNumber {
81+
panic("Requested block number mismatch with expected latest blocks")
82+
}
83+
84+
adapter.nextLatest++
85+
86+
return data, nil
87+
}
88+
89+
func (adapter *Adapter) GetBlockHash(data Data) string {
90+
return data.Hash
91+
}
92+
93+
func (adapter *Adapter) GetParentBlockHash(data Data) string {
94+
return data.ParentHash
95+
}

blockchain/sync/process/db/processor_batch.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func NewBatchAggregateProcessor[T any](option BatchOption, db *gorm.DB, processo
5656
return &BatchAggregateProcessor[T]{
5757
AggregateProcessor: NewAggregateProcessor(option.Processor, db, innerProcessors...),
5858
option: option,
59+
processors: processors,
5960
lastBatchTime: time.Now(),
6061
}
6162
}

blockchain/sync/sync_db_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/poll"
10+
"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/poll/testutil"
11+
"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/process/db"
12+
"github.com/Conflux-Chain/go-conflux-util/parallel"
13+
"github.com/Conflux-Chain/go-conflux-util/store"
14+
"github.com/stretchr/testify/assert"
15+
)
16+
17+
func TestCatchUpDB(t *testing.T) {
18+
adapter := testutil.MustNewAdapter([]uint64{3, 5}, nil)
19+
20+
storeConfig := store.NewMemoryConfig()
21+
DB := storeConfig.MustOpenOrCreate()
22+
23+
processor := newTestDBProcessor()
24+
25+
nextBlockNumber := CatchUpDB(
26+
context.Background(),
27+
CatchupParamsDB[testutil.Data]{
28+
Adapter: adapter,
29+
Poller: poll.CatchUpOption{
30+
Parallel: poll.ParallelOption{
31+
SerialOption: parallel.SerialOption{
32+
Window: 5,
33+
},
34+
},
35+
},
36+
Processor: db.BatchOption{
37+
BatchSize: 3, // 1 batch contains 3 blocks
38+
},
39+
DB: DB,
40+
NextBlockNumber: 2, // start to sync from block 2
41+
},
42+
processor,
43+
)
44+
45+
assert.Equal(t, uint64(6), nextBlockNumber)
46+
47+
processor.assertData(t,
48+
[][]uint64{
49+
{2, 3, 4},
50+
{5},
51+
},
52+
nil,
53+
nil,
54+
)
55+
}
56+
57+
func TestSyncFinalizedDB(t *testing.T) {
58+
adapter := testutil.MustNewAdapter([]uint64{3, 5}, nil)
59+
60+
storeConfig := store.NewMemoryConfig()
61+
DB := storeConfig.MustOpenOrCreate()
62+
63+
processor := newTestDBProcessor()
64+
65+
var wg sync.WaitGroup
66+
67+
StartFinalizedDB(context.Background(), &wg, ParamsDB[testutil.Data]{
68+
Adapter: adapter,
69+
DB: DB,
70+
NextBlockNumber: 2, // start to sync from block 2
71+
}, processor)
72+
73+
// wait for poll-and-process
74+
time.Sleep(500 * time.Millisecond)
75+
76+
processor.assertData(t,
77+
nil,
78+
[]uint64{2, 3, 4, 5},
79+
nil,
80+
)
81+
}
82+
83+
func TestSyncLatestDB(t *testing.T) {
84+
// finalized block number: 5
85+
// latest block number: 9, with 2 reorgs
86+
adapter := testutil.MustNewAdapter([]uint64{3, 5}, []testutil.Data{
87+
{Number: 6, Hash: "DataHash-6", ParentHash: "DataHash-5"},
88+
{Number: 7, Hash: "DataHash-7", ParentHash: "DataHash-6"},
89+
90+
// revert block 7
91+
{Number: 8, Hash: "DataHash-88", ParentHash: "DataHash-77"},
92+
{Number: 7, Hash: "DataHash-77", ParentHash: "DataHash-6"},
93+
{Number: 8, Hash: "DataHash-88", ParentHash: "DataHash-77"},
94+
95+
// revert blocks 7 & 8
96+
{Number: 9, Hash: "DataHash-999", ParentHash: "DataHash-888"},
97+
{Number: 8, Hash: "DataHash-888", ParentHash: "DataHash-777"},
98+
{Number: 7, Hash: "DataHash-777", ParentHash: "DataHash-6"},
99+
{Number: 8, Hash: "DataHash-888", ParentHash: "DataHash-777"},
100+
{Number: 9, Hash: "DataHash-999", ParentHash: "DataHash-888"},
101+
})
102+
103+
storeConfig := store.NewMemoryConfig()
104+
DB := storeConfig.MustOpenOrCreate()
105+
106+
processor := newTestDBProcessor()
107+
108+
var wg sync.WaitGroup
109+
110+
StartLatestDB(context.Background(), &wg, ParamsDB[testutil.Data]{
111+
Adapter: adapter,
112+
DB: DB,
113+
NextBlockNumber: 2, // start to sync from block 2
114+
}, processor)
115+
116+
// wait for poll-and-process
117+
time.Sleep(500 * time.Millisecond)
118+
119+
processor.assertData(t,
120+
nil,
121+
[]uint64{2, 3, 4, 5, 6, 7, 8, 9},
122+
// 2 reorgs
123+
[][]uint64{
124+
{7},
125+
{7, 8},
126+
},
127+
)
128+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package sync
2+
3+
import (
4+
"encoding/json"
5+
"slices"
6+
"testing"
7+
8+
"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/poll/testutil"
9+
"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/process/db"
10+
"github.com/stretchr/testify/assert"
11+
"gorm.io/gorm"
12+
)
13+
14+
type dummyDBOp struct{}
15+
16+
func (op dummyDBOp) Exec(tx *gorm.DB) error { return nil }
17+
18+
type testDBProcessor struct {
19+
batch []testutil.Data
20+
batches [][]testutil.Data
21+
22+
singles []testutil.Data
23+
24+
drops [][]testutil.Data
25+
26+
prev testutil.Data
27+
}
28+
29+
func newTestDBProcessor() *testDBProcessor {
30+
return &testDBProcessor{}
31+
}
32+
33+
func (p *testDBProcessor) requiresContinuous(data testutil.Data) {
34+
if len(p.prev.Hash) == 0 {
35+
return
36+
}
37+
38+
if p.prev.Number+1 != data.Number {
39+
panic("Block number not in sequence")
40+
}
41+
42+
if p.prev.Hash != data.ParentHash {
43+
panic("Block parent hash mismatch")
44+
}
45+
}
46+
47+
func (p *testDBProcessor) Process(data testutil.Data) db.Operation {
48+
p.requiresContinuous(data)
49+
50+
p.singles = append(p.singles, data)
51+
52+
p.prev = data
53+
54+
return dummyDBOp{}
55+
}
56+
57+
func (p *testDBProcessor) Revert(data testutil.Data) db.Operation {
58+
if len(p.singles) == 0 {
59+
panic("No block to revert")
60+
}
61+
62+
if data.Number < p.singles[0].Number {
63+
panic("Reverted block number too small")
64+
}
65+
66+
if data.Number > p.singles[len(p.singles)-1].Number {
67+
panic("Reverted block number too large")
68+
}
69+
70+
var ancestor int
71+
for p.singles[ancestor].Number != data.Number {
72+
ancestor++
73+
}
74+
75+
p.drops = append(p.drops, slices.Clone(p.singles[ancestor:]))
76+
p.singles = p.singles[:ancestor]
77+
78+
if ancestor == 0 {
79+
p.prev = testutil.Data{}
80+
} else {
81+
p.prev = p.singles[ancestor-1]
82+
}
83+
84+
return dummyDBOp{}
85+
}
86+
87+
func (p *testDBProcessor) BatchProcess(data testutil.Data) int {
88+
p.requiresContinuous(data)
89+
90+
p.batch = append(p.batch, data)
91+
92+
p.prev = data
93+
94+
return len(p.batch)
95+
}
96+
97+
func (p *testDBProcessor) BatchExec(tx *gorm.DB, createBatchSize int) error {
98+
return nil
99+
}
100+
101+
func (p *testDBProcessor) BatchReset() {
102+
p.batches = append(p.batches, p.batch)
103+
p.batch = nil
104+
}
105+
106+
func (p *testDBProcessor) assertData(t *testing.T, batches [][]uint64, singles []uint64, drops [][]uint64) {
107+
assert.Equal(t, batches, p.toNumberSlice2D(p.batches))
108+
assert.Nil(t, p.batch)
109+
assert.Equal(t, singles, p.toNumberSlice(p.singles))
110+
assert.Equal(t, drops, p.toNumberSlice2D(p.drops))
111+
}
112+
113+
func (p *testDBProcessor) toNumberSlice(data []testutil.Data) []uint64 {
114+
if data == nil {
115+
return nil
116+
}
117+
118+
result := make([]uint64, 0, len(data))
119+
120+
for _, v := range data {
121+
result = append(result, v.Number)
122+
}
123+
124+
return result
125+
}
126+
127+
func (p *testDBProcessor) toNumberSlice2D(data [][]testutil.Data) [][]uint64 {
128+
if data == nil {
129+
return nil
130+
}
131+
132+
result := make([][]uint64, 0, len(data))
133+
134+
for _, v := range data {
135+
result = append(result, p.toNumberSlice(v))
136+
}
137+
138+
return result
139+
}
140+
141+
func (p *testDBProcessor) String() string {
142+
data := map[string]any{
143+
"batches": p.toNumberSlice2D(p.batches),
144+
"batch": p.toNumberSlice(p.batch),
145+
"singles": p.toNumberSlice(p.singles),
146+
"drops": p.toNumberSlice2D(p.drops),
147+
"prev": p.prev.Number,
148+
}
149+
150+
encoded, _ := json.MarshalIndent(data, "", " ")
151+
152+
return string(encoded)
153+
}

0 commit comments

Comments
 (0)