Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions blockchain/sync/process/db/processor_agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,3 @@ func (processor *AggregateProcessor[T]) blockingWrite(ctx context.Context, op Op
}
}
}

// Close implements the process.Processor[T] interface.
func (processor *AggregateProcessor[T]) Close(ctx context.Context) {
// do nothing
}
6 changes: 3 additions & 3 deletions blockchain/sync/process/db/processor_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewBatchAggregateProcessor[T any](option BatchOption, db *gorm.DB, processo
}
}

// Process implements the process.Processor[T] interface.
// Process implements the process.CatchUpProcessor[T] interface.
func (processor *BatchAggregateProcessor[T]) Process(ctx context.Context, data T) {
processor.size = 0

Expand Down Expand Up @@ -104,8 +104,8 @@ func (processor *BatchAggregateProcessor[T]) Exec(tx *gorm.DB) error {
return nil
}

// Close implements the process.Processor[T] interface.
func (processor *BatchAggregateProcessor[T]) Close(ctx context.Context) {
// Close implements the process.CatchUpProcessor[T] interface.
func (processor *BatchAggregateProcessor[T]) OnCatchedUp(ctx context.Context) {
if processor.size > 0 {
processor.write(ctx)
}
Expand Down
26 changes: 19 additions & 7 deletions blockchain/sync/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,46 @@ import (
type Processor[T any] interface {
// Process processes the given data.
Process(ctx context.Context, data T)
}

type CatchUpProcessor[T any] interface {
Processor[T]

// Close is executed when Process goroutine terminated.
Close(ctx context.Context)
// OnCatchedUp is executed after the latest finalized block processed.
OnCatchedUp(ctx context.Context)
}

// Process retrieves data from the given channel and processes data with given processor.
//
// Generally, it will be executed in a separate goroutine, and terminate if given context done or channel closed.
func Process[T any](ctx context.Context, wg *sync.WaitGroup, dataCh <-chan T, processor Processor[T]) {
//
// It returns true if the given channel closed. Otherwise false, if the given context done.
func Process[T any](ctx context.Context, wg *sync.WaitGroup, dataCh <-chan T, processor Processor[T]) bool {
defer wg.Done()

for {
select {
case <-ctx.Done():
return
return false
case data, ok := <-dataCh:
if !ok {
processor.Close(ctx)
return
return true
}

processor.Process(ctx, data)

// Check if context is done during processing, otherwise the for loop may continue to
// process the next data when data channel has more data to process.
if ctxutil.IsDone(ctx) {
return
return false
}
}
}
}

// ProcessCatchUp processes the polled blockchain data from given data channel till the latest finalized block processed.
func ProcessCatchUp[T any](ctx context.Context, wg *sync.WaitGroup, dataCh <-chan T, processor CatchUpProcessor[T]) {
if Process(ctx, wg, dataCh, processor) {
processor.OnCatchedUp(ctx)
}
}
Loading