diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index efc0a92..4a08854 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -105,3 +105,42 @@ var ( Help: "The last block number that was published", }) ) + +// Operation Duration Metrics +var ( + StagingInsertDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "staging_insert_duration_seconds", + Help: "Time taken to insert data into staging storage", + Buckets: prometheus.DefBuckets, + }) + + MainStorageInsertDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "main_storage_insert_duration_seconds", + Help: "Time taken to insert data into main storage", + Buckets: prometheus.DefBuckets, + }) + + PublishDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "publish_duration_seconds", + Help: "Time taken to publish block data to Kafka", + Buckets: prometheus.DefBuckets, + }) + + StagingDeleteDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "staging_delete_duration_seconds", + Help: "Time taken to delete data from staging storage", + Buckets: prometheus.DefBuckets, + }) + + GetBlockNumbersToCommitDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "get_block_numbers_to_commit_duration_seconds", + Help: "Time taken to get block numbers to commit from storage", + Buckets: prometheus.DefBuckets, + }) + + GetStagingDataDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "get_staging_data_duration_seconds", + Help: "Time taken to get data from staging storage", + Buckets: prometheus.DefBuckets, + }) +) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 4ef4206..c24caed 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -80,6 +80,12 @@ func (c *Committer) Start(ctx context.Context) { } func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) { + startTime := time.Now() + defer func() { + log.Debug().Str("metric", "get_block_numbers_to_commit_duration").Msgf("getBlockNumbersToCommit duration: %f", time.Since(startTime).Seconds()) + metrics.GetBlockNumbersToCommitDuration.Observe(time.Since(startTime).Seconds()) + }() + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) if err != nil { @@ -117,7 +123,11 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo return nil, nil } + startTime := time.Now() blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.GetChainID()}) + log.Debug().Str("metric", "get_staging_data_duration").Msgf("StagingStorage.GetStagingData duration: %f", time.Since(startTime).Seconds()) + metrics.GetStagingDataDuration.Observe(time.Since(startTime).Seconds()) + if err != nil { return nil, fmt.Errorf("error fetching blocks to commit: %v", err) } @@ -168,20 +178,29 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er } log.Debug().Msgf("Committing %d blocks", len(blockNumbers)) + mainStorageStart := time.Now() if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil { log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) return fmt.Errorf("error saving data to main storage: %v", err) } + log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds()) + metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds()) + publishStart := time.Now() go func() { if err := c.publisher.PublishBlockData(blockData); err != nil { log.Error().Err(err).Msg("Failed to publish block data to kafka") } + log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds()) + metrics.PublishDuration.Observe(time.Since(publishStart).Seconds()) }() + stagingDeleteStart := time.Now() if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil { return fmt.Errorf("error deleting data from staging storage: %v", err) } + log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds()) + metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds()) // Find highest block number from committed blocks highestBlock := blockData[0].Block diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index a86f1c7..325de7c 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -234,6 +234,8 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) { Traces: result.Data.Traces, }) } + + startTime := time.Now() if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil { e := fmt.Errorf("error inserting block data: %v", err) log.Error().Err(e) @@ -245,6 +247,8 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) { } metrics.PolledBatchSize.Set(float64(len(blockData))) } + log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds()) + metrics.StagingInsertDuration.Observe(time.Since(startTime).Seconds()) if len(failedResults) > 0 { p.handleBlockFailures(failedResults)