Skip to content

Conversation

@jakeloo
Copy link
Member

@jakeloo jakeloo commented Aug 13, 2025

Summary by CodeRabbit

  • New Features
    • Added S3 support: archive polling source, S3 main storage with Parquet, and extensive S3 config.
    • Introduced Kafka streaming backend and Redis metadata storage.
    • Added Badger-backed storage and block buffers.
    • Implemented concurrent migrator with worker pooling and range splitting.
    • New ClickHouse schemas: transactions, logs, traces, token_transfers, token_balances, address_* tables and materialized views.
    • Publisher TLS support.
  • Refactor
    • Reworked orchestrator lifecycle, poller worker model, and storage connectors; expanded configuration model.
  • Bug Fixes
    • Metrics server error reporting; minor publisher init fix.
  • Tests
    • Added Badger buffer tests/benchmarks.
  • Chores
    • Dependency upgrades (AWS SDK, parquet-go, Redis, CLI, OpenTelemetry).

@coderabbitai
Copy link

coderabbitai bot commented Aug 13, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

Introduces S3-based source/storage, Kafka/Redis/Badger backends, extensive config restructuring, retryable RPC fetching, a concurrent migrator with worker orchestration, orchestrator lifecycle refactors, poller/committer changes, new serialization for blocks, and a comprehensive ClickHouse schema overhaul (soft-deletes, projections, new tables/materialized views), plus mocks and tests updates.

Changes

Cohort / File(s) Summary
Migrator concurrency and boundaries
cmd/migrate_valid.go
Refactors migrator to multi-worker processing with context/signal handling; new worker-driven RPC fetch, boundary computations per range, batch size config; source/destination connectors; updated public signatures.
CLI flags and command wiring
cmd/root.go
Adds numerous flags for S3 archives, storage backends, publisher TLS, and migrator (destination types, start/end, workers, batchSize); binds via viper; wires new commands.
Config model restructure
configs/config.go
Replaces generic storage config with typed per-backend structs; adds S3/Parquet, Redis, Kafka, Migrator configs; adds compression/TLS options; new fields for poller/committer.
Storage connector API
internal/storage/connector.go
Introduces explicit constructors (Orchestrator/Staging/Main); adds Close aggregator; updates interfaces (publish/commit cursors, block counts, token queries); removes generic constructor and some failure APIs.
Orchestrator lifecycle and components
cmd/orchestrator.go, internal/orchestrator/orchestrator.go
Centralizes WaitGroup; logs component completion; cancels on component end; adds S3 source option to poller construction; revised shutdown to close storage.
Committer bounds and storage usage
internal/orchestrator/committer.go, internal/orchestrator/committer_test.go
Adds commitUntilBlock and shared poller; switches publish cursor to OrchestratorStorage; respects upper bound; renames DeleteOlderThan→DeleteStagingDataOlderThan in tests.
Poller S3 source and worker reuse
internal/orchestrator/poller.go
Adds WithPollerS3Source; embeds a reusable worker (archive-enabled); shifts failure storage to StagingStorage; adjusts poll-limit handling.
Validator tweak
internal/orchestrator/validator.go
Bypasses strict validation when encountering tx type 0x7E; minor formatting.
RPC retry layer
internal/rpc/batcher.go, internal/rpc/rpc.go
Adds retryable batch/single fetch functions; switches GetFullBlocks to WithRetry variants; handles 413 and batch errors with splitting.
Worker source abstraction
internal/worker/worker.go
Adds SourceType and archive-capable Worker; chooses archive vs RPC; chunked processing with retry; concurrency semaphore; Close method.
S3 archive source
internal/source/source.go, internal/source/s3.go
Introduces ISource and S3Source with metadata caching, local file cache, parquet parsing, concurrent downloads; exposes range, stats, refresh, and GetFullBlocks.
S3 main storage
internal/storage/s3.go
Adds S3Connector with buffered writes (memory/Badger), Parquet formatter, background flush, upload and read/query helpers; Close/Flush support.
Badger storage and buffers
internal/storage/badger.go, internal/storage/block_buffer.go, internal/storage/block_buffer_badger.go, internal/storage/block_buffer_badger_test.go
Adds BadgerConnector for orchestrator/staging; in-memory and Badger-backed block buffers with stats/range APIs; tests and benchmarks for Badger buffer.
Kafka writer and publisher
internal/storage/kafka.go, internal/storage/kafka_publisher.go
Introduces write-only KafkaConnector backed by KafkaPublisher; transactional publish of blocks/reorgs; updates commit cursor via orchestrator storage.
Redis metadata store
internal/storage/redis.go
Adds RedisConnector for last reorg/publish/commit cursors with optional TLS.
ClickHouse connectors
internal/storage/clickhouse.go, internal/storage/postgres.go
Adds Close and commit cursor APIs; block count helper; enable compression; rename DeleteOlderThan→DeleteStagingDataOlderThan (Postgres/ClickHouse).
Handlers
internal/handlers/logs_handlers.go
Switches to storage.NewMainConnector for main storage; TODO comment for query connector.
Common serialization
internal/common/block.go
Adds JSON tags to BlockData and a Serialize method normalizing nil slices to empty.
Dependencies
go.mod
Adds AWS SDK v2, parquet-go, redis v9, badger v4; upgrades cobra, OTEL, x/* modules; adds indirect libs.
Mocks
test/mocks/MockIMainStorage.go, test/mocks/MockIOrchestratorStorage.go, test/mocks/MockIStagingStorage.go, test/mocks/MockIRPCClient.go
Regenerated for new/renamed methods (Close, GetBlockCount, publish/commit cursors, DeleteStagingDataOlderThan, block failures APIs); bump mockery version.
ClickHouse schema: core shifts
internal/tools/clickhouse/0000_clickhouse_create_blocks_table.sql, .../0004_clickhouse_create_insert_null_table.sql, .../0005_clickhouse_create_insert_data_mv.sql
Switch to soft-deletes (is_deleted), ReplacingMergeTree, new partitions/projections, view renames to insert_* and source table rename.
ClickHouse: base tables
internal/tools/clickhouse/0001_clickhouse_create_transactions_table.sql, internal/tools/clickhouse/0002_clickhouse_create_logs_table.sql, internal/tools/clickhouse/0003_clickhouse_create_traces_table.sql
Adds/updates tables with projections, indexes, partitions; remove sign column; adjust ORDER BY/GRANULARITY.
ClickHouse: token transfers/balances
internal/tools/clickhouse/0006_clickhouse_create_token_transfers.sql, 0007_clickhouse_create_token_transfers_mv.sql, 0008_clickhouse_create_token_balances.sql, 0009_clickhouse_create_token_balances_mv.sql
Introduces token_transfers table and MVs per standard; adds token_balances table and per-token-type delta MVs.
ClickHouse: address-centric
internal/tools/clickhouse/0010_clickhouse_create_address_transactions.sql, 0011_clickhouse_create_address_transactions_mv.sql, 0012_clickhouse_create_address_transfers.sql, 0013_clickhouse_create_address_transfers_mv.sql
Adds address_* tables and MVs for transactions/transfers; projections and indexing oriented by address and direction.
ClickHouse: removals
internal/tools/clickhouse/0001_clickhouse_create_block_failures_table.sql, 0002_clickhouse_create_cursors_table.sql, 0003_clickhouse_create_staging_table.sql, 0006_clickhouse_create_logs_table.sql, 0010_clickhouse_create_token_transfers_mv.sql
Removes legacy block_failures, cursors, staging, old logs, and old token_transfers table/MV definitions.
ClickHouse opts
internal/tools/clickhouse_opts/0000_clickhouse_backfill_logs_transfer.sql
Adds backfill staging table and MVs to populate token_transfers from logs for ERC20/721/1155/6909.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User
  participant Migrator
  participant Source as Source Storage
  participant Dest as Destination Storage
  participant W as Worker
  participant RPC as RPC Client

  User->>Migrator: Start(ctx, start,end,workers,batch)
  Migrator->>Source: DetermineMigrationBoundaries(start,end)
  alt no range to migrate
    Migrator-->>User: exit
  else range exists
    Migrator->>Migrator: divideBlockRange(range, workers)
    par For each worker i
      Migrator->>W: processBlockRange(subrange)
      W->>Source: GetValidBlocksForRange(...)
      alt missing blocks
        W->>RPC: FetchBlocks (retry)
        RPC-->>W: Full blocks
        W->>W: Validate
      end
      W->>Dest: Insert batches
    and Graceful shutdown
      Note over Migrator: OS signal/context cancel handled
    end
  end
  Migrator->>Dest: Close()
  Migrator->>Source: Close()
  Migrator-->>User: Done
Loading
sequenceDiagram
  autonumber
  participant Orchestrator
  participant Poller
  participant Committer
  participant FailureRec as FailureRecoverer
  participant Reorg as ReorgHandler
  participant Monitor as WorkModeMonitor
  participant Tracker as ChainTracker
  participant Storage

  Orchestrator->>Storage: NewStorageConnector(...)
  Orchestrator->>Poller: Start(WithS3Source?)
  Orchestrator->>Committer: Start(commitUntil?)
  Orchestrator->>FailureRec: Start()
  Orchestrator->>Reorg: Start()
  Orchestrator->>Monitor: Start()
  Orchestrator->>Tracker: Start()
  par lifecycle
    Poller-->>Orchestrator: completed → cancel()
    Committer-->>Orchestrator: completed → cancel()
    FailureRec-->>Orchestrator: completed
    Reorg-->>Orchestrator: completed
    Monitor-->>Orchestrator: completed
    Tracker-->>Orchestrator: completed
  end
  Orchestrator->>Storage: Close()
  Orchestrator-->>Orchestrator: Shutdown complete
Loading
sequenceDiagram
  autonumber
  participant Poller
  participant W as Worker
  participant Src as Archive Source (S3)
  participant RPC as RPC Client
  participant Staging as StagingStorage

  Poller->>W: Run(ctx, blockNumbers)
  alt Archive available
    W->>Src: GetFullBlocks(blockNumbers)
    Src-->>W: Results (per block)
  else
    W->>RPC: Batched Fetch (retry)
    RPC-->>W: Results
  end
  W-->>Poller: Blocks/results
  alt failures
    Poller->>Staging: StoreBlockFailures(...)
  end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~150 minutes

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jl/commit-kafka

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbit in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbit in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbit gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbit read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbit help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbit ignore or @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbit summary or @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbit or @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@zeet-co zeet-co bot requested a deployment to production August 13, 2025 16:15 In progress
@zeet-co zeet-co bot requested a deployment to production August 14, 2025 17:02 In progress
@zeet-co zeet-co bot requested a deployment to production August 19, 2025 05:42 In progress
@zeet-co zeet-co bot requested a deployment to production August 26, 2025 21:18 In progress
@zeet-co zeet-co bot requested a deployment to production August 26, 2025 21:18 In progress
@jakeloo jakeloo marked this pull request as ready for review August 28, 2025 05:31
@jakeloo jakeloo merged commit acdb2eb into main Aug 28, 2025
4 of 5 checks passed
@jakeloo jakeloo deleted the jl/commit-kafka branch August 28, 2025 05:50
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 57

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (21)
internal/tools/clickhouse/0004_clickhouse_create_insert_null_table.sql (1)

27-59: Adjust field types to match Ethereum specs

  • Change to_address to Nullable(FixedString(42)) to correctly represent contract‐creation transactions (alchemy.com)
  • Use UInt256 for max_fee_per_gas and max_priority_fee_per_gas to cover the full 2⁵⁶ range (asserted < 2**256) (eips.ethereum.org)
  • Store blob_versioned_hashes as Array(FixedString(66)) (32 bytes hex with “0x” prefix) per EIP-4844’s Bytes32 versioned‐hash format (eips.ethlib.cn)
internal/handlers/logs_handlers.go (1)

120-128: Fix potential nil dereference when ConstructEventABI fails.
signatureHash is computed even if ConstructEventABI returns error; eventABI can be nil.

Apply:

-    eventABI, err = common.ConstructEventABI(signature)
-    if err != nil {
-        log.Debug().Err(err).Msgf("Unable to construct event ABI for %s", signature)
-    }
-    signatureHash = eventABI.ID.Hex()
+    eventABI, err = common.ConstructEventABI(signature)
+    if err != nil {
+        log.Debug().Err(err).Msgf("Unable to construct event ABI for %s", signature)
+    } else if eventABI != nil {
+        signatureHash = eventABI.ID.Hex()
+    }
configs/config.go (1)

357-365: Bug: Orchestrator chainBasedConfig env overrides are written to Main instead of Orchestrator.
This misroutes STORAGE_ORCHESTRATOR_CLICKHOUSE_CHAINBASEDCONFIG.

Apply:

-  if chainConfigJSON := os.Getenv("STORAGE_ORCHESTRATOR_CLICKHOUSE_CHAINBASEDCONFIG"); chainConfigJSON != "" {
+  if chainConfigJSON := os.Getenv("STORAGE_ORCHESTRATOR_CLICKHOUSE_CHAINBASEDCONFIG"); chainConfigJSON != "" {
     var orchestratorChainConfig map[string]TableOverrideConfig
     if err := json.Unmarshal([]byte(chainConfigJSON), &orchestratorChainConfig); err != nil {
       return fmt.Errorf("error parsing orchestrator chainBasedConfig JSON: %v", err)
     }
-    if Cfg.Storage.Main.Clickhouse != nil {
-      Cfg.Storage.Main.Clickhouse.ChainBasedConfig = orchestratorChainConfig
+    if Cfg.Storage.Orchestrator.Clickhouse != nil {
+      Cfg.Storage.Orchestrator.Clickhouse.ChainBasedConfig = orchestratorChainConfig
     }
   }
internal/publisher/publisher.go (1)

128-159: Don’t hold the read lock across network I/O; pre-filter nils and surface publish errors

Holding p.mu.RLock while producing can block Close/init; we should snapshot the client under lock, then release. Also, aggregate and return the first error instead of always nil.

 func (p *Publisher) publishMessages(ctx context.Context, messages []*kgo.Record) error {
   if len(messages) == 0 {
     return nil
   }
 
   if !config.Cfg.Publisher.Enabled {
     log.Debug().Msg("Publisher is disabled, skipping publish")
     return nil
   }
 
-  p.mu.RLock()
-  defer p.mu.RUnlock()
-
-  if p.client == nil {
+  p.mu.RLock()
+  client := p.client
+  p.mu.RUnlock()
+
+  if client == nil {
     return nil // Skip if no client configured
   }
 
-  var wg sync.WaitGroup
-  wg.Add(len(messages))
+  // filter out nil records defensively
+  filtered := make([]*kgo.Record, 0, len(messages))
+  for _, m := range messages {
+    if m != nil {
+      filtered = append(filtered, m)
+    }
+  }
+  if len(filtered) == 0 {
+    return nil
+  }
+
+  var wg sync.WaitGroup
+  wg.Add(len(filtered))
+  var mu sync.Mutex
+  var firstErr error
   // Publish to all configured producers
-  for _, msg := range messages {
-    p.client.Produce(ctx, msg, func(_ *kgo.Record, err error) {
+  for _, msg := range filtered {
+    client.Produce(ctx, msg, func(_ *kgo.Record, err error) {
       defer wg.Done()
       if err != nil {
         log.Error().Err(err).Msg("Failed to publish message to Kafka")
+        mu.Lock()
+        if firstErr == nil {
+          firstErr = err
+        }
+        mu.Unlock()
       }
     })
   }
   wg.Wait()
 
-  return nil
+  return firstErr
 }
internal/orchestrator/failure_recoverer.go (1)

90-121: *Bug: map key uses big.Int pointer identity; lookups will miss and deletions may target zero values

Using *big.Int as a map key compares pointer addresses, not numeric values. Results typically carry distinct pointers, so ok will be false and you’ll attempt to delete a zero-value failure. Key by the numeric value (String()) instead.

-  blockFailureMap := make(map[*big.Int]common.BlockFailure)
+  blockFailureMap := make(map[string]common.BlockFailure)
   for _, failure := range blockFailures {
-    blockFailureMap[failure.BlockNumber] = failure
+    blockFailureMap[failure.BlockNumber.String()] = failure
   }
   var newBlockFailures []common.BlockFailure
   var failuresToDelete []common.BlockFailure
   var successfulResults []common.BlockData
   for _, result := range results {
-    blockFailureForBlock, ok := blockFailureMap[result.BlockNumber]
+    blockFailureForBlock, ok := blockFailureMap[result.BlockNumber.String()]
     if result.Error != nil {
       failureCount := 1
       if ok {
         failureCount = blockFailureForBlock.FailureCount + 1
       }
       newBlockFailures = append(newBlockFailures, common.BlockFailure{
         BlockNumber:   result.BlockNumber,
         FailureReason: result.Error.Error(),
         FailureTime:   time.Now(),
         ChainId:       fr.rpc.GetChainID(),
         FailureCount:  failureCount,
       })
     } else {
       successfulResults = append(successfulResults, common.BlockData{
         Block:        result.Data.Block,
         Logs:         result.Data.Logs,
         Transactions: result.Data.Transactions,
         Traces:       result.Data.Traces,
       })
-      failuresToDelete = append(failuresToDelete, blockFailureForBlock)
+      if ok {
+        failuresToDelete = append(failuresToDelete, blockFailureForBlock)
+      }
     }
   }
internal/rpc/rpc.go (1)

279-282: Apply retry to GetBlocks/GetTransactions for consistency.

These still use RPCFetchSingleBatch; align with WithRetry to handle provider 413s and batch-size limits.

- blocks = RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, ctx, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams)
+ blocks = RPCFetchSingleBatchWithRetry[*big.Int, common.RawBlock](rpc, ctx, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams)
- transactions = RPCFetchSingleBatch[string, common.RawTransaction](rpc, ctx, txHashes, "eth_getTransactionByHash", GetTransactionParams)
+ transactions = RPCFetchSingleBatchWithRetry[string, common.RawTransaction](rpc, ctx, txHashes, "eth_getTransactionByHash", GetTransactionParams)

Also applies to: 294-297

cmd/root.go (1)

53-53: Bug: poller-interval declared as Bool but should be numeric.

This binds to poller.interval and will break consumers expecting duration/ints.

- rootCmd.PersistentFlags().Bool("poller-interval", true, "Poller interval")
+ rootCmd.PersistentFlags().Int("poller-interval", 1000, "Poller interval in milliseconds")
internal/storage/postgres.go (3)

63-91: Broken SQL: missing WHERE before AND clauses.

query is initialized without WHERE, then “AND …” is appended, producing invalid SQL.

-query := `SELECT chain_id, block_number, last_error_timestamp, failure_count, reason 
-          FROM block_failures`
+query := `SELECT chain_id, block_number, last_error_timestamp, failure_count, reason 
+          FROM block_failures WHERE 1=1`

Also, the IN clause for BlockNumbers is string-concatenated. Prefer parameterized placeholders:

- blockNumberStrs := make([]string, len(qf.BlockNumbers))
- for i, bn := range qf.BlockNumbers { blockNumberStrs[i] = bn.String() }
- query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(blockNumberStrs, ","))
+ placeholders := make([]string, len(qf.BlockNumbers))
+ for i, bn := range qf.BlockNumbers {
+   argCount++
+   placeholders[i] = fmt.Sprintf("$%d", argCount)
+   args = append(args, bn.String())
+ }
+ query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(placeholders, ","))

350-356: Wrong table in DELETE subquery.

Deleting from block_data but selecting ctid from block_failures. Likely copy/paste bug.

-	FROM block_failures
+	FROM block_data

238-269: Persist normalized JSON to avoid null arrays.

Use BlockData.Serialize() before json.Marshal to ensure empty slices instead of nulls.

- blockDataJSON, err := json.Marshal(blockData)
+ blockDataJSON, err := json.Marshal(blockData.Serialize())
internal/orchestrator/poller.go (3)

248-257: Data race on lastPolledBlock write

Writes to p.lastPolledBlock must be protected; multiple workers run concurrently.

   endBlock := blockNumbers[len(blockNumbers)-1]
   if endBlock != nil {
-    p.lastPolledBlock = endBlock
+    p.lastPolledBlockMutex.Lock()
+    p.lastPolledBlock = endBlock
+    p.lastPolledBlockMutex.Unlock()
   }

328-333: Data race on lastPolledBlock read

Guard reads with RLock.

-  log.Debug().Msgf("Last polled block: %s", p.lastPolledBlock.String())
-  startBlock := new(big.Int).Add(p.lastPolledBlock, big.NewInt(1))
+  p.lastPolledBlockMutex.RLock()
+  lp := new(big.Int).Set(p.lastPolledBlock)
+  p.lastPolledBlockMutex.RUnlock()
+  log.Debug().Msgf("Last polled block: %s", lp.String())
+  startBlock := new(big.Int).Add(lp, big.NewInt(1))

385-391: Close worker to release archive resources

Worker.Close() closes the archive and its background goroutines; call it on shutdown.

 func (p *Poller) shutdown(cancel context.CancelFunc, tasks chan struct{}, wg *sync.WaitGroup) {
   cancel()
   close(tasks)
   wg.Wait()
+  if p.worker != nil {
+    _ = p.worker.Close()
+  }
   log.Info().Msg("Poller shutting down")
 }
internal/storage/clickhouse.go (3)

565-703: UNION + GROUP BY/ORDER/LIMIT composition may produce invalid or unstable queries

  • addPostQueryClauses wraps UNION with SELECT * FROM (%s) GROUP BY ... — selecting * with GROUP BY is invalid unless CH relaxes it; specify projected columns.
  • Final ordering for UNION uses only inner ORDER BY; outer result has no ORDER BY before final LIMIT, which is nondeterministic.

Minimal fix: apply ORDER BY at the outer level and pass columns to the wrapper.

- if len(qf.GroupBy) > 0 {
-   groupByClause := fmt.Sprintf(" GROUP BY %s", strings.Join(qf.GroupBy, ", "))
-   if strings.Contains(query, "UNION ALL") {
-     query = fmt.Sprintf("SELECT %s FROM (%s)%s", columns /* pass through */, query, groupByClause)
-   } else {
-     query += groupByClause
-   }
- }
+ // For UNION, wrap with explicit projection and apply GROUP BY/ORDER/LIMIT outside
+ // NOTE: requires threading 'columns' into addPostQueryClauses or handling in buildUnionQuery.

