@@ -165,9 +165,27 @@ func (p *Poller) Start(ctx context.Context) {
165165}
166166
167167func (p * Poller ) Poll (ctx context.Context , blockNumbers []* big.Int ) (lastPolledBlock * big.Int ) {
168+ blockData , failedResults := p .PollWithoutSaving (ctx , blockNumbers )
169+ if len (blockData ) > 0 || len (failedResults ) > 0 {
170+ p .StageResults (blockData , failedResults )
171+ }
172+
173+ var highestBlockNumber * big.Int
174+ if len (blockData ) > 0 {
175+ highestBlockNumber = blockData [0 ].Block .Number
176+ for _ , block := range blockData {
177+ if block .Block .Number .Cmp (highestBlockNumber ) > 0 {
178+ highestBlockNumber = new (big.Int ).Set (block .Block .Number )
179+ }
180+ }
181+ }
182+ return highestBlockNumber
183+ }
184+
185+ func (p * Poller ) PollWithoutSaving (ctx context.Context , blockNumbers []* big.Int ) ([]common.BlockData , []rpc.GetFullBlockResult ) {
168186 if len (blockNumbers ) < 1 {
169187 log .Debug ().Msg ("No blocks to poll, skipping" )
170- return
188+ return nil , nil
171189 }
172190 endBlock := blockNumbers [len (blockNumbers )- 1 ]
173191 if endBlock != nil {
@@ -180,8 +198,60 @@ func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledB
180198
181199 worker := worker .NewWorker (p .rpc )
182200 results := worker .Run (ctx , blockNumbers )
183- p .handleWorkerResults (results )
184- return endBlock
201+ blockData , failedResults := p .convertPollResultsToBlockData (results )
202+ return blockData , failedResults
203+ }
204+
205+ func (p * Poller ) convertPollResultsToBlockData (results []rpc.GetFullBlockResult ) ([]common.BlockData , []rpc.GetFullBlockResult ) {
206+ var successfulResults []rpc.GetFullBlockResult
207+ var failedResults []rpc.GetFullBlockResult
208+
209+ for _ , result := range results {
210+ if result .Error != nil {
211+ bn := "<unknown>"
212+ if result .BlockNumber != nil {
213+ bn = result .BlockNumber .String ()
214+ }
215+ log .Warn ().Err (result .Error ).Msgf ("Error fetching block data for block %s" , bn )
216+ failedResults = append (failedResults , result )
217+ } else {
218+ successfulResults = append (successfulResults , result )
219+ }
220+ }
221+
222+ blockData := make ([]common.BlockData , 0 , len (successfulResults ))
223+ for _ , result := range successfulResults {
224+ blockData = append (blockData , common.BlockData {
225+ Block : result .Data .Block ,
226+ Logs : result .Data .Logs ,
227+ Transactions : result .Data .Transactions ,
228+ Traces : result .Data .Traces ,
229+ })
230+ }
231+ return blockData , failedResults
232+ }
233+
234+ func (p * Poller ) StageResults (blockData []common.BlockData , failedResults []rpc.GetFullBlockResult ) {
235+ startTime := time .Now ()
236+ metrics .PolledBatchSize .Set (float64 (len (blockData )))
237+ if len (blockData ) > 0 {
238+ if err := p .storage .StagingStorage .InsertStagingData (blockData ); err != nil {
239+ e := fmt .Errorf ("error inserting block data: %v" , err )
240+ log .Error ().Err (e )
241+ for _ , result := range blockData {
242+ failedResults = append (failedResults , rpc.GetFullBlockResult {
243+ BlockNumber : result .Block .Number ,
244+ Error : e ,
245+ })
246+ }
247+ }
248+ }
249+ log .Debug ().Str ("metric" , "staging_insert_duration" ).Msgf ("StagingStorage.InsertStagingData duration: %f" , time .Since (startTime ).Seconds ())
250+ metrics .StagingInsertDuration .Observe (time .Since (startTime ).Seconds ())
251+
252+ if len (failedResults ) > 0 {
253+ p .handleBlockFailures (failedResults )
254+ }
185255}
186256
187257func (p * Poller ) reachedPollLimit (blockNumber * big.Int ) bool {
@@ -230,49 +300,6 @@ func (p *Poller) createBlockNumbersForRange(startBlock *big.Int, endBlock *big.I
230300 return blockNumbers
231301}
232302
233- func (p * Poller ) handleWorkerResults (results []rpc.GetFullBlockResult ) {
234- var successfulResults []rpc.GetFullBlockResult
235- var failedResults []rpc.GetFullBlockResult
236-
237- for _ , result := range results {
238- if result .Error != nil {
239- log .Warn ().Err (result .Error ).Msgf ("Error fetching block data for block %s" , result .BlockNumber .String ())
240- failedResults = append (failedResults , result )
241- } else {
242- successfulResults = append (successfulResults , result )
243- }
244- }
245-
246- blockData := make ([]common.BlockData , 0 , len (successfulResults ))
247- for _ , result := range successfulResults {
248- blockData = append (blockData , common.BlockData {
249- Block : result .Data .Block ,
250- Logs : result .Data .Logs ,
251- Transactions : result .Data .Transactions ,
252- Traces : result .Data .Traces ,
253- })
254- }
255-
256- startTime := time .Now ()
257- if err := p .storage .StagingStorage .InsertStagingData (blockData ); err != nil {
258- e := fmt .Errorf ("error inserting block data: %v" , err )
259- log .Error ().Err (e )
260- for _ , result := range successfulResults {
261- failedResults = append (failedResults , rpc.GetFullBlockResult {
262- BlockNumber : result .BlockNumber ,
263- Error : e ,
264- })
265- }
266- metrics .PolledBatchSize .Set (float64 (len (blockData )))
267- }
268- log .Debug ().Str ("metric" , "staging_insert_duration" ).Msgf ("StagingStorage.InsertStagingData duration: %f" , time .Since (startTime ).Seconds ())
269- metrics .StagingInsertDuration .Observe (time .Since (startTime ).Seconds ())
270-
271- if len (failedResults ) > 0 {
272- p .handleBlockFailures (failedResults )
273- }
274- }
275-
276303func (p * Poller ) handleBlockFailures (results []rpc.GetFullBlockResult ) {
277304 var blockFailures []common.BlockFailure
278305 for _ , result := range results {
0 commit comments