Skip to content

Commit 4aced6f

Browse files
authored
Refactor the common used sync methods for db (#89)
1 parent fe2dd08 commit 4aced6f

File tree

3 files changed

+65
-87
lines changed

3 files changed

+65
-87
lines changed

blockchain/sync/evm/syncer_db.go

Lines changed: 0 additions & 86 deletions
This file was deleted.

blockchain/sync/process/db/processor_batch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type BatchAggregateProcessor[T any] struct {
4646
}
4747

4848
func NewBatchAggregateProcessor[T any](option BatchOption, db *gorm.DB, processors ...BatchProcessor[T]) *BatchAggregateProcessor[T] {
49-
defaults.SetDefaults(option)
49+
defaults.SetDefaults(&option)
5050

5151
innerProcessors := make([]Processor[T], 0, len(processors))
5252
for _, v := range processors {

blockchain/sync/sync_db.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/poll"
8+
"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/process"
9+
"github.com/Conflux-Chain/go-conflux-util/blockchain/sync/process/db"
10+
"github.com/Conflux-Chain/go-conflux-util/channel"
11+
"gorm.io/gorm"
12+
)
13+
14+
type CatchupParamsDB[T any] struct {
15+
Adapter poll.Adapter[T]
16+
Poller poll.CatchUpOption
17+
Processor db.BatchOption
18+
DB *gorm.DB
19+
NextBlockNumber uint64
20+
}
21+
22+
type ParamsDB[T any] struct {
23+
Adapter poll.Adapter[T]
24+
Poller poll.Option
25+
Processor db.Option
26+
DB *gorm.DB
27+
NextBlockNumber uint64
28+
}
29+
30+
func CatchUpDB[T channel.Sizable](ctx context.Context, params CatchupParamsDB[T], processors ...db.BatchProcessor[T]) uint64 {
31+
var wg sync.WaitGroup
32+
33+
poller := poll.NewCatchUpPoller(params.Adapter, params.NextBlockNumber, params.Poller)
34+
wg.Add(1)
35+
go poller.Poll(ctx, &wg)
36+
37+
processor := db.NewBatchAggregateProcessor(params.Processor, params.DB, processors...)
38+
wg.Add(1)
39+
go process.Process(ctx, &wg, poller.DataCh(), processor)
40+
41+
wg.Wait()
42+
43+
return poller.NextBlockNumber()
44+
}
45+
46+
func StartFinalizedDB[T any](ctx context.Context, wg *sync.WaitGroup, params ParamsDB[T], processors ...db.Processor[T]) {
47+
poller := poll.NewFinalizedPoller(params.Adapter, params.NextBlockNumber, params.Poller)
48+
wg.Add(1)
49+
go poller.Poll(ctx, wg)
50+
51+
processor := db.NewAggregateProcessor(params.Processor, params.DB, processors...)
52+
wg.Add(1)
53+
go process.Process(ctx, wg, poller.DataCh(), processor)
54+
}
55+
56+
func StartLatestDB[T any](ctx context.Context, wg *sync.WaitGroup, params ParamsDB[T], processors ...db.RevertableProcessor[T]) {
57+
poller := poll.NewLatestPoller(params.Adapter, params.NextBlockNumber, params.Poller)
58+
wg.Add(1)
59+
go poller.Poll(ctx, wg)
60+
61+
processor := db.NewRevertableAggregateProcessor(params.Processor, params.DB, processors...)
62+
wg.Add(1)
63+
go process.Process(ctx, wg, poller.DataCh(), processor)
64+
}

0 commit comments

Comments
 (0)