diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 9d42e05..d25294f 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -24,19 +24,17 @@ func NewWorker(rpc rpc.IRPCClient) *Worker { } } -func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, resultsCh chan<- []rpc.GetFullBlockResult) { +func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, resultsCh chan<- []rpc.GetFullBlockResult, sem chan struct{}) { select { case <-ctx.Done(): return default: } - defer func() { - time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond) - }() - - // Try with current chunk size + // Acquire semaphore only for the RPC request + sem <- struct{}{} results := w.rpc.GetFullBlocks(ctx, chunk) + <-sem // Release semaphore immediately after RPC request if len(chunk) == 1 { // chunk size 1 is the minimum, so we return whatever we get @@ -56,6 +54,7 @@ func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, re } } + log.Debug().Msgf("Out of %d blocks, %d successful, %d failed", len(results), len(successfulResults), len(failedBlocks)) // If we have successful results, send them if len(successfulResults) > 0 { resultsCh <- successfulResults @@ -68,7 +67,7 @@ func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, re // can't split any further, so try one last time if len(failedBlocks) == 1 { - w.processChunkWithRetry(ctx, failedBlocks, resultsCh) + w.processChunkWithRetry(ctx, failedBlocks, resultsCh, sem) return } @@ -84,12 +83,12 @@ func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, re go func() { defer wg.Done() - w.processChunkWithRetry(ctx, leftChunk, resultsCh) + w.processChunkWithRetry(ctx, leftChunk, resultsCh, sem) }() go func() { defer wg.Done() - w.processChunkWithRetry(ctx, rightChunk, resultsCh) + w.processChunkWithRetry(ctx, rightChunk, resultsCh, sem) }() wg.Wait() @@ -102,9 +101,15 @@ func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFull var wg sync.WaitGroup resultsCh := make(chan []rpc.GetFullBlockResult, blockCount) + // Create a semaphore channel to limit concurrent goroutines + sem := make(chan struct{}, 20) + log.Debug().Msgf("Worker Processing %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), w.rpc.GetBlocksPerRequest().Blocks) - for _, chunk := range chunks { + for i, chunk := range chunks { + if i > 0 { + time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond) + } select { case <-ctx.Done(): log.Debug().Msg("Context canceled, stopping Worker") @@ -116,7 +121,7 @@ func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFull wg.Add(1) go func(chunk []*big.Int) { defer wg.Done() - w.processChunkWithRetry(ctx, chunk, resultsCh) + w.processChunkWithRetry(ctx, chunk, resultsCh, sem) }(chunk) }