Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions blockchain/sync/process/db/processor_agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,3 @@ func (processor *AggregateProcessor[T]) blockingWrite(ctx context.Context, op Op
}
}
}

// Close implements the process.Processor[T] interface.
func (processor *AggregateProcessor[T]) Close(ctx context.Context) {
// do nothing
}
6 changes: 3 additions & 3 deletions blockchain/sync/process/db/processor_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewBatchAggregateProcessor[T any](option BatchOption, db *gorm.DB, processo
}
}

// Process implements the process.Processor[T] interface.
// Process implements the process.CatchUpProcessor[T] interface.
func (processor *BatchAggregateProcessor[T]) Process(ctx context.Context, data T) {
processor.size = 0

Expand Down Expand Up @@ -104,8 +104,8 @@ func (processor *BatchAggregateProcessor[T]) Exec(tx *gorm.DB) error {
return nil
}

// Close implements the process.Processor[T] interface.
func (processor *BatchAggregateProcessor[T]) Close(ctx context.Context) {
// Close implements the process.CatchUpProcessor[T] interface.
func (processor *BatchAggregateProcessor[T]) OnCatchedUp(ctx context.Context) {
if processor.size > 0 {
processor.write(ctx)
}
Expand Down
26 changes: 19 additions & 7 deletions blockchain/sync/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,46 @@ import (
type Processor[T any] interface {
// Process processes the given data.
Process(ctx context.Context, data T)
}

type CatchUpProcessor[T any] interface {
Processor[T]

// Close is executed when Process goroutine terminated.
Close(ctx context.Context)
// OnCatchedUp is executed after the latest finalized block processed.
OnCatchedUp(ctx context.Context)
}

// Process retrieves data from the given channel and processes data with given processor.
//
// Generally, it will be executed in a separate goroutine, and terminate if given context done or channel closed.
func Process[T any](ctx context.Context, wg *sync.WaitGroup, dataCh <-chan T, processor Processor[T]) {
//
// It returns true if the given channel closed. Otherwise false, if the given context done.
func Process[T any](ctx context.Context, wg *sync.WaitGroup, dataCh <-chan T, processor Processor[T]) bool {
defer wg.Done()

for {
select {
case <-ctx.Done():
return
return false
case data, ok := <-dataCh:
if !ok {
processor.Close(ctx)
return
return true
}

processor.Process(ctx, data)

// Check if context is done during processing, otherwise the for loop may continue to
// process the next data when data channel has more data to process.
if ctxutil.IsDone(ctx) {
return
return false
}
}
}
}

// ProcessCatchUp processes the polled blockchain data from given data channel till the latest finalized block processed.
func ProcessCatchUp[T any](ctx context.Context, wg *sync.WaitGroup, dataCh <-chan T, processor CatchUpProcessor[T]) {
if Process(ctx, wg, dataCh, processor) {
processor.OnCatchedUp(ctx)
}
}
2 changes: 1 addition & 1 deletion blockchain/sync/sync_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func CatchUpDB[T channel.Sizable](ctx context.Context, params CatchupParamsDB[T]

processor := db.NewBatchAggregateProcessor(params.Processor, params.DB, processors...)
wg.Add(1)
go process.Process(ctx, &wg, poller.DataCh(), processor)
go process.ProcessCatchUp(ctx, &wg, poller.DataCh(), processor)

wg.Wait()

Expand Down
Loading