Alternatively, move GROUP BY/ORDER/LIMIT handling for UNION into buildUnionQuery and keep addPostQueryClauses for non-UNION queries only. I can provide a concrete patch if you decide the approach.


1022-1029: Trailing comma in IN list

getBlockNumbersStringArray returns a trailing comma, yielding invalid SQL: IN (1,2,3,).

 func getBlockNumbersStringArray(blockNumbers []*big.Int) string {
-  blockNumbersString := ""
-  for _, blockNumber := range blockNumbers {
-    blockNumbersString += fmt.Sprintf("%s,", blockNumber.String())
-  }
-  return blockNumbersString
+  if len(blockNumbers) == 0 {
+    return ""
+  }
+  parts := make([]string, 0, len(blockNumbers))
+  for _, bn := range blockNumbers {
+    parts = append(parts, bn.String())
+  }
+  return strings.Join(parts, ",")
 }

773-794: Enforce hex validation and switch to parameterized queries
The helpers createContractAddressClause and createWalletAddressClause currently interpolate unsanitized input via fmt.Sprintf, opening the door to SQL injection. Require contractAddress and walletAddress to be valid 0x-prefixed hex strings and pass them as bound parameters through the ClickHouse driver instead of string concatenation.

internal/orchestrator/committer.go (3)

323-333: Possible nil deref before error/zero checks

You log latestCommittedBlockNumber.String() before confirming err/nil. This can panic when storage returns (nil, nil) for empty tables.

Apply:

-	latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
-	log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
-	if err != nil {
+	latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
+	if err != nil {
 		return nil, err
 	}
+	if latestCommittedBlockNumber != nil {
+		log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
+	} else {
+		log.Debug().Msg("Committer found no max block in main storage (nil)")
+	}

329-338: Handle nil from GetMaxBlockNumber

Code assumes zero BigInt, but implementations may return nil. Treat nil as zero.

Apply:

-	if latestCommittedBlockNumber.Sign() == 0 {
+	if latestCommittedBlockNumber == nil || latestCommittedBlockNumber.Sign() == 0 {
 		// If no blocks have been committed yet, start from the fromBlock specified in the config
 		latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))

616-631: Ignore error from poller during gap handling

Failure to poll missing blocks is silently ignored.

Apply:

-	c.poller.Poll(ctx, missingBlockNumbers)
+	if err := c.poller.Poll(ctx, missingBlockNumbers); err != nil {
+		log.Error().Err(err).Msg("Poller error while handling gap")
+	}
cmd/migrate_valid.go (1)

473-487: Make GetValidBlocksFromRPC non-terminating and propagate errors

  • In cmd/migrate_valid.go (around L473), change
    func (m *Migrator) GetValidBlocksFromRPC(blockNumbers []*big.Int) []common.BlockData
    to
    func (m *Migrator) GetValidBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error)
    and replace each log.Fatal… with return nil, err (or formatted error for invalidBlocks).
  • At the call site (cmd/migrate_valid.go L239), update
    validMissingBlocks := m.GetValidBlocksFromRPC(missingBlocks)
    to
    validMissingBlocks, err := m.GetValidBlocksFromRPC(ctx, missingBlocks)
    if err != nil {
        log.Error().Err(err).Msgf("Worker %d: failed to fetch missing blocks", workerID)
        time.Sleep(3 * time.Second)
        continue
    }
internal/storage/connector.go (1)

305-379: Update both connector signatures and callers to use a value IOrchestratorStorage instead of a pointer

  • Change NewMainConnector signature in internal/storage/connector.go (line 305) to
    func NewMainConnector(cfg *config.StorageMainConfig, orchestratorStorage IOrchestratorStorage) (IMainStorage, error)
  • Change NewKafkaConnector signature in internal/storage/kafka.go (line 19) to
    func NewKafkaConnector(cfg *config.KafkaConfig, orchestratorStorage IOrchestratorStorage) (*KafkaConnector, error)
  • Update all callers of NewMainConnector (internal/storage/connector.go:178, internal/handlers/logs_handlers.go:228, cmd/migrate_valid.go:320) to drop the & when passing an IOrchestratorStorage value
