Skip to content

Commit 3648a0b

Browse files
committed
Refactor the db processors
1 parent fbbf164 commit 3648a0b

File tree

4 files changed

+62
-50
lines changed

4 files changed

+62
-50
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package db
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/Conflux-Chain/go-conflux-util/ctxutil"
8+
"github.com/Conflux-Chain/go-conflux-util/health"
9+
"github.com/mcuadros/go-defaults"
10+
"gorm.io/gorm"
11+
)
12+
13+
type Option struct {
14+
RetryInterval time.Duration `default:"3s"`
15+
16+
Health health.TimedCounterConfig
17+
}
18+
19+
// RetriableProcessor operates on database till succeeded.
20+
type RetriableProcessor struct {
21+
option Option
22+
db *gorm.DB
23+
health *health.TimedCounter
24+
}
25+
26+
func NewRetriableProcessor(db *gorm.DB, option Option) *RetriableProcessor {
27+
defaults.SetDefaults(&option)
28+
29+
return &RetriableProcessor{
30+
option: option,
31+
db: db,
32+
health: health.NewTimedCounter(option.Health),
33+
}
34+
}
35+
36+
// Write executes the given op in a transaction. If failed, it will try again till succeeded.
37+
func (processor *RetriableProcessor) Write(ctx context.Context, op Operation) {
38+
for {
39+
err := processor.db.Transaction(func(tx *gorm.DB) error {
40+
return op.Exec(tx)
41+
})
42+
43+
processor.health.LogOnError(err, "Process blockchain data in Database")
44+
45+
if err == nil {
46+
return
47+
}
48+
49+
if err = ctxutil.Sleep(ctx, processor.option.RetryInterval); err != nil {
50+
return
51+
}
52+
}
53+
}

blockchain/sync/process/db/processor_agg.go

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,7 @@ package db
22

33
import (
44
"context"
5-
"time"
65

7-
"github.com/Conflux-Chain/go-conflux-util/ctxutil"
8-
"github.com/Conflux-Chain/go-conflux-util/health"
9-
"github.com/mcuadros/go-defaults"
106
"gorm.io/gorm"
117
)
128

@@ -15,28 +11,17 @@ type Processor[T any] interface {
1511
Process(data T) Operation
1612
}
1713

18-
type Option struct {
19-
RetryInterval time.Duration `default:"3s"`
20-
21-
Health health.TimedCounterConfig
22-
}
23-
2414
// AggregateProcessor aggregates multiple processor to process blockchain data in batch.
2515
type AggregateProcessor[T any] struct {
26-
option Option
27-
db *gorm.DB
16+
*RetriableProcessor
17+
2818
processors []Processor[T]
29-
health *health.TimedCounter
3019
}
3120

3221
func NewAggregateProcessor[T any](option Option, db *gorm.DB, processors ...Processor[T]) *AggregateProcessor[T] {
33-
defaults.SetDefaults(&option)
34-
3522
return &AggregateProcessor[T]{
36-
option: option,
37-
db: db,
38-
processors: processors,
39-
health: health.NewTimedCounter(option.Health),
23+
RetriableProcessor: NewRetriableProcessor(db, option),
24+
processors: processors,
4025
}
4126
}
4227

@@ -49,24 +34,5 @@ func (processor *AggregateProcessor[T]) Process(ctx context.Context, data T) {
4934
ops = append(ops, op)
5035
}
5136

52-
processor.blockingWrite(ctx, ComposeOperation(ops...))
53-
}
54-
55-
// blockingWrite executes the given op in a transaction. If failed, it will try again till succeeded.
56-
func (processor *AggregateProcessor[T]) blockingWrite(ctx context.Context, op Operation) {
57-
for {
58-
err := processor.db.Transaction(func(tx *gorm.DB) error {
59-
return op.Exec(tx)
60-
})
61-
62-
processor.health.LogOnError(err, "Process blockchain data in Database")
63-
64-
if err == nil {
65-
return
66-
}
67-
68-
if err = ctxutil.Sleep(ctx, processor.option.RetryInterval); err != nil {
69-
return
70-
}
71-
}
37+
processor.Write(ctx, ComposeOperation(ops...))
7238
}

blockchain/sync/process/db/processor_batch.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313
// Note, thread-safe is not required in the implementations, since batch
1414
// related methods are executed in a single thread.
1515
type BatchProcessor[T any] interface {
16-
Processor[T]
17-
1816
// BatchProcess processes the given data and returns the number of SQLs to be executed in batch.
1917
BatchProcess(data T) int
2018

@@ -37,7 +35,7 @@ type BatchOption struct {
3735
//
3836
// Generally, it is used during catch up phase.
3937
type BatchAggregateProcessor[T any] struct {
40-
*AggregateProcessor[T]
38+
*RetriableProcessor
4139

4240
option BatchOption
4341
processors []BatchProcessor[T]
@@ -48,13 +46,8 @@ type BatchAggregateProcessor[T any] struct {
4846
func NewBatchAggregateProcessor[T any](option BatchOption, db *gorm.DB, processors ...BatchProcessor[T]) *BatchAggregateProcessor[T] {
4947
defaults.SetDefaults(&option)
5048

51-
innerProcessors := make([]Processor[T], 0, len(processors))
52-
for _, v := range processors {
53-
innerProcessors = append(innerProcessors, v)
54-
}
55-
5649
return &BatchAggregateProcessor[T]{
57-
AggregateProcessor: NewAggregateProcessor(option.Option, db, innerProcessors...),
50+
RetriableProcessor: NewRetriableProcessor(db, option.Option),
5851
option: option,
5952
processors: processors,
6053
lastBatchTime: time.Now(),
@@ -82,7 +75,7 @@ func (processor *BatchAggregateProcessor[T]) Process(ctx context.Context, data T
8275
}
8376

8477
func (processor *BatchAggregateProcessor[T]) write(ctx context.Context) {
85-
processor.blockingWrite(ctx, processor)
78+
processor.Write(ctx, processor)
8679

8780
// reset
8881
processor.lastBatchTime = time.Now()

blockchain/sync/process/db/processor_revertable.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (processor *RevertableAggregateProcessor[T]) Process(ctx context.Context, d
5353
ops = append(ops, op)
5454
}
5555

56-
processor.blockingWrite(ctx, ComposeOperation(ops...))
56+
processor.Write(ctx, ComposeOperation(ops...))
5757
}
5858

5959
processor.AggregateProcessor.Process(ctx, data.Data)

0 commit comments

Comments
 (0)