Skip to content

Commit b65bcb5

Browse files
committed
metrics fixes
1 parent 1dfc50a commit b65bcb5

File tree

8 files changed

+15
-177
lines changed

8 files changed

+15
-177
lines changed

internal/backfill/backfill.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ func RunBackfill() {
3333
chainIdStr := libs.ChainIdStr
3434

3535
// Set static metrics
36-
metrics.BackfillIndexerName.WithLabelValues(indexerName, chainIdStr, indexerName).Set(1)
37-
metrics.BackfillChainId.WithLabelValues(indexerName, chainIdStr).Set(float64(libs.ChainId.Uint64()))
3836
metrics.BackfillStartBlock.WithLabelValues(indexerName, chainIdStr).Set(float64(startBlockNumber))
3937
metrics.BackfillEndBlock.WithLabelValues(indexerName, chainIdStr).Set(float64(endBlockNumber))
4038

@@ -100,7 +98,6 @@ func channelValidBlockData(startBlockNumber uint64, endBlockNumber uint64) {
10098
metrics.BackfillComputedBatchSize.WithLabelValues(indexerName, chainIdStr).Set(float64(batchSize))
10199
metrics.BackfillCurrentStartBlock.WithLabelValues(indexerName, chainIdStr).Set(float64(startBlock))
102100
metrics.BackfillCurrentEndBlock.WithLabelValues(indexerName, chainIdStr).Set(float64(endBlock))
103-
metrics.BackfillBlockdataChannelLength.WithLabelValues(indexerName, chainIdStr).Set(float64(len(blockdataChannel)))
104101

105102
log.Debug().
106103
Any("start_block", startBlock).
@@ -118,6 +115,8 @@ func channelValidBlockData(startBlockNumber uint64, endBlockNumber uint64) {
118115
Uint64("batch_size", batchSize).
119116
Msg("Blockdata length does not match expected length")
120117
}
118+
119+
metrics.BackfillBlockdataChannelLength.WithLabelValues(indexerName, chainIdStr).Set(float64(len(blockdataChannel)))
121120
blockdataChannel <- blockdata
122121
bn = endBlock + 1
123122
}

internal/backfill/parquetwriter.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func SaveToParquet(blockData []*common.BlockData, avgMemoryPerBlockChannel chan
7575
// Track bytes written metric
7676
chainIdStr := libs.ChainIdStr
7777
indexerName := config.Cfg.ZeetProjectName
78-
metrics.BackfillParquetBytesWritten.WithLabelValues(indexerName, chainIdStr).Add(float64(bytesWritten))
78+
metrics.BackfillParquetBytesWritten.WithLabelValues(indexerName, chainIdStr).Set(float64(parquetTempBufferBytes))
7979
// update last tracked block number after writing to parquet
8080
lastTrackedBlockNumber = lastTrackedBn
8181

@@ -207,12 +207,8 @@ func FlushParquet() error {
207207
chainIdStr := libs.ChainIdStr
208208
indexerName := config.Cfg.ZeetProjectName
209209
// Convert string block numbers to float64 for metrics
210-
startBlockFloat, _ := strconv.ParseFloat(parquetStartBlockNumber, 64)
211210
endBlockFloat, _ := strconv.ParseFloat(parquetEndBlockNumber, 64)
212-
metrics.BackfillFlushStartBlock.WithLabelValues(indexerName, chainIdStr).Set(startBlockFloat)
213211
metrics.BackfillFlushEndBlock.WithLabelValues(indexerName, chainIdStr).Set(endBlockFloat)
214-
metrics.BackfillFlushBlockTimestamp.WithLabelValues(indexerName, chainIdStr).Set(float64(parquetBlockTimestamp.Unix()))
215-
metrics.BackfillFlushCurrentTime.WithLabelValues(indexerName, chainIdStr).Set(float64(time.Now().Unix()))
216212

217213
// upload the parquet file to s3 (checksum is calculated inside UploadParquetToS3)
218214
if err := libs.UploadParquetToS3(

internal/committer/blockparserroutine.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ func channelParseBlocksFromFile() error {
104104
log.Panic().Err(err).Msg("Failed to acquire memory permit")
105105
}
106106

107+
metrics.CommitterBlockDataChannelLength.WithLabelValues(indexerName, chainIdStr).Set(float64(len(blockDataChannel)))
108+
metrics.CommitterMemoryPermitBytes.WithLabelValues(indexerName, chainIdStr).Set(float64(memorySemaphore.held))
109+
107110
blockDataChannel <- &BlockDataWithSize{
108111
BlockData: &blockData,
109112
ByteSize: byteSize,
@@ -125,12 +128,6 @@ func channelParseBlocksFromFile() error {
125128
}
126129
}
127130

128-
// Calculate and record average parsing time per row
129-
if parsedRowCount > 0 {
130-
avgParseTimePerRow := totalParseTime / time.Duration(parsedRowCount)
131-
metrics.CommitterBlockDataParseDuration.WithLabelValues(indexerName, chainIdStr).Observe(avgParseTimePerRow.Seconds())
132-
}
133-
134131
log.Debug().
135132
Str("file", filePath).
136133
Int("parsed_rows", parsedRowCount).
@@ -149,9 +146,6 @@ func channelParseBlocksFromFile() error {
149146
} else {
150147
log.Debug().Str("file", filePath).Msg("Cleaned up local file")
151148
}
152-
153-
// Update committer metrics
154-
updateCommitterMetrics()
155149
}
156150

157151
return nil

internal/committer/blockprocessorroutine.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package committer
22

33
import (
4-
"time"
5-
64
"github.com/rs/zerolog/log"
75
config "github.com/thirdweb-dev/indexer/configs"
86
"github.com/thirdweb-dev/indexer/internal/common"
@@ -40,8 +38,6 @@ func processBlocks() {
4038
totalBytesInBatch += block.ByteSize
4139
}
4240
if len(blockBatch) == 500 {
43-
// Track Kafka publish timing
44-
start := time.Now()
4541
if err := libs.KafkaPublisherV2.PublishBlockData(blockBatch); err != nil {
4642
log.Panic().
4743
Err(err).
@@ -50,10 +46,7 @@ func processBlocks() {
5046
Uint64("end_block", blockBatch[len(blockBatch)-1].Block.Number.Uint64()).
5147
Msg("Failed to publish batch to Kafka")
5248
}
53-
publishDuration := time.Since(start)
5449

55-
// Update metrics
56-
metrics.CommitterKafkaPublishDuration.WithLabelValues(indexerName, chainIdStr).Observe(publishDuration.Seconds())
5750
metrics.CommitterLastPublishedBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(blockBatch[len(blockBatch)-1].Block.Number.Uint64()))
5851

5952
log.Debug().
@@ -67,9 +60,7 @@ func processBlocks() {
6760
}
6861

6962
nextBlockNumber++
70-
71-
// Update committer metrics
72-
updateCommitterMetrics()
63+
metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber))
7364
}
7465

7566
// Publish any remaining blocks in the batch
@@ -81,8 +72,6 @@ func processBlocks() {
8172
Uint64("memory_released_bytes", totalBytesInBatch).
8273
Msg("Publishing final batch to Kafka")
8374

84-
// Track Kafka publish timing for final batch
85-
start := time.Now()
8675
if err := libs.KafkaPublisherV2.PublishBlockData(blockBatch); err != nil {
8776
log.Panic().
8877
Err(err).
@@ -91,10 +80,8 @@ func processBlocks() {
9180
Uint64("end_block", blockBatch[len(blockBatch)-1].Block.Number.Uint64()).
9281
Msg("Failed to publish final batch to Kafka")
9382
}
94-
publishDuration := time.Since(start)
9583

9684
// Update metrics
97-
metrics.CommitterKafkaPublishDuration.WithLabelValues(indexerName, chainIdStr).Observe(publishDuration.Seconds())
9885
metrics.CommitterLastPublishedBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(blockBatch[len(blockBatch)-1].Block.Number.Uint64()))
9986

10087
log.Debug().

internal/committer/committer.go

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"os"
77
"path/filepath"
8-
"time"
98

109
"github.com/rs/zerolog/log"
1110
config "github.com/thirdweb-dev/indexer/configs"
@@ -47,14 +46,6 @@ func Init() {
4746
// streaming channels
4847
blockDataChannel = make(chan *BlockDataWithSize)
4948
downloadedFilePathChannel = make(chan string, config.Cfg.StagingS3MaxParallelFileDownload)
50-
51-
// Initialize committer metrics
52-
chainIdStr := libs.ChainIdStr
53-
indexerName := config.Cfg.ZeetProjectName
54-
55-
// Set static metrics
56-
metrics.CommitterIndexerName.WithLabelValues(indexerName, chainIdStr, indexerName).Set(1)
57-
metrics.CommitterChainId.WithLabelValues(indexerName, chainIdStr).Set(float64(libs.ChainId.Uint64()))
5849
}
5950

6051
func CommitStreaming() error {
@@ -124,10 +115,6 @@ func getLastTrackedBlockNumberAndBlockRangesFromS3() (int64, []types.BlockRange,
124115
}
125116

126117
func downloadFilesForBlockRange(blockRanges []types.BlockRange) {
127-
// Initialize metrics labels
128-
chainIdStr := libs.ChainIdStr
129-
indexerName := config.Cfg.ZeetProjectName
130-
131118
for i, blockRange := range blockRanges {
132119
log.Info().
133120
Int("processing", i+1).
@@ -137,22 +124,13 @@ func downloadFilesForBlockRange(blockRanges []types.BlockRange) {
137124
Uint64("end_block", blockRange.EndBlock).
138125
Msg("Starting download")
139126

140-
// Track S3 download timing
141-
start := time.Now()
142127
filePath, err := libs.DownloadFile(tempDir, &blockRange)
143-
downloadDuration := time.Since(start)
144-
145-
// Update S3 download timing metric
146-
metrics.CommitterS3DownloadDuration.WithLabelValues(indexerName, chainIdStr).Observe(downloadDuration.Seconds())
147128

148129
if err != nil {
149130
log.Panic().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file")
150131
}
151132

152133
downloadedFilePathChannel <- filePath
153-
154-
// Update committer metrics
155-
updateCommitterMetrics()
156134
}
157135
log.Info().Msg("All downloads completed, closing download channel")
158136
}
@@ -172,19 +150,3 @@ func acquireMemoryPermit(size uint64) (bool, error) {
172150
func releaseMemoryPermit(size uint64) {
173151
memorySemaphore.Release(int64(size))
174152
}
175-
176-
// Helper function to update committer metrics
177-
func updateCommitterMetrics() {
178-
chainIdStr := libs.ChainIdStr
179-
indexerName := config.Cfg.ZeetProjectName
180-
181-
// Update channel lengths
182-
metrics.CommitterDownloadedFilePathChannelLength.WithLabelValues(indexerName, chainIdStr).Set(float64(len(downloadedFilePathChannel)))
183-
metrics.CommitterBlockDataChannelLength.WithLabelValues(indexerName, chainIdStr).Set(float64(len(blockDataChannel)))
184-
185-
// Update memory permit bytes (current held memory)
186-
metrics.CommitterMemoryPermitBytes.WithLabelValues(indexerName, chainIdStr).Set(float64(memorySemaphore.held))
187-
188-
// Update next block number
189-
metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber))
190-
}

internal/committer/poollatest.go

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,10 @@ func pollLatest() error {
2020
indexerName := config.Cfg.ZeetProjectName
2121

2222
for {
23-
// Track RPC download timing
24-
start := time.Now()
2523
latestBlock, err := libs.RpcClient.GetLatestBlockNumber(context.Background())
26-
rpcDuration := time.Since(start)
2724

2825
// Update latest block number metric
2926
metrics.CommitterLatestBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(latestBlock.Uint64()))
30-
metrics.CommitterRPCDownloadDuration.WithLabelValues(indexerName, chainIdStr).Observe(rpcDuration.Seconds())
3127

3228
if err != nil {
3329
log.Warn().Err(err).Msg("Failed to get latest block number, retrying...")
@@ -39,13 +35,8 @@ func pollLatest() error {
3935
continue
4036
}
4137

42-
// Track RPC block data fetch timing
43-
start = time.Now()
4438
// will panic if any block is invalid
4539
blockDataArray := libblockdata.GetValidBlockDataInBatch(latestBlock.Uint64(), nextBlockNumber)
46-
rpcDataDuration := time.Since(start)
47-
48-
metrics.CommitterRPCDownloadDuration.WithLabelValues(indexerName, chainIdStr).Observe(rpcDataDuration.Seconds())
4940

5041
// Validate that all blocks are sequential and nothing is missing
5142
expectedBlockNumber := nextBlockNumber
@@ -74,18 +65,13 @@ func pollLatest() error {
7465
blockDataPointers[i] = &block
7566
}
7667

77-
// Track Kafka publish timing
78-
start = time.Now()
7968
if err := libs.KafkaPublisherV2.PublishBlockData(blockDataPointers); err != nil {
8069
log.Panic().
8170
Err(err).
8271
Int("blocks_count", len(blockDataArray)).
8372
Msg("Failed to publish blocks to Kafka")
8473
}
85-
publishDuration := time.Since(start)
8674

87-
// Update metrics
88-
metrics.CommitterKafkaPublishDuration.WithLabelValues(indexerName, chainIdStr).Observe(publishDuration.Seconds())
8975
metrics.CommitterLastPublishedBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(expectedBlockNumber - 1))
9076

9177
log.Debug().
@@ -95,8 +81,6 @@ func pollLatest() error {
9581

9682
// Update nextCommitBlockNumber for next iteration
9783
nextBlockNumber = expectedBlockNumber
98-
99-
// Update committer metrics
100-
updateCommitterMetrics()
84+
metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber))
10185
}
10286
}

internal/libs/libblockdata/getblockdata.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,6 @@ func getValidBlockDataFromClickhouseV1(startBlockNumber uint64, endBlockNumber u
157157
log.Panic().Err(err).Msg("Failed to get block data from ClickHouseV1")
158158
}
159159

160-
// Track ClickHouse rows fetched
161-
chainIdStr := libs.ChainIdStr
162-
indexerName := config.Cfg.ZeetProjectName
163-
metrics.BackfillClickHouseRowsFetched.WithLabelValues(indexerName, chainIdStr).Add(float64(len(blockData)))
164-
165160
for i, block := range blockData {
166161
if isValid, _ := Validate(block); !isValid {
167162
blockData[i] = nil
@@ -225,8 +220,7 @@ func getValidBlockDataFromRpcBatch(blockNumbers []uint64) []*common.BlockData {
225220
// Initial fetch
226221
rpcResults = libs.RpcClient.GetFullBlocks(context.Background(), blockNumbersToBigInt(blockNumbers))
227222

228-
// Track initial RPC rows fetched
229-
metrics.BackfillRPCRowsFetched.WithLabelValues(indexerName, chainIdStr).Add(float64(len(rpcResults)))
223+
metrics.CommitterRPCRowsToFetch.WithLabelValues(indexerName, chainIdStr).Set(float64(len(blockNumbers)))
230224

231225
// Create array of failed block numbers for retry
232226
failedBlockNumbers := make([]uint64, 0)
@@ -243,8 +237,7 @@ func getValidBlockDataFromRpcBatch(blockNumbers []uint64) []*common.BlockData {
243237
}
244238

245239
// Track retry metric
246-
metrics.BackfillRPCRetries.WithLabelValues(indexerName, chainIdStr).Add(float64(len(failedBlockNumbers)))
247-
metrics.CommitterRPCRetries.WithLabelValues(indexerName, chainIdStr).Add(float64(len(failedBlockNumbers)))
240+
metrics.CommitterRPCRetries.WithLabelValues(indexerName, chainIdStr).Set(float64(len(failedBlockNumbers)))
248241

249242
log.Warn().
250243
Int("retry", retry+1).
@@ -254,9 +247,6 @@ func getValidBlockDataFromRpcBatch(blockNumbers []uint64) []*common.BlockData {
254247
// Retry only the failed blocks
255248
retryResults := libs.RpcClient.GetFullBlocks(context.Background(), blockNumbersToBigInt(failedBlockNumbers))
256249

257-
// Track retry RPC rows fetched
258-
metrics.BackfillRPCRowsFetched.WithLabelValues(indexerName, chainIdStr).Add(float64(len(retryResults)))
259-
260250
// Update rpcResults with successful ones and create new failed array
261251
newFailedBlockNumbers := make([]uint64, 0)
262252
retryIndex := 0

0 commit comments

Comments
 (0)