@@ -23,11 +23,13 @@ type CatchUpProcessor[T any] interface {
2323// Process retrieves data from the given channel and processes data with given processor.
2424//
2525// Generally, it will be executed in a separate goroutine, and terminate if given context done or channel closed.
26- //
27- // It returns true if the given channel closed. Otherwise false, if the given context done.
28- func Process [T any ](ctx context.Context , wg * sync.WaitGroup , dataCh <- chan T , processor Processor [T ]) bool {
26+ func Process [T any ](ctx context.Context , wg * sync.WaitGroup , dataCh <- chan T , processor Processor [T ]) {
2927 defer wg .Done ()
3028
29+ process (ctx , dataCh , processor )
30+ }
31+
32+ func process [T any ](ctx context.Context , dataCh <- chan T , processor Processor [T ]) bool {
3133 for {
3234 select {
3335 case <- ctx .Done ():
@@ -50,7 +52,9 @@ func Process[T any](ctx context.Context, wg *sync.WaitGroup, dataCh <-chan T, pr
5052
5153// ProcessCatchUp processes the polled blockchain data from given data channel till the latest finalized block processed.
5254func ProcessCatchUp [T any ](ctx context.Context , wg * sync.WaitGroup , dataCh <- chan T , processor CatchUpProcessor [T ]) {
53- if Process (ctx , wg , dataCh , processor ) {
55+ defer wg .Done ()
56+
57+ if process (ctx , dataCh , processor ) {
5458 processor .OnCatchedUp (ctx )
5559 }
5660}
0 commit comments