@@ -165,9 +165,26 @@ func (p *Poller) Start(ctx context.Context) {
165165}
166166
167167func (p * Poller ) Poll (ctx context.Context , blockNumbers []* big.Int ) (lastPolledBlock * big.Int ) {
168+ blockData , failedResults := p .PollWithoutSaving (ctx , blockNumbers )
169+ if blockData != nil || failedResults != nil {
170+ p .StageResults (blockData , failedResults )
171+ }
172+ highestBlockNumber := new (big.Int )
173+ if len (blockData ) > 0 {
174+ highestBlockNumber = blockData [0 ].Block .Number
175+ for _ , block := range blockData {
176+ if block .Block .Number .Cmp (highestBlockNumber ) > 0 {
177+ highestBlockNumber = new (big.Int ).Set (block .Block .Number )
178+ }
179+ }
180+ }
181+ return highestBlockNumber
182+ }
183+
184+ func (p * Poller ) PollWithoutSaving (ctx context.Context , blockNumbers []* big.Int ) ([]common.BlockData , []rpc.GetFullBlockResult ) {
168185 if len (blockNumbers ) < 1 {
169186 log .Debug ().Msg ("No blocks to poll, skipping" )
170- return
187+ return nil , nil
171188 }
172189 endBlock := blockNumbers [len (blockNumbers )- 1 ]
173190 if endBlock != nil {
@@ -180,8 +197,54 @@ func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledB
180197
181198 worker := worker .NewWorker (p .rpc )
182199 results := worker .Run (ctx , blockNumbers )
183- p .handleWorkerResults (results )
184- return endBlock
200+ blockData , failedResults := p .convertPollResultsToBlockData (results )
201+ return blockData , failedResults
202+ }
203+
204+ func (p * Poller ) convertPollResultsToBlockData (results []rpc.GetFullBlockResult ) ([]common.BlockData , []rpc.GetFullBlockResult ) {
205+ var successfulResults []rpc.GetFullBlockResult
206+ var failedResults []rpc.GetFullBlockResult
207+
208+ for _ , result := range results {
209+ if result .Error != nil {
210+ log .Warn ().Err (result .Error ).Msgf ("Error fetching block data for block %s" , result .BlockNumber .String ())
211+ failedResults = append (failedResults , result )
212+ } else {
213+ successfulResults = append (successfulResults , result )
214+ }
215+ }
216+
217+ blockData := make ([]common.BlockData , 0 , len (successfulResults ))
218+ for _ , result := range successfulResults {
219+ blockData = append (blockData , common.BlockData {
220+ Block : result .Data .Block ,
221+ Logs : result .Data .Logs ,
222+ Transactions : result .Data .Transactions ,
223+ Traces : result .Data .Traces ,
224+ })
225+ }
226+ return blockData , failedResults
227+ }
228+
229+ func (p * Poller ) StageResults (blockData []common.BlockData , failedResults []rpc.GetFullBlockResult ) {
230+ startTime := time .Now ()
231+ if err := p .storage .StagingStorage .InsertStagingData (blockData ); err != nil {
232+ e := fmt .Errorf ("error inserting block data: %v" , err )
233+ log .Error ().Err (e )
234+ for _ , result := range blockData {
235+ failedResults = append (failedResults , rpc.GetFullBlockResult {
236+ BlockNumber : result .Block .Number ,
237+ Error : e ,
238+ })
239+ }
240+ metrics .PolledBatchSize .Set (float64 (len (blockData )))
241+ }
242+ log .Debug ().Str ("metric" , "staging_insert_duration" ).Msgf ("StagingStorage.InsertStagingData duration: %f" , time .Since (startTime ).Seconds ())
243+ metrics .StagingInsertDuration .Observe (time .Since (startTime ).Seconds ())
244+
245+ if len (failedResults ) > 0 {
246+ p .handleBlockFailures (failedResults )
247+ }
185248}
186249
187250func (p * Poller ) reachedPollLimit (blockNumber * big.Int ) bool {
@@ -230,49 +293,6 @@ func (p *Poller) createBlockNumbersForRange(startBlock *big.Int, endBlock *big.I
230293 return blockNumbers
231294}
232295
233- func (p * Poller ) handleWorkerResults (results []rpc.GetFullBlockResult ) {
234- var successfulResults []rpc.GetFullBlockResult
235- var failedResults []rpc.GetFullBlockResult
236-
237- for _ , result := range results {
238- if result .Error != nil {
239- log .Warn ().Err (result .Error ).Msgf ("Error fetching block data for block %s" , result .BlockNumber .String ())
240- failedResults = append (failedResults , result )
241- } else {
242- successfulResults = append (successfulResults , result )
243- }
244- }
245-
246- blockData := make ([]common.BlockData , 0 , len (successfulResults ))
247- for _ , result := range successfulResults {
248- blockData = append (blockData , common.BlockData {
249- Block : result .Data .Block ,
250- Logs : result .Data .Logs ,
251- Transactions : result .Data .Transactions ,
252- Traces : result .Data .Traces ,
253- })
254- }
255-
256- startTime := time .Now ()
257- if err := p .storage .StagingStorage .InsertStagingData (blockData ); err != nil {
258- e := fmt .Errorf ("error inserting block data: %v" , err )
259- log .Error ().Err (e )
260- for _ , result := range successfulResults {
261- failedResults = append (failedResults , rpc.GetFullBlockResult {
262- BlockNumber : result .BlockNumber ,
263- Error : e ,
264- })
265- }
266- metrics .PolledBatchSize .Set (float64 (len (blockData )))
267- }
268- log .Debug ().Str ("metric" , "staging_insert_duration" ).Msgf ("StagingStorage.InsertStagingData duration: %f" , time .Since (startTime ).Seconds ())
269- metrics .StagingInsertDuration .Observe (time .Since (startTime ).Seconds ())
270-
271- if len (failedResults ) > 0 {
272- p .handleBlockFailures (failedResults )
273- }
274- }
275-
276296func (p * Poller ) handleBlockFailures (results []rpc.GetFullBlockResult ) {
277297 var blockFailures []common.BlockFailure
278298 for _ , result := range results {
0 commit comments