-
Notifications
You must be signed in to change notification settings - Fork 28
insight indexer v2 #297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
insight indexer v2 #297
Conversation
* fix large block transaction for 296 * minor change
removed unused code
|
Caution Review failedThe pull request is closed. WalkthroughRemoves the orchestrator, HTTP API, multiple storage backends, and many CLI commands; introduces a new environment-driven config, a streaming committer (Kafka-based) and a backfill subsystem with Parquet/S3 support, plus related libs and metrics refactorings. Many tests and ClickHouse DDLs were deleted. Changes
Sequence Diagram(s)sequenceDiagram
participant CFG as Config loader
participant RPC as RPC client
participant CH as ClickHouse V2
participant S3 as S3
participant Commit as Committer
participant Parser as Parquet Parser
participant Proc as Block Processor
participant Kafka as Kafka V2
Note over CFG,Commit: Initialization
CFG->>Commit: Init() (env + viper)
Commit->>RPC: InitRPCClient()
Commit->>CH: InitNewClickHouseV2()
Commit->>Kafka: InitKafkaV2()
Commit->>S3: InitS3()
Note over Commit: Backfill / Historical flow
Commit->>S3: GetBackfillBoundaries()
alt historical ranges exist
Commit->>S3: Download parquet files
S3-->>Commit: file paths
Commit->>Parser: spawn blockParserRoutine()
Parser-->>Commit: stream BlockDataWithSize via channel
Commit->>Proc: spawn blockProcessorRoutine()
Proc->>Kafka: PublishBatch()
end
Note over Commit,RPC: Live streaming flow
loop pollLatest
Commit->>RPC: GetLatestBlockNumber()
RPC-->>Commit: latest
Commit->>RPC: GetValidBlockDataInBatch(next -> latest)
RPC-->>Commit: block data array
Commit->>Kafka: PublishBlockData(batch)
end
Note over Commit: Reorg detection
Commit->>Commit: RunReorgValidator() (periodic)
Commit->>RPC: fetch headers for range
alt mismatch detected
Commit->>RPC: fetch replacement blocks
Commit->>Kafka: PublishReorg(old,new)
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120+ minutes
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro Disabled knowledge base sources:
📒 Files selected for processing (3)
Comment |
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 37
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
go.mod (2)
3-3: Use minor version in go directivePrefer
go 1.23instead ofgo 1.23.0to follow go.mod conventions and tooling expectations.-go 1.23.0 +go 1.23
1-1: Fix module path in go.mod to match repositoryThe module path
github.com/thirdweb-dev/indexerdoes not match the repositorythirdweb-dev/insight. This will cause import path confusion and go get failures. Update line 1 of go.mod tomodule github.com/thirdweb-dev/insight.internal/storage/kafka_publisher.go (1)
130-146: PublishReorg: check inputs and guard newHead underflow.newHead-1 underflows if newHead==0; also check slices for length.
func (p *KafkaPublisher) PublishReorg(oldData []*common.BlockData, newData []*common.BlockData) error { - chainId := newData[0].Block.ChainId.Uint64() - newHead := uint64(newData[0].Block.Number.Uint64()) + if len(newData) == 0 { + return nil + } + chainId := newData[0].Block.ChainId.Uint64() + newHead := newData[0].Block.Number.Uint64() + if newHead == 0 { + return fmt.Errorf("invalid new head 0") + }
🧹 Nitpick comments (28)
.gitignore (1)
36-36: Ignore rule looks good; consider exceptions if you plan to commit samplesIf you intend to keep sample/data schemas, add a negation (for example: docs/samples) to allow checked‑in examples.
internal/committer/parquet.go (1)
39-43: Add integrity checks against Parquet header fieldsValidate unmarshalled block fields match row metadata to catch corrupt rows early.
var block common.Block if err := json.Unmarshal(pd.Block, &block); err != nil { return 0, common.BlockData{}, fmt.Errorf("failed to unmarshal block: %w", err) } + // Optional integrity checks + if block.Number != nil && block.Number.Uint64() != pd.BlockNumber { + return 0, common.BlockData{}, fmt.Errorf("block number mismatch: row=%d json=%d", pd.BlockNumber, block.Number.Uint64()) + } + if block.Hash != "" && pd.BlockHash != "" && block.Hash != pd.BlockHash { + return 0, common.BlockData{}, fmt.Errorf("block hash mismatch: row=%s json=%s", pd.BlockHash, block.Hash) + }internal/committer/blockparserroutine.go (2)
107-107: Channel length gauge is always 0 with an unbuffered channelblockDataChannel is created unbuffered (see internal/committer/committer.go: Init), so len(blockDataChannel) is always 0. Either buffer the channel to reflect queue depth or drop this gauge and rely on CommitterMemoryPermitBytes.
65-66: Micro‑alloc in tight looprow := make([]parquet.Row, 1) allocates every iteration. Hoist it outside the loop and reuse to reduce GC churn.
internal/libs/kafka.go (1)
9-22: Guard against double initializationMake InitKafkaV2 idempotent; avoids races/repeated client creation.
Apply:
package libs @@ -import ( +import ( + "sync" "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/storage" ) var KafkaPublisherV2 *storage.KafkaPublisher +var kafkaOnce sync.Once func InitKafkaV2() { - var err error - KafkaPublisherV2, err = storage.NewKafkaPublisher(&config.KafkaConfig{ - Brokers: config.Cfg.CommitterKafkaBrokers, - Username: config.Cfg.CommitterKafkaUsername, - Password: config.Cfg.CommitterKafkaPassword, - EnableTLS: config.Cfg.CommitterKafkaEnableTLS, - }) - if err != nil { - log.Fatal().Err(err).Msg("Failed to initialize Kafka publisher") - } + kafkaOnce.Do(func() { + var err error + KafkaPublisherV2, err = storage.NewKafkaPublisher(&config.KafkaConfig{ + Brokers: config.Cfg.CommitterKafkaBrokers, + Username: config.Cfg.CommitterKafkaUsername, + Password: config.Cfg.CommitterKafkaPassword, + EnableTLS: config.Cfg.CommitterKafkaEnableTLS, + }) + if err != nil { + log.Fatal().Err(err).Msg("Failed to initialize Kafka publisher") + } + }) }cmd/committer.go (2)
13-18: Propagate errors from the command (use RunE instead of Run)Switch to RunE so upstream can handle failures (Kubernetes, systemd, etc.).
Apply:
var committerCmd = &cobra.Command{ Use: "committer", Short: "run committer", Long: "published data from s3 to kafka. if block is not found in s3, it will panic", - Run: RunCommitter, + RunE: RunCommitter, }
20-37: Harden metrics server and return errors
- Use a dedicated mux with timeouts to avoid global state and slowloris.
- Return CommitStreaming error to caller.
Apply:
-func RunCommitter(cmd *cobra.Command, args []string) { - fmt.Println("running committer") +func RunCommitter(cmd *cobra.Command, args []string) error { + log.Info().Msg("running committer") // Start Prometheus metrics server log.Info().Msg("Starting Metrics Server on port 2112") go func() { - http.Handle("/metrics", promhttp.Handler()) - if err := http.ListenAndServe(":2112", nil); err != nil { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + srv := &http.Server{ + Addr: ":2112", + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + IdleTimeout: 60 * time.Second, + } + if err := srv.ListenAndServe(); err != nil { log.Error().Err(err).Msg("Metrics server error") } }() committer.Init() committer.InitReorg() go committer.RunReorgValidator() - committer.CommitStreaming() -} + if err := committer.CommitStreaming(); err != nil { + log.Error().Err(err).Msg("CommitStreaming exited with error") + return err + } + return nil +}internal/libs/rpcclient.go (1)
10-23: Make RPC client initialization idempotentAvoid duplicate initializations; safer in tests and multi-init paths.
Apply:
package libs import ( "math/big" + "sync" "github.com/rs/zerolog/log" "github.com/thirdweb-dev/indexer/internal/rpc" ) var RpcClient rpc.IRPCClient var ChainId *big.Int var ChainIdStr string +var rpcOnce sync.Once func InitRPCClient() { - var err error - RpcClient, err = rpc.Initialize() - if err != nil { - log.Fatal().Err(err).Msg("Failed to initialize RPC") - } - - ChainId = RpcClient.GetChainID() - ChainIdStr = ChainId.String() + rpcOnce.Do(func() { + var err error + RpcClient, err = rpc.Initialize() + if err != nil { + log.Fatal().Err(err).Msg("Failed to initialize RPC") + } + ChainId = RpcClient.GetChainID() + ChainIdStr = ChainId.String() + }) }internal/libs/constants.go (1)
3-3: Apply Go naming conventions and long-line linter suppressionALL_CAPS with underscores violates Go idiomatic style for exported constants; use PascalCase. The long hex string requires linter suppression. One reference exists at
internal/libs/clickhouse.go:239.-const EMPTY_LOGS_BLOOM = "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" +// EmptyLogsBloom is the 2048-bit zero logs bloom filter (0x + 512 zeroes). +//nolint:lll // long hex string is intentional +const EmptyLogsBloom = "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"Update the reference in
internal/libs/clickhouse.go:239fromEMPTY_LOGS_BLOOMtoEmptyLogsBloom.cmd/backfill.go (1)
23-31: Harden the metrics server: dedicated mux, timeouts, and graceful shutdown.Avoid default mux (duplicate handler panics across multiple runs), add timeouts, and tie lifecycle to Cobra context.
@@ -import ( - "fmt" - "net/http" - - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/rs/zerolog/log" - "github.com/spf13/cobra" - "github.com/thirdweb-dev/indexer/internal/backfill" -) +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "github.com/thirdweb-dev/indexer/internal/backfill" +) @@ - // Start Prometheus metrics server - log.Info().Msg("Starting Metrics Server on port 2112") - go func() { - http.Handle("/metrics", promhttp.Handler()) - if err := http.ListenAndServe(":2112", nil); err != nil { - log.Error().Err(err).Msg("Metrics server error") - } - }() + // Start Prometheus metrics server + log.Info().Msg("Starting Metrics Server on :2112") + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + srv := &http.Server{ + Addr: ":2112", + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Error().Err(err).Msg("Metrics server error") + } + }() + go func() { + <-cmd.Context().Done() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(ctx); err != nil { + log.Error().Err(err).Msg("Metrics server shutdown error") + } + }()internal/backfill/getbackfillboundaries.go (1)
12-34: Nit: “Boundary” spelling and log field types.Consider renaming functions/fields/messages from “Boundry” to “Boundary” and using Uint64 instead of Any for numeric fields in logs. Optional polish only.
Also applies to: 36-70, 72-90
internal/backfill/disableIndexerMaybeStartCommitter.go (1)
65-74: Improve failure observability by logging response body on non‑2xx.Helps diagnose why the disable/deploy request failed.
@@ - if resp.StatusCode >= 200 && resp.StatusCode < 300 { + if resp.StatusCode >= 200 && resp.StatusCode < 300 { log.Info(). Int("statusCode", resp.StatusCode). Msg("Successfully sent deploy-s3-committer request. Indexer disabled") } else { - log.Error(). - Int("statusCode", resp.StatusCode). - Msg("Deploy-s3-committer request failed. Could not disable indexer") + body, _ := io.ReadAll(resp.Body) + log.Error(). + Int("statusCode", resp.StatusCode). + Bytes("body", body). + Msg("Deploy-s3-committer request failed. Could not disable indexer") }Add import:
import ( "bytes" "encoding/json" "fmt" "net/http" + "io" "time" )internal/committer/committer.go (1)
98-100: Clarify log message scope.This function isn’t “starting streaming”; it discovers last committed state and S3 ranges.
- log.Info().Str("chain_id", libs.ChainIdStr).Msg("Starting streaming commit process") + log.Info().Str("chain_id", libs.ChainIdStr).Msg("Discovering last committed block and S3 block ranges")internal/libs/s3.go (3)
119-147: Validate parsed block range: ensure end >= start.Protect against malformed filenames that pass regex but reverse order.
- return uint64(startBlock), uint64(endBlock), nil + if endBlock < startBlock { + return 0, 0, fmt.Errorf("end block %d < start block %d for %s", endBlock, startBlock, filename) + } + return uint64(startBlock), uint64(endBlock), nil
251-274: Variable name shadows sortBlockRanges(); minor clarity fix.The local variable name “sortBlockRanges” shadows the function of the same name. Rename to avoid confusion.
- sortBlockRanges, err := GetS3ParquetBlockRangesSorted(ChainId) + ranges, err := GetS3ParquetBlockRangesSorted(ChainId) ... - for i, blockRange := range sortBlockRanges { + for i, blockRange := range ranges { ... - return sortBlockRanges[skipToIndex:], nil + return ranges[skipToIndex:], nil
276-321: Download: add timeout and verify checksum after write.Add a deadline to GetObject, and validate against the uploaded checksum metadata to catch corruption.
- result, err := S3Client.GetObject(context.Background(), &s3.GetObjectInput{ + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + result, err := S3Client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(config.Cfg.StagingS3Bucket), Key: aws.String(blockRange.S3Key), }) ... - _, err = file.ReadFrom(result.Body) + _, err = file.ReadFrom(result.Body) if err != nil { os.Remove(localPath) return "", fmt.Errorf("failed to write file: %w", err) } + // Optional checksum verification + if want := result.Metadata["checksum"]; want != "" { + f, _ := os.Open(localPath) + got, _ := calculateFileChecksum(f) + f.Close() + if got != want { + os.Remove(localPath) + return "", fmt.Errorf("checksum mismatch: got=%s want=%s", got, want) + } + }internal/libs/libblockdata/getblockdata.go (1)
149-153: Make batch concurrency configurable.Hard-coding maxConcurrentBatches=4 may not fit all chains; source from config.
- maxConcurrentBatches := 4 + maxConcurrentBatches := max(1, int(config.Cfg.RPCNumParallelCalls))Implement max helper or inline math. Based on learnings.
internal/backfill/parquetwriter.go (3)
75-81: Metric reports buffer bytes, not bytes written; rename or change value.Gauge BackfillParquetBytesWritten should reflect file bytes on disk, not temp buffer size.
- metrics.BackfillParquetBytesWritten.WithLabelValues(indexerName, chainIdStr).Set(float64(parquetTempBufferBytes)) + metrics.BackfillParquetBytesWritten.WithLabelValues(indexerName, chainIdStr).Set(float64(getBytesWritten()))Alternatively, rename metric to “backfill_parquet_buffer_bytes”. Based on learnings.
227-238: Ensure file sync errors are handled; use fsync error.parquetFile.Sync() error is ignored; capture and propagate.
- parquetTempBufferBytes = 0 - parquetFile.Sync() + parquetTempBufferBytes = 0 + if err := parquetFile.Sync(); err != nil { + return fmt.Errorf("failed to sync parquet file: %w", err) + }
96-121: Use deterministic file naming to avoid collisions.Using Unix seconds can collide; include chain id and block range.
- filename := fmt.Sprintf("%d.parquet", time.Now().Unix()) + filename := fmt.Sprintf("chain_%d_blocks_%s_%s_%d.parquet", + libs.ChainId.Uint64(), startBlockNumber, endBlockNumber, time.Now().UnixNano())internal/storage/kafka_publisher.go (2)
270-295: Avoid per-call zstd encoder allocation; reuse a pooled encoder.Creating a new encoder for every message is expensive. Initialize once on KafkaPublisher and reuse.
- if len(msgJson) >= compressionThreshold { - encoder, err := zstd.NewWriter(nil) - if err != nil { - log.Fatal().Err(err).Msg("failed to create zstd encoder") - } - defer encoder.Close() - value = encoder.EncodeAll([]byte(msgJson), nil) + if len(msgJson) >= compressionThreshold { + if pEnc == nil { // add *zstd.Encoder field to KafkaPublisher + enc, err := zstd.NewWriter(nil) + if err != nil { return nil, fmt.Errorf("zstd init: %w", err) } + p.pEnc = enc + } + value = p.pEnc.EncodeAll(msgJson, nil)Add a Close() on publisher to release encoder. Optional if process-long-lived. Based on learnings.
353-360: Single hard-coded partition risks hotspotting.All records go to partition 0. Consider keyed partitioning (chainId as key) and let Kafka assign partitions to balance throughput while preserving key order.
- Partition: 0, + // Partition left unset when using a keyed partitioner; ensure RecordPartitioner matches.Ensure kgo.RecordPartitioner aligns (e.g., StickyKeyPartitioner). Optional if strict single-partition ordering is required. Based on learnings.
internal/libs/clickhouse.go (3)
66-85: TLS config missing MinVersion; set TLS 1.3 (or at least 1.2).Raise baseline to TLS 1.3 for client connections.
- TLS: func() *tls.Config { + TLS: func() *tls.Config { if enableTLS { - return &tls.Config{} + return &tls.Config{MinVersion: tls.VersionTLS13} } return nil }(),As flagged by static analysis. Based on learnings.
118-142: Max block query: qualify database and remove unnecessary HAVING.Use configured DB and simplify.
- query := fmt.Sprintf("SELECT toString(max(block_number)) FROM blocks WHERE chain_id = %d HAVING count() > 0", chainId.Uint64()) + query := fmt.Sprintf("SELECT toString(max(block_number)) FROM %s.blocks WHERE chain_id = %d", + config.Cfg.CommitterClickhouseDatabase, chainId.Uint64())
256-283: Guard index math and log once per anomaly.Index bounds checks exist; consider tracking a counter to avoid noisy logs under large gaps. Not a blocker.
No code change required now; just a note to add rate-limiting or sampling on logs if needed.
Also applies to: 285-312, 314-341, 343-370
internal/metrics/metrics.go (2)
40-43: Reconsider "_total" suffix on a Gauge metric.The metric name
backfill_parquet_bytes_written_totaluses the_totalsuffix, which is conventionally reserved for Counter metrics in Prometheus. Since this is a Gauge, the suffix may cause confusion about the metric type.Consider renaming to
backfill_parquet_bytes_writtenor using a Counter if this value monotonically increases.- BackfillParquetBytesWritten = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "backfill_parquet_bytes_written_total", + BackfillParquetBytesWritten = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "backfill_parquet_bytes_written", Help: "The total number of bytes written to parquet files during backfill", }, []string{"project_name", "chain_id"})
93-96: Reconsider "_total" suffix on a Gauge metric.The metric name
committer_rpc_retries_totaluses the_totalsuffix, which is conventionally reserved for Counter metrics in Prometheus. Since this is a Gauge, the suffix may cause confusion about the metric type.Consider renaming to
committer_rpc_retriesor using a Counter if this tracks cumulative retries.- CommitterRPCRetries = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "committer_rpc_retries_total", + CommitterRPCRetries = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "committer_rpc_retries", Help: "The total number of RPC retries", }, []string{"project_name", "chain_id"})configs/config.go (1)
44-81: Consider documenting the configuration precedence order.The config loading flow (env.Parse at line 90, then viper.Unmarshal at line 121) means environment variables take precedence over config file values. Additionally, the
Configstruct mixesenvtags (lines 47-80) with nested structs usingmapstructuretags (lines 45-46).Add a comment documenting the precedence order to help maintainers understand the configuration hierarchy:
type Config struct { + // Configuration precedence (highest to lowest): + // 1. Environment variables (env tags) + // 2. Config file values (mapstructure tags) + // 3. Default values (envDefault tags) RPC RPCConfig `mapstructure:"rpc"` Log LogConfig `mapstructure:"log"`
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (95)
.gitignore(1 hunks)cmd/api.go(0 hunks)cmd/backfill.go(1 hunks)cmd/committer.go(1 hunks)cmd/migrate_valid.go(0 hunks)cmd/orchestrator.go(0 hunks)cmd/root.go(1 hunks)cmd/validate.go(0 hunks)cmd/validate_and_fix.go(0 hunks)configs/config.go(3 hunks)configs/test_config.yml(0 hunks)docker-compose.yml(0 hunks)go.mod(2 hunks)internal/backfill/backfill.go(1 hunks)internal/backfill/disableIndexerMaybeStartCommitter.go(1 hunks)internal/backfill/getbackfillboundaries.go(1 hunks)internal/backfill/parquetwriter.go(1 hunks)internal/committer/blockparserroutine.go(1 hunks)internal/committer/blockprocessorroutine.go(1 hunks)internal/committer/committer.go(1 hunks)internal/committer/parquet.go(1 hunks)internal/committer/poollatest.go(1 hunks)internal/committer/reorg.go(1 hunks)internal/committer/semaphore.go(1 hunks)internal/common/abi.go(0 hunks)internal/common/balances.go(0 hunks)internal/common/block.go(0 hunks)internal/common/log.go(0 hunks)internal/common/log_test.go(0 hunks)internal/common/set.go(0 hunks)internal/common/trace.go(0 hunks)internal/common/transaction.go(0 hunks)internal/common/transaction_test.go(0 hunks)internal/common/transfers.go(0 hunks)internal/handlers/blocks_handlers.go(0 hunks)internal/handlers/logs_handlers.go(0 hunks)internal/handlers/search_handlers.go(0 hunks)internal/handlers/search_handlers_test.go(0 hunks)internal/handlers/token_handlers.go(0 hunks)internal/handlers/transactions_handlers.go(0 hunks)internal/handlers/transfer_handlers.go(0 hunks)internal/libs/clickhouse.go(1 hunks)internal/libs/constants.go(1 hunks)internal/libs/kafka.go(1 hunks)internal/libs/libblockdata/getblockdata.go(1 hunks)internal/libs/libblockdata/validator.go(1 hunks)internal/libs/redis.go(1 hunks)internal/libs/rpcclient.go(1 hunks)internal/libs/s3.go(1 hunks)internal/metrics/metrics.go(1 hunks)internal/middleware/authorization.go(0 hunks)internal/middleware/cors.go(0 hunks)internal/middleware/logger.go(0 hunks)internal/orchestrator/chain_tracker.go(0 hunks)internal/orchestrator/committer.go(0 hunks)internal/orchestrator/committer_test.go(0 hunks)internal/orchestrator/orchestrator.go(0 hunks)internal/orchestrator/poller.go(0 hunks)internal/orchestrator/poller_test.go(0 hunks)internal/orchestrator/reorg_handler.go(0 hunks)internal/orchestrator/reorg_handler_test.go(0 hunks)internal/orchestrator/validator.go(0 hunks)internal/orchestrator/work_mode_monitor.go(0 hunks)internal/publisher/publisher.go(0 hunks)internal/rpc/rpc.go(3 hunks)internal/source/s3.go(0 hunks)internal/source/source.go(0 hunks)internal/source/staging.go(0 hunks)internal/storage/badger.go(0 hunks)internal/storage/block_buffer.go(0 hunks)internal/storage/block_buffer_badger.go(0 hunks)internal/storage/block_buffer_badger_test.go(0 hunks)internal/storage/block_buffer_pebble.go(0 hunks)internal/storage/clickhouse.go(0 hunks)internal/storage/clickhouse_connector_test.go(0 hunks)internal/storage/connector.go(0 hunks)internal/storage/kafka.go(0 hunks)internal/storage/kafka_publisher.go(8 hunks)internal/storage/pebble.go(0 hunks)internal/storage/postgres.go(0 hunks)internal/storage/postgres_connector_test.go(0 hunks)internal/storage/redis.go(0 hunks)internal/storage/s3.go(0 hunks)internal/tools/clickhouse/0000_clickhouse_create_blocks_table.sql(0 hunks)internal/tools/clickhouse/0001_clickhouse_create_transactions_table.sql(0 hunks)internal/tools/clickhouse/0002_clickhouse_create_logs_table.sql(0 hunks)internal/tools/clickhouse/0003_clickhouse_create_traces_table.sql(0 hunks)internal/tools/clickhouse/0004_clickhouse_create_insert_null_table.sql(0 hunks)internal/tools/clickhouse/0005_clickhouse_create_insert_data_mv.sql(0 hunks)internal/tools/clickhouse/0006_clickhouse_create_token_transfers.sql(0 hunks)internal/tools/clickhouse/0008_clickhouse_create_token_balances.sql(0 hunks)internal/tools/clickhouse/0009_clickhouse_create_token_balances_mv.sql(0 hunks)internal/tools/clickhouse/0010_clickhouse_create_address_transactions.sql(0 hunks)internal/tools/clickhouse/0011_clickhouse_create_address_transactions_mv.sql(0 hunks)internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql(0 hunks)
💤 Files with no reviewable changes (67)
- internal/storage/block_buffer_badger_test.go
- internal/tools/clickhouse/0004_clickhouse_create_insert_null_table.sql
- internal/common/set.go
- internal/common/block.go
- internal/tools/clickhouse/0011_clickhouse_create_address_transactions_mv.sql
- internal/orchestrator/validator.go
- internal/source/staging.go
- internal/orchestrator/reorg_handler.go
- docker-compose.yml
- internal/source/source.go
- internal/middleware/cors.go
- internal/tools/clickhouse/0010_clickhouse_create_address_transactions.sql
- internal/storage/kafka.go
- internal/handlers/blocks_handlers.go
- internal/storage/postgres_connector_test.go
- cmd/migrate_valid.go
- internal/orchestrator/work_mode_monitor.go
- internal/common/log_test.go
- internal/tools/clickhouse/0009_clickhouse_create_token_balances_mv.sql
- cmd/validate.go
- internal/common/transaction_test.go
- internal/middleware/authorization.go
- internal/tools/clickhouse/0002_clickhouse_create_logs_table.sql
- internal/tools/clickhouse/0000_clickhouse_create_blocks_table.sql
- internal/common/balances.go
- internal/orchestrator/poller_test.go
- internal/orchestrator/orchestrator.go
- internal/storage/clickhouse_connector_test.go
- internal/storage/badger.go
- internal/orchestrator/poller.go
- internal/orchestrator/chain_tracker.go
- internal/common/transaction.go
- internal/orchestrator/committer_test.go
- internal/orchestrator/committer.go
- internal/handlers/search_handlers.go
- internal/storage/block_buffer.go
- internal/common/abi.go
- internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql
- internal/tools/clickhouse/0001_clickhouse_create_transactions_table.sql
- internal/orchestrator/reorg_handler_test.go
- cmd/api.go
- internal/storage/pebble.go
- internal/tools/clickhouse/0006_clickhouse_create_token_transfers.sql
- internal/storage/clickhouse.go
- internal/middleware/logger.go
- internal/storage/redis.go
- internal/handlers/transactions_handlers.go
- internal/storage/block_buffer_pebble.go
- internal/tools/clickhouse/0008_clickhouse_create_token_balances.sql
- internal/handlers/token_handlers.go
- internal/source/s3.go
- internal/handlers/search_handlers_test.go
- internal/common/transfers.go
- internal/storage/postgres.go
- internal/handlers/transfer_handlers.go
- internal/handlers/logs_handlers.go
- internal/tools/clickhouse/0003_clickhouse_create_traces_table.sql
- internal/storage/connector.go
- configs/test_config.yml
- internal/common/log.go
- internal/tools/clickhouse/0005_clickhouse_create_insert_data_mv.sql
- cmd/orchestrator.go
- internal/common/trace.go
- internal/storage/s3.go
- cmd/validate_and_fix.go
- internal/publisher/publisher.go
- internal/storage/block_buffer_badger.go
🧰 Additional context used
🧬 Code graph analysis (22)
internal/libs/rpcclient.go (1)
internal/rpc/rpc.go (2)
IRPCClient(42-55)Initialize(68-98)
internal/committer/parquet.go (5)
internal/common/block.go (2)
BlockData(35-40)Block(8-33)internal/types/types.go (1)
ParquetBlockData(11-20)internal/common/transaction.go (1)
Transaction(10-45)internal/common/log.go (1)
Log(8-24)internal/common/trace.go (1)
Trace(8-32)
internal/committer/committer.go (8)
internal/common/block.go (1)
BlockData(35-40)internal/committer/semaphore.go (2)
SafeSemaphore(12-17)NewSafeSemaphore(19-25)internal/libs/rpcclient.go (3)
InitRPCClient(14-23)ChainId(11-11)ChainIdStr(12-12)internal/libs/clickhouse.go (1)
GetMaxBlockNumberFromClickHouseV2(118-142)internal/libs/s3.go (2)
GetBlockRangesFromS3(251-274)DownloadFile(277-322)internal/libs/kafka.go (1)
InitKafkaV2(11-22)internal/metrics/metrics.go (1)
CommitterNextBlockNumber(58-61)internal/types/types.go (1)
BlockRange(4-8)
cmd/committer.go (3)
internal/backfill/backfill.go (1)
Init(17-21)internal/committer/committer.go (2)
Init(29-49)CommitStreaming(51-95)internal/committer/reorg.go (2)
InitReorg(14-16)RunReorgValidator(18-43)
internal/libs/libblockdata/getblockdata.go (5)
internal/common/block.go (2)
BlockData(35-40)Block(8-33)internal/rpc/rpc.go (1)
GetFullBlockResult(18-22)internal/libs/rpcclient.go (2)
ChainIdStr(12-12)RpcClient(10-10)internal/metrics/metrics.go (2)
CommitterRPCRowsToFetch(88-91)CommitterRPCRetries(93-96)internal/libs/libblockdata/validator.go (1)
Validate(13-81)
internal/rpc/rpc.go (5)
internal/rpc/batcher.go (2)
RPCFetchSingleBatch(53-83)RPCFetchBatchResult(13-17)internal/common/block.go (1)
RawBlock(42-42)internal/rpc/params.go (3)
GetBlockWithoutTransactionsParams(17-19)GetBlockWithTransactionsParams(9-11)GetTransactionParams(13-15)internal/common/transaction.go (1)
RawTransaction(8-8)internal/common/utils.go (1)
SliceToChunks(7-20)
internal/libs/redis.go (1)
configs/config.go (2)
Cfg(83-83)Config(44-81)
internal/backfill/getbackfillboundaries.go (6)
internal/backfill/disableIndexerMaybeStartCommitter.go (1)
DisableIndexerMaybeStartCommitter(19-75)configs/config.go (1)
Cfg(83-83)internal/metrics/metrics.go (2)
BackfillStartBlock(10-13)BackfillEndBlock(15-18)internal/libs/s3.go (1)
GetS3ParquetBlockRangesSorted(50-64)internal/libs/rpcclient.go (2)
ChainId(11-11)RpcClient(10-10)internal/types/types.go (1)
BlockRange(4-8)
internal/libs/s3.go (3)
configs/config.go (1)
Cfg(83-83)internal/types/types.go (1)
BlockRange(4-8)internal/libs/rpcclient.go (1)
ChainId(11-11)
internal/committer/blockparserroutine.go (6)
internal/libs/rpcclient.go (1)
ChainIdStr(12-12)configs/config.go (1)
Cfg(83-83)internal/committer/parquet.go (1)
ParseParquetRow(14-36)internal/common/block.go (2)
Block(8-33)BlockData(35-40)internal/metrics/metrics.go (2)
CommitterBlockDataChannelLength(68-71)CommitterMemoryPermitBytes(73-76)internal/committer/committer.go (1)
BlockDataWithSize(17-21)
internal/libs/libblockdata/validator.go (3)
internal/common/block.go (2)
BlockData(35-40)Block(8-33)configs/config.go (1)
Cfg(83-83)internal/validation/root_calculator.go (2)
CalculateLogsBloom(255-305)CalculateTransactionsRoot(18-253)
internal/committer/poollatest.go (5)
internal/libs/rpcclient.go (2)
ChainIdStr(12-12)RpcClient(10-10)configs/config.go (1)
Cfg(83-83)internal/metrics/metrics.go (3)
CommitterLatestBlockNumber(63-66)CommitterLastPublishedBlockNumber(78-81)CommitterNextBlockNumber(58-61)internal/libs/libblockdata/getblockdata.go (1)
GetValidBlockDataInBatch(18-92)internal/libs/kafka.go (1)
KafkaPublisherV2(9-9)
cmd/backfill.go (1)
internal/backfill/backfill.go (2)
RunBackfill(23-47)Init(17-21)
internal/backfill/backfill.go (7)
internal/common/block.go (1)
BlockData(35-40)internal/libs/s3.go (1)
InitS3(30-47)internal/libs/rpcclient.go (2)
InitRPCClient(14-23)ChainIdStr(12-12)internal/backfill/parquetwriter.go (3)
InitParquetWriter(37-40)FlushParquet(193-225)SaveToParquet(42-94)cmd/backfill.go (1)
RunBackfill(20-34)internal/metrics/metrics.go (7)
BackfillStartBlock(10-13)BackfillEndBlock(15-18)BackfillComputedBatchSize(20-23)BackfillCurrentStartBlock(25-28)BackfillCurrentEndBlock(30-33)BackfillBlockdataChannelLength(50-53)BackfillAvgMemoryPerBlock(35-38)internal/libs/libblockdata/getblockdata.go (1)
GetValidBlockDataForRange(94-139)
internal/libs/kafka.go (2)
internal/storage/kafka_publisher.go (2)
KafkaPublisher(21-24)NewKafkaPublisher(61-112)configs/config.go (2)
KafkaConfig(18-23)Cfg(83-83)
internal/committer/blockprocessorroutine.go (5)
internal/libs/rpcclient.go (1)
ChainIdStr(12-12)configs/config.go (1)
Cfg(83-83)internal/common/block.go (2)
BlockData(35-40)Block(8-33)internal/libs/kafka.go (1)
KafkaPublisherV2(9-9)internal/metrics/metrics.go (2)
CommitterLastPublishedBlockNumber(78-81)CommitterNextBlockNumber(58-61)
internal/backfill/parquetwriter.go (6)
internal/types/types.go (1)
ParquetBlockData(11-20)configs/config.go (1)
Cfg(83-83)internal/common/block.go (2)
BlockData(35-40)Block(8-33)internal/libs/rpcclient.go (2)
ChainIdStr(12-12)ChainId(11-11)internal/metrics/metrics.go (2)
BackfillParquetBytesWritten(40-43)BackfillFlushEndBlock(45-48)internal/libs/s3.go (1)
UploadParquetToS3(150-210)
internal/storage/kafka_publisher.go (2)
internal/common/block.go (2)
BlockData(35-40)Block(8-33)configs/config.go (1)
Cfg(83-83)
internal/backfill/disableIndexerMaybeStartCommitter.go (2)
configs/config.go (1)
Cfg(83-83)internal/libs/rpcclient.go (1)
ChainIdStr(12-12)
internal/committer/reorg.go (7)
internal/libs/redis.go (3)
InitRedis(18-37)GetReorgLastValidBlock(39-48)SetReorgLastValidBlock(50-52)internal/libs/clickhouse.go (4)
GetMaxBlockNumberFromClickHouseV2(118-142)GetBlockNumberFromClickHouseV2DaysAgo(92-116)GetBlockHeadersForReorgCheck(165-191)GetBlockDataFromClickHouseV2(193-254)internal/libs/rpcclient.go (2)
ChainId(11-11)ChainIdStr(12-12)internal/libs/libblockdata/getblockdata.go (1)
GetValidBlockDataInBatch(18-92)internal/common/block.go (1)
Block(8-33)internal/libs/kafka.go (1)
KafkaPublisherV2(9-9)internal/metrics/metrics.go (1)
CommitterLastPublishedReorgBlockNumber(83-86)
configs/config.go (1)
internal/metrics/metrics.go (2)
BackfillStartBlock(10-13)BackfillEndBlock(15-18)
internal/libs/clickhouse.go (6)
configs/config.go (2)
Cfg(83-83)Config(44-81)internal/common/block.go (2)
Block(8-33)BlockData(35-40)internal/common/transaction.go (1)
Transaction(10-45)internal/common/log.go (1)
Log(8-24)internal/common/trace.go (1)
Trace(8-32)internal/libs/constants.go (1)
EMPTY_LOGS_BLOOM(3-3)
🪛 ast-grep (0.39.6)
internal/libs/clickhouse.go
[warning] 71-71: MinVersionis missing from this TLS configuration. By default, TLS 1.2 is currently used as the minimum when acting as a client, and TLS 1.0 when acting as a server. General purpose web applications should default to TLS 1.3 with all other protocols disabled. Only where it is known that a web server must support legacy clients with unsupported an insecure browsers (such as Internet Explorer 10), it may be necessary to enable TLS 1.0 to provide support. AddMinVersion: tls.VersionTLS13' to the TLS configuration to bump the minimum version to TLS 1.3.
Context: tls.Config{}
Note: [CWE-327]: Use of a Broken or Risky Cryptographic Algorithm [OWASP A03:2017]: Sensitive Data Exposure [OWASP A02:2021]: Cryptographic Failures [REFERENCES]
https://owasp.org/Top10/A02_2021-Cryptographic_Failures
(missing-ssl-minversion-go)
🔇 Additional comments (8)
internal/committer/poollatest.go (1)
75-78: Verify single‑writer semantics for nextBlockNumber
nextBlockNumberappears package‑level. If any other routine mutates it, race conditions are likely. Confirm only this loop writes, or guard with a mutex/atomic.internal/rpc/rpc.go (1)
239-252: Branching for chain‑296 looks correctFetching blocks w/o txs first, then backfilling txs for the special range is a sensible workaround.
Add a unit test for
needsChain296SpecialHandlingcovering boundary blocks 3853944 and 25431538.go.mod (1)
10-30: Consolidate env loading mechanisms to avoid precedence conflictsVerification confirms all three env loaders are active in
configs/config.go:godotenv.Load()(line 86),env.Parse()(line 90), andviper.AutomaticEnv()(line 119). Running them sequentially creates potential for configuration conflicts—evaluate which should be the single source of truth or document the intended precedence explicitly.
go mod tidycompleted successfully; no additional action needed there.internal/backfill/backfill.go (1)
94-95: Review comment is incorrect; Go 1.23.0 has native min/max built-ins.Go 1.21+ includes built-in
min()andmax()functions that accept all ordered types, includinguint64. The codebase uses Go 1.23.0 and the code at lines 94 and 144 ininternal/backfill/backfill.goalready compiles correctly. Adding customminU64andmaxU64helpers would be redundant and unnecessary.Likely an incorrect or invalid review comment.
internal/libs/libblockdata/validator.go (1)
56-65: Confirm: strict-mode short-circuits on any unsupported tx type.Current logic returns success for the whole block if it finds one tx with type 0x7E or >4. If the intent is “skip tx-root validation entirely when any unsupported tx exists,” this is fine; otherwise, you may want to continue scanning and only skip when all txs are supported.
internal/committer/reorg.go (1)
51-52: The code is correct as-is; no changes needed.Go 1.21 added built-in min and max functions, and the project requires Go 1.23.0. The calls to
min()andmax()on lines 51 and 57 use these built-in language features and will compile without issues. Adding local helper functions is unnecessary.Likely an incorrect or invalid review comment.
internal/libs/libblockdata/getblockdata.go (1)
141-185: Invalid ‘range’ over int; batch loop never compiles. Also avoid generic min.Use a standard for loop and clamp end explicitly.
- numBatches := (totalBlocks + int(rpcBatchSize) - 1) / int(rpcBatchSize) + numBatches := (totalBlocks + int(rpcBatchSize) - 1) / int(rpcBatchSize) ... - for batchIndex := range numBatches { + for batchIndex := 0; batchIndex < numBatches; batchIndex++ { ... - start := batchIdx * int(rpcBatchSize) - end := min(start+int(rpcBatchSize), totalBlocks) + start := batchIdx * int(rpcBatchSize) + end := start + int(rpcBatchSize) + if end > totalBlocks { + end = totalBlocks + }Likely an incorrect or invalid review comment.
configs/config.go (1)
18-23: KafkaConfig struct is actively used and should not be removed.The verification shows that
KafkaConfigis instantiated atinternal/libs/kafka.go:13and passed toNewKafkaPublisher()atinternal/storage/kafka_publisher.go:61. The struct serves as a bridge between the flatConfigrepresentation (with fields likeCommitterKafkaBrokers,CommitterKafkaUsername) and the Kafka publisher initialization. This is a legitimate pattern and the struct definition is required.Likely an incorrect or invalid review comment.
Summary by CodeRabbit
New Features
Configuration Changes
Breaking Changes