♻️ Duplicate comments (1)
internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql (1)

26-45: Add projection keyed by (token_type, chain_id, address, token_id)

Per prior preference, optimize for queries at (token_type, chain_id, address, token_id).

From learnings (#240).

     PROJECTION address_state_projection (
@@
     ),
+    PROJECTION address_token_state_projection (
+        SELECT
+            token_type,
+            chain_id,
+            address,
+            token_id,
+            token_address,
+            count() AS transfer_count,
+            sum(toDecimal256(amount, 0)) AS total_amount,
+            min(block_number) AS min_block_number,
+            min(block_timestamp) AS min_block_timestamp,
+            max(block_number) AS max_block_number,
+            max(block_timestamp) AS max_block_timestamp
+        GROUP BY
+            token_type,
+            chain_id,
+            address,
+            token_id,
+            token_address
+    )
🧹 Nitpick comments (54)
internal/tools/clickhouse/0004_clickhouse_create_insert_null_table.sql (1)

97-99: Consider DateTime64 for higher precision on ingestion timestamps.

If you dedupe/order on insert_timestamp, millisecond precision helps during bursts.

-    insert_timestamp DateTime DEFAULT now(),
+    insert_timestamp DateTime64(3) DEFAULT now(),
internal/tools/clickhouse/0000_clickhouse_create_blocks_table.sql (1)

31-44: Add ORDER BY inside the aggregate projection for better pruning.

Explicit ORDER BY on group keys can improve projection read efficiency.

     PROJECTION chain_state_projection
     (
         SELECT
           chain_id,
           count() AS count,
           uniqExact(block_number) AS unique_block_count,
           min(block_number) AS min_block_number,
           min(block_timestamp) AS min_block_timestamp,
           max(block_number) AS max_block_number,
           max(block_timestamp) AS max_block_timestamp
         GROUP BY
           chain_id
+        ORDER BY
+          chain_id
     )
internal/tools/clickhouse/0001_clickhouse_create_transactions_table.sql (1)

9-11: to_address may be null for contract creations.

If upstream normalizes to zero-address it’s fine; otherwise consider Nullable(FixedString(42)) for fidelity with EVM semantics.

internal/tools/clickhouse/0011_clickhouse_create_address_transactions_mv.sql (1)

39-44: Propagating insert_timestamp/is_deleted is good; consider DateTime64 upstream.

If you adopt DateTime64 in sources, mirror it here automatically.

internal/tools/clickhouse/0002_clickhouse_create_logs_table.sql (1)

28-39: Projections: include minimal identifiers to avoid extra lookups when possible

Current projections only select _part_offset. If frequent queries need address/topic_0 plus block pointers without fetching base columns, add them here to reduce reads.

Example:

-        SELECT
-            _part_offset
+        SELECT
+            _part_offset, address, topic_0

Validate against your workload before adopting.

internal/tools/clickhouse/0010_clickhouse_create_address_transactions.sql (1)

45-59: Add per-direction address projection (address + address_type)

You have totals by address; many queries also slice by direction.

     PROJECTION address_total_count_projection
     (
         SELECT
           chain_id,
           address,
           count() AS tx_count,
           uniqExact(hash) AS unique_tx_count,
           min(block_number) AS min_block_number,
           min(block_timestamp) AS min_block_timestamp,
           max(block_number) AS max_block_number,
           max(block_timestamp) AS max_block_timestamp
         GROUP BY
           chain_id,
           address
     )
+    ,
+    PROJECTION address_direction_count_projection
+    (
+        SELECT
+          chain_id,
+          address,
+          address_type,
+          count() AS tx_count,
+          uniqExact(hash) AS unique_tx_count,
+          min(block_number) AS min_block_number,
+          min(block_timestamp) AS min_block_timestamp,
+          max(block_number) AS max_block_number,
+          max(block_timestamp) AS max_block_timestamp
+        GROUP BY
+          chain_id,
+          address,
+          address_type
+    )
internal/tools/clickhouse/0006_clickhouse_create_token_transfers.sql (1)

5-8: Normalize addresses to lower case at write-time for consistency.

Downstream queries/indexes benefit from canonical casing.

Example:

-`token_address` FixedString(42),
-`from_address` FixedString(42),
-`to_address` FixedString(42),
+`token_address` FixedString(42),
+`from_address` FixedString(42),
+`to_address` FixedString(42),

-- in MVs (0007), wrap with lower()

Also applies to: 11-13, 20-23

internal/tools/clickhouse/0008_clickhouse_create_token_balances.sql (2)

27-40: Consider adding final aggregate projections with MaterializedAggregateFunctions for fast reads.

You’re storing State columns; ensure there are complementary projections with ...Merge() for ready-to-read aggregates if queries expect finalized values.

Also applies to: 42-55


14-14: Use same timestamp codec as transfers for compression/scan efficiency.

Keep codecs consistent across sinks/sources.

-`block_timestamp` DateTime,
+`block_timestamp` DateTime CODEC(Delta(4), ZSTD(1)),
internal/tools/clickhouse/0007_clickhouse_create_token_transfers_mv.sql (2)

43-46: Explicitly cast ERC721 amount to UInt256.

Avoid implicit widening.

-  toUInt8(1) AS amount,
+  toUInt256(1) AS amount,

158-158: Use '=' instead of '==' for SQL equality.

For portability and consistency.

-  AND length(data) == 2 + 128;
+  AND length(data) = 2 + 128;
internal/tools/clickhouse/0009_clickhouse_create_token_balances_mv.sql (1)

53-61: Cast ERC721 deltas to Int256 to match sink type.

Prevents implicit casts and keeps types uniform.

-  -1 AS balance_delta,
+  -toInt256(1) AS balance_delta,
...
-  1 AS balance_delta,
+  toInt256(1) AS balance_delta,

Also applies to: 66-71

internal/tools/clickhouse_opts/0000_clickhouse_backfill_logs_transfer.sql (2)

145-168: Normalize equality operator and casing; keep rules consistent with realtime MVs.

  • Use = instead of ==.
  • Consider lower-casing across all token types or none (6909 currently differs).
-  AND length(data) == 2 + 128;
+  AND length(data) = 2 + 128;

Also either add lower() for ERC20/721/1155 addresses or remove it here for consistency.


171-201: Dangerous commented instructions: wrong table names.

token_balance/token_balance_* don’t exist in this PR; could mislead ops.

--- DROP TABLE token_transfers, token_balance;
+-- DROP TABLE token_transfers, token_balances;
--- DROP TABLE bf__token_transfers_erc20_mv, bf__token_transfers_erc721_mv, bf__token_transfers_erc1155_mv, bf__token_transfers_erc6909_mv;
+-- DROP TABLE bf__token_transfers_erc20_mv, bf__token_transfers_erc721_mv, bf__token_transfers_erc1155_single_mv, bf__token_transfers_erc1155_batch_mv, bf__token_transfers_erc6909_mv;
--- DROP TABLE token_transfers_erc20_mv, token_transfers_erc721_mv, token_transfers_erc1155_mv, token_transfers_erc6909_mv;
+-- DROP TABLE token_transfers_erc20_mv, token_transfers_erc721_mv, token_transfers_erc1155_single_mv, token_transfers_erc1155_batch_mv, token_transfers_erc6909_mv;
--- DROP TABLE token_balance_erc20_mv, token_balance_erc721_mv, token_balance_erc1155_mv, token_balance_erc6909_mv;
+-- DROP TABLE token_balances_erc20_mv, token_balances_erc721_mv, token_balances_erc1155_mv, token_balances_erc6909_mv;
test/mocks/MockIRPCClient.go (1)

1-1: Pin mock generator to avoid churn across environments.
Recommend adding a pinned mockery invocation (go:generate or Makefile) to keep versions consistent in CI/dev.

internal/handlers/logs_handlers.go (1)

206-213: Minor readability: avoid shadowing logger with variable named 'log'.
Renaming loop var prevents confusion with zerolog’s log.

-    for _, log := range logs {
-      decodedLog := log.Decode(eventABI)
+    for _, lg := range logs {
+      decodedLog := lg.Decode(eventABI)
       if decodedLog.Decoded.Name == "" || decodedLog.Decoded.Signature == "" {
         decodingCompletelySuccessful = false
       }
       decodedLogs = append(decodedLogs, decodedLog)
     }
configs/config.go (1)

328-333: Reduce duplicate logging of chain config.
Interface()+Msgf duplicates the payload; prefer structured + Msg.

-    log.Debug().
-      Interface("chainConfig", clickhouse.ChainBasedConfig).
-      Msgf("Loaded chain config %v", clickhouse.ChainBasedConfig)
+    log.Debug().
+      Interface("chainConfig", clickhouse.ChainBasedConfig).
+      Msg("Loaded chain config")
cmd/orchestrator.go (2)

36-43: Treat http.ErrServerClosed as benign for metrics server.
Avoid noisy error logs during graceful shutdown.

+   // Start Prometheus metrics server
    log.Info().Msg("Starting Metrics Server on port 2112")
    go func() {
        http.Handle("/metrics", promhttp.Handler())
-       if err := http.ListenAndServe(":2112", nil); err != nil {
-           log.Error().Err(err).Msg("Metrics server error")
-       }
+       if err := http.ListenAndServe(":2112", nil); err != nil && err != http.ErrServerClosed {
+           log.Error().Err(err).Msg("Metrics server error")
+       }
    }()

26-34: Avoid package-name shadowing for rpc and orchestrator vars.
Reduces confusion and accidental misuse.

-   rpc, err := rpc.Initialize()
+   rpcClient, err := rpc.Initialize()
    if err != nil {
        log.Fatal().Err(err).Msg("Failed to initialize RPC")
    }

-   orchestrator, err := orchestrator.NewOrchestrator(rpc)
+   orch, err := orchestrator.NewOrchestrator(rpcClient)
    if err != nil {
        log.Fatal().Err(err).Msg("Failed to create orchestrator")
    }
...
-   orchestrator.Start()
+   orch.Start()
internal/publisher/publisher.go (1)

74-79: Warn when using SASL/PLAIN without TLS

SASL PLAIN over plaintext exposes credentials. Emit a warning when TLS is disabled.

  if config.Cfg.Publisher.Username != "" && config.Cfg.Publisher.Password != "" {
     opts = append(opts, kgo.SASL(plain.Auth{
       User: config.Cfg.Publisher.Username,
       Pass: config.Cfg.Publisher.Password,
     }.AsMechanism()))
+    if !config.Cfg.Publisher.EnableTLS {
+      log.Warn().Msg("Kafka SASL/PLAIN configured without TLS; credentials may be exposed on the wire")
+    }
   }
internal/orchestrator/failure_recoverer.go (1)

125-133: Skip no-op writes when there’s nothing to store/delete

Avoid unnecessary DB calls when slices are empty.

-  if err := fr.storage.StagingStorage.StoreBlockFailures(newBlockFailures); err != nil {
+  if len(newBlockFailures) > 0 {
+    if err := fr.storage.StagingStorage.StoreBlockFailures(newBlockFailures); err != nil {
       log.Error().Err(err).Msg("Error storing block failures")
       return
-  }
-  if err := fr.storage.StagingStorage.DeleteBlockFailures(failuresToDelete); err != nil {
+    }
+  }
+  if len(failuresToDelete) > 0 {
+    if err := fr.storage.StagingStorage.DeleteBlockFailures(failuresToDelete); err != nil {
       log.Error().Err(err).Msg("Error deleting block failures")
       return
-  }
+    }
+  }
internal/source/source.go (1)

10-14: Go naming: drop the “I” prefix; consider io.Closer

Idiomatic Go prefers “Source” over “ISource”. Also consider embedding io.Closer or returning error from Close() if teardown can fail.

-type ISource interface {
+type Source interface {
   GetFullBlocks(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFullBlockResult
   GetSupportedBlockRange(ctx context.Context) (minBlockNumber *big.Int, maxBlockNumber *big.Int, err error)
-  Close()
+  Close()
 }
internal/orchestrator/committer_test.go (2)

350-351: Prefer assert.Eventually over manual select+timeout

Reduces flakiness and boilerplate.

-  select {
-  case <-deleteDone:
-  case <-time.After(2 * time.Second):
-    t.Fatal("DeleteStagingDataOlderThan was not called within timeout period")
-  }
+  assert.Eventually(t, func() bool {
+    select {
+    case <-deleteDone:
+      return true
+    default:
+      return false
+    }
+  }, 2*time.Second, 10*time.Millisecond)

429-429: Nit: comment clarifies expectations

The note about GetChainID being unused here helps future readers of the mock expectations.

internal/common/block.go (1)

103-120: Normalize nil slices: good; ensure callers use it where JSON is persisted.

Consider calling BlockData.Serialize() before persisting to staging/S3 to avoid null arrays leaking into stored JSON.

cmd/root.go (3)

69-70: Doc/default mismatch for poller-s3-maxCacheSize.

Help text says “default 5GB” but default is 0. Either update the text or set 5GB as default.

-rootCmd.PersistentFlags().Int64("poller-s3-maxCacheSize", 0, "Max cache size in bytes for poller archive source (default 5GB)")
+rootCmd.PersistentFlags().Int64("poller-s3-maxCacheSize", int64(5)*1024*1024*1024, "Max cache size in bytes for poller archive source (default 5GB)")

152-152: Nit: typo in Redis password flag description.

“orchestator” ➜ “orchestrator”; remove double space.

-rootCmd.PersistentFlags().String("storage-orchestrator-redis-password", "", "Redis password for orchestator storage  metadata")
+rootCmd.PersistentFlags().String("storage-orchestrator-redis-password", "", "Redis password for orchestrator storage metadata")

146-149: Standardize TLS flags to kebab-case -enable-tls.
Rename the two camelCase flags and their BindPFlag lookups in cmd/root.go:

  • storage-orchestrator-redis-enableTLS → storage-orchestrator-redis-enable-tls
  • migrator-destination-kafka-enableTLS → migrator-destination-kafka-enable-tls

Ensure you update the corresponding rootCmd.PersistentFlags().Bool(...) calls and the .Lookup(...) arguments in viper.BindPFlag.

internal/storage/postgres.go (1)

463-473: Confirm boundary semantics for DeleteStagingDataOlderThan.

Name suggests strictly “older than”, but query uses “<=” which also deletes the boundary block. If unintended, switch to “<”.

- AND block_number <= $2
+ AND block_number < $2
internal/storage/block_buffer_badger_test.go (1)

53-55: Compare big.Int values with Cmp to avoid pointer/reflect pitfalls.

Safer and idiomatic for numeric equality.

- assert.Equal(t, big.NewInt(101), maxBlock)
+ assert.Zero(t, maxBlock.Cmp(big.NewInt(101)))
@@
- assert.Equal(t, big.NewInt(99), chainStats.MinBlock)
- assert.Equal(t, big.NewInt(101), chainStats.MaxBlock)
+ assert.Zero(t, chainStats.MinBlock.Cmp(big.NewInt(99)))
+ assert.Zero(t, chainStats.MaxBlock.Cmp(big.NewInt(101)))
@@
- assert.Equal(t, big.NewInt(200), maxBlock)
+ assert.Zero(t, maxBlock.Cmp(big.NewInt(200)))

Also applies to: 66-68, 87-89

internal/orchestrator/orchestrator.go (1)

44-55: Always defer cancel to avoid context leak

Add a defer cancel() right after creation to guarantee resources are released even if nothing triggers o.cancel().

 func (o *Orchestrator) Start() {
   ctx, cancel := context.WithCancel(context.Background())
   o.cancel = cancel
+  defer cancel()
internal/orchestrator/poller.go (1)

198-206: Nil workModeChan would deadlock this select

If WithPollerWorkModeChan isn’t provided, case workMode := <-p.workModeChan blocks forever. Either ensure it’s always set (document) or guard with if p.workModeChan != nil. In this PR, orchestrator sets it, but add a comment or guard.

internal/rpc/batcher.go (1)

54-86: Ordering across chunks is nondeterministic

Results are appended as chunks complete; if callers rely on order matching input keys, process chunks sequentially or re-order at the end. Given worker sorts results later, this is likely fine. Document this behavior.

internal/storage/clickhouse.go (1)

115-123: Minor: Async insert settings

If async insert is enabled, consider making wait_for_async_insert configurable (some deployments prefer fire-and-forget).

internal/worker/worker.go (2)

337-346: Also close the RPC client

Free network resources when closing the worker (useful in tests or alternate lifecycles).

 func (w *Worker) Close() error {
   // Close archive if it exists
   if w.archive != nil {
     log.Debug().Msg("Closing archive connection")
     w.archive.Close()
   }

+  if w.rpc != nil {
+    w.rpc.Close()
+  }
   log.Debug().Msg("Worker closed successfully")
   return nil
 }

255-278: Archive decision is conservative (ALL blocks in range)

LGTM. Consider partial-archive + partial-RPC in future for large spans; not required now.

internal/storage/redis.go (1)

57-75: Prefer bounded contexts for Redis ops

All calls use context.Background(); add short timeouts to prevent hangs on network partitions. Consider a per-connector op timeout (e.g., 2–5s) and use context.WithTimeout in Get*/Set*/Close.

Also applies to: 77-81, 83-100, 107-124, 125-129, 131-134

internal/orchestrator/committer.go (1)

529-537: Message typo: “commit” vs “publish”

Wrong error string in getSequentialBlockDataToPublish.

Apply:

-		return nil, fmt.Errorf("error determining blocks to commit: %v", err)
+		return nil, fmt.Errorf("error determining blocks to publish: %v", err)
internal/storage/kafka_publisher.go (1)

292-299: Single-partition publishing limits throughput

Manual partitioner with Partition: 0 forces all traffic to one partition. If ordering requirements allow, prefer key-based partitioning to scale.

Option: remove ManualPartitioner and Partition=0; rely on key to preserve per-chain ordering.

-		kgo.RecordPartitioner(kgo.ManualPartitioner()),
+		// kgo.DefaultProducePartitioner uses key hashing to preserve key ordering
+		kgo.RecordPartitioner(kgo.DefaultProducePartitioner()),
-		Partition: 0,
internal/storage/block_buffer_badger.go (4)

289-313: Full-prefix scan in GetBlocksInRange; seek to start to reduce IO

After padding keys, you can seek directly to start and break once past end.

Apply:

-		for it.Rewind(); it.Valid(); it.Next() {
+		startKey := b.makeKey(chainId, startBlock)
+		for it.Seek(startKey); it.ValidForPrefix(prefix); it.Next() {
 			item := it.Item()
 			err := item.Value(func(val []byte) error {
 				var blockData common.BlockData
 				if err := gob.NewDecoder(bytes.NewReader(val)).Decode(&blockData); err != nil {
 					return err
 				}
 
 				blockNum := blockData.Block.Number
-				if blockNum.Cmp(startBlock) >= 0 && blockNum.Cmp(endBlock) <= 0 {
+				if blockNum.Cmp(endBlock) > 0 {
+					return nil
+				}
+				if blockNum.Cmp(startBlock) >= 0 {
 					result = append(result, blockData)
 				}
 				return nil
 			})

232-235: Report total on-disk size (LSM + vlog)

badger.DB.Size() returns (lsm, vlog); you currently ignore vlog.

Apply:

-	lsm, _ := b.db.Size()
-	return lsm, b.blockCount
+	lsm, vlog := b.db.Size()
+	return lsm + vlog, b.blockCount

107-121: Potential overcount on duplicate keys

Add increments blockCount and per-chain counts without checking if the key already exists; repeated Add of same block skews stats/flush thresholds.

Consider WriteBatch with Insert-only semantics or a pre-check to avoid double-counting, if duplicates are expected in practice.

Also applies to: 128-157


51-70: Comment/value mismatch

ValueThreshold comment says “> 512 bytes” but code sets 1024. Align comment or value.

internal/source/s3.go (4)

99-107: Confusing comment vs. value for MaxCacheSize

Comment says “Increased from 5GB to 10GB” but value is 5GB.

Apply this diff to make intent explicit:

-	if cfg.MaxCacheSize == 0 {
-		cfg.MaxCacheSize = 5 * 1024 * 1024 * 1024 // Increased from 5GB to 10GB
-	}
+	if cfg.MaxCacheSize == 0 {
+		cfg.MaxCacheSize = 10 * 1024 * 1024 * 1024 // 10GB default
+	}

901-906: Reduce collision risk in cache filenames

Only 16 hex chars (~64 bits) increases collision risk. Use full hash.

Apply this diff:

-	hash := sha256.Sum256([]byte(fileKey))
-	filename := hex.EncodeToString(hash[:])[:16] + ".parquet"
+	hash := sha256.Sum256([]byte(fileKey))
+	filename := hex.EncodeToString(hash[:]) + ".parquet"
 	return filepath.Join(s.cacheDir, filename)

575-660: Block index is built but never used

buildBlockIndex fills s.blockIndex but read path ignores it. Delete it or wire it into selective reads; otherwise it’s wasted work.

Do you want me to remove it or integrate it with readBlocksFromLocalFile?


158-170: Use a context with timeout for AWS config load

LoadDefaultConfig uses context.Background(). Prefer a bounded context.

Apply this diff:

-	awsCfg, err := awsconfig.LoadDefaultConfig(context.Background(),
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+	awsCfg, err := awsconfig.LoadDefaultConfig(ctx,
 		awsconfig.WithRegion(cfg.Region),
 	)
internal/storage/kafka.go (1)

134-137: Fix doc comment

This closes Kafka publisher, not Redis.

Apply this diff:

-// Close closes the Redis connection
+// Close closes the Kafka publisher
cmd/migrate_valid.go (1)

151-161: Remainder distribution can be simpler

Minor: avoid big.Int allocations in loop.

Apply this diff:

-	remainder := new(big.Int).Mod(totalBlocks, big.NewInt(int64(numWorkers)))
+	remainder := int(new(big.Int).Mod(totalBlocks, big.NewInt(int64(numWorkers))).Int64())
@@
-		if big.NewInt(int64(i)).Cmp(remainder) < 0 {
+		if i < remainder {
 			workerBlockCount.Add(workerBlockCount, big.NewInt(1))
 		}
internal/storage/badger.go (2)

126-134: Iterator perf: disable value prefetch when scanning keys

For scans where most entries are filtered/skipped, set PrefetchValues = false to reduce I/O.

Apply this diff:

-		opts := badger.DefaultIteratorOptions
+		opts := badger.DefaultIteratorOptions
+		opts.PrefetchValues = false

494-526: Avoid building large deletion lists; delete inline

To reduce memory spikes on large datasets, delete as you iterate (using KeyCopy(nil)).

Apply this diff:

-		var keysToDelete [][]byte
 		for it.Rewind(); it.Valid(); it.Next() {
 			key := string(it.Item().Key())
 			parts := strings.Split(key, ":")
@@
-			if blockNum.Cmp(blockNumber) <= 0 {
-				keysToDelete = append(keysToDelete, it.Item().KeyCopy(nil))
-			}
+			if blockNum.Cmp(blockNumber) <= 0 {
+				if err := txn.Delete(it.Item().KeyCopy(nil)); err != nil {
+					return err
+				}
+			}
 		}
-
-		for _, key := range keysToDelete {
-			if err := txn.Delete(key); err != nil {
-				return err
-			}
-		}
internal/storage/connector.go (1)

63-67: TODO in exported type

Minor: remove or rephrase the TODO in QueryResult before merging.

internal/storage/block_buffer.go (1)

63-90: Consider using a more efficient size estimation approach

Encoding the entire batch with gob on every Add call could become a performance bottleneck for large batches. Consider estimating size based on known field sizes or using a running average.

Consider implementing a size estimation based on field analysis or maintaining a running average of bytes-per-block ratio from successful encodings. This would avoid the overhead of gob encoding on every Add call while still providing reasonable accuracy for buffer management.

internal/storage/s3.go (1)

71-78: Log a warning when explicit AWS credentials are used
Add a warning under the explicit-credentials branch in internal/storage/s3.go to discourage static credentials in favor of IAM roles:

--- a/internal/storage/s3.go
+++ b/internal/storage/s3.go
@@ -71,6 +71,7 @@
    if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
+       log.Warn().Msg("Using explicit AWS credentials from config; consider using IAM roles for enhanced security.")
        awsCfg.Credentials = aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) {
            return aws.Credentials{
                AccessKeyID:     cfg.AccessKeyID,
                SecretAccessKey: cfg.SecretAccessKey,
            }, nil
        })
    }

No occurrences of logging these credentials were found in the codebase.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 3a03423 and e551c17.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (54)
  • cmd/migrate_valid.go (5 hunks)
  • cmd/orchestrator.go (1 hunks)
  • cmd/root.go (15 hunks)
  • configs/config.go (8 hunks)
  • go.mod (6 hunks)
  • internal/common/block.go (2 hunks)
  • internal/handlers/logs_handlers.go (1 hunks)
  • internal/orchestrator/committer.go (14 hunks)
  • internal/orchestrator/committer_test.go (5 hunks)
  • internal/orchestrator/failure_recoverer.go (2 hunks)
  • internal/orchestrator/orchestrator.go (2 hunks)
  • internal/orchestrator/poller.go (7 hunks)
  • internal/orchestrator/validator.go (2 hunks)
  • internal/publisher/publisher.go (1 hunks)
  • internal/rpc/batcher.go (2 hunks)
  • internal/rpc/rpc.go (2 hunks)
  • internal/source/s3.go (1 hunks)
  • internal/source/source.go (1 hunks)
  • internal/storage/badger.go (1 hunks)
  • internal/storage/block_buffer.go (1 hunks)
  • internal/storage/block_buffer_badger.go (1 hunks)
  • internal/storage/block_buffer_badger_test.go (1 hunks)
  • internal/storage/clickhouse.go (5 hunks)
  • internal/storage/connector.go (3 hunks)
  • internal/storage/kafka.go (1 hunks)
  • internal/storage/kafka_publisher.go (1 hunks)
  • internal/storage/postgres.go (2 hunks)
  • internal/storage/redis.go (1 hunks)
  • internal/storage/s3.go (1 hunks)
  • internal/tools/clickhouse/0000_clickhouse_create_blocks_table.sql (1 hunks)
  • internal/tools/clickhouse/0001_clickhouse_create_block_failures_table.sql (0 hunks)
  • internal/tools/clickhouse/0001_clickhouse_create_transactions_table.sql (1 hunks)
  • internal/tools/clickhouse/0002_clickhouse_create_cursors_table.sql (0 hunks)
  • internal/tools/clickhouse/0002_clickhouse_create_logs_table.sql (1 hunks)
  • internal/tools/clickhouse/0003_clickhouse_create_staging_table.sql (0 hunks)
  • internal/tools/clickhouse/0003_clickhouse_create_traces_table.sql (1 hunks)
  • internal/tools/clickhouse/0004_clickhouse_create_insert_null_table.sql (2 hunks)
  • internal/tools/clickhouse/0005_clickhouse_create_insert_data_mv.sql (5 hunks)
  • internal/tools/clickhouse/0006_clickhouse_create_logs_table.sql (0 hunks)
  • internal/tools/clickhouse/0006_clickhouse_create_token_transfers.sql (1 hunks)
  • internal/tools/clickhouse/0007_clickhouse_create_token_transfers_mv.sql (1 hunks)
  • internal/tools/clickhouse/0008_clickhouse_create_token_balances.sql (1 hunks)
  • internal/tools/clickhouse/0009_clickhouse_create_token_balances_mv.sql (1 hunks)
  • internal/tools/clickhouse/0010_clickhouse_create_address_transactions.sql (2 hunks)
  • internal/tools/clickhouse/0010_clickhouse_create_token_transfers_mv.sql (0 hunks)
  • internal/tools/clickhouse/0011_clickhouse_create_address_transactions_mv.sql (1 hunks)
  • internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql (1 hunks)
  • internal/tools/clickhouse/0013_clickhouse_create_address_transfers_mv.sql (1 hunks)
  • internal/tools/clickhouse_opts/0000_clickhouse_backfill_logs_transfer.sql (1 hunks)
  • internal/worker/worker.go (2 hunks)
  • test/mocks/MockIMainStorage.go (3 hunks)
  • test/mocks/MockIOrchestratorStorage.go (4 hunks)
  • test/mocks/MockIRPCClient.go (1 hunks)
  • test/mocks/MockIStagingStorage.go (4 hunks)
💤 Files with no reviewable changes (5)
  • internal/tools/clickhouse/0001_clickhouse_create_block_failures_table.sql
  • internal/tools/clickhouse/0003_clickhouse_create_staging_table.sql
  • internal/tools/clickhouse/0006_clickhouse_create_logs_table.sql
  • internal/tools/clickhouse/0010_clickhouse_create_token_transfers_mv.sql
  • internal/tools/clickhouse/0002_clickhouse_create_cursors_table.sql
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-22T18:30:59.800Z
Learnt from: iuwqyir
PR: thirdweb-dev/insight#240
File: internal/tools/clickhouse_create_token_balances_mv.sql:8-17
Timestamp: 2025-07-22T18:30:59.800Z
Learning: In the token_balances table projection design, the user prefers to have the address_projection optimized for queries by (token_type, chain_id, address, token_id) without including owner, as the main table is already optimized for owner-based query paths.

Applied to files:

  • internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql
  • internal/tools/clickhouse/0008_clickhouse_create_token_balances.sql
  • internal/tools/clickhouse/0006_clickhouse_create_token_transfers.sql
  • internal/tools/clickhouse/0010_clickhouse_create_address_transactions.sql
  • internal/tools/clickhouse/0009_clickhouse_create_token_balances_mv.sql
🧬 Code graph analysis (20)
internal/source/source.go (1)
internal/rpc/rpc.go (1)
  • GetFullBlockResult (18-22)
internal/rpc/batcher.go (2)
internal/rpc/rpc.go (1)
  • Client (57-66)
internal/common/utils.go (1)
  • SliceToChunks (7-20)
internal/storage/block_buffer_badger_test.go (3)
internal/storage/block_buffer_badger.go (1)
  • NewBadgerBlockBuffer (41-96)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
internal/storage/block_buffer.go (1)
  • ChainStats (257-261)
internal/orchestrator/failure_recoverer.go (1)
internal/storage/connector.go (1)
  • QueryFilter (11-29)
internal/common/block.go (3)
internal/common/transaction.go (1)
  • Transaction (16-51)
internal/common/log.go (1)
  • Log (15-31)
internal/common/trace.go (1)
  • Trace (8-32)
internal/storage/block_buffer.go (2)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
internal/storage/block_buffer_badger.go (1)
  • NewBadgerBlockBuffer (41-96)
internal/storage/block_buffer_badger.go (2)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
internal/storage/block_buffer.go (3)
  • BufferStats (249-254)
  • ChainStats (257-261)
  • IBlockBuffer (24-37)
internal/orchestrator/orchestrator.go (1)
internal/orchestrator/poller.go (4)
  • NewPoller (100-132)
  • WithPollerWorkModeChan (46-50)
  • WithPollerS3Source (52-66)
  • Poller (23-37)
internal/handlers/logs_handlers.go (1)
internal/storage/connector.go (1)
  • NewMainConnector (305-379)
internal/rpc/rpc.go (1)
internal/rpc/batcher.go (2)
  • RPCFetchSingleBatchWithRetry (88-153)
  • RPCFetchInBatchesWithRetry (54-86)
internal/worker/worker.go (4)
internal/rpc/rpc.go (2)
  • IRPCClient (42-55)
  • GetFullBlockResult (18-22)
internal/source/source.go (1)
  • ISource (10-14)
internal/common/utils.go (1)
  • SliceToChunks (7-20)
internal/metrics/metrics.go (1)
  • LastFetchedBlock (37-40)
internal/storage/s3.go (7)
internal/storage/block_buffer.go (2)
  • IBlockBuffer (24-37)
  • NewBlockBuffer (40-46)
internal/common/block.go (3)
  • BlockData (61-66)
  • Block (8-33)
  • BlockHeader (68-72)
internal/storage/block_buffer_badger.go (1)
  • NewBadgerBlockBuffer (41-96)
internal/common/transaction.go (1)
  • Transaction (16-51)
internal/common/log.go (1)
  • Log (15-31)
internal/common/trace.go (1)
  • Trace (8-32)
internal/storage/connector.go (2)
  • QueryFilter (11-29)
  • QueryResult (63-67)
internal/orchestrator/poller.go (4)
internal/worker/worker.go (3)
  • Worker (35-39)
  • NewWorkerWithArchive (49-55)
  • NewWorker (41-46)
internal/source/s3.go (1)
  • NewS3Source (91-173)
internal/rpc/rpc.go (1)
  • IRPCClient (42-55)
internal/storage/connector.go (1)
  • IStorage (69-73)
internal/storage/kafka_publisher.go (1)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
internal/source/s3.go (6)
internal/rpc/rpc.go (2)
  • Client (57-66)
  • GetFullBlockResult (18-22)
internal/common/block.go (2)
  • Block (8-33)
  • BlockData (61-66)
internal/common/trace.go (1)
  • Trace (8-32)
internal/storage/s3.go (1)
  • ParquetBlockData (51-60)
internal/common/transaction.go (1)
  • Transaction (16-51)
internal/common/log.go (1)
  • Log (15-31)
internal/orchestrator/committer.go (2)
internal/rpc/rpc.go (1)
  • IRPCClient (42-55)
internal/orchestrator/poller.go (2)
  • Poller (23-37)
  • NewBoundlessPoller (68-96)
internal/storage/badger.go (3)
internal/storage/connector.go (1)
  • QueryFilter (11-29)
internal/common/block_failures.go (1)
  • BlockFailure (8-14)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
internal/storage/connector.go (6)
internal/storage/redis.go (1)
  • NewRedisConnector (28-55)
internal/storage/postgres.go (1)
  • NewPostgresConnector (22-58)
internal/storage/clickhouse.go (1)
  • NewClickHouseConnector (66-79)
internal/storage/badger.go (1)
  • NewBadgerConnector (27-65)
internal/storage/kafka.go (1)
  • NewKafkaConnector (19-35)
internal/storage/s3.go (1)
  • NewS3Connector (62-141)
test/mocks/MockIStagingStorage.go (2)
internal/common/block_failures.go (1)
  • BlockFailure (8-14)
internal/storage/connector.go (1)
  • QueryFilter (11-29)
internal/storage/kafka.go (3)
internal/storage/kafka_publisher.go (2)
  • KafkaPublisher (20-23)
  • NewKafkaPublisher (60-111)
internal/storage/connector.go (5)
  • IOrchestratorStorage (100-109)
  • BalancesQueryFilter (48-61)
  • QueryResult (63-67)
  • TransfersQueryFilter (31-46)
  • QueryFilter (11-29)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)

Comment on lines +65 to +71
rootCmd.PersistentFlags().String("poller-s3-format", "parquet", "S3 storage format for poller archive source")
rootCmd.PersistentFlags().String("poller-s3-cacheDir", "/tmp/insight-archive", "Local cache directory for poller archive source")
rootCmd.PersistentFlags().Int("poller-s3-metadataTTL", 0, "Metadata cache TTL in seconds for poller archive source")
rootCmd.PersistentFlags().Int("poller-s3-fileCacheTTL", 0, "File cache TTL in seconds for poller archive source")
rootCmd.PersistentFlags().Int64("poller-s3-maxCacheSize", 0, "Max cache size in bytes for poller archive source (default 5GB)")
rootCmd.PersistentFlags().Int("poller-s3-cleanupInterval", 0, "Cache cleanup interval in seconds for poller archive source")
rootCmd.PersistentFlags().Int("poller-s3-maxConcurrentDownloads", 3, "Max concurrent downloads for poller archive source")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing viper binding for poller.s3.format.

Flag is defined but never bound; value won’t flow into config.

 // existing s3 poller binds...
+viper.BindPFlag("poller.s3.format", rootCmd.PersistentFlags().Lookup("poller-s3-format"))

Also applies to: 265-277

🤖 Prompt for AI Agents
In cmd/root.go around lines 65-71 (and similarly for flags at 265-277), the CLI
flags for poller S3 options are registered but not bound to viper, so config
values from flags won't flow into viper keys; call viper.BindPFlag for each flag
mapping the flag name to the appropriate viper key (e.g., bind
"poller-s3-format" to "poller.s3.format") after registering the flag, and repeat
for poller-s3-cacheDir -> poller.s3.cacheDir, poller-s3-metadataTTL ->
poller.s3.metadataTTL, poller-s3-fileCacheTTL -> poller.s3.fileCacheTTL,
poller-s3-maxCacheSize -> poller.s3.maxCacheSize, poller-s3-cleanupInterval ->
poller.s3.cleanupInterval, and poller-s3-maxConcurrentDownloads ->
poller.s3.maxConcurrentDownloads so viper picks up flag values.

Comment on lines 224 to 229
func getMainStorage() (storage.IMainStorage, error) {
storageOnce.Do(func() {
var err error
mainStorage, err = storage.NewConnector[storage.IMainStorage](&config.Cfg.Storage.Main)
// TODO: move this to a QueryConnector later to decouple read/write connector
mainStorage, err = storage.NewMainConnector(&config.Cfg.Storage.Main, nil)
if err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid selecting Kafka for read paths; prefer queryable backends.
NewMainConnector requires orchestrator storage for Kafka and Kafka isn’t queryable here. In auto mode this can 500 if Kafka is configured.

Apply:

 func getMainStorage() (storage.IMainStorage, error) {
   storageOnce.Do(func() {
     var err error
-    // TODO: move this to a QueryConnector later to decouple read/write connector
-    mainStorage, err = storage.NewMainConnector(&config.Cfg.Storage.Main, nil)
+    // TODO: move this to a QueryConnector later to decouple read/write connector
+    // Prefer queryable backends for read APIs; ignore Kafka in auto mode.
+    cfg := config.Cfg.Storage.Main
+    if cfg.Type == "" || cfg.Type == "auto" {
+      cfg.Kafka = nil
+    }
+    mainStorage, err = storage.NewMainConnector(&cfg, nil)
     if err != nil {
       storageErr = err
       log.Error().Err(err).Msg("Error creating storage connector")
     }
   })
   return mainStorage, storageErr
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func getMainStorage() (storage.IMainStorage, error) {
storageOnce.Do(func() {
var err error
mainStorage, err = storage.NewConnector[storage.IMainStorage](&config.Cfg.Storage.Main)
// TODO: move this to a QueryConnector later to decouple read/write connector
mainStorage, err = storage.NewMainConnector(&config.Cfg.Storage.Main, nil)
if err != nil {
func getMainStorage() (storage.IMainStorage, error) {
storageOnce.Do(func() {
var err error
// TODO: move this to a QueryConnector later to decouple read/write connector
// Prefer queryable backends for read APIs; ignore Kafka in auto mode.
cfg := config.Cfg.Storage.Main
if cfg.Type == "" || cfg.Type == "auto" {
cfg.Kafka = nil
}
mainStorage, err = storage.NewMainConnector(&cfg, nil)
if err != nil {
storageErr = err
log.Error().Err(err).Msg("Error creating storage connector")
}
})
return mainStorage, storageErr
}
🤖 Prompt for AI Agents
In internal/handlers/logs_handlers.go around lines 224 to 229, the code uses
storage.NewMainConnector which can select Kafka (non-queryable) for read paths
and cause 500s; replace this with the queryable connector (e.g.,
storage.NewQueryConnector or the storage API that returns a query-capable
backend) so read paths never pick Kafka, or explicitly force the connector
selection to a queryable backend in auto mode (fall back to a DB/readonly store
if Kafka is configured). Keep the same error handling and ensure the chosen
connector is suitable for queries.

Comment on lines 646 to 652
blocksToPoll := blocksToCommit
if len(blocksToCommit) > int(poller.blocksPerPoll) {
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
if len(blocksToCommit) > int(c.poller.blocksPerPoll) {
blocksToPoll = blocksToCommit[:int(c.poller.blocksPerPoll)]
}
poller.Poll(ctx, blocksToPoll)
c.poller.Poll(ctx, blocksToPoll)
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ignore error from poller when staging data is missing

Same as above.

Apply:

-	c.poller.Poll(ctx, blocksToPoll)
+	if err := c.poller.Poll(ctx, blocksToPoll); err != nil {
+		log.Error().Err(err).Msg("Poller error while backfilling missing staging data")
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
blocksToPoll := blocksToCommit
if len(blocksToCommit) > int(poller.blocksPerPoll) {
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
if len(blocksToCommit) > int(c.poller.blocksPerPoll) {
blocksToPoll = blocksToCommit[:int(c.poller.blocksPerPoll)]
}
poller.Poll(ctx, blocksToPoll)
c.poller.Poll(ctx, blocksToPoll)
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
}
blocksToPoll := blocksToCommit
if len(blocksToCommit) > int(c.poller.blocksPerPoll) {
blocksToPoll = blocksToCommit[:int(c.poller.blocksPerPoll)]
}
if err := c.poller.Poll(ctx, blocksToPoll); err != nil {
log.Error().Err(err).Msg("Poller error while backfilling missing staging data")
}
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s",
len(blocksToPoll),
blocksToPoll[0].String(),
blocksToPoll[len(blocksToPoll)-1].String(),
)
}

Comment on lines +136 to 144
// Waiting for all goroutines to complete
o.wg.Wait()

func (o *Orchestrator) Shutdown() {
if o.cancel != nil {
o.cancel()
if err := o.storage.Close(); err != nil {
log.Error().Err(err).Msg("Error closing storage connections")
}

log.Info().Msg("Orchestrator shutdown complete")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Stop signal delivery and close RPC/storage on shutdown

  • Call signal.Stop(sigChan) to avoid leaking the signal notifier after return.
  • Close the RPC client as IRPCClient exposes Close().
   // Waiting for all goroutines to complete
   o.wg.Wait()

   if err := o.storage.Close(); err != nil {
     log.Error().Err(err).Msg("Error closing storage connections")
   }
+  signal.Stop(sigChan)
+  if o.rpc != nil {
+    o.rpc.Close()
+  }

   log.Info().Msg("Orchestrator shutdown complete")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Waiting for all goroutines to complete
o.wg.Wait()
func (o *Orchestrator) Shutdown() {
if o.cancel != nil {
o.cancel()
if err := o.storage.Close(); err != nil {
log.Error().Err(err).Msg("Error closing storage connections")
}
log.Info().Msg("Orchestrator shutdown complete")
}
// Waiting for all goroutines to complete
o.wg.Wait()
if err := o.storage.Close(); err != nil {
log.Error().Err(err).Msg("Error closing storage connections")
}
signal.Stop(sigChan)
if o.rpc != nil {
o.rpc.Close()
}
log.Info().Msg("Orchestrator shutdown complete")
}
🤖 Prompt for AI Agents
internal/orchestrator/orchestrator.go around lines 136 to 144: after wg.Wait()
ensure you stop the OS signal notifier and close RPC before closing storage to
avoid leaks and dangling connections; call signal.Stop(sigChan) (where sigChan
is the channel used to receive shutdown signals) to unregister the notifier,
then if o.rpc != nil call o.rpc.Close() and handle/log any error, then proceed
to close storage as currently implemented and log shutdown completion.

Comment on lines +52 to +66
func WithPollerS3Source(cfg *config.S3SourceConfig) PollerOption {
return func(p *Poller) {
if cfg == nil || cfg.Region == "" || cfg.Bucket == "" {
return
}

source, err := source.NewS3Source(cfg, p.rpc.GetChainID())
if err != nil {
log.Fatal().Err(err).Msg("Failed to create S3 source")
}

log.Info().Msg("Poller S3 source configuration detected, setting up S3 source for poller")
p.worker = worker.NewWorkerWithArchive(p.rpc, source)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Do not terminate the process from a library; avoid name shadowing

  • Replace log.Fatal() with log.Error() and gracefully fall back to RPC worker. Avoid shadowing the source import with a local source variable name.
-func WithPollerS3Source(cfg *config.S3SourceConfig) PollerOption {
+func WithPollerS3Source(cfg *config.S3SourceConfig) PollerOption {
   return func(p *Poller) {
     if cfg == nil || cfg.Region == "" || cfg.Bucket == "" {
       return
     }

-    source, err := source.NewS3Source(cfg, p.rpc.GetChainID())
+    s3src, err := source.NewS3Source(cfg, p.rpc.GetChainID())
     if err != nil {
-      log.Fatal().Err(err).Msg("Failed to create S3 source")
+      log.Error().Err(err).Msg("Failed to create S3 source; falling back to RPC")
+      return
     }

     log.Info().Msg("Poller S3 source configuration detected, setting up S3 source for poller")
-    p.worker = worker.NewWorkerWithArchive(p.rpc, source)
+    p.worker = worker.NewWorkerWithArchive(p.rpc, s3src)
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func WithPollerS3Source(cfg *config.S3SourceConfig) PollerOption {
return func(p *Poller) {
if cfg == nil || cfg.Region == "" || cfg.Bucket == "" {
return
}
source, err := source.NewS3Source(cfg, p.rpc.GetChainID())
if err != nil {
log.Fatal().Err(err).Msg("Failed to create S3 source")
}
log.Info().Msg("Poller S3 source configuration detected, setting up S3 source for poller")
p.worker = worker.NewWorkerWithArchive(p.rpc, source)
}
}
func WithPollerS3Source(cfg *config.S3SourceConfig) PollerOption {
return func(p *Poller) {
if cfg == nil || cfg.Region == "" || cfg.Bucket == "" {
return
}
s3src, err := source.NewS3Source(cfg, p.rpc.GetChainID())
if err != nil {
log.Error().Err(err).Msg("Failed to create S3 source; falling back to RPC")
return
}
log.Info().Msg("Poller S3 source configuration detected, setting up S3 source for poller")
p.worker = worker.NewWorkerWithArchive(p.rpc, s3src)
}
}
🤖 Prompt for AI Agents
In internal/orchestrator/poller.go around lines 52 to 66, the function currently
calls log.Fatal on S3 source creation and shadows the imported source package
with a local variable; change it to use log.Error (or log.Err) so the library
does not terminate the process, rename the local variable to avoid shadowing
(e.g., s3Src or s3Source), and on error simply log and return so the caller
keeps the default RPC worker; only set p.worker when the S3 source was created
successfully.

Comment on lines +21 to +24
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_address_type address_type TYPE bloom_filter GRANULARITY 3,
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 4,
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 4,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Index token_address (and optionally token_type) for common filter paths

Token-address filters are frequent; add bloom indexes to avoid full scans under (chain_id, address, …) sort.

     INDEX idx_address_type address_type TYPE bloom_filter GRANULARITY 3,
     INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 4,
     INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 4,
+    INDEX idx_token_address token_address TYPE bloom_filter GRANULARITY 3,
+    INDEX idx_token_type token_type TYPE bloom_filter GRANULARITY 4,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_address_type address_type TYPE bloom_filter GRANULARITY 3,
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 4,
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 4,
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_address_type address_type TYPE bloom_filter GRANULARITY 3,
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 4,
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 4,
INDEX idx_token_address token_address TYPE bloom_filter GRANULARITY 3,
INDEX idx_token_type token_type TYPE bloom_filter GRANULARITY 4,
🤖 Prompt for AI Agents
In internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql around
lines 21 to 24, add a bloom index on token_address (and optionally token_type)
to speed up common (chain_id, address, ...) filter paths; insert lines such as
an INDEX idx_token_address token_address TYPE bloom_filter GRANULARITY 4 (and if
desired INDEX idx_token_type token_type TYPE bloom_filter GRANULARITY 3)
alongside the existing indexes so token-based queries avoid full table scans.

Comment on lines +33 to +38
count() AS transfer_count,
sum(toInt256(amount)) AS total_amount,
min(block_number) AS min_block_number,
min(block_timestamp) AS min_block_timestamp,
max(block_number) AS max_block_number,
max(block_timestamp) AS max_block_timestamp
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid toInt256(amount); use Decimal256 for safe aggregation

Casting UInt256→Int256 risks overflow. Aggregate as Decimal256(precision=76, scale=0).

-            sum(toInt256(amount)) AS total_amount,
+            sum(toDecimal256(amount, 0)) AS total_amount,

Apply the same change in the second projection below.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
count() AS transfer_count,
sum(toInt256(amount)) AS total_amount,
min(block_number) AS min_block_number,
min(block_timestamp) AS min_block_timestamp,
max(block_number) AS max_block_number,
max(block_timestamp) AS max_block_timestamp
count() AS transfer_count,
sum(toDecimal256(amount, 0)) AS total_amount,
min(block_number) AS min_block_number,
min(block_timestamp) AS min_block_timestamp,
max(block_number) AS max_block_number,
max(block_timestamp) AS max_block_timestamp
🤖 Prompt for AI Agents
In internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql around
lines 33 to 38, the code casts amount with toInt256(amount) which risks
overflow; replace those casts with toDecimal256(amount, 76, 0) (Decimal256
precision=76, scale=0) for the total_amount aggregation, and apply the same
replacement in the second projection later in the file so all aggregations use
Decimal256 instead of Int256.

Comment on lines +52 to +57
count() AS transfer_count,
sum(toInt256(amount)) AS total_amount,
min(block_number) AS min_block_number,
min(block_timestamp) AS min_block_timestamp,
max(block_number) AS max_block_number,
max(block_timestamp) AS max_block_timestamp
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Apply Decimal256 aggregation here as well

Mirror the safe sum change in the total-state projection.

-            sum(toInt256(amount)) AS total_amount,
+            sum(toDecimal256(amount, 0)) AS total_amount,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
count() AS transfer_count,
sum(toInt256(amount)) AS total_amount,
min(block_number) AS min_block_number,
min(block_timestamp) AS min_block_timestamp,
max(block_number) AS max_block_number,
max(block_timestamp) AS max_block_timestamp
count() AS transfer_count,
sum(toDecimal256(amount, 0)) AS total_amount,
min(block_number) AS min_block_number,
min(block_timestamp) AS min_block_timestamp,
max(block_number) AS max_block_number,
max(block_timestamp) AS max_block_timestamp
🤖 Prompt for AI Agents
In internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql around
lines 52 to 57, the total_amount aggregation still uses sum(toInt256(amount))
instead of the Decimal256-safe aggregation used in the total-state projection;
replace the current sum aggregation with the Decimal256 variant used in the
other projection (e.g., sum(toDecimal256(amount, <scale>)) or the exact helper
used there) so the scale and casting match the total-state implementation and
avoid overflow/precision differences.

Comment on lines +9 to +11
address_tuple.1 AS address,
address_tuple.2 AS address_type,
from_address,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Cast address_type explicitly to Enum8 to avoid implicit-cast surprises

Relying on String→Enum auto-cast can break if enum labels change. Cast explicitly.

-    address_tuple.2 AS address_type,
+    CAST(address_tuple.2, 'Enum8(\'from\' = 1, \'to\' = 2)') AS address_type,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
address_tuple.1 AS address,
address_tuple.2 AS address_type,
from_address,
address_tuple.1 AS address,
CAST(address_tuple.2, 'Enum8(\'from\' = 1, \'to\' = 2)') AS address_type,
from_address,
🤖 Prompt for AI Agents
internal/tools/clickhouse/0013_clickhouse_create_address_transfers_mv.sql around
lines 9-11: the SELECT currently uses address_tuple.2 AS address_type which
relies on implicit String→Enum casting; replace it with an explicit cast to the
Enum8 type used for address_type (e.g. CAST(address_tuple.2 AS Enum8(...)) or
CAST(address_tuple.2 AS your_existing_Enum8_type)) so the query uses a stable,
explicit enum conversion and avoids implicit-cast breakage if labels change.

Comment on lines +280 to 334
// Run processes blocks using either archive OR rpc
func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFullBlockResult {
if len(blockNumbers) == 0 {
return nil
}

var results []rpc.GetFullBlockResult

// Determine which source to use
sourceType := SourceTypeRPC
fetchFunc := w.fetchFromRPC

if w.shouldUseArchive(ctx, blockNumbers) {
sourceType = SourceTypeArchive
fetchFunc = w.fetchFromArchive
log.Debug().
Int("count", len(blockNumbers)).
Str("source", sourceType.String()).
Msg("Using archive for all blocks")
} else {
log.Debug().
Int("count", len(blockNumbers)).
Str("source", sourceType.String()).
Msg("Using RPC for all blocks")
}

// Process all blocks with the selected source
results = w.processBatch(ctx, blockNumbers, sourceType, fetchFunc)

// Update metrics and log summary
if len(results) > 0 {
lastBlockNumberFloat, _ := results[len(results)-1].BlockNumber.Float64()
metrics.LastFetchedBlock.Set(lastBlockNumberFloat)

// Count successes and failures
successful := 0
failed := 0
for _, r := range results {
if r.Error == nil {
successful++
} else {
failed++
}
}

log.Debug().
Int("total", len(results)).
Int("successful", successful).
Int("failed", failed).
Str("source", sourceType.String()).
Msg("Block fetching complete")
}

return results
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Guard metric update against nil BlockNumber

If any result lacks BlockNumber, dereferencing will panic. Compute max valid block first.

-  if len(results) > 0 {
-    lastBlockNumberFloat, _ := results[len(results)-1].BlockNumber.Float64()
-    metrics.LastFetchedBlock.Set(lastBlockNumberFloat)
+  if len(results) > 0 {
+    var last *big.Int
+    for i := len(results) - 1; i >= 0; i-- {
+      if results[i].BlockNumber != nil {
+        last = results[i].BlockNumber
+        break
+      }
+    }
+    if last != nil {
+      if f, ok := new(big.Float).SetInt(last).Float64(); ok {
+        metrics.LastFetchedBlock.Set(f)
+      }
+    }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In internal/worker/worker.go around lines 280 to 334, the metrics update
currently dereferences results[len(results)-1].BlockNumber which can be nil and
cause a panic; instead iterate the results to find the maximum non-nil
BlockNumber (skip nil entries), convert that big.Int to float64 (handling
conversion error), and only call metrics.LastFetchedBlock.Set(...) when a valid
block number was found; leave logging/count logic unchanged but guard the metric
update behind this validity check.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review continued from previous batch...

Comment on lines +252 to 259
prepStartTime := time.Now()
blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap))
for _, blockData := range blocksToInsertMap {
blocksToInsert = append(blocksToInsert, blockData)
}
prepDuration := time.Since(prepStartTime)
log.Debug().Dur("duration", prepDuration).Int("blocks_to_insert", len(blocksToInsert)).Msgf("Worker %d: Prepared blocks for insertion", workerID)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Sort blocks before insertion to preserve ordering and downstream assumptions

Maps randomize order; some connectors (e.g., Kafka) may assume ascending order when updating cursors.

Apply this diff:

-		blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap))
-		for _, blockData := range blocksToInsertMap {
-			blocksToInsert = append(blocksToInsert, blockData)
-		}
+		blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap))
+		for _, bd := range blocksToInsertMap {
+			blocksToInsert = append(blocksToInsert, bd)
+		}
+		sort.Slice(blocksToInsert, func(i, j int) bool {
+			return blocksToInsert[i].Block.Number.Cmp(blocksToInsert[j].Block.Number) < 0
+		})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
prepStartTime := time.Now()
blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap))
for _, blockData := range blocksToInsertMap {
blocksToInsert = append(blocksToInsert, blockData)
}
prepDuration := time.Since(prepStartTime)
log.Debug().Dur("duration", prepDuration).Int("blocks_to_insert", len(blocksToInsert)).Msgf("Worker %d: Prepared blocks for insertion", workerID)
prepStartTime := time.Now()
blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap))
for _, bd := range blocksToInsertMap {
blocksToInsert = append(blocksToInsert, bd)
}
sort.Slice(blocksToInsert, func(i, j int) bool {
return blocksToInsert[i].Block.Number.Cmp(blocksToInsert[j].Block.Number) < 0
})
prepDuration := time.Since(prepStartTime)
log.Debug().Dur("duration", prepDuration).Int("blocks_to_insert", len(blocksToInsert)).Msgf("Worker %d: Prepared blocks for insertion", workerID)
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 252 to 259, after converting
blocksToInsertMap into the blocksToInsert slice, sort blocksToInsert into
deterministic ascending order by the block sequence key (e.g.,
BlockNumber/Height or Cursor) using sort.Slice (or equivalent) before computing
prepDuration, logging, and performing the insertion; ensure the comparator uses
the correct numeric or lexicographic field for your BlockData so downstream
connectors (like Kafka) receive blocks in ascending order.

Comment on lines +441 to 449
blockData := m.worker.Run(context.Background(), blockNumbers)
for _, block := range blockData {
if block.Error != nil {
log.Warn().Err(block.Error).Msgf("Failed to fetch block %s from RPC", block.BlockNumber.String())
continue
}
allBlockData = append(allBlockData, block.Data)
}
return allBlockData, nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Propagate cancellation to RPC worker

You’re ignoring ctx and using context.Background(), which prevents graceful shutdown mid-batch.

Apply this diff:

-func (m *Migrator) FetchBlocksFromRPC(blockNumbers []*big.Int) ([]common.BlockData, error) {
+func (m *Migrator) FetchBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
 	allBlockData := make([]common.BlockData, 0, len(blockNumbers))
 
-	blockData := m.worker.Run(context.Background(), blockNumbers)
+	blockData := m.worker.Run(ctx, blockNumbers)

And update callers:

  • In processBlockRange: validMissingBlocks := migrator.GetValidBlocksFromRPC(ctx, missingBlocks)
  • In GetValidBlocksFromRPC: accept ctx context.Context and pass it through to FetchBlocksFromRPC.

🏁 Script executed:

#!/bin/bash
set -e

echo "=== FetchBlocksFromRPC definition ==="
rg -nP --type=go '\bfunc \(m \*Migrator\) FetchBlocksFromRPC\b' -C3 .

echo "=== GetValidBlocksFromRPC usage ==="
rg -nP --type=go '\bGetValidBlocksFromRPC\b' -C3 .

echo "=== Migrator struct definition ==="
rg -nP --type=go '\btype\s+Migrator\b' -C3 .

echo "=== worker.Run signature ==="
rg -nP --type=go '\bRun\(' -C3 .

Length of output: 39401


🏁 Script executed:

#!/bin/bash
# Show context where GetValidBlocksFromRPC is called
sed -n '230,260p' cmd/migrate_valid.go

# Show full FetchBlocksFromRPC implementation
sed -n '430,480p' cmd/migrate_valid.go

Length of output: 3328


Propagate cancellation context through FetchBlocksFromRPC

Replace context.Background() with the incoming ctx:

--- cmd/migrate_valid.go
-func (m *Migrator) FetchBlocksFromRPC(blockNumbers []*big.Int) ([]common.BlockData, error) {
+func (m *Migrator) FetchBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
     allBlockData := make([]common.BlockData, 0, len(blockNumbers))
-    blockData := m.worker.Run(context.Background(), blockNumbers)
+    blockData := m.worker.Run(ctx, blockNumbers)

Update callers:

  • Change GetValidBlocksFromRPC to func (m *Migrator) GetValidBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) … and call m.FetchBlocksFromRPC(ctx, …).
  • In processBlockRange, use validMissingBlocks := migrator.GetValidBlocksFromRPC(ctx, missingBlocks).
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
blockData := m.worker.Run(context.Background(), blockNumbers)
for _, block := range blockData {
if block.Error != nil {
log.Warn().Err(block.Error).Msgf("Failed to fetch block %s from RPC", block.BlockNumber.String())
continue
}
allBlockData = append(allBlockData, block.Data)
}
return allBlockData, nil
func (m *Migrator) FetchBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
allBlockData := make([]common.BlockData, 0, len(blockNumbers))
blockData := m.worker.Run(ctx, blockNumbers)
for _, block := range blockData {
if block.Error != nil {
log.Warn().Err(block.Error).Msgf("Failed to fetch block %s from RPC", block.BlockNumber.String())
continue
}
allBlockData = append(allBlockData, block.Data)
}
return allBlockData, nil
}
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 441 to 449, the call to m.worker.Run uses
context.Background(), which ignores caller cancellation; change it to pass the
incoming ctx. Update the GetValidBlocksFromRPC signature to func (m *Migrator)
GetValidBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) … and inside
call m.FetchBlocksFromRPC(ctx, …) (or m.worker.Run with ctx) so cancellation
propagates. Also update callers: in processBlockRange call validMissingBlocks :=
migrator.GetValidBlocksFromRPC(ctx, missingBlocks) and any other sites to
forward the ctx. Ensure imports include context where needed.

Comment on lines +74 to +77
// Active use tracking
activeUseMu sync.RWMutex
activeUse map[string]int // Files currently being read (reference count)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Cleanup goroutine leak: add a stop channel

cleanupCache() runs forever; Close() only removes the dir. Add stopCh for a clean shutdown.

Apply this diff:

 type S3Source struct {
   client   *s3.Client
@@
   activeUseMu sync.RWMutex
   activeUse   map[string]int // Files currently being read (reference count)
+  stopCh      chan struct{}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Active use tracking
activeUseMu sync.RWMutex
activeUse map[string]int // Files currently being read (reference count)
}
// Active use tracking
activeUseMu sync.RWMutex
activeUse map[string]int // Files currently being read (reference count)
stopCh chan struct{}
}
🤖 Prompt for AI Agents
In internal/source/s3.go around lines 74 to 77, the cleanupCache goroutine runs
forever and Close() only removes the dir; add a stop channel and shutdown
coordination so the goroutine can exit cleanly. Modify the struct to include
stopCh chan struct{} and a wg sync.WaitGroup (or done channel) to track the
cleanup goroutine; when starting the cleanup goroutine increment wg and have the
goroutine select on either its tick/timer or <-stopCh and return when stopped.
In Close(), signal shutdown by closing or sending to stopCh (using sync.Once to
avoid double-close), wait for the goroutine via wg.Wait(), then perform
directory removal and other cleanup. Ensure any blocking operations in
cleanupCache honor context of stopCh to avoid leaks.

Comment on lines +141 to +156
archive := &S3Source{
client: s3Client,
config: cfg,
chainId: chainId,
cacheDir: cacheDir,
metadataTTL: cfg.MetadataTTL,
fileCacheTTL: cfg.FileCacheTTL,
maxCacheSize: cfg.MaxCacheSize,
cleanupInterval: cfg.CleanupInterval,
maxConcurrentDownloads: cfg.MaxConcurrentDownloads,
fileMetadata: make(map[string]*FileMetadata),
cacheMap: make(map[string]time.Time),
blockIndex: make(map[string][]BlockIndex),
downloading: make(map[string]*sync.WaitGroup),
activeUse: make(map[string]int),
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Initialize stop channel

Initialize stopCh so it can be closed on shutdown.

Apply this diff:

 	archive := &S3Source{
 		client:                 s3Client,
 		config:                 cfg,
 		chainId:                chainId,
 		cacheDir:               cacheDir,
@@
 		downloading:            make(map[string]*sync.WaitGroup),
 		activeUse:              make(map[string]int),
+		stopCh:                 make(chan struct{}),
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
archive := &S3Source{
client: s3Client,
config: cfg,
chainId: chainId,
cacheDir: cacheDir,
metadataTTL: cfg.MetadataTTL,
fileCacheTTL: cfg.FileCacheTTL,
maxCacheSize: cfg.MaxCacheSize,
cleanupInterval: cfg.CleanupInterval,
maxConcurrentDownloads: cfg.MaxConcurrentDownloads,
fileMetadata: make(map[string]*FileMetadata),
cacheMap: make(map[string]time.Time),
blockIndex: make(map[string][]BlockIndex),
downloading: make(map[string]*sync.WaitGroup),
activeUse: make(map[string]int),
}
archive := &S3Source{
client: s3Client,
config: cfg,
chainId: chainId,
cacheDir: cacheDir,
metadataTTL: cfg.MetadataTTL,
fileCacheTTL: cfg.FileCacheTTL,
maxCacheSize: cfg.MaxCacheSize,
cleanupInterval: cfg.CleanupInterval,
maxConcurrentDownloads: cfg.MaxConcurrentDownloads,
fileMetadata: make(map[string]*FileMetadata),
cacheMap: make(map[string]time.Time),
blockIndex: make(map[string][]BlockIndex),
downloading: make(map[string]*sync.WaitGroup),
activeUse: make(map[string]int),
stopCh: make(chan struct{}),
}
🤖 Prompt for AI Agents
In internal/source/s3.go around lines 141 to 156, the S3Source struct literal
omits initialization of stopCh which prevents it from being closed on shutdown;
add stopCh: make(chan struct{}) to the struct initialization so the stop channel
is created and can be closed during shutdown handling.

Comment on lines +309 to +314
func (s *S3Source) Close() {
// Clean up cache directory
if s.cacheDir != "" {
os.RemoveAll(s.cacheDir)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Close should stop background worker before removing cache

Stop the ticker goroutine; otherwise it leaks and may operate on a removed dir.

Apply this diff:

 func (s *S3Source) Close() {
-	// Clean up cache directory
+	// Stop background cleanup
+	if s.stopCh != nil {
+		close(s.stopCh)
+	}
+	// Clean up cache directory
 	if s.cacheDir != "" {
 		os.RemoveAll(s.cacheDir)
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (s *S3Source) Close() {
// Clean up cache directory
if s.cacheDir != "" {
os.RemoveAll(s.cacheDir)
}
}
func (s *S3Source) Close() {
// Stop background cleanup
if s.stopCh != nil {
close(s.stopCh)
}
// Clean up cache directory
if s.cacheDir != "" {
os.RemoveAll(s.cacheDir)
}
}
🤖 Prompt for AI Agents
In internal/source/s3.go around lines 309 to 314, Close currently removes the
cache directory without first stopping the background ticker goroutine, causing
a leak and race on the removed dir; modify Close to signal the worker to stop
(e.g., close a done channel or send on a stop channel), stop the ticker
(ticker.Stop()) and wait for the worker to exit (e.g., wg.Wait() or wait on the
done ack) before calling os.RemoveAll(s.cacheDir); if the S3Source struct lacks
a stop channel or waitgroup, add them and ensure the goroutine checks the stop
signal and returns promptly.

Comment on lines 164 to 184
func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {
var storage IStorage
var err error

storage.OrchestratorStorage, err = NewConnector[IOrchestratorStorage](&cfg.Orchestrator)
storage.OrchestratorStorage, err = NewOrchestratorConnector(&cfg.Orchestrator)
if err != nil {
return IStorage{}, fmt.Errorf("failed to create orchestrator storage: %w", err)
}

storage.MainStorage, err = NewConnector[IMainStorage](&cfg.Main)
storage.StagingStorage, err = NewStagingConnector(&cfg.Staging)
if err != nil {
return IStorage{}, fmt.Errorf("failed to create main storage: %w", err)
return IStorage{}, fmt.Errorf("failed to create staging storage: %w", err)
}

storage.StagingStorage, err = NewConnector[IStagingStorage](&cfg.Staging)
storage.MainStorage, err = NewMainConnector(&cfg.Main, &storage.OrchestratorStorage)
if err != nil {
return IStorage{}, fmt.Errorf("failed to create staging storage: %w", err)
return IStorage{}, fmt.Errorf("failed to create main storage: %w", err)
}

return storage, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Pass value interface to NewMainConnector (align with Kafka change)

After changing Kafka to accept IOrchestratorStorage by value, pass the value here.

Apply this diff:

-	storage.MainStorage, err = NewMainConnector(&cfg.Main, &storage.OrchestratorStorage)
+	storage.MainStorage, err = NewMainConnector(&cfg.Main, storage.OrchestratorStorage)

🏁 Script executed:

#!/bin/bash
# Locate the definition of NewMainConnector to verify its signature
rg -nP "func NewMainConnector" -C3 --type go .

Length of output: 484


🏁 Script executed:

#!/bin/bash
# Locate the definition of IOrchestratorStorage
rg -nP "type IOrchestratorStorage" -C3 --type go .

# Inspect NewOrchestratorConnector to see what it returns
rg -nP "func NewOrchestratorConnector" -C3 --type go .

Length of output: 1100


Convert NewMainConnector to accept IOrchestratorStorage by value

  • In internal/storage/connector.go at the NewMainConnector declaration (line 305), change the second parameter from orchestratorStorage *IOrchestratorStorage to orchestratorStorage IOrchestratorStorage.
  • In NewStorageConnector (lines 164–184), update the call to:
    storage.MainStorage, err = NewMainConnector(&cfg.Main, storage.OrchestratorStorage)
    Without adjusting the signature, removing the & will break compilation.
🤖 Prompt for AI Agents
In internal/storage/connector.go around lines 164–184 and at the
NewMainConnector declaration (around line 305), change the NewMainConnector
second parameter from a pointer type to a value: replace orchestratorStorage
*IOrchestratorStorage with orchestratorStorage IOrchestratorStorage; then update
the call in NewStorageConnector to pass the value (storage.MainStorage, err =
NewMainConnector(&cfg.Main, storage.OrchestratorStorage)) instead of taking its
address. Also search for and update any other call sites to remove the & when
passing an IOrchestratorStorage pointer to NewMainConnector so signatures match.

Comment on lines +51 to +56
chainId := data[0].Block.ChainId
maxBlockNumber := data[len(data)-1].Block.Number
if err := kr.orchestratorStorage.SetLastCommittedBlockNumber(chainId, maxBlockNumber); err != nil {
return fmt.Errorf("failed to update last committed block number in orchestrator storage: %w", err)
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Compute max block explicitly; don’t assume input sorted

data[len(data)-1] may not be the max.

Apply this diff:

-	chainId := data[0].Block.ChainId
-	maxBlockNumber := data[len(data)-1].Block.Number
+	chainId := data[0].Block.ChainId
+	maxBlockNumber := new(big.Int).SetUint64(0)
+	for i := range data {
+		if data[i].Block.Number.Cmp(maxBlockNumber) > 0 {
+			maxBlockNumber = data[i].Block.Number
+		}
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
chainId := data[0].Block.ChainId
maxBlockNumber := data[len(data)-1].Block.Number
if err := kr.orchestratorStorage.SetLastCommittedBlockNumber(chainId, maxBlockNumber); err != nil {
return fmt.Errorf("failed to update last committed block number in orchestrator storage: %w", err)
}
chainId := data[0].Block.ChainId
maxBlockNumber := new(big.Int).SetUint64(0)
for i := range data {
if data[i].Block.Number.Cmp(maxBlockNumber) > 0 {
maxBlockNumber = data[i].Block.Number
}
}
if err := kr.orchestratorStorage.SetLastCommittedBlockNumber(chainId, maxBlockNumber); err != nil {
return fmt.Errorf("failed to update last committed block number in orchestrator storage: %w", err)
}
🤖 Prompt for AI Agents
internal/storage/kafka.go around lines 51 to 56: the code assumes the last
element is the highest block number; instead iterate over the data slice to
compute the maximum Block.Number (and optionally validate that all Block.ChainId
values match or pick a canonical chainId like data[0].Block.ChainId) and use
that max value when calling SetLastCommittedBlockNumber; replace the direct use
of data[len(data)-1].Block.Number with a loop that compares each entry and
tracks the max.

Comment on lines +60 to +75
// ReplaceBlockData handles reorg by publishing both old and new data to Kafka
func (kr *KafkaConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) {
if len(data) == 0 {
return nil, nil
}

oldBlocks := []common.BlockData{}

// TODO: We need to fetch the old blocks from the primary data store
if err := kr.kafkaPublisher.PublishReorg(data, data); err != nil {
return nil, fmt.Errorf("failed to publish reorg blocks to kafka: %w", err)
}

// save cursor
return oldBlocks, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

ReplaceBlockData publishes wrong payload; return “not supported”

Publishing (old=new=data) is incorrect for reorgs and can corrupt consumers.

Apply this diff:

-// ReplaceBlockData handles reorg by publishing both old and new data to Kafka
+// ReplaceBlockData is not supported for write-only Kafka connector
 func (kr *KafkaConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) {
 	if len(data) == 0 {
 		return nil, nil
 	}
-
-	oldBlocks := []common.BlockData{}
-
-	// TODO: We need to fetch the old blocks from the primary data store
-	if err := kr.kafkaPublisher.PublishReorg(data, data); err != nil {
-		return nil, fmt.Errorf("failed to publish reorg blocks to kafka: %w", err)
-	}
-
-	// save cursor
-	return oldBlocks, nil
+	return nil, fmt.Errorf("ReplaceBlockData is not supported by the Kafka streaming connector")
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// ReplaceBlockData handles reorg by publishing both old and new data to Kafka
func (kr *KafkaConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) {
if len(data) == 0 {
return nil, nil
}
oldBlocks := []common.BlockData{}
// TODO: We need to fetch the old blocks from the primary data store
if err := kr.kafkaPublisher.PublishReorg(data, data); err != nil {
return nil, fmt.Errorf("failed to publish reorg blocks to kafka: %w", err)
}
// save cursor
return oldBlocks, nil
}
// ReplaceBlockData is not supported for write-only Kafka connector
func (kr *KafkaConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) {
if len(data) == 0 {
return nil, nil
}
return nil, fmt.Errorf("ReplaceBlockData is not supported by the Kafka streaming connector")
}
🤖 Prompt for AI Agents
internal/storage/kafka.go lines 60-75: ReplaceBlockData currently publishes the
same slice for old and new (data, data) which is incorrect; instead remove the
PublishReorg call and immediately return a clear "not supported" error (e.g.
fmt.Errorf("replace block data not supported") or a package-level
ErrNotSupported) and nil for the oldBlocks slice so callers know reorgs aren't
handled by the Kafka connector. Ensure no publish is attempted and keep the
function behavior consistent with the error return.

Comment on lines +905 to +909
for i := new(big.Int).Set(fileStart); i.Cmp(fileEnd) <= 0; i.Add(i, big.NewInt(1)) {
if i.Cmp(startBlock) >= 0 && i.Cmp(endBlock) <= 0 {
blockSet[i.String()] = true
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Inefficient algorithm for adding contiguous block ranges

The loop creates a new big.Int for every block number in potentially large ranges, which is memory-intensive and slow.

For contiguous block ranges, directly add all blocks without iterating:

-		// Add all blocks in this file's range to our set
-		for i := new(big.Int).Set(fileStart); i.Cmp(fileEnd) <= 0; i.Add(i, big.NewInt(1)) {
-			if i.Cmp(startBlock) >= 0 && i.Cmp(endBlock) <= 0 {
-				blockSet[i.String()] = true
-			}
-		}
+		// Since S3 files contain contiguous blocks, mark the entire range as present
+		// Only iterate if we need to check individual blocks
+		effectiveStart := new(big.Int).Set(fileStart)
+		if effectiveStart.Cmp(startBlock) < 0 {
+			effectiveStart = startBlock
+		}
+		effectiveEnd := new(big.Int).Set(fileEnd)
+		if effectiveEnd.Cmp(endBlock) > 0 {
+			effectiveEnd = endBlock
+		}
+		
+		// For finding missing blocks, we need to track individual blocks
+		for i := new(big.Int).Set(effectiveStart); i.Cmp(effectiveEnd) <= 0; i.Add(i, big.NewInt(1)) {
+			blockSet[i.String()] = true
+		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for i := new(big.Int).Set(fileStart); i.Cmp(fileEnd) <= 0; i.Add(i, big.NewInt(1)) {
if i.Cmp(startBlock) >= 0 && i.Cmp(endBlock) <= 0 {
blockSet[i.String()] = true
}
}
// Since S3 files contain contiguous blocks, mark only the overlapping range as present
effectiveStart := new(big.Int).Set(fileStart)
if effectiveStart.Cmp(startBlock) < 0 {
effectiveStart = startBlock
}
effectiveEnd := new(big.Int).Set(fileEnd)
if effectiveEnd.Cmp(endBlock) > 0 {
effectiveEnd = endBlock
}
// Iterate the overlap and populate the block set
for i := new(big.Int).Set(effectiveStart); i.Cmp(effectiveEnd) <= 0; i.Add(i, big.NewInt(1)) {
blockSet[i.String()] = true
}

Comment on lines +1094 to +1096
if err != nil && err.Error() != "EOF" {
return nil, fmt.Errorf("failed to read parquet: %w", err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect EOF handling for parquet reader

Comparing error messages with string literals is fragile and could break with library updates.

Use proper error checking:

 	for {
 		n, err := reader.Read(parquetRows)
-		if err != nil && err.Error() != "EOF" {
+		if err != nil && err != io.EOF {
 			return nil, fmt.Errorf("failed to read parquet: %w", err)
 		}
 		if n == 0 {
 			break
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if err != nil && err.Error() != "EOF" {
return nil, fmt.Errorf("failed to read parquet: %w", err)
}
for {
n, err := reader.Read(parquetRows)
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read parquet: %w", err)
}
if n == 0 {
break
}
// …
}
🤖 Prompt for AI Agents
In internal/storage/s3.go around lines 1094-1096, the code compares err.Error()
to the string "EOF", which is fragile; change the check to use errors.Is(err,
io.EOF) (and consider also checking errors.Is(err, io.ErrUnexpectedEOF) if
relevant) so EOF is detected reliably; update imports to include "errors" and
"io" and keep returning a wrapped fmt.Errorf for non-EOF errors as before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants