@@ -24,19 +24,17 @@ func NewWorker(rpc rpc.IRPCClient) *Worker {
2424 }
2525}
2626
27- func (w * Worker ) processChunkWithRetry (ctx context.Context , chunk []* big.Int , resultsCh chan <- []rpc.GetFullBlockResult ) {
27+ func (w * Worker ) processChunkWithRetry (ctx context.Context , chunk []* big.Int , resultsCh chan <- []rpc.GetFullBlockResult , sem chan struct {} ) {
2828 select {
2929 case <- ctx .Done ():
3030 return
3131 default :
3232 }
3333
34- defer func () {
35- time .Sleep (time .Duration (config .Cfg .RPC .Blocks .BatchDelay ) * time .Millisecond )
36- }()
37-
38- // Try with current chunk size
34+ // Acquire semaphore only for the RPC request
35+ sem <- struct {}{}
3936 results := w .rpc .GetFullBlocks (ctx , chunk )
37+ <- sem // Release semaphore immediately after RPC request
4038
4139 if len (chunk ) == 1 {
4240 // chunk size 1 is the minimum, so we return whatever we get
@@ -56,6 +54,7 @@ func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, re
5654 }
5755 }
5856
57+ log .Debug ().Msgf ("Out of %d blocks, %d successful, %d failed" , len (results ), len (successfulResults ), len (failedBlocks ))
5958 // If we have successful results, send them
6059 if len (successfulResults ) > 0 {
6160 resultsCh <- successfulResults
@@ -68,7 +67,7 @@ func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, re
6867
6968 // can't split any further, so try one last time
7069 if len (failedBlocks ) == 1 {
71- w .processChunkWithRetry (ctx , failedBlocks , resultsCh )
70+ w .processChunkWithRetry (ctx , failedBlocks , resultsCh , sem )
7271 return
7372 }
7473
@@ -84,12 +83,12 @@ func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, re
8483
8584 go func () {
8685 defer wg .Done ()
87- w .processChunkWithRetry (ctx , leftChunk , resultsCh )
86+ w .processChunkWithRetry (ctx , leftChunk , resultsCh , sem )
8887 }()
8988
9089 go func () {
9190 defer wg .Done ()
92- w .processChunkWithRetry (ctx , rightChunk , resultsCh )
91+ w .processChunkWithRetry (ctx , rightChunk , resultsCh , sem )
9392 }()
9493
9594 wg .Wait ()
@@ -102,9 +101,15 @@ func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFull
102101 var wg sync.WaitGroup
103102 resultsCh := make (chan []rpc.GetFullBlockResult , blockCount )
104103
104+ // Create a semaphore channel to limit concurrent goroutines
105+ sem := make (chan struct {}, 20 )
106+
105107 log .Debug ().Msgf ("Worker Processing %d blocks in %d chunks of max %d blocks" , blockCount , len (chunks ), w .rpc .GetBlocksPerRequest ().Blocks )
106108
107- for _ , chunk := range chunks {
109+ for i , chunk := range chunks {
110+ if i > 0 {
111+ time .Sleep (time .Duration (config .Cfg .RPC .Blocks .BatchDelay ) * time .Millisecond )
112+ }
108113 select {
109114 case <- ctx .Done ():
110115 log .Debug ().Msg ("Context canceled, stopping Worker" )
@@ -116,7 +121,7 @@ func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFull
116121 wg .Add (1 )
117122 go func (chunk []* big.Int ) {
118123 defer wg .Done ()
119- w .processChunkWithRetry (ctx , chunk , resultsCh )
124+ w .processChunkWithRetry (ctx , chunk , resultsCh , sem )
120125 }(chunk )
121126 }
122127
0 commit comments