@@ -2,6 +2,7 @@ package worker
22
33import (
44 "math/big"
5+ "sort"
56 "sync"
67 "time"
78
@@ -22,24 +23,88 @@ func NewWorker(rpc rpc.IRPCClient) *Worker {
2223 }
2324}
2425
26+ func (w * Worker ) processChunkWithRetry (chunk []* big.Int , resultsCh chan <- []rpc.GetFullBlockResult ) {
27+ defer func () {
28+ time .Sleep (time .Duration (config .Cfg .RPC .Blocks .BatchDelay ) * time .Millisecond )
29+ }()
30+
31+ // Try with current chunk size
32+ results := w .rpc .GetFullBlocks (chunk )
33+
34+ if len (chunk ) == 1 {
35+ // chunk size 1 is the minimum, so we return whatever we get
36+ resultsCh <- results
37+ return
38+ }
39+
40+ // Check for failed blocks
41+ var failedBlocks []* big.Int
42+ var successfulResults []rpc.GetFullBlockResult
43+
44+ for i , result := range results {
45+ if result .Error != nil {
46+ failedBlocks = append (failedBlocks , chunk [i ])
47+ } else {
48+ successfulResults = append (successfulResults , result )
49+ }
50+ }
51+
52+ // If we have successful results, send them
53+ if len (successfulResults ) > 0 {
54+ resultsCh <- successfulResults
55+ }
56+
57+ // If no blocks failed, we're done
58+ if len (failedBlocks ) == 0 {
59+ return
60+ }
61+
62+ // can't split any further, so try one last time
63+ if len (failedBlocks ) == 1 {
64+ w .processChunkWithRetry (failedBlocks , resultsCh )
65+ return
66+ }
67+
68+ // Split failed blocks in half and retry
69+ mid := len (failedBlocks ) / 2
70+ leftChunk := failedBlocks [:mid ]
71+ rightChunk := failedBlocks [mid :]
72+
73+ log .Debug ().Msgf ("Splitting %d failed blocks into chunks of %d and %d" , len (failedBlocks ), len (leftChunk ), len (rightChunk ))
74+
75+ var wg sync.WaitGroup
76+ wg .Add (2 )
77+
78+ go func () {
79+ defer wg .Done ()
80+ w .processChunkWithRetry (leftChunk , resultsCh )
81+ }()
82+
83+ go func () {
84+ defer wg .Done ()
85+ w .processChunkWithRetry (rightChunk , resultsCh )
86+ }()
87+
88+ wg .Wait ()
89+ }
90+
2591func (w * Worker ) Run (blockNumbers []* big.Int ) []rpc.GetFullBlockResult {
2692 blockCount := len (blockNumbers )
2793 chunks := common .SliceToChunks (blockNumbers , w .rpc .GetBlocksPerRequest ().Blocks )
2894
2995 var wg sync.WaitGroup
30- resultsCh := make (chan []rpc.GetFullBlockResult , len ( chunks ) )
96+ resultsCh := make (chan []rpc.GetFullBlockResult , blockCount )
3197
3298 log .Debug ().Msgf ("Worker Processing %d blocks in %d chunks of max %d blocks" , blockCount , len (chunks ), w .rpc .GetBlocksPerRequest ().Blocks )
99+
33100 for _ , chunk := range chunks {
34101 wg .Add (1 )
35102 go func (chunk []* big.Int ) {
36103 defer wg .Done ()
37- resultsCh <- w .rpc .GetFullBlocks (chunk )
38- if config .Cfg .RPC .Blocks .BatchDelay > 0 {
39- time .Sleep (time .Duration (config .Cfg .RPC .Blocks .BatchDelay ) * time .Millisecond )
40- }
104+ w .processChunkWithRetry (chunk , resultsCh )
41105 }(chunk )
42106 }
107+
43108 go func () {
44109 wg .Wait ()
45110 close (resultsCh )
@@ -50,6 +115,11 @@ func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult {
50115 results = append (results , batchResults ... )
51116 }
52117
118+ // Sort results by block number
119+ sort .Slice (results , func (i , j int ) bool {
120+ return results [i ].BlockNumber .Cmp (results [j ].BlockNumber ) < 0
121+ })
122+
53123 // track the last fetched block number
54124 if len (results ) > 0 {
55125 lastBlockNumberFloat , _ := results [len (results )- 1 ].BlockNumber .Float64 ()
0 commit comments