-
Notifications
You must be signed in to change notification settings - Fork 28
Initial kafka committer #271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Caution Review failedThe pull request is closed. WalkthroughIntroduces S3-based source/storage, Kafka/Redis/Badger backends, extensive config restructuring, retryable RPC fetching, a concurrent migrator with worker orchestration, orchestrator lifecycle refactors, poller/committer changes, new serialization for blocks, and a comprehensive ClickHouse schema overhaul (soft-deletes, projections, new tables/materialized views), plus mocks and tests updates. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor User
participant Migrator
participant Source as Source Storage
participant Dest as Destination Storage
participant W as Worker
participant RPC as RPC Client
User->>Migrator: Start(ctx, start,end,workers,batch)
Migrator->>Source: DetermineMigrationBoundaries(start,end)
alt no range to migrate
Migrator-->>User: exit
else range exists
Migrator->>Migrator: divideBlockRange(range, workers)
par For each worker i
Migrator->>W: processBlockRange(subrange)
W->>Source: GetValidBlocksForRange(...)
alt missing blocks
W->>RPC: FetchBlocks (retry)
RPC-->>W: Full blocks
W->>W: Validate
end
W->>Dest: Insert batches
and Graceful shutdown
Note over Migrator: OS signal/context cancel handled
end
end
Migrator->>Dest: Close()
Migrator->>Source: Close()
Migrator-->>User: Done
sequenceDiagram
autonumber
participant Orchestrator
participant Poller
participant Committer
participant FailureRec as FailureRecoverer
participant Reorg as ReorgHandler
participant Monitor as WorkModeMonitor
participant Tracker as ChainTracker
participant Storage
Orchestrator->>Storage: NewStorageConnector(...)
Orchestrator->>Poller: Start(WithS3Source?)
Orchestrator->>Committer: Start(commitUntil?)
Orchestrator->>FailureRec: Start()
Orchestrator->>Reorg: Start()
Orchestrator->>Monitor: Start()
Orchestrator->>Tracker: Start()
par lifecycle
Poller-->>Orchestrator: completed → cancel()
Committer-->>Orchestrator: completed → cancel()
FailureRec-->>Orchestrator: completed
Reorg-->>Orchestrator: completed
Monitor-->>Orchestrator: completed
Tracker-->>Orchestrator: completed
end
Orchestrator->>Storage: Close()
Orchestrator-->>Orchestrator: Shutdown complete
sequenceDiagram
autonumber
participant Poller
participant W as Worker
participant Src as Archive Source (S3)
participant RPC as RPC Client
participant Staging as StagingStorage
Poller->>W: Run(ctx, blockNumbers)
alt Archive available
W->>Src: GetFullBlocks(blockNumbers)
Src-->>W: Results (per block)
else
W->>RPC: Batched Fetch (retry)
RPC-->>W: Results
end
W-->>Poller: Blocks/results
alt failures
Poller->>Staging: StoreBlockFailures(...)
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~150 minutes Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
This reverts commit 6233232.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 57
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (21)
internal/tools/clickhouse/0004_clickhouse_create_insert_null_table.sql (1)
27-59: Adjust field types to match Ethereum specs
- Change
to_addresstoNullable(FixedString(42))to correctly represent contract‐creation transactions (alchemy.com)- Use
UInt256formax_fee_per_gasandmax_priority_fee_per_gasto cover the full 2⁵⁶ range (asserted< 2**256) (eips.ethereum.org)- Store
blob_versioned_hashesasArray(FixedString(66))(32 bytes hex with “0x” prefix) per EIP-4844’sBytes32versioned‐hash format (eips.ethlib.cn)internal/handlers/logs_handlers.go (1)
120-128: Fix potential nil dereference when ConstructEventABI fails.
signatureHash is computed even if ConstructEventABI returns error; eventABI can be nil.Apply:
- eventABI, err = common.ConstructEventABI(signature) - if err != nil { - log.Debug().Err(err).Msgf("Unable to construct event ABI for %s", signature) - } - signatureHash = eventABI.ID.Hex() + eventABI, err = common.ConstructEventABI(signature) + if err != nil { + log.Debug().Err(err).Msgf("Unable to construct event ABI for %s", signature) + } else if eventABI != nil { + signatureHash = eventABI.ID.Hex() + }configs/config.go (1)
357-365: Bug: Orchestrator chainBasedConfig env overrides are written to Main instead of Orchestrator.
This misroutes STORAGE_ORCHESTRATOR_CLICKHOUSE_CHAINBASEDCONFIG.Apply:
- if chainConfigJSON := os.Getenv("STORAGE_ORCHESTRATOR_CLICKHOUSE_CHAINBASEDCONFIG"); chainConfigJSON != "" { + if chainConfigJSON := os.Getenv("STORAGE_ORCHESTRATOR_CLICKHOUSE_CHAINBASEDCONFIG"); chainConfigJSON != "" { var orchestratorChainConfig map[string]TableOverrideConfig if err := json.Unmarshal([]byte(chainConfigJSON), &orchestratorChainConfig); err != nil { return fmt.Errorf("error parsing orchestrator chainBasedConfig JSON: %v", err) } - if Cfg.Storage.Main.Clickhouse != nil { - Cfg.Storage.Main.Clickhouse.ChainBasedConfig = orchestratorChainConfig + if Cfg.Storage.Orchestrator.Clickhouse != nil { + Cfg.Storage.Orchestrator.Clickhouse.ChainBasedConfig = orchestratorChainConfig } }internal/publisher/publisher.go (1)
128-159: Don’t hold the read lock across network I/O; pre-filter nils and surface publish errorsHolding p.mu.RLock while producing can block Close/init; we should snapshot the client under lock, then release. Also, aggregate and return the first error instead of always nil.
func (p *Publisher) publishMessages(ctx context.Context, messages []*kgo.Record) error { if len(messages) == 0 { return nil } if !config.Cfg.Publisher.Enabled { log.Debug().Msg("Publisher is disabled, skipping publish") return nil } - p.mu.RLock() - defer p.mu.RUnlock() - - if p.client == nil { + p.mu.RLock() + client := p.client + p.mu.RUnlock() + + if client == nil { return nil // Skip if no client configured } - var wg sync.WaitGroup - wg.Add(len(messages)) + // filter out nil records defensively + filtered := make([]*kgo.Record, 0, len(messages)) + for _, m := range messages { + if m != nil { + filtered = append(filtered, m) + } + } + if len(filtered) == 0 { + return nil + } + + var wg sync.WaitGroup + wg.Add(len(filtered)) + var mu sync.Mutex + var firstErr error // Publish to all configured producers - for _, msg := range messages { - p.client.Produce(ctx, msg, func(_ *kgo.Record, err error) { + for _, msg := range filtered { + client.Produce(ctx, msg, func(_ *kgo.Record, err error) { defer wg.Done() if err != nil { log.Error().Err(err).Msg("Failed to publish message to Kafka") + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() } }) } wg.Wait() - return nil + return firstErr }internal/orchestrator/failure_recoverer.go (1)
90-121: *Bug: map key uses big.Int pointer identity; lookups will miss and deletions may target zero valuesUsing *big.Int as a map key compares pointer addresses, not numeric values. Results typically carry distinct pointers, so ok will be false and you’ll attempt to delete a zero-value failure. Key by the numeric value (String()) instead.
- blockFailureMap := make(map[*big.Int]common.BlockFailure) + blockFailureMap := make(map[string]common.BlockFailure) for _, failure := range blockFailures { - blockFailureMap[failure.BlockNumber] = failure + blockFailureMap[failure.BlockNumber.String()] = failure } var newBlockFailures []common.BlockFailure var failuresToDelete []common.BlockFailure var successfulResults []common.BlockData for _, result := range results { - blockFailureForBlock, ok := blockFailureMap[result.BlockNumber] + blockFailureForBlock, ok := blockFailureMap[result.BlockNumber.String()] if result.Error != nil { failureCount := 1 if ok { failureCount = blockFailureForBlock.FailureCount + 1 } newBlockFailures = append(newBlockFailures, common.BlockFailure{ BlockNumber: result.BlockNumber, FailureReason: result.Error.Error(), FailureTime: time.Now(), ChainId: fr.rpc.GetChainID(), FailureCount: failureCount, }) } else { successfulResults = append(successfulResults, common.BlockData{ Block: result.Data.Block, Logs: result.Data.Logs, Transactions: result.Data.Transactions, Traces: result.Data.Traces, }) - failuresToDelete = append(failuresToDelete, blockFailureForBlock) + if ok { + failuresToDelete = append(failuresToDelete, blockFailureForBlock) + } } }internal/rpc/rpc.go (1)
279-282: Apply retry to GetBlocks/GetTransactions for consistency.These still use RPCFetchSingleBatch; align with WithRetry to handle provider 413s and batch-size limits.
- blocks = RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, ctx, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams) + blocks = RPCFetchSingleBatchWithRetry[*big.Int, common.RawBlock](rpc, ctx, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams)- transactions = RPCFetchSingleBatch[string, common.RawTransaction](rpc, ctx, txHashes, "eth_getTransactionByHash", GetTransactionParams) + transactions = RPCFetchSingleBatchWithRetry[string, common.RawTransaction](rpc, ctx, txHashes, "eth_getTransactionByHash", GetTransactionParams)Also applies to: 294-297
cmd/root.go (1)
53-53: Bug: poller-interval declared as Bool but should be numeric.This binds to poller.interval and will break consumers expecting duration/ints.
- rootCmd.PersistentFlags().Bool("poller-interval", true, "Poller interval") + rootCmd.PersistentFlags().Int("poller-interval", 1000, "Poller interval in milliseconds")internal/storage/postgres.go (3)
63-91: Broken SQL: missing WHERE before AND clauses.query is initialized without WHERE, then “AND …” is appended, producing invalid SQL.
-query := `SELECT chain_id, block_number, last_error_timestamp, failure_count, reason - FROM block_failures` +query := `SELECT chain_id, block_number, last_error_timestamp, failure_count, reason + FROM block_failures WHERE 1=1`Also, the IN clause for BlockNumbers is string-concatenated. Prefer parameterized placeholders:
- blockNumberStrs := make([]string, len(qf.BlockNumbers)) - for i, bn := range qf.BlockNumbers { blockNumberStrs[i] = bn.String() } - query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(blockNumberStrs, ",")) + placeholders := make([]string, len(qf.BlockNumbers)) + for i, bn := range qf.BlockNumbers { + argCount++ + placeholders[i] = fmt.Sprintf("$%d", argCount) + args = append(args, bn.String()) + } + query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(placeholders, ","))
350-356: Wrong table in DELETE subquery.Deleting from block_data but selecting ctid from block_failures. Likely copy/paste bug.
- FROM block_failures + FROM block_data
238-269: Persist normalized JSON to avoid null arrays.Use BlockData.Serialize() before json.Marshal to ensure empty slices instead of nulls.
- blockDataJSON, err := json.Marshal(blockData) + blockDataJSON, err := json.Marshal(blockData.Serialize())internal/orchestrator/poller.go (3)
248-257: Data race on lastPolledBlock writeWrites to
p.lastPolledBlockmust be protected; multiple workers run concurrently.endBlock := blockNumbers[len(blockNumbers)-1] if endBlock != nil { - p.lastPolledBlock = endBlock + p.lastPolledBlockMutex.Lock() + p.lastPolledBlock = endBlock + p.lastPolledBlockMutex.Unlock() }
328-333: Data race on lastPolledBlock readGuard reads with
RLock.- log.Debug().Msgf("Last polled block: %s", p.lastPolledBlock.String()) - startBlock := new(big.Int).Add(p.lastPolledBlock, big.NewInt(1)) + p.lastPolledBlockMutex.RLock() + lp := new(big.Int).Set(p.lastPolledBlock) + p.lastPolledBlockMutex.RUnlock() + log.Debug().Msgf("Last polled block: %s", lp.String()) + startBlock := new(big.Int).Add(lp, big.NewInt(1))
385-391: Close worker to release archive resources
Worker.Close()closes the archive and its background goroutines; call it on shutdown.func (p *Poller) shutdown(cancel context.CancelFunc, tasks chan struct{}, wg *sync.WaitGroup) { cancel() close(tasks) wg.Wait() + if p.worker != nil { + _ = p.worker.Close() + } log.Info().Msg("Poller shutting down") }internal/storage/clickhouse.go (3)
565-703: UNION + GROUP BY/ORDER/LIMIT composition may produce invalid or unstable queries
addPostQueryClauseswraps UNION withSELECT * FROM (%s) GROUP BY ...— selecting*with GROUP BY is invalid unless CH relaxes it; specify projected columns.- Final ordering for UNION uses only inner ORDER BY; outer result has no ORDER BY before final LIMIT, which is nondeterministic.
Minimal fix: apply ORDER BY at the outer level and pass columns to the wrapper.
- if len(qf.GroupBy) > 0 { - groupByClause := fmt.Sprintf(" GROUP BY %s", strings.Join(qf.GroupBy, ", ")) - if strings.Contains(query, "UNION ALL") { - query = fmt.Sprintf("SELECT %s FROM (%s)%s", columns /* pass through */, query, groupByClause) - } else { - query += groupByClause - } - } + // For UNION, wrap with explicit projection and apply GROUP BY/ORDER/LIMIT outside + // NOTE: requires threading 'columns' into addPostQueryClauses or handling in buildUnionQuery.Alternatively, move GROUP BY/ORDER/LIMIT handling for UNION into
buildUnionQueryand keepaddPostQueryClausesfor non-UNION queries only. I can provide a concrete patch if you decide the approach.
1022-1029: Trailing comma in IN list
getBlockNumbersStringArrayreturns a trailing comma, yielding invalid SQL:IN (1,2,3,).func getBlockNumbersStringArray(blockNumbers []*big.Int) string { - blockNumbersString := "" - for _, blockNumber := range blockNumbers { - blockNumbersString += fmt.Sprintf("%s,", blockNumber.String()) - } - return blockNumbersString + if len(blockNumbers) == 0 { + return "" + } + parts := make([]string, 0, len(blockNumbers)) + for _, bn := range blockNumbers { + parts = append(parts, bn.String()) + } + return strings.Join(parts, ",") }
773-794: Enforce hex validation and switch to parameterized queries
The helperscreateContractAddressClauseandcreateWalletAddressClausecurrently interpolate unsanitized input viafmt.Sprintf, opening the door to SQL injection. RequirecontractAddressandwalletAddressto be valid 0x-prefixed hex strings and pass them as bound parameters through the ClickHouse driver instead of string concatenation.internal/orchestrator/committer.go (3)
323-333: Possible nil deref before error/zero checksYou log latestCommittedBlockNumber.String() before confirming err/nil. This can panic when storage returns (nil, nil) for empty tables.
Apply:
- latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) - log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) - if err != nil { + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) + if err != nil { return nil, err } + if latestCommittedBlockNumber != nil { + log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) + } else { + log.Debug().Msg("Committer found no max block in main storage (nil)") + }
329-338: Handle nil from GetMaxBlockNumberCode assumes zero BigInt, but implementations may return nil. Treat nil as zero.
Apply:
- if latestCommittedBlockNumber.Sign() == 0 { + if latestCommittedBlockNumber == nil || latestCommittedBlockNumber.Sign() == 0 { // If no blocks have been committed yet, start from the fromBlock specified in the config latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
616-631: Ignore error from poller during gap handlingFailure to poll missing blocks is silently ignored.
Apply:
- c.poller.Poll(ctx, missingBlockNumbers) + if err := c.poller.Poll(ctx, missingBlockNumbers); err != nil { + log.Error().Err(err).Msg("Poller error while handling gap") + }cmd/migrate_valid.go (1)
473-487: Make GetValidBlocksFromRPC non-terminating and propagate errors
- In cmd/migrate_valid.go (around L473), change
func (m *Migrator) GetValidBlocksFromRPC(blockNumbers []*big.Int) []common.BlockData
to
func (m *Migrator) GetValidBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error)
and replace eachlog.Fatal…withreturn nil, err(or formatted error for invalidBlocks).- At the call site (cmd/migrate_valid.go L239), update
validMissingBlocks := m.GetValidBlocksFromRPC(missingBlocks)
tovalidMissingBlocks, err := m.GetValidBlocksFromRPC(ctx, missingBlocks) if err != nil { log.Error().Err(err).Msgf("Worker %d: failed to fetch missing blocks", workerID) time.Sleep(3 * time.Second) continue }internal/storage/connector.go (1)
305-379: Update both connector signatures and callers to use a value IOrchestratorStorage instead of a pointer
- Change NewMainConnector signature in internal/storage/connector.go (line 305) to
func NewMainConnector(cfg *config.StorageMainConfig, orchestratorStorage IOrchestratorStorage) (IMainStorage, error)- Change NewKafkaConnector signature in internal/storage/kafka.go (line 19) to
func NewKafkaConnector(cfg *config.KafkaConfig, orchestratorStorage IOrchestratorStorage) (*KafkaConnector, error)- Update all callers of NewMainConnector (internal/storage/connector.go:178, internal/handlers/logs_handlers.go:228, cmd/migrate_valid.go:320) to drop the
&when passing an IOrchestratorStorage value
♻️ Duplicate comments (1)
internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql (1)
26-45: Add projection keyed by (token_type, chain_id, address, token_id)Per prior preference, optimize for queries at (token_type, chain_id, address, token_id).
From learnings (#240).
PROJECTION address_state_projection ( @@ ), + PROJECTION address_token_state_projection ( + SELECT + token_type, + chain_id, + address, + token_id, + token_address, + count() AS transfer_count, + sum(toDecimal256(amount, 0)) AS total_amount, + min(block_number) AS min_block_number, + min(block_timestamp) AS min_block_timestamp, + max(block_number) AS max_block_number, + max(block_timestamp) AS max_block_timestamp + GROUP BY + token_type, + chain_id, + address, + token_id, + token_address + )
🧹 Nitpick comments (54)
internal/tools/clickhouse/0004_clickhouse_create_insert_null_table.sql (1)
97-99: Consider DateTime64 for higher precision on ingestion timestamps.If you dedupe/order on insert_timestamp, millisecond precision helps during bursts.
- insert_timestamp DateTime DEFAULT now(), + insert_timestamp DateTime64(3) DEFAULT now(),internal/tools/clickhouse/0000_clickhouse_create_blocks_table.sql (1)
31-44: Add ORDER BY inside the aggregate projection for better pruning.Explicit ORDER BY on group keys can improve projection read efficiency.
PROJECTION chain_state_projection ( SELECT chain_id, count() AS count, uniqExact(block_number) AS unique_block_count, min(block_number) AS min_block_number, min(block_timestamp) AS min_block_timestamp, max(block_number) AS max_block_number, max(block_timestamp) AS max_block_timestamp GROUP BY chain_id + ORDER BY + chain_id )internal/tools/clickhouse/0001_clickhouse_create_transactions_table.sql (1)
9-11: to_address may be null for contract creations.If upstream normalizes to zero-address it’s fine; otherwise consider Nullable(FixedString(42)) for fidelity with EVM semantics.
internal/tools/clickhouse/0011_clickhouse_create_address_transactions_mv.sql (1)
39-44: Propagating insert_timestamp/is_deleted is good; consider DateTime64 upstream.If you adopt DateTime64 in sources, mirror it here automatically.
internal/tools/clickhouse/0002_clickhouse_create_logs_table.sql (1)
28-39: Projections: include minimal identifiers to avoid extra lookups when possibleCurrent projections only select _part_offset. If frequent queries need address/topic_0 plus block pointers without fetching base columns, add them here to reduce reads.
Example:
- SELECT - _part_offset + SELECT + _part_offset, address, topic_0Validate against your workload before adopting.
internal/tools/clickhouse/0010_clickhouse_create_address_transactions.sql (1)
45-59: Add per-direction address projection (address + address_type)You have totals by address; many queries also slice by direction.
PROJECTION address_total_count_projection ( SELECT chain_id, address, count() AS tx_count, uniqExact(hash) AS unique_tx_count, min(block_number) AS min_block_number, min(block_timestamp) AS min_block_timestamp, max(block_number) AS max_block_number, max(block_timestamp) AS max_block_timestamp GROUP BY chain_id, address ) + , + PROJECTION address_direction_count_projection + ( + SELECT + chain_id, + address, + address_type, + count() AS tx_count, + uniqExact(hash) AS unique_tx_count, + min(block_number) AS min_block_number, + min(block_timestamp) AS min_block_timestamp, + max(block_number) AS max_block_number, + max(block_timestamp) AS max_block_timestamp + GROUP BY + chain_id, + address, + address_type + )internal/tools/clickhouse/0006_clickhouse_create_token_transfers.sql (1)
5-8: Normalize addresses to lower case at write-time for consistency.Downstream queries/indexes benefit from canonical casing.
Example:
-`token_address` FixedString(42), -`from_address` FixedString(42), -`to_address` FixedString(42), +`token_address` FixedString(42), +`from_address` FixedString(42), +`to_address` FixedString(42), -- in MVs (0007), wrap with lower()Also applies to: 11-13, 20-23
internal/tools/clickhouse/0008_clickhouse_create_token_balances.sql (2)
27-40: Consider adding final aggregate projections with MaterializedAggregateFunctions for fast reads.You’re storing State columns; ensure there are complementary projections with
...Merge()for ready-to-read aggregates if queries expect finalized values.Also applies to: 42-55
14-14: Use same timestamp codec as transfers for compression/scan efficiency.Keep codecs consistent across sinks/sources.
-`block_timestamp` DateTime, +`block_timestamp` DateTime CODEC(Delta(4), ZSTD(1)),internal/tools/clickhouse/0007_clickhouse_create_token_transfers_mv.sql (2)
43-46: Explicitly cast ERC721 amount to UInt256.Avoid implicit widening.
- toUInt8(1) AS amount, + toUInt256(1) AS amount,
158-158: Use '=' instead of '==' for SQL equality.For portability and consistency.
- AND length(data) == 2 + 128; + AND length(data) = 2 + 128;internal/tools/clickhouse/0009_clickhouse_create_token_balances_mv.sql (1)
53-61: Cast ERC721 deltas to Int256 to match sink type.Prevents implicit casts and keeps types uniform.
- -1 AS balance_delta, + -toInt256(1) AS balance_delta, ... - 1 AS balance_delta, + toInt256(1) AS balance_delta,Also applies to: 66-71
internal/tools/clickhouse_opts/0000_clickhouse_backfill_logs_transfer.sql (2)
145-168: Normalize equality operator and casing; keep rules consistent with realtime MVs.
- Use
=instead of==.- Consider lower-casing across all token types or none (6909 currently differs).
- AND length(data) == 2 + 128; + AND length(data) = 2 + 128;Also either add
lower()for ERC20/721/1155 addresses or remove it here for consistency.
171-201: Dangerous commented instructions: wrong table names.
token_balance/token_balance_*don’t exist in this PR; could mislead ops.--- DROP TABLE token_transfers, token_balance; +-- DROP TABLE token_transfers, token_balances; --- DROP TABLE bf__token_transfers_erc20_mv, bf__token_transfers_erc721_mv, bf__token_transfers_erc1155_mv, bf__token_transfers_erc6909_mv; +-- DROP TABLE bf__token_transfers_erc20_mv, bf__token_transfers_erc721_mv, bf__token_transfers_erc1155_single_mv, bf__token_transfers_erc1155_batch_mv, bf__token_transfers_erc6909_mv; --- DROP TABLE token_transfers_erc20_mv, token_transfers_erc721_mv, token_transfers_erc1155_mv, token_transfers_erc6909_mv; +-- DROP TABLE token_transfers_erc20_mv, token_transfers_erc721_mv, token_transfers_erc1155_single_mv, token_transfers_erc1155_batch_mv, token_transfers_erc6909_mv; --- DROP TABLE token_balance_erc20_mv, token_balance_erc721_mv, token_balance_erc1155_mv, token_balance_erc6909_mv; +-- DROP TABLE token_balances_erc20_mv, token_balances_erc721_mv, token_balances_erc1155_mv, token_balances_erc6909_mv;test/mocks/MockIRPCClient.go (1)
1-1: Pin mock generator to avoid churn across environments.
Recommend adding a pinned mockery invocation (go:generate or Makefile) to keep versions consistent in CI/dev.internal/handlers/logs_handlers.go (1)
206-213: Minor readability: avoid shadowing logger with variable named 'log'.
Renaming loop var prevents confusion with zerolog’s log.- for _, log := range logs { - decodedLog := log.Decode(eventABI) + for _, lg := range logs { + decodedLog := lg.Decode(eventABI) if decodedLog.Decoded.Name == "" || decodedLog.Decoded.Signature == "" { decodingCompletelySuccessful = false } decodedLogs = append(decodedLogs, decodedLog) }configs/config.go (1)
328-333: Reduce duplicate logging of chain config.
Interface()+Msgf duplicates the payload; prefer structured + Msg.- log.Debug(). - Interface("chainConfig", clickhouse.ChainBasedConfig). - Msgf("Loaded chain config %v", clickhouse.ChainBasedConfig) + log.Debug(). + Interface("chainConfig", clickhouse.ChainBasedConfig). + Msg("Loaded chain config")cmd/orchestrator.go (2)
36-43: Treat http.ErrServerClosed as benign for metrics server.
Avoid noisy error logs during graceful shutdown.+ // Start Prometheus metrics server log.Info().Msg("Starting Metrics Server on port 2112") go func() { http.Handle("/metrics", promhttp.Handler()) - if err := http.ListenAndServe(":2112", nil); err != nil { - log.Error().Err(err).Msg("Metrics server error") - } + if err := http.ListenAndServe(":2112", nil); err != nil && err != http.ErrServerClosed { + log.Error().Err(err).Msg("Metrics server error") + } }()
26-34: Avoid package-name shadowing for rpc and orchestrator vars.
Reduces confusion and accidental misuse.- rpc, err := rpc.Initialize() + rpcClient, err := rpc.Initialize() if err != nil { log.Fatal().Err(err).Msg("Failed to initialize RPC") } - orchestrator, err := orchestrator.NewOrchestrator(rpc) + orch, err := orchestrator.NewOrchestrator(rpcClient) if err != nil { log.Fatal().Err(err).Msg("Failed to create orchestrator") } ... - orchestrator.Start() + orch.Start()internal/publisher/publisher.go (1)
74-79: Warn when using SASL/PLAIN without TLSSASL PLAIN over plaintext exposes credentials. Emit a warning when TLS is disabled.
if config.Cfg.Publisher.Username != "" && config.Cfg.Publisher.Password != "" { opts = append(opts, kgo.SASL(plain.Auth{ User: config.Cfg.Publisher.Username, Pass: config.Cfg.Publisher.Password, }.AsMechanism())) + if !config.Cfg.Publisher.EnableTLS { + log.Warn().Msg("Kafka SASL/PLAIN configured without TLS; credentials may be exposed on the wire") + } }internal/orchestrator/failure_recoverer.go (1)
125-133: Skip no-op writes when there’s nothing to store/deleteAvoid unnecessary DB calls when slices are empty.
- if err := fr.storage.StagingStorage.StoreBlockFailures(newBlockFailures); err != nil { + if len(newBlockFailures) > 0 { + if err := fr.storage.StagingStorage.StoreBlockFailures(newBlockFailures); err != nil { log.Error().Err(err).Msg("Error storing block failures") return - } - if err := fr.storage.StagingStorage.DeleteBlockFailures(failuresToDelete); err != nil { + } + } + if len(failuresToDelete) > 0 { + if err := fr.storage.StagingStorage.DeleteBlockFailures(failuresToDelete); err != nil { log.Error().Err(err).Msg("Error deleting block failures") return - } + } + }internal/source/source.go (1)
10-14: Go naming: drop the “I” prefix; consider io.CloserIdiomatic Go prefers “Source” over “ISource”. Also consider embedding io.Closer or returning error from Close() if teardown can fail.
-type ISource interface { +type Source interface { GetFullBlocks(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFullBlockResult GetSupportedBlockRange(ctx context.Context) (minBlockNumber *big.Int, maxBlockNumber *big.Int, err error) - Close() + Close() }internal/orchestrator/committer_test.go (2)
350-351: Prefer assert.Eventually over manual select+timeoutReduces flakiness and boilerplate.
- select { - case <-deleteDone: - case <-time.After(2 * time.Second): - t.Fatal("DeleteStagingDataOlderThan was not called within timeout period") - } + assert.Eventually(t, func() bool { + select { + case <-deleteDone: + return true + default: + return false + } + }, 2*time.Second, 10*time.Millisecond)
429-429: Nit: comment clarifies expectationsThe note about GetChainID being unused here helps future readers of the mock expectations.
internal/common/block.go (1)
103-120: Normalize nil slices: good; ensure callers use it where JSON is persisted.Consider calling BlockData.Serialize() before persisting to staging/S3 to avoid null arrays leaking into stored JSON.
cmd/root.go (3)
69-70: Doc/default mismatch for poller-s3-maxCacheSize.Help text says “default 5GB” but default is 0. Either update the text or set 5GB as default.
-rootCmd.PersistentFlags().Int64("poller-s3-maxCacheSize", 0, "Max cache size in bytes for poller archive source (default 5GB)") +rootCmd.PersistentFlags().Int64("poller-s3-maxCacheSize", int64(5)*1024*1024*1024, "Max cache size in bytes for poller archive source (default 5GB)")
152-152: Nit: typo in Redis password flag description.“orchestator” ➜ “orchestrator”; remove double space.
-rootCmd.PersistentFlags().String("storage-orchestrator-redis-password", "", "Redis password for orchestator storage metadata") +rootCmd.PersistentFlags().String("storage-orchestrator-redis-password", "", "Redis password for orchestrator storage metadata")
146-149: Standardize TLS flags to kebab-case-enable-tls.
Rename the two camelCase flags and their BindPFlag lookups in cmd/root.go:
- storage-orchestrator-redis-enableTLS → storage-orchestrator-redis-enable-tls
- migrator-destination-kafka-enableTLS → migrator-destination-kafka-enable-tls
Ensure you update the corresponding
rootCmd.PersistentFlags().Bool(...)calls and the.Lookup(...)arguments inviper.BindPFlag.internal/storage/postgres.go (1)
463-473: Confirm boundary semantics for DeleteStagingDataOlderThan.Name suggests strictly “older than”, but query uses “<=” which also deletes the boundary block. If unintended, switch to “<”.
- AND block_number <= $2 + AND block_number < $2internal/storage/block_buffer_badger_test.go (1)
53-55: Compare big.Int values with Cmp to avoid pointer/reflect pitfalls.Safer and idiomatic for numeric equality.
- assert.Equal(t, big.NewInt(101), maxBlock) + assert.Zero(t, maxBlock.Cmp(big.NewInt(101))) @@ - assert.Equal(t, big.NewInt(99), chainStats.MinBlock) - assert.Equal(t, big.NewInt(101), chainStats.MaxBlock) + assert.Zero(t, chainStats.MinBlock.Cmp(big.NewInt(99))) + assert.Zero(t, chainStats.MaxBlock.Cmp(big.NewInt(101))) @@ - assert.Equal(t, big.NewInt(200), maxBlock) + assert.Zero(t, maxBlock.Cmp(big.NewInt(200)))Also applies to: 66-68, 87-89
internal/orchestrator/orchestrator.go (1)
44-55: Always defer cancel to avoid context leakAdd a
defer cancel()right after creation to guarantee resources are released even if nothing triggerso.cancel().func (o *Orchestrator) Start() { ctx, cancel := context.WithCancel(context.Background()) o.cancel = cancel + defer cancel()internal/orchestrator/poller.go (1)
198-206: Nil workModeChan would deadlock this selectIf
WithPollerWorkModeChanisn’t provided,case workMode := <-p.workModeChanblocks forever. Either ensure it’s always set (document) or guard withif p.workModeChan != nil. In this PR, orchestrator sets it, but add a comment or guard.internal/rpc/batcher.go (1)
54-86: Ordering across chunks is nondeterministicResults are appended as chunks complete; if callers rely on order matching input keys, process chunks sequentially or re-order at the end. Given worker sorts results later, this is likely fine. Document this behavior.
internal/storage/clickhouse.go (1)
115-123: Minor: Async insert settingsIf async insert is enabled, consider making
wait_for_async_insertconfigurable (some deployments prefer fire-and-forget).internal/worker/worker.go (2)
337-346: Also close the RPC clientFree network resources when closing the worker (useful in tests or alternate lifecycles).
func (w *Worker) Close() error { // Close archive if it exists if w.archive != nil { log.Debug().Msg("Closing archive connection") w.archive.Close() } + if w.rpc != nil { + w.rpc.Close() + } log.Debug().Msg("Worker closed successfully") return nil }
255-278: Archive decision is conservative (ALL blocks in range)LGTM. Consider partial-archive + partial-RPC in future for large spans; not required now.
internal/storage/redis.go (1)
57-75: Prefer bounded contexts for Redis opsAll calls use context.Background(); add short timeouts to prevent hangs on network partitions. Consider a per-connector op timeout (e.g., 2–5s) and use context.WithTimeout in Get*/Set*/Close.
Also applies to: 77-81, 83-100, 107-124, 125-129, 131-134
internal/orchestrator/committer.go (1)
529-537: Message typo: “commit” vs “publish”Wrong error string in getSequentialBlockDataToPublish.
Apply:
- return nil, fmt.Errorf("error determining blocks to commit: %v", err) + return nil, fmt.Errorf("error determining blocks to publish: %v", err)internal/storage/kafka_publisher.go (1)
292-299: Single-partition publishing limits throughputManual partitioner with Partition: 0 forces all traffic to one partition. If ordering requirements allow, prefer key-based partitioning to scale.
Option: remove ManualPartitioner and Partition=0; rely on key to preserve per-chain ordering.
- kgo.RecordPartitioner(kgo.ManualPartitioner()), + // kgo.DefaultProducePartitioner uses key hashing to preserve key ordering + kgo.RecordPartitioner(kgo.DefaultProducePartitioner()),- Partition: 0,internal/storage/block_buffer_badger.go (4)
289-313: Full-prefix scan in GetBlocksInRange; seek to start to reduce IOAfter padding keys, you can seek directly to start and break once past end.
Apply:
- for it.Rewind(); it.Valid(); it.Next() { + startKey := b.makeKey(chainId, startBlock) + for it.Seek(startKey); it.ValidForPrefix(prefix); it.Next() { item := it.Item() err := item.Value(func(val []byte) error { var blockData common.BlockData if err := gob.NewDecoder(bytes.NewReader(val)).Decode(&blockData); err != nil { return err } blockNum := blockData.Block.Number - if blockNum.Cmp(startBlock) >= 0 && blockNum.Cmp(endBlock) <= 0 { + if blockNum.Cmp(endBlock) > 0 { + return nil + } + if blockNum.Cmp(startBlock) >= 0 { result = append(result, blockData) } return nil })
232-235: Report total on-disk size (LSM + vlog)badger.DB.Size() returns (lsm, vlog); you currently ignore vlog.
Apply:
- lsm, _ := b.db.Size() - return lsm, b.blockCount + lsm, vlog := b.db.Size() + return lsm + vlog, b.blockCount
107-121: Potential overcount on duplicate keysAdd increments blockCount and per-chain counts without checking if the key already exists; repeated Add of same block skews stats/flush thresholds.
Consider WriteBatch with Insert-only semantics or a pre-check to avoid double-counting, if duplicates are expected in practice.
Also applies to: 128-157
51-70: Comment/value mismatchValueThreshold comment says “> 512 bytes” but code sets 1024. Align comment or value.
internal/source/s3.go (4)
99-107: Confusing comment vs. value for MaxCacheSizeComment says “Increased from 5GB to 10GB” but value is 5GB.
Apply this diff to make intent explicit:
- if cfg.MaxCacheSize == 0 { - cfg.MaxCacheSize = 5 * 1024 * 1024 * 1024 // Increased from 5GB to 10GB - } + if cfg.MaxCacheSize == 0 { + cfg.MaxCacheSize = 10 * 1024 * 1024 * 1024 // 10GB default + }
901-906: Reduce collision risk in cache filenamesOnly 16 hex chars (~64 bits) increases collision risk. Use full hash.
Apply this diff:
- hash := sha256.Sum256([]byte(fileKey)) - filename := hex.EncodeToString(hash[:])[:16] + ".parquet" + hash := sha256.Sum256([]byte(fileKey)) + filename := hex.EncodeToString(hash[:]) + ".parquet" return filepath.Join(s.cacheDir, filename)
575-660: Block index is built but never used
buildBlockIndexfillss.blockIndexbut read path ignores it. Delete it or wire it into selective reads; otherwise it’s wasted work.Do you want me to remove it or integrate it with
readBlocksFromLocalFile?
158-170: Use a context with timeout for AWS config load
LoadDefaultConfigusescontext.Background(). Prefer a bounded context.Apply this diff:
- awsCfg, err := awsconfig.LoadDefaultConfig(context.Background(), + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(cfg.Region), )internal/storage/kafka.go (1)
134-137: Fix doc commentThis closes Kafka publisher, not Redis.
Apply this diff:
-// Close closes the Redis connection +// Close closes the Kafka publishercmd/migrate_valid.go (1)
151-161: Remainder distribution can be simplerMinor: avoid
big.Intallocations in loop.Apply this diff:
- remainder := new(big.Int).Mod(totalBlocks, big.NewInt(int64(numWorkers))) + remainder := int(new(big.Int).Mod(totalBlocks, big.NewInt(int64(numWorkers))).Int64()) @@ - if big.NewInt(int64(i)).Cmp(remainder) < 0 { + if i < remainder { workerBlockCount.Add(workerBlockCount, big.NewInt(1)) }internal/storage/badger.go (2)
126-134: Iterator perf: disable value prefetch when scanning keysFor scans where most entries are filtered/skipped, set
PrefetchValues = falseto reduce I/O.Apply this diff:
- opts := badger.DefaultIteratorOptions + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false
494-526: Avoid building large deletion lists; delete inlineTo reduce memory spikes on large datasets, delete as you iterate (using
KeyCopy(nil)).Apply this diff:
- var keysToDelete [][]byte for it.Rewind(); it.Valid(); it.Next() { key := string(it.Item().Key()) parts := strings.Split(key, ":") @@ - if blockNum.Cmp(blockNumber) <= 0 { - keysToDelete = append(keysToDelete, it.Item().KeyCopy(nil)) - } + if blockNum.Cmp(blockNumber) <= 0 { + if err := txn.Delete(it.Item().KeyCopy(nil)); err != nil { + return err + } + } } - - for _, key := range keysToDelete { - if err := txn.Delete(key); err != nil { - return err - } - }internal/storage/connector.go (1)
63-67: TODO in exported typeMinor: remove or rephrase the TODO in
QueryResultbefore merging.internal/storage/block_buffer.go (1)
63-90: Consider using a more efficient size estimation approachEncoding the entire batch with gob on every Add call could become a performance bottleneck for large batches. Consider estimating size based on known field sizes or using a running average.
Consider implementing a size estimation based on field analysis or maintaining a running average of bytes-per-block ratio from successful encodings. This would avoid the overhead of gob encoding on every Add call while still providing reasonable accuracy for buffer management.
internal/storage/s3.go (1)
71-78: Log a warning when explicit AWS credentials are used
Add a warning under the explicit-credentials branch in internal/storage/s3.go to discourage static credentials in favor of IAM roles:--- a/internal/storage/s3.go +++ b/internal/storage/s3.go @@ -71,6 +71,7 @@ if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" { + log.Warn().Msg("Using explicit AWS credentials from config; consider using IAM roles for enhanced security.") awsCfg.Credentials = aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) { return aws.Credentials{ AccessKeyID: cfg.AccessKeyID, SecretAccessKey: cfg.SecretAccessKey, }, nil }) }No occurrences of logging these credentials were found in the codebase.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (54)
cmd/migrate_valid.go(5 hunks)cmd/orchestrator.go(1 hunks)cmd/root.go(15 hunks)configs/config.go(8 hunks)go.mod(6 hunks)internal/common/block.go(2 hunks)internal/handlers/logs_handlers.go(1 hunks)internal/orchestrator/committer.go(14 hunks)internal/orchestrator/committer_test.go(5 hunks)internal/orchestrator/failure_recoverer.go(2 hunks)internal/orchestrator/orchestrator.go(2 hunks)internal/orchestrator/poller.go(7 hunks)internal/orchestrator/validator.go(2 hunks)internal/publisher/publisher.go(1 hunks)internal/rpc/batcher.go(2 hunks)internal/rpc/rpc.go(2 hunks)internal/source/s3.go(1 hunks)internal/source/source.go(1 hunks)internal/storage/badger.go(1 hunks)internal/storage/block_buffer.go(1 hunks)internal/storage/block_buffer_badger.go(1 hunks)internal/storage/block_buffer_badger_test.go(1 hunks)internal/storage/clickhouse.go(5 hunks)internal/storage/connector.go(3 hunks)internal/storage/kafka.go(1 hunks)internal/storage/kafka_publisher.go(1 hunks)internal/storage/postgres.go(2 hunks)internal/storage/redis.go(1 hunks)internal/storage/s3.go(1 hunks)internal/tools/clickhouse/0000_clickhouse_create_blocks_table.sql(1 hunks)internal/tools/clickhouse/0001_clickhouse_create_block_failures_table.sql(0 hunks)internal/tools/clickhouse/0001_clickhouse_create_transactions_table.sql(1 hunks)internal/tools/clickhouse/0002_clickhouse_create_cursors_table.sql(0 hunks)internal/tools/clickhouse/0002_clickhouse_create_logs_table.sql(1 hunks)internal/tools/clickhouse/0003_clickhouse_create_staging_table.sql(0 hunks)internal/tools/clickhouse/0003_clickhouse_create_traces_table.sql(1 hunks)internal/tools/clickhouse/0004_clickhouse_create_insert_null_table.sql(2 hunks)internal/tools/clickhouse/0005_clickhouse_create_insert_data_mv.sql(5 hunks)internal/tools/clickhouse/0006_clickhouse_create_logs_table.sql(0 hunks)internal/tools/clickhouse/0006_clickhouse_create_token_transfers.sql(1 hunks)internal/tools/clickhouse/0007_clickhouse_create_token_transfers_mv.sql(1 hunks)internal/tools/clickhouse/0008_clickhouse_create_token_balances.sql(1 hunks)internal/tools/clickhouse/0009_clickhouse_create_token_balances_mv.sql(1 hunks)internal/tools/clickhouse/0010_clickhouse_create_address_transactions.sql(2 hunks)internal/tools/clickhouse/0010_clickhouse_create_token_transfers_mv.sql(0 hunks)internal/tools/clickhouse/0011_clickhouse_create_address_transactions_mv.sql(1 hunks)internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql(1 hunks)internal/tools/clickhouse/0013_clickhouse_create_address_transfers_mv.sql(1 hunks)internal/tools/clickhouse_opts/0000_clickhouse_backfill_logs_transfer.sql(1 hunks)internal/worker/worker.go(2 hunks)test/mocks/MockIMainStorage.go(3 hunks)test/mocks/MockIOrchestratorStorage.go(4 hunks)test/mocks/MockIRPCClient.go(1 hunks)test/mocks/MockIStagingStorage.go(4 hunks)
💤 Files with no reviewable changes (5)
- internal/tools/clickhouse/0001_clickhouse_create_block_failures_table.sql
- internal/tools/clickhouse/0003_clickhouse_create_staging_table.sql
- internal/tools/clickhouse/0006_clickhouse_create_logs_table.sql
- internal/tools/clickhouse/0010_clickhouse_create_token_transfers_mv.sql
- internal/tools/clickhouse/0002_clickhouse_create_cursors_table.sql
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-22T18:30:59.800Z
Learnt from: iuwqyir
PR: thirdweb-dev/insight#240
File: internal/tools/clickhouse_create_token_balances_mv.sql:8-17
Timestamp: 2025-07-22T18:30:59.800Z
Learning: In the token_balances table projection design, the user prefers to have the address_projection optimized for queries by (token_type, chain_id, address, token_id) without including owner, as the main table is already optimized for owner-based query paths.
Applied to files:
internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sqlinternal/tools/clickhouse/0008_clickhouse_create_token_balances.sqlinternal/tools/clickhouse/0006_clickhouse_create_token_transfers.sqlinternal/tools/clickhouse/0010_clickhouse_create_address_transactions.sqlinternal/tools/clickhouse/0009_clickhouse_create_token_balances_mv.sql
🧬 Code graph analysis (20)
internal/source/source.go (1)
internal/rpc/rpc.go (1)
GetFullBlockResult(18-22)
internal/rpc/batcher.go (2)
internal/rpc/rpc.go (1)
Client(57-66)internal/common/utils.go (1)
SliceToChunks(7-20)
internal/storage/block_buffer_badger_test.go (3)
internal/storage/block_buffer_badger.go (1)
NewBadgerBlockBuffer(41-96)internal/common/block.go (2)
BlockData(61-66)Block(8-33)internal/storage/block_buffer.go (1)
ChainStats(257-261)
internal/orchestrator/failure_recoverer.go (1)
internal/storage/connector.go (1)
QueryFilter(11-29)
internal/common/block.go (3)
internal/common/transaction.go (1)
Transaction(16-51)internal/common/log.go (1)
Log(15-31)internal/common/trace.go (1)
Trace(8-32)
internal/storage/block_buffer.go (2)
internal/common/block.go (2)
BlockData(61-66)Block(8-33)internal/storage/block_buffer_badger.go (1)
NewBadgerBlockBuffer(41-96)
internal/storage/block_buffer_badger.go (2)
internal/common/block.go (2)
BlockData(61-66)Block(8-33)internal/storage/block_buffer.go (3)
BufferStats(249-254)ChainStats(257-261)IBlockBuffer(24-37)
internal/orchestrator/orchestrator.go (1)
internal/orchestrator/poller.go (4)
NewPoller(100-132)WithPollerWorkModeChan(46-50)WithPollerS3Source(52-66)Poller(23-37)
internal/handlers/logs_handlers.go (1)
internal/storage/connector.go (1)
NewMainConnector(305-379)
internal/rpc/rpc.go (1)
internal/rpc/batcher.go (2)
RPCFetchSingleBatchWithRetry(88-153)RPCFetchInBatchesWithRetry(54-86)
internal/worker/worker.go (4)
internal/rpc/rpc.go (2)
IRPCClient(42-55)GetFullBlockResult(18-22)internal/source/source.go (1)
ISource(10-14)internal/common/utils.go (1)
SliceToChunks(7-20)internal/metrics/metrics.go (1)
LastFetchedBlock(37-40)
internal/storage/s3.go (7)
internal/storage/block_buffer.go (2)
IBlockBuffer(24-37)NewBlockBuffer(40-46)internal/common/block.go (3)
BlockData(61-66)Block(8-33)BlockHeader(68-72)internal/storage/block_buffer_badger.go (1)
NewBadgerBlockBuffer(41-96)internal/common/transaction.go (1)
Transaction(16-51)internal/common/log.go (1)
Log(15-31)internal/common/trace.go (1)
Trace(8-32)internal/storage/connector.go (2)
QueryFilter(11-29)QueryResult(63-67)
internal/orchestrator/poller.go (4)
internal/worker/worker.go (3)
Worker(35-39)NewWorkerWithArchive(49-55)NewWorker(41-46)internal/source/s3.go (1)
NewS3Source(91-173)internal/rpc/rpc.go (1)
IRPCClient(42-55)internal/storage/connector.go (1)
IStorage(69-73)
internal/storage/kafka_publisher.go (1)
internal/common/block.go (2)
BlockData(61-66)Block(8-33)
internal/source/s3.go (6)
internal/rpc/rpc.go (2)
Client(57-66)GetFullBlockResult(18-22)internal/common/block.go (2)
Block(8-33)BlockData(61-66)internal/common/trace.go (1)
Trace(8-32)internal/storage/s3.go (1)
ParquetBlockData(51-60)internal/common/transaction.go (1)
Transaction(16-51)internal/common/log.go (1)
Log(15-31)
internal/orchestrator/committer.go (2)
internal/rpc/rpc.go (1)
IRPCClient(42-55)internal/orchestrator/poller.go (2)
Poller(23-37)NewBoundlessPoller(68-96)
internal/storage/badger.go (3)
internal/storage/connector.go (1)
QueryFilter(11-29)internal/common/block_failures.go (1)
BlockFailure(8-14)internal/common/block.go (2)
BlockData(61-66)Block(8-33)
internal/storage/connector.go (6)
internal/storage/redis.go (1)
NewRedisConnector(28-55)internal/storage/postgres.go (1)
NewPostgresConnector(22-58)internal/storage/clickhouse.go (1)
NewClickHouseConnector(66-79)internal/storage/badger.go (1)
NewBadgerConnector(27-65)internal/storage/kafka.go (1)
NewKafkaConnector(19-35)internal/storage/s3.go (1)
NewS3Connector(62-141)
test/mocks/MockIStagingStorage.go (2)
internal/common/block_failures.go (1)
BlockFailure(8-14)internal/storage/connector.go (1)
QueryFilter(11-29)
internal/storage/kafka.go (3)
internal/storage/kafka_publisher.go (2)
KafkaPublisher(20-23)NewKafkaPublisher(60-111)internal/storage/connector.go (5)
IOrchestratorStorage(100-109)BalancesQueryFilter(48-61)QueryResult(63-67)TransfersQueryFilter(31-46)QueryFilter(11-29)internal/common/block.go (2)
BlockData(61-66)Block(8-33)
| rootCmd.PersistentFlags().String("poller-s3-format", "parquet", "S3 storage format for poller archive source") | ||
| rootCmd.PersistentFlags().String("poller-s3-cacheDir", "/tmp/insight-archive", "Local cache directory for poller archive source") | ||
| rootCmd.PersistentFlags().Int("poller-s3-metadataTTL", 0, "Metadata cache TTL in seconds for poller archive source") | ||
| rootCmd.PersistentFlags().Int("poller-s3-fileCacheTTL", 0, "File cache TTL in seconds for poller archive source") | ||
| rootCmd.PersistentFlags().Int64("poller-s3-maxCacheSize", 0, "Max cache size in bytes for poller archive source (default 5GB)") | ||
| rootCmd.PersistentFlags().Int("poller-s3-cleanupInterval", 0, "Cache cleanup interval in seconds for poller archive source") | ||
| rootCmd.PersistentFlags().Int("poller-s3-maxConcurrentDownloads", 3, "Max concurrent downloads for poller archive source") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing viper binding for poller.s3.format.
Flag is defined but never bound; value won’t flow into config.
// existing s3 poller binds...
+viper.BindPFlag("poller.s3.format", rootCmd.PersistentFlags().Lookup("poller-s3-format"))Also applies to: 265-277
🤖 Prompt for AI Agents
In cmd/root.go around lines 65-71 (and similarly for flags at 265-277), the CLI
flags for poller S3 options are registered but not bound to viper, so config
values from flags won't flow into viper keys; call viper.BindPFlag for each flag
mapping the flag name to the appropriate viper key (e.g., bind
"poller-s3-format" to "poller.s3.format") after registering the flag, and repeat
for poller-s3-cacheDir -> poller.s3.cacheDir, poller-s3-metadataTTL ->
poller.s3.metadataTTL, poller-s3-fileCacheTTL -> poller.s3.fileCacheTTL,
poller-s3-maxCacheSize -> poller.s3.maxCacheSize, poller-s3-cleanupInterval ->
poller.s3.cleanupInterval, and poller-s3-maxConcurrentDownloads ->
poller.s3.maxConcurrentDownloads so viper picks up flag values.
| func getMainStorage() (storage.IMainStorage, error) { | ||
| storageOnce.Do(func() { | ||
| var err error | ||
| mainStorage, err = storage.NewConnector[storage.IMainStorage](&config.Cfg.Storage.Main) | ||
| // TODO: move this to a QueryConnector later to decouple read/write connector | ||
| mainStorage, err = storage.NewMainConnector(&config.Cfg.Storage.Main, nil) | ||
| if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid selecting Kafka for read paths; prefer queryable backends.
NewMainConnector requires orchestrator storage for Kafka and Kafka isn’t queryable here. In auto mode this can 500 if Kafka is configured.
Apply:
func getMainStorage() (storage.IMainStorage, error) {
storageOnce.Do(func() {
var err error
- // TODO: move this to a QueryConnector later to decouple read/write connector
- mainStorage, err = storage.NewMainConnector(&config.Cfg.Storage.Main, nil)
+ // TODO: move this to a QueryConnector later to decouple read/write connector
+ // Prefer queryable backends for read APIs; ignore Kafka in auto mode.
+ cfg := config.Cfg.Storage.Main
+ if cfg.Type == "" || cfg.Type == "auto" {
+ cfg.Kafka = nil
+ }
+ mainStorage, err = storage.NewMainConnector(&cfg, nil)
if err != nil {
storageErr = err
log.Error().Err(err).Msg("Error creating storage connector")
}
})
return mainStorage, storageErr
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func getMainStorage() (storage.IMainStorage, error) { | |
| storageOnce.Do(func() { | |
| var err error | |
| mainStorage, err = storage.NewConnector[storage.IMainStorage](&config.Cfg.Storage.Main) | |
| // TODO: move this to a QueryConnector later to decouple read/write connector | |
| mainStorage, err = storage.NewMainConnector(&config.Cfg.Storage.Main, nil) | |
| if err != nil { | |
| func getMainStorage() (storage.IMainStorage, error) { | |
| storageOnce.Do(func() { | |
| var err error | |
| // TODO: move this to a QueryConnector later to decouple read/write connector | |
| // Prefer queryable backends for read APIs; ignore Kafka in auto mode. | |
| cfg := config.Cfg.Storage.Main | |
| if cfg.Type == "" || cfg.Type == "auto" { | |
| cfg.Kafka = nil | |
| } | |
| mainStorage, err = storage.NewMainConnector(&cfg, nil) | |
| if err != nil { | |
| storageErr = err | |
| log.Error().Err(err).Msg("Error creating storage connector") | |
| } | |
| }) | |
| return mainStorage, storageErr | |
| } |
🤖 Prompt for AI Agents
In internal/handlers/logs_handlers.go around lines 224 to 229, the code uses
storage.NewMainConnector which can select Kafka (non-queryable) for read paths
and cause 500s; replace this with the queryable connector (e.g.,
storage.NewQueryConnector or the storage API that returns a query-capable
backend) so read paths never pick Kafka, or explicitly force the connector
selection to a queryable backend in auto mode (fall back to a DB/readonly store
if Kafka is configured). Keep the same error handling and ensure the chosen
connector is suitable for queries.
| blocksToPoll := blocksToCommit | ||
| if len(blocksToCommit) > int(poller.blocksPerPoll) { | ||
| blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)] | ||
| if len(blocksToCommit) > int(c.poller.blocksPerPoll) { | ||
| blocksToPoll = blocksToCommit[:int(c.poller.blocksPerPoll)] | ||
| } | ||
| poller.Poll(ctx, blocksToPoll) | ||
| c.poller.Poll(ctx, blocksToPoll) | ||
| log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ignore error from poller when staging data is missing
Same as above.
Apply:
- c.poller.Poll(ctx, blocksToPoll)
+ if err := c.poller.Poll(ctx, blocksToPoll); err != nil {
+ log.Error().Err(err).Msg("Poller error while backfilling missing staging data")
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| blocksToPoll := blocksToCommit | |
| if len(blocksToCommit) > int(poller.blocksPerPoll) { | |
| blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)] | |
| if len(blocksToCommit) > int(c.poller.blocksPerPoll) { | |
| blocksToPoll = blocksToCommit[:int(c.poller.blocksPerPoll)] | |
| } | |
| poller.Poll(ctx, blocksToPoll) | |
| c.poller.Poll(ctx, blocksToPoll) | |
| log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String()) | |
| } | |
| blocksToPoll := blocksToCommit | |
| if len(blocksToCommit) > int(c.poller.blocksPerPoll) { | |
| blocksToPoll = blocksToCommit[:int(c.poller.blocksPerPoll)] | |
| } | |
| if err := c.poller.Poll(ctx, blocksToPoll); err != nil { | |
| log.Error().Err(err).Msg("Poller error while backfilling missing staging data") | |
| } | |
| log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", | |
| len(blocksToPoll), | |
| blocksToPoll[0].String(), | |
| blocksToPoll[len(blocksToPoll)-1].String(), | |
| ) | |
| } |
| // Waiting for all goroutines to complete | ||
| o.wg.Wait() | ||
|
|
||
| func (o *Orchestrator) Shutdown() { | ||
| if o.cancel != nil { | ||
| o.cancel() | ||
| if err := o.storage.Close(); err != nil { | ||
| log.Error().Err(err).Msg("Error closing storage connections") | ||
| } | ||
|
|
||
| log.Info().Msg("Orchestrator shutdown complete") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Stop signal delivery and close RPC/storage on shutdown
- Call
signal.Stop(sigChan)to avoid leaking the signal notifier after return. - Close the RPC client as
IRPCClientexposesClose().
// Waiting for all goroutines to complete
o.wg.Wait()
if err := o.storage.Close(); err != nil {
log.Error().Err(err).Msg("Error closing storage connections")
}
+ signal.Stop(sigChan)
+ if o.rpc != nil {
+ o.rpc.Close()
+ }
log.Info().Msg("Orchestrator shutdown complete")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Waiting for all goroutines to complete | |
| o.wg.Wait() | |
| func (o *Orchestrator) Shutdown() { | |
| if o.cancel != nil { | |
| o.cancel() | |
| if err := o.storage.Close(); err != nil { | |
| log.Error().Err(err).Msg("Error closing storage connections") | |
| } | |
| log.Info().Msg("Orchestrator shutdown complete") | |
| } | |
| // Waiting for all goroutines to complete | |
| o.wg.Wait() | |
| if err := o.storage.Close(); err != nil { | |
| log.Error().Err(err).Msg("Error closing storage connections") | |
| } | |
| signal.Stop(sigChan) | |
| if o.rpc != nil { | |
| o.rpc.Close() | |
| } | |
| log.Info().Msg("Orchestrator shutdown complete") | |
| } |
🤖 Prompt for AI Agents
internal/orchestrator/orchestrator.go around lines 136 to 144: after wg.Wait()
ensure you stop the OS signal notifier and close RPC before closing storage to
avoid leaks and dangling connections; call signal.Stop(sigChan) (where sigChan
is the channel used to receive shutdown signals) to unregister the notifier,
then if o.rpc != nil call o.rpc.Close() and handle/log any error, then proceed
to close storage as currently implemented and log shutdown completion.
| func WithPollerS3Source(cfg *config.S3SourceConfig) PollerOption { | ||
| return func(p *Poller) { | ||
| if cfg == nil || cfg.Region == "" || cfg.Bucket == "" { | ||
| return | ||
| } | ||
|
|
||
| source, err := source.NewS3Source(cfg, p.rpc.GetChainID()) | ||
| if err != nil { | ||
| log.Fatal().Err(err).Msg("Failed to create S3 source") | ||
| } | ||
|
|
||
| log.Info().Msg("Poller S3 source configuration detected, setting up S3 source for poller") | ||
| p.worker = worker.NewWorkerWithArchive(p.rpc, source) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Do not terminate the process from a library; avoid name shadowing
- Replace
log.Fatal()withlog.Error()and gracefully fall back to RPC worker. Avoid shadowing thesourceimport with a localsourcevariable name.
-func WithPollerS3Source(cfg *config.S3SourceConfig) PollerOption {
+func WithPollerS3Source(cfg *config.S3SourceConfig) PollerOption {
return func(p *Poller) {
if cfg == nil || cfg.Region == "" || cfg.Bucket == "" {
return
}
- source, err := source.NewS3Source(cfg, p.rpc.GetChainID())
+ s3src, err := source.NewS3Source(cfg, p.rpc.GetChainID())
if err != nil {
- log.Fatal().Err(err).Msg("Failed to create S3 source")
+ log.Error().Err(err).Msg("Failed to create S3 source; falling back to RPC")
+ return
}
log.Info().Msg("Poller S3 source configuration detected, setting up S3 source for poller")
- p.worker = worker.NewWorkerWithArchive(p.rpc, source)
+ p.worker = worker.NewWorkerWithArchive(p.rpc, s3src)
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func WithPollerS3Source(cfg *config.S3SourceConfig) PollerOption { | |
| return func(p *Poller) { | |
| if cfg == nil || cfg.Region == "" || cfg.Bucket == "" { | |
| return | |
| } | |
| source, err := source.NewS3Source(cfg, p.rpc.GetChainID()) | |
| if err != nil { | |
| log.Fatal().Err(err).Msg("Failed to create S3 source") | |
| } | |
| log.Info().Msg("Poller S3 source configuration detected, setting up S3 source for poller") | |
| p.worker = worker.NewWorkerWithArchive(p.rpc, source) | |
| } | |
| } | |
| func WithPollerS3Source(cfg *config.S3SourceConfig) PollerOption { | |
| return func(p *Poller) { | |
| if cfg == nil || cfg.Region == "" || cfg.Bucket == "" { | |
| return | |
| } | |
| s3src, err := source.NewS3Source(cfg, p.rpc.GetChainID()) | |
| if err != nil { | |
| log.Error().Err(err).Msg("Failed to create S3 source; falling back to RPC") | |
| return | |
| } | |
| log.Info().Msg("Poller S3 source configuration detected, setting up S3 source for poller") | |
| p.worker = worker.NewWorkerWithArchive(p.rpc, s3src) | |
| } | |
| } |
🤖 Prompt for AI Agents
In internal/orchestrator/poller.go around lines 52 to 66, the function currently
calls log.Fatal on S3 source creation and shadows the imported source package
with a local variable; change it to use log.Error (or log.Err) so the library
does not terminate the process, rename the local variable to avoid shadowing
(e.g., s3Src or s3Source), and on error simply log and return so the caller
keeps the default RPC worker; only set p.worker when the S3 source was created
successfully.
| INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1, | ||
| INDEX idx_address_type address_type TYPE bloom_filter GRANULARITY 3, | ||
| INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 4, | ||
| INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 4, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Index token_address (and optionally token_type) for common filter paths
Token-address filters are frequent; add bloom indexes to avoid full scans under (chain_id, address, …) sort.
INDEX idx_address_type address_type TYPE bloom_filter GRANULARITY 3,
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 4,
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 4,
+ INDEX idx_token_address token_address TYPE bloom_filter GRANULARITY 3,
+ INDEX idx_token_type token_type TYPE bloom_filter GRANULARITY 4,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1, | |
| INDEX idx_address_type address_type TYPE bloom_filter GRANULARITY 3, | |
| INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 4, | |
| INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 4, | |
| INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1, | |
| INDEX idx_address_type address_type TYPE bloom_filter GRANULARITY 3, | |
| INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 4, | |
| INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 4, | |
| INDEX idx_token_address token_address TYPE bloom_filter GRANULARITY 3, | |
| INDEX idx_token_type token_type TYPE bloom_filter GRANULARITY 4, |
🤖 Prompt for AI Agents
In internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql around
lines 21 to 24, add a bloom index on token_address (and optionally token_type)
to speed up common (chain_id, address, ...) filter paths; insert lines such as
an INDEX idx_token_address token_address TYPE bloom_filter GRANULARITY 4 (and if
desired INDEX idx_token_type token_type TYPE bloom_filter GRANULARITY 3)
alongside the existing indexes so token-based queries avoid full table scans.
| count() AS transfer_count, | ||
| sum(toInt256(amount)) AS total_amount, | ||
| min(block_number) AS min_block_number, | ||
| min(block_timestamp) AS min_block_timestamp, | ||
| max(block_number) AS max_block_number, | ||
| max(block_timestamp) AS max_block_timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid toInt256(amount); use Decimal256 for safe aggregation
Casting UInt256→Int256 risks overflow. Aggregate as Decimal256(precision=76, scale=0).
- sum(toInt256(amount)) AS total_amount,
+ sum(toDecimal256(amount, 0)) AS total_amount,Apply the same change in the second projection below.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| count() AS transfer_count, | |
| sum(toInt256(amount)) AS total_amount, | |
| min(block_number) AS min_block_number, | |
| min(block_timestamp) AS min_block_timestamp, | |
| max(block_number) AS max_block_number, | |
| max(block_timestamp) AS max_block_timestamp | |
| count() AS transfer_count, | |
| sum(toDecimal256(amount, 0)) AS total_amount, | |
| min(block_number) AS min_block_number, | |
| min(block_timestamp) AS min_block_timestamp, | |
| max(block_number) AS max_block_number, | |
| max(block_timestamp) AS max_block_timestamp |
🤖 Prompt for AI Agents
In internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql around
lines 33 to 38, the code casts amount with toInt256(amount) which risks
overflow; replace those casts with toDecimal256(amount, 76, 0) (Decimal256
precision=76, scale=0) for the total_amount aggregation, and apply the same
replacement in the second projection later in the file so all aggregations use
Decimal256 instead of Int256.
| count() AS transfer_count, | ||
| sum(toInt256(amount)) AS total_amount, | ||
| min(block_number) AS min_block_number, | ||
| min(block_timestamp) AS min_block_timestamp, | ||
| max(block_number) AS max_block_number, | ||
| max(block_timestamp) AS max_block_timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Apply Decimal256 aggregation here as well
Mirror the safe sum change in the total-state projection.
- sum(toInt256(amount)) AS total_amount,
+ sum(toDecimal256(amount, 0)) AS total_amount,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| count() AS transfer_count, | |
| sum(toInt256(amount)) AS total_amount, | |
| min(block_number) AS min_block_number, | |
| min(block_timestamp) AS min_block_timestamp, | |
| max(block_number) AS max_block_number, | |
| max(block_timestamp) AS max_block_timestamp | |
| count() AS transfer_count, | |
| sum(toDecimal256(amount, 0)) AS total_amount, | |
| min(block_number) AS min_block_number, | |
| min(block_timestamp) AS min_block_timestamp, | |
| max(block_number) AS max_block_number, | |
| max(block_timestamp) AS max_block_timestamp |
🤖 Prompt for AI Agents
In internal/tools/clickhouse/0012_clickhouse_create_address_transfers.sql around
lines 52 to 57, the total_amount aggregation still uses sum(toInt256(amount))
instead of the Decimal256-safe aggregation used in the total-state projection;
replace the current sum aggregation with the Decimal256 variant used in the
other projection (e.g., sum(toDecimal256(amount, <scale>)) or the exact helper
used there) so the scale and casting match the total-state implementation and
avoid overflow/precision differences.
| address_tuple.1 AS address, | ||
| address_tuple.2 AS address_type, | ||
| from_address, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Cast address_type explicitly to Enum8 to avoid implicit-cast surprises
Relying on String→Enum auto-cast can break if enum labels change. Cast explicitly.
- address_tuple.2 AS address_type,
+ CAST(address_tuple.2, 'Enum8(\'from\' = 1, \'to\' = 2)') AS address_type,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| address_tuple.1 AS address, | |
| address_tuple.2 AS address_type, | |
| from_address, | |
| address_tuple.1 AS address, | |
| CAST(address_tuple.2, 'Enum8(\'from\' = 1, \'to\' = 2)') AS address_type, | |
| from_address, |
🤖 Prompt for AI Agents
internal/tools/clickhouse/0013_clickhouse_create_address_transfers_mv.sql around
lines 9-11: the SELECT currently uses address_tuple.2 AS address_type which
relies on implicit String→Enum casting; replace it with an explicit cast to the
Enum8 type used for address_type (e.g. CAST(address_tuple.2 AS Enum8(...)) or
CAST(address_tuple.2 AS your_existing_Enum8_type)) so the query uses a stable,
explicit enum conversion and avoids implicit-cast breakage if labels change.
| // Run processes blocks using either archive OR rpc | ||
| func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFullBlockResult { | ||
| if len(blockNumbers) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| var results []rpc.GetFullBlockResult | ||
|
|
||
| // Determine which source to use | ||
| sourceType := SourceTypeRPC | ||
| fetchFunc := w.fetchFromRPC | ||
|
|
||
| if w.shouldUseArchive(ctx, blockNumbers) { | ||
| sourceType = SourceTypeArchive | ||
| fetchFunc = w.fetchFromArchive | ||
| log.Debug(). | ||
| Int("count", len(blockNumbers)). | ||
| Str("source", sourceType.String()). | ||
| Msg("Using archive for all blocks") | ||
| } else { | ||
| log.Debug(). | ||
| Int("count", len(blockNumbers)). | ||
| Str("source", sourceType.String()). | ||
| Msg("Using RPC for all blocks") | ||
| } | ||
|
|
||
| // Process all blocks with the selected source | ||
| results = w.processBatch(ctx, blockNumbers, sourceType, fetchFunc) | ||
|
|
||
| // Update metrics and log summary | ||
| if len(results) > 0 { | ||
| lastBlockNumberFloat, _ := results[len(results)-1].BlockNumber.Float64() | ||
| metrics.LastFetchedBlock.Set(lastBlockNumberFloat) | ||
|
|
||
| // Count successes and failures | ||
| successful := 0 | ||
| failed := 0 | ||
| for _, r := range results { | ||
| if r.Error == nil { | ||
| successful++ | ||
| } else { | ||
| failed++ | ||
| } | ||
| } | ||
|
|
||
| log.Debug(). | ||
| Int("total", len(results)). | ||
| Int("successful", successful). | ||
| Int("failed", failed). | ||
| Str("source", sourceType.String()). | ||
| Msg("Block fetching complete") | ||
| } | ||
|
|
||
| return results | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Guard metric update against nil BlockNumber
If any result lacks BlockNumber, dereferencing will panic. Compute max valid block first.
- if len(results) > 0 {
- lastBlockNumberFloat, _ := results[len(results)-1].BlockNumber.Float64()
- metrics.LastFetchedBlock.Set(lastBlockNumberFloat)
+ if len(results) > 0 {
+ var last *big.Int
+ for i := len(results) - 1; i >= 0; i-- {
+ if results[i].BlockNumber != nil {
+ last = results[i].BlockNumber
+ break
+ }
+ }
+ if last != nil {
+ if f, ok := new(big.Float).SetInt(last).Float64(); ok {
+ metrics.LastFetchedBlock.Set(f)
+ }
+ }Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In internal/worker/worker.go around lines 280 to 334, the metrics update
currently dereferences results[len(results)-1].BlockNumber which can be nil and
cause a panic; instead iterate the results to find the maximum non-nil
BlockNumber (skip nil entries), convert that big.Int to float64 (handling
conversion error), and only call metrics.LastFetchedBlock.Set(...) when a valid
block number was found; leave logging/count logic unchanged but guard the metric
update behind this validity check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review continued from previous batch...
| prepStartTime := time.Now() | ||
| blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap)) | ||
| for _, blockData := range blocksToInsertMap { | ||
| blocksToInsert = append(blocksToInsert, blockData) | ||
| } | ||
| prepDuration := time.Since(prepStartTime) | ||
| log.Debug().Dur("duration", prepDuration).Int("blocks_to_insert", len(blocksToInsert)).Msgf("Worker %d: Prepared blocks for insertion", workerID) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort blocks before insertion to preserve ordering and downstream assumptions
Maps randomize order; some connectors (e.g., Kafka) may assume ascending order when updating cursors.
Apply this diff:
- blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap))
- for _, blockData := range blocksToInsertMap {
- blocksToInsert = append(blocksToInsert, blockData)
- }
+ blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap))
+ for _, bd := range blocksToInsertMap {
+ blocksToInsert = append(blocksToInsert, bd)
+ }
+ sort.Slice(blocksToInsert, func(i, j int) bool {
+ return blocksToInsert[i].Block.Number.Cmp(blocksToInsert[j].Block.Number) < 0
+ })📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| prepStartTime := time.Now() | |
| blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap)) | |
| for _, blockData := range blocksToInsertMap { | |
| blocksToInsert = append(blocksToInsert, blockData) | |
| } | |
| prepDuration := time.Since(prepStartTime) | |
| log.Debug().Dur("duration", prepDuration).Int("blocks_to_insert", len(blocksToInsert)).Msgf("Worker %d: Prepared blocks for insertion", workerID) | |
| prepStartTime := time.Now() | |
| blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap)) | |
| for _, bd := range blocksToInsertMap { | |
| blocksToInsert = append(blocksToInsert, bd) | |
| } | |
| sort.Slice(blocksToInsert, func(i, j int) bool { | |
| return blocksToInsert[i].Block.Number.Cmp(blocksToInsert[j].Block.Number) < 0 | |
| }) | |
| prepDuration := time.Since(prepStartTime) | |
| log.Debug().Dur("duration", prepDuration).Int("blocks_to_insert", len(blocksToInsert)).Msgf("Worker %d: Prepared blocks for insertion", workerID) |
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 252 to 259, after converting
blocksToInsertMap into the blocksToInsert slice, sort blocksToInsert into
deterministic ascending order by the block sequence key (e.g.,
BlockNumber/Height or Cursor) using sort.Slice (or equivalent) before computing
prepDuration, logging, and performing the insertion; ensure the comparator uses
the correct numeric or lexicographic field for your BlockData so downstream
connectors (like Kafka) receive blocks in ascending order.
| blockData := m.worker.Run(context.Background(), blockNumbers) | ||
| for _, block := range blockData { | ||
| if block.Error != nil { | ||
| log.Warn().Err(block.Error).Msgf("Failed to fetch block %s from RPC", block.BlockNumber.String()) | ||
| continue | ||
| } | ||
| allBlockData = append(allBlockData, block.Data) | ||
| } | ||
| return allBlockData, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Propagate cancellation to RPC worker
You’re ignoring ctx and using context.Background(), which prevents graceful shutdown mid-batch.
Apply this diff:
-func (m *Migrator) FetchBlocksFromRPC(blockNumbers []*big.Int) ([]common.BlockData, error) {
+func (m *Migrator) FetchBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
allBlockData := make([]common.BlockData, 0, len(blockNumbers))
- blockData := m.worker.Run(context.Background(), blockNumbers)
+ blockData := m.worker.Run(ctx, blockNumbers)And update callers:
- In
processBlockRange:validMissingBlocks := migrator.GetValidBlocksFromRPC(ctx, missingBlocks) - In
GetValidBlocksFromRPC: acceptctx context.Contextand pass it through toFetchBlocksFromRPC.
🏁 Script executed:
#!/bin/bash
set -e
echo "=== FetchBlocksFromRPC definition ==="
rg -nP --type=go '\bfunc \(m \*Migrator\) FetchBlocksFromRPC\b' -C3 .
echo "=== GetValidBlocksFromRPC usage ==="
rg -nP --type=go '\bGetValidBlocksFromRPC\b' -C3 .
echo "=== Migrator struct definition ==="
rg -nP --type=go '\btype\s+Migrator\b' -C3 .
echo "=== worker.Run signature ==="
rg -nP --type=go '\bRun\(' -C3 .Length of output: 39401
🏁 Script executed:
#!/bin/bash
# Show context where GetValidBlocksFromRPC is called
sed -n '230,260p' cmd/migrate_valid.go
# Show full FetchBlocksFromRPC implementation
sed -n '430,480p' cmd/migrate_valid.goLength of output: 3328
Propagate cancellation context through FetchBlocksFromRPC
Replace context.Background() with the incoming ctx:
--- cmd/migrate_valid.go
-func (m *Migrator) FetchBlocksFromRPC(blockNumbers []*big.Int) ([]common.BlockData, error) {
+func (m *Migrator) FetchBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
allBlockData := make([]common.BlockData, 0, len(blockNumbers))
- blockData := m.worker.Run(context.Background(), blockNumbers)
+ blockData := m.worker.Run(ctx, blockNumbers)Update callers:
- Change
GetValidBlocksFromRPCtofunc (m *Migrator) GetValidBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) …and callm.FetchBlocksFromRPC(ctx, …). - In processBlockRange, use
validMissingBlocks := migrator.GetValidBlocksFromRPC(ctx, missingBlocks).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| blockData := m.worker.Run(context.Background(), blockNumbers) | |
| for _, block := range blockData { | |
| if block.Error != nil { | |
| log.Warn().Err(block.Error).Msgf("Failed to fetch block %s from RPC", block.BlockNumber.String()) | |
| continue | |
| } | |
| allBlockData = append(allBlockData, block.Data) | |
| } | |
| return allBlockData, nil | |
| func (m *Migrator) FetchBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) { | |
| allBlockData := make([]common.BlockData, 0, len(blockNumbers)) | |
| blockData := m.worker.Run(ctx, blockNumbers) | |
| for _, block := range blockData { | |
| if block.Error != nil { | |
| log.Warn().Err(block.Error).Msgf("Failed to fetch block %s from RPC", block.BlockNumber.String()) | |
| continue | |
| } | |
| allBlockData = append(allBlockData, block.Data) | |
| } | |
| return allBlockData, nil | |
| } |
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 441 to 449, the call to m.worker.Run uses
context.Background(), which ignores caller cancellation; change it to pass the
incoming ctx. Update the GetValidBlocksFromRPC signature to func (m *Migrator)
GetValidBlocksFromRPC(ctx context.Context, blockNumbers []*big.Int) … and inside
call m.FetchBlocksFromRPC(ctx, …) (or m.worker.Run with ctx) so cancellation
propagates. Also update callers: in processBlockRange call validMissingBlocks :=
migrator.GetValidBlocksFromRPC(ctx, missingBlocks) and any other sites to
forward the ctx. Ensure imports include context where needed.
| // Active use tracking | ||
| activeUseMu sync.RWMutex | ||
| activeUse map[string]int // Files currently being read (reference count) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleanup goroutine leak: add a stop channel
cleanupCache() runs forever; Close() only removes the dir. Add stopCh for a clean shutdown.
Apply this diff:
type S3Source struct {
client *s3.Client
@@
activeUseMu sync.RWMutex
activeUse map[string]int // Files currently being read (reference count)
+ stopCh chan struct{}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Active use tracking | |
| activeUseMu sync.RWMutex | |
| activeUse map[string]int // Files currently being read (reference count) | |
| } | |
| // Active use tracking | |
| activeUseMu sync.RWMutex | |
| activeUse map[string]int // Files currently being read (reference count) | |
| stopCh chan struct{} | |
| } |
🤖 Prompt for AI Agents
In internal/source/s3.go around lines 74 to 77, the cleanupCache goroutine runs
forever and Close() only removes the dir; add a stop channel and shutdown
coordination so the goroutine can exit cleanly. Modify the struct to include
stopCh chan struct{} and a wg sync.WaitGroup (or done channel) to track the
cleanup goroutine; when starting the cleanup goroutine increment wg and have the
goroutine select on either its tick/timer or <-stopCh and return when stopped.
In Close(), signal shutdown by closing or sending to stopCh (using sync.Once to
avoid double-close), wait for the goroutine via wg.Wait(), then perform
directory removal and other cleanup. Ensure any blocking operations in
cleanupCache honor context of stopCh to avoid leaks.
| archive := &S3Source{ | ||
| client: s3Client, | ||
| config: cfg, | ||
| chainId: chainId, | ||
| cacheDir: cacheDir, | ||
| metadataTTL: cfg.MetadataTTL, | ||
| fileCacheTTL: cfg.FileCacheTTL, | ||
| maxCacheSize: cfg.MaxCacheSize, | ||
| cleanupInterval: cfg.CleanupInterval, | ||
| maxConcurrentDownloads: cfg.MaxConcurrentDownloads, | ||
| fileMetadata: make(map[string]*FileMetadata), | ||
| cacheMap: make(map[string]time.Time), | ||
| blockIndex: make(map[string][]BlockIndex), | ||
| downloading: make(map[string]*sync.WaitGroup), | ||
| activeUse: make(map[string]int), | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initialize stop channel
Initialize stopCh so it can be closed on shutdown.
Apply this diff:
archive := &S3Source{
client: s3Client,
config: cfg,
chainId: chainId,
cacheDir: cacheDir,
@@
downloading: make(map[string]*sync.WaitGroup),
activeUse: make(map[string]int),
+ stopCh: make(chan struct{}),
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| archive := &S3Source{ | |
| client: s3Client, | |
| config: cfg, | |
| chainId: chainId, | |
| cacheDir: cacheDir, | |
| metadataTTL: cfg.MetadataTTL, | |
| fileCacheTTL: cfg.FileCacheTTL, | |
| maxCacheSize: cfg.MaxCacheSize, | |
| cleanupInterval: cfg.CleanupInterval, | |
| maxConcurrentDownloads: cfg.MaxConcurrentDownloads, | |
| fileMetadata: make(map[string]*FileMetadata), | |
| cacheMap: make(map[string]time.Time), | |
| blockIndex: make(map[string][]BlockIndex), | |
| downloading: make(map[string]*sync.WaitGroup), | |
| activeUse: make(map[string]int), | |
| } | |
| archive := &S3Source{ | |
| client: s3Client, | |
| config: cfg, | |
| chainId: chainId, | |
| cacheDir: cacheDir, | |
| metadataTTL: cfg.MetadataTTL, | |
| fileCacheTTL: cfg.FileCacheTTL, | |
| maxCacheSize: cfg.MaxCacheSize, | |
| cleanupInterval: cfg.CleanupInterval, | |
| maxConcurrentDownloads: cfg.MaxConcurrentDownloads, | |
| fileMetadata: make(map[string]*FileMetadata), | |
| cacheMap: make(map[string]time.Time), | |
| blockIndex: make(map[string][]BlockIndex), | |
| downloading: make(map[string]*sync.WaitGroup), | |
| activeUse: make(map[string]int), | |
| stopCh: make(chan struct{}), | |
| } |
🤖 Prompt for AI Agents
In internal/source/s3.go around lines 141 to 156, the S3Source struct literal
omits initialization of stopCh which prevents it from being closed on shutdown;
add stopCh: make(chan struct{}) to the struct initialization so the stop channel
is created and can be closed during shutdown handling.
| func (s *S3Source) Close() { | ||
| // Clean up cache directory | ||
| if s.cacheDir != "" { | ||
| os.RemoveAll(s.cacheDir) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close should stop background worker before removing cache
Stop the ticker goroutine; otherwise it leaks and may operate on a removed dir.
Apply this diff:
func (s *S3Source) Close() {
- // Clean up cache directory
+ // Stop background cleanup
+ if s.stopCh != nil {
+ close(s.stopCh)
+ }
+ // Clean up cache directory
if s.cacheDir != "" {
os.RemoveAll(s.cacheDir)
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (s *S3Source) Close() { | |
| // Clean up cache directory | |
| if s.cacheDir != "" { | |
| os.RemoveAll(s.cacheDir) | |
| } | |
| } | |
| func (s *S3Source) Close() { | |
| // Stop background cleanup | |
| if s.stopCh != nil { | |
| close(s.stopCh) | |
| } | |
| // Clean up cache directory | |
| if s.cacheDir != "" { | |
| os.RemoveAll(s.cacheDir) | |
| } | |
| } |
🤖 Prompt for AI Agents
In internal/source/s3.go around lines 309 to 314, Close currently removes the
cache directory without first stopping the background ticker goroutine, causing
a leak and race on the removed dir; modify Close to signal the worker to stop
(e.g., close a done channel or send on a stop channel), stop the ticker
(ticker.Stop()) and wait for the worker to exit (e.g., wg.Wait() or wait on the
done ack) before calling os.RemoveAll(s.cacheDir); if the S3Source struct lacks
a stop channel or waitgroup, add them and ensure the goroutine checks the stop
signal and returns promptly.
| func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) { | ||
| var storage IStorage | ||
| var err error | ||
|
|
||
| storage.OrchestratorStorage, err = NewConnector[IOrchestratorStorage](&cfg.Orchestrator) | ||
| storage.OrchestratorStorage, err = NewOrchestratorConnector(&cfg.Orchestrator) | ||
| if err != nil { | ||
| return IStorage{}, fmt.Errorf("failed to create orchestrator storage: %w", err) | ||
| } | ||
|
|
||
| storage.MainStorage, err = NewConnector[IMainStorage](&cfg.Main) | ||
| storage.StagingStorage, err = NewStagingConnector(&cfg.Staging) | ||
| if err != nil { | ||
| return IStorage{}, fmt.Errorf("failed to create main storage: %w", err) | ||
| return IStorage{}, fmt.Errorf("failed to create staging storage: %w", err) | ||
| } | ||
|
|
||
| storage.StagingStorage, err = NewConnector[IStagingStorage](&cfg.Staging) | ||
| storage.MainStorage, err = NewMainConnector(&cfg.Main, &storage.OrchestratorStorage) | ||
| if err != nil { | ||
| return IStorage{}, fmt.Errorf("failed to create staging storage: %w", err) | ||
| return IStorage{}, fmt.Errorf("failed to create main storage: %w", err) | ||
| } | ||
|
|
||
| return storage, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Pass value interface to NewMainConnector (align with Kafka change)
After changing Kafka to accept IOrchestratorStorage by value, pass the value here.
Apply this diff:
- storage.MainStorage, err = NewMainConnector(&cfg.Main, &storage.OrchestratorStorage)
+ storage.MainStorage, err = NewMainConnector(&cfg.Main, storage.OrchestratorStorage)🏁 Script executed:
#!/bin/bash
# Locate the definition of NewMainConnector to verify its signature
rg -nP "func NewMainConnector" -C3 --type go .Length of output: 484
🏁 Script executed:
#!/bin/bash
# Locate the definition of IOrchestratorStorage
rg -nP "type IOrchestratorStorage" -C3 --type go .
# Inspect NewOrchestratorConnector to see what it returns
rg -nP "func NewOrchestratorConnector" -C3 --type go .Length of output: 1100
Convert NewMainConnector to accept IOrchestratorStorage by value
- In internal/storage/connector.go at the
NewMainConnectordeclaration (line 305), change the second parameter fromorchestratorStorage *IOrchestratorStoragetoorchestratorStorage IOrchestratorStorage. - In
NewStorageConnector(lines 164–184), update the call to:Without adjusting the signature, removing thestorage.MainStorage, err = NewMainConnector(&cfg.Main, storage.OrchestratorStorage)
&will break compilation.
🤖 Prompt for AI Agents
In internal/storage/connector.go around lines 164–184 and at the
NewMainConnector declaration (around line 305), change the NewMainConnector
second parameter from a pointer type to a value: replace orchestratorStorage
*IOrchestratorStorage with orchestratorStorage IOrchestratorStorage; then update
the call in NewStorageConnector to pass the value (storage.MainStorage, err =
NewMainConnector(&cfg.Main, storage.OrchestratorStorage)) instead of taking its
address. Also search for and update any other call sites to remove the & when
passing an IOrchestratorStorage pointer to NewMainConnector so signatures match.
| chainId := data[0].Block.ChainId | ||
| maxBlockNumber := data[len(data)-1].Block.Number | ||
| if err := kr.orchestratorStorage.SetLastCommittedBlockNumber(chainId, maxBlockNumber); err != nil { | ||
| return fmt.Errorf("failed to update last committed block number in orchestrator storage: %w", err) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compute max block explicitly; don’t assume input sorted
data[len(data)-1] may not be the max.
Apply this diff:
- chainId := data[0].Block.ChainId
- maxBlockNumber := data[len(data)-1].Block.Number
+ chainId := data[0].Block.ChainId
+ maxBlockNumber := new(big.Int).SetUint64(0)
+ for i := range data {
+ if data[i].Block.Number.Cmp(maxBlockNumber) > 0 {
+ maxBlockNumber = data[i].Block.Number
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| chainId := data[0].Block.ChainId | |
| maxBlockNumber := data[len(data)-1].Block.Number | |
| if err := kr.orchestratorStorage.SetLastCommittedBlockNumber(chainId, maxBlockNumber); err != nil { | |
| return fmt.Errorf("failed to update last committed block number in orchestrator storage: %w", err) | |
| } | |
| chainId := data[0].Block.ChainId | |
| maxBlockNumber := new(big.Int).SetUint64(0) | |
| for i := range data { | |
| if data[i].Block.Number.Cmp(maxBlockNumber) > 0 { | |
| maxBlockNumber = data[i].Block.Number | |
| } | |
| } | |
| if err := kr.orchestratorStorage.SetLastCommittedBlockNumber(chainId, maxBlockNumber); err != nil { | |
| return fmt.Errorf("failed to update last committed block number in orchestrator storage: %w", err) | |
| } |
🤖 Prompt for AI Agents
internal/storage/kafka.go around lines 51 to 56: the code assumes the last
element is the highest block number; instead iterate over the data slice to
compute the maximum Block.Number (and optionally validate that all Block.ChainId
values match or pick a canonical chainId like data[0].Block.ChainId) and use
that max value when calling SetLastCommittedBlockNumber; replace the direct use
of data[len(data)-1].Block.Number with a loop that compares each entry and
tracks the max.
| // ReplaceBlockData handles reorg by publishing both old and new data to Kafka | ||
| func (kr *KafkaConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) { | ||
| if len(data) == 0 { | ||
| return nil, nil | ||
| } | ||
|
|
||
| oldBlocks := []common.BlockData{} | ||
|
|
||
| // TODO: We need to fetch the old blocks from the primary data store | ||
| if err := kr.kafkaPublisher.PublishReorg(data, data); err != nil { | ||
| return nil, fmt.Errorf("failed to publish reorg blocks to kafka: %w", err) | ||
| } | ||
|
|
||
| // save cursor | ||
| return oldBlocks, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReplaceBlockData publishes wrong payload; return “not supported”
Publishing (old=new=data) is incorrect for reorgs and can corrupt consumers.
Apply this diff:
-// ReplaceBlockData handles reorg by publishing both old and new data to Kafka
+// ReplaceBlockData is not supported for write-only Kafka connector
func (kr *KafkaConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) {
if len(data) == 0 {
return nil, nil
}
-
- oldBlocks := []common.BlockData{}
-
- // TODO: We need to fetch the old blocks from the primary data store
- if err := kr.kafkaPublisher.PublishReorg(data, data); err != nil {
- return nil, fmt.Errorf("failed to publish reorg blocks to kafka: %w", err)
- }
-
- // save cursor
- return oldBlocks, nil
+ return nil, fmt.Errorf("ReplaceBlockData is not supported by the Kafka streaming connector")
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // ReplaceBlockData handles reorg by publishing both old and new data to Kafka | |
| func (kr *KafkaConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) { | |
| if len(data) == 0 { | |
| return nil, nil | |
| } | |
| oldBlocks := []common.BlockData{} | |
| // TODO: We need to fetch the old blocks from the primary data store | |
| if err := kr.kafkaPublisher.PublishReorg(data, data); err != nil { | |
| return nil, fmt.Errorf("failed to publish reorg blocks to kafka: %w", err) | |
| } | |
| // save cursor | |
| return oldBlocks, nil | |
| } | |
| // ReplaceBlockData is not supported for write-only Kafka connector | |
| func (kr *KafkaConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) { | |
| if len(data) == 0 { | |
| return nil, nil | |
| } | |
| return nil, fmt.Errorf("ReplaceBlockData is not supported by the Kafka streaming connector") | |
| } |
🤖 Prompt for AI Agents
internal/storage/kafka.go lines 60-75: ReplaceBlockData currently publishes the
same slice for old and new (data, data) which is incorrect; instead remove the
PublishReorg call and immediately return a clear "not supported" error (e.g.
fmt.Errorf("replace block data not supported") or a package-level
ErrNotSupported) and nil for the oldBlocks slice so callers know reorgs aren't
handled by the Kafka connector. Ensure no publish is attempted and keep the
function behavior consistent with the error return.
| for i := new(big.Int).Set(fileStart); i.Cmp(fileEnd) <= 0; i.Add(i, big.NewInt(1)) { | ||
| if i.Cmp(startBlock) >= 0 && i.Cmp(endBlock) <= 0 { | ||
| blockSet[i.String()] = true | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Inefficient algorithm for adding contiguous block ranges
The loop creates a new big.Int for every block number in potentially large ranges, which is memory-intensive and slow.
For contiguous block ranges, directly add all blocks without iterating:
- // Add all blocks in this file's range to our set
- for i := new(big.Int).Set(fileStart); i.Cmp(fileEnd) <= 0; i.Add(i, big.NewInt(1)) {
- if i.Cmp(startBlock) >= 0 && i.Cmp(endBlock) <= 0 {
- blockSet[i.String()] = true
- }
- }
+ // Since S3 files contain contiguous blocks, mark the entire range as present
+ // Only iterate if we need to check individual blocks
+ effectiveStart := new(big.Int).Set(fileStart)
+ if effectiveStart.Cmp(startBlock) < 0 {
+ effectiveStart = startBlock
+ }
+ effectiveEnd := new(big.Int).Set(fileEnd)
+ if effectiveEnd.Cmp(endBlock) > 0 {
+ effectiveEnd = endBlock
+ }
+
+ // For finding missing blocks, we need to track individual blocks
+ for i := new(big.Int).Set(effectiveStart); i.Cmp(effectiveEnd) <= 0; i.Add(i, big.NewInt(1)) {
+ blockSet[i.String()] = true
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for i := new(big.Int).Set(fileStart); i.Cmp(fileEnd) <= 0; i.Add(i, big.NewInt(1)) { | |
| if i.Cmp(startBlock) >= 0 && i.Cmp(endBlock) <= 0 { | |
| blockSet[i.String()] = true | |
| } | |
| } | |
| // Since S3 files contain contiguous blocks, mark only the overlapping range as present | |
| effectiveStart := new(big.Int).Set(fileStart) | |
| if effectiveStart.Cmp(startBlock) < 0 { | |
| effectiveStart = startBlock | |
| } | |
| effectiveEnd := new(big.Int).Set(fileEnd) | |
| if effectiveEnd.Cmp(endBlock) > 0 { | |
| effectiveEnd = endBlock | |
| } | |
| // Iterate the overlap and populate the block set | |
| for i := new(big.Int).Set(effectiveStart); i.Cmp(effectiveEnd) <= 0; i.Add(i, big.NewInt(1)) { | |
| blockSet[i.String()] = true | |
| } |
| if err != nil && err.Error() != "EOF" { | ||
| return nil, fmt.Errorf("failed to read parquet: %w", err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect EOF handling for parquet reader
Comparing error messages with string literals is fragile and could break with library updates.
Use proper error checking:
for {
n, err := reader.Read(parquetRows)
- if err != nil && err.Error() != "EOF" {
+ if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read parquet: %w", err)
}
if n == 0 {
break
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if err != nil && err.Error() != "EOF" { | |
| return nil, fmt.Errorf("failed to read parquet: %w", err) | |
| } | |
| for { | |
| n, err := reader.Read(parquetRows) | |
| if err != nil && err != io.EOF { | |
| return nil, fmt.Errorf("failed to read parquet: %w", err) | |
| } | |
| if n == 0 { | |
| break | |
| } | |
| // … | |
| } |
🤖 Prompt for AI Agents
In internal/storage/s3.go around lines 1094-1096, the code compares err.Error()
to the string "EOF", which is fragile; change the check to use errors.Is(err,
io.EOF) (and consider also checking errors.Is(err, io.ErrUnexpectedEOF) if
relevant) so EOF is detected reliably; update imports to include "errors" and
"io" and keep returning a wrapped fmt.Errorf for non-EOF errors as before.
Summary by CodeRabbit