diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index b660a8c..abb0171 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -165,9 +165,27 @@ func (p *Poller) Start(ctx context.Context) { } func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledBlock *big.Int) { + blockData, failedResults := p.PollWithoutSaving(ctx, blockNumbers) + if len(blockData) > 0 || len(failedResults) > 0 { + p.StageResults(blockData, failedResults) + } + + var highestBlockNumber *big.Int + if len(blockData) > 0 { + highestBlockNumber = blockData[0].Block.Number + for _, block := range blockData { + if block.Block.Number.Cmp(highestBlockNumber) > 0 { + highestBlockNumber = new(big.Int).Set(block.Block.Number) + } + } + } + return highestBlockNumber +} + +func (p *Poller) PollWithoutSaving(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, []rpc.GetFullBlockResult) { if len(blockNumbers) < 1 { log.Debug().Msg("No blocks to poll, skipping") - return + return nil, nil } endBlock := blockNumbers[len(blockNumbers)-1] if endBlock != nil { @@ -180,8 +198,60 @@ func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledB worker := worker.NewWorker(p.rpc) results := worker.Run(ctx, blockNumbers) - p.handleWorkerResults(results) - return endBlock + blockData, failedResults := p.convertPollResultsToBlockData(results) + return blockData, failedResults +} + +func (p *Poller) convertPollResultsToBlockData(results []rpc.GetFullBlockResult) ([]common.BlockData, []rpc.GetFullBlockResult) { + var successfulResults []rpc.GetFullBlockResult + var failedResults []rpc.GetFullBlockResult + + for _, result := range results { + if result.Error != nil { + bn := "" + if result.BlockNumber != nil { + bn = result.BlockNumber.String() + } + log.Warn().Err(result.Error).Msgf("Error fetching block data for block %s", bn) + failedResults = append(failedResults, result) + } else { + successfulResults = append(successfulResults, result) + } + } + + blockData := make([]common.BlockData, 0, len(successfulResults)) + for _, result := range successfulResults { + blockData = append(blockData, common.BlockData{ + Block: result.Data.Block, + Logs: result.Data.Logs, + Transactions: result.Data.Transactions, + Traces: result.Data.Traces, + }) + } + return blockData, failedResults +} + +func (p *Poller) StageResults(blockData []common.BlockData, failedResults []rpc.GetFullBlockResult) { + startTime := time.Now() + metrics.PolledBatchSize.Set(float64(len(blockData))) + if len(blockData) > 0 { + if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil { + e := fmt.Errorf("error inserting block data: %v", err) + log.Error().Err(e) + for _, result := range blockData { + failedResults = append(failedResults, rpc.GetFullBlockResult{ + BlockNumber: result.Block.Number, + Error: e, + }) + } + } + } + log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds()) + metrics.StagingInsertDuration.Observe(time.Since(startTime).Seconds()) + + if len(failedResults) > 0 { + p.handleBlockFailures(failedResults) + } } func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool { @@ -230,49 +300,6 @@ func (p *Poller) createBlockNumbersForRange(startBlock *big.Int, endBlock *big.I return blockNumbers } -func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) { - var successfulResults []rpc.GetFullBlockResult - var failedResults []rpc.GetFullBlockResult - - for _, result := range results { - if result.Error != nil { - log.Warn().Err(result.Error).Msgf("Error fetching block data for block %s", result.BlockNumber.String()) - failedResults = append(failedResults, result) - } else { - successfulResults = append(successfulResults, result) - } - } - - blockData := make([]common.BlockData, 0, len(successfulResults)) - for _, result := range successfulResults { - blockData = append(blockData, common.BlockData{ - Block: result.Data.Block, - Logs: result.Data.Logs, - Transactions: result.Data.Transactions, - Traces: result.Data.Traces, - }) - } - - startTime := time.Now() - if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil { - e := fmt.Errorf("error inserting block data: %v", err) - log.Error().Err(e) - for _, result := range successfulResults { - failedResults = append(failedResults, rpc.GetFullBlockResult{ - BlockNumber: result.BlockNumber, - Error: e, - }) - } - metrics.PolledBatchSize.Set(float64(len(blockData))) - } - log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds()) - metrics.StagingInsertDuration.Observe(time.Since(startTime).Seconds()) - - if len(failedResults) > 0 { - p.handleBlockFailures(failedResults) - } -} - func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) { var blockFailures []common.BlockFailure for _, result := range results {