From e67451f3c8ddb7eaeb41c14423efe4532f2b179d Mon Sep 17 00:00:00 2001 From: satyakwok Date: Thu, 14 May 2026 20:51:32 +0200 Subject: [PATCH 1/5] feat(sync): multi-endpoint round-robin + batched writes + monotonic cursor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backfill went 38 → 268 b/s (7x) on the live mainnet catch-up by combining three changes: 1. Multi-endpoint RPC pool. ChainProvider + RestClient now accept comma- separated URLs and round-robin requests via an atomic counter. Lets the indexer hit fullnodes directly (bypassing the public Caddy edge + its per-IP rate limit) while spreading load across N nodes. 2. REST_URL env split. Direct fullnode access needs `/rpc` for JSON-RPC but root for native REST. The Caddy edge papered over this with a path rewrite; the public-facing single URL doesn't work once we bypass it. REST_URL falls back to RPC_URL when unset (no behaviour change for single-endpoint deployments). 3. Batched writes. Backfill now buffers up to INDEXER_BACKFILL_BATCH (100) bundles and flushes them as one transaction with multi-row INSERTs (`insert_batch` helpers added to blocks/transactions/logs). One commit per batch instead of one per block. 4. Monotonic cursor. The tail loop's `ingest_one` path advances the cursor per-block; running in parallel with the batched backfill, its slower commits clobbered the batched writer's higher cursor values, causing visible regression of 100k+ blocks in seconds. write_cursor now uses `GREATEST(_meta.value::int8, EXCLUDED.value::int8)` at the SQL level so the on-disk cursor is monotonic regardless of which writer commits last. Also: INDEXER_BACKFILL_BATCH env (1..1000, default 100) for tuning. --- bin/indexer.rs | 12 ++++- crates/chain/src/provider.rs | 65 ++++++++++++++++------- crates/chain/src/rest.rs | 42 ++++++++++++--- crates/db/src/blocks.rs | 40 +++++++++++++++ crates/db/src/logs.rs | 29 +++++++++++ crates/db/src/transactions.rs | 35 +++++++++++++ crates/sync/src/backfill.rs | 80 ++++++++++++++++++++++++----- crates/sync/src/block_writer.rs | 91 +++++++++++++++++++++++++++++++++ crates/sync/src/cursor.rs | 22 +++++++- 9 files changed, 373 insertions(+), 43 deletions(-) diff --git a/bin/indexer.rs b/bin/indexer.rs index 7d2fb51..b82c936 100644 --- a/bin/indexer.rs +++ b/bin/indexer.rs @@ -30,6 +30,12 @@ use tokio_util::sync::CancellationToken; struct IndexerConfig { database_url: String, rpc_url: String, + /// Optional separate base URL for the native REST endpoints + /// (`/chain/blocks/`, `/tx/`). Defaults to `rpc_url` when unset. + /// Lets us point JSON-RPC at `/rpc` and REST at `` (root) + /// when bypassing the Caddy edge that handles the path rewrite. + #[serde(default)] + rest_url: Option, #[serde(default)] grpc_url: Option, #[serde(default)] @@ -88,8 +94,10 @@ async fn main() -> anyhow::Result<()> { let provider = ChainProvider::http(&cfg.rpc_url)?; // Native REST client for `/chain/blocks/` + `/tx/` — Sentrix's // EVM JSON-RPC ignores `full=true` on getBlockByNumber, so block + tx - // ingest goes via the native REST path. Same host, different URL space. - let rest = RestClient::new(&cfg.rpc_url)?; + // ingest goes via the native REST path. REST_URL falls back to RPC_URL; + // override when JSON-RPC needs `/rpc` suffix (direct fullnode bypass). + let rest_base = cfg.rest_url.as_deref().unwrap_or(&cfg.rpc_url); + let rest = RestClient::new(rest_base)?; let cancel = CancellationToken::new(); // Analytics flusher (optional). The handle threads into both the diff --git a/crates/chain/src/provider.rs b/crates/chain/src/provider.rs index 1bb2cd7..cf7659d 100644 --- a/crates/chain/src/provider.rs +++ b/crates/chain/src/provider.rs @@ -14,44 +14,73 @@ use alloy_primitives::{Address, Bytes}; use alloy_provider::{Provider, ProviderBuilder, RootProvider}; use alloy_rpc_types::{Block, BlockNumberOrTag, Filter, Log, TransactionInput, TransactionRequest}; use indexer_domain::BlockHeight; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; /// Concrete provider type — alloy 2.0's default HTTP transport (reqwest). /// Hidden from callers behind [`ChainProvider`]; exposed via /// [`ChainProvider::raw`] for advanced use. pub type HttpProvider = RootProvider; -/// Thin wrapper around an alloy `RootProvider` keyed to a single Sentrix -/// JSON-RPC endpoint. Cheap to clone (the underlying provider is `Arc`-y). +/// Thin wrapper around one or more alloy `RootProvider`s. Single-URL is +/// the common case; multi-URL spreads JSON-RPC load round-robin across +/// fullnodes for higher backfill throughput. #[derive(Clone)] pub struct ChainProvider { - inner: HttpProvider, + inners: Vec, + cursor: Arc, } impl ChainProvider { - /// Build a provider from an HTTP(S) URL. + /// Build a provider from a comma-separated URL list (or a single URL). + /// Returns Err if no valid URL parses. pub fn http(url: &str) -> ChainResult { - let url = url - .parse::() - .map_err(|e| ChainError::InvalidArgument(format!("bad rpc url: {e}")))?; - let inner = ProviderBuilder::new() - .disable_recommended_fillers() - .connect_http(url); - Ok(Self { inner }) + let mut inners = Vec::new(); + for raw in url.split(',') { + let trimmed = raw.trim(); + if trimmed.is_empty() { + continue; + } + let parsed = trimmed + .parse::() + .map_err(|e| ChainError::InvalidArgument(format!("bad rpc url '{trimmed}': {e}")))?; + let inner = ProviderBuilder::new() + .disable_recommended_fillers() + .connect_http(parsed); + inners.push(inner); + } + if inners.is_empty() { + return Err(ChainError::InvalidArgument( + "rpc url is empty".to_string(), + )); + } + Ok(Self { + inners, + cursor: Arc::new(AtomicUsize::new(0)), + }) + } + + /// Round-robin pick. Atomic FetchAdd is lock-free across concurrent + /// fetch tasks in the backfill pipeline. + fn next(&self) -> &HttpProvider { + let i = self.cursor.fetch_add(1, Ordering::Relaxed); + &self.inners[i % self.inners.len()] } - /// Underlying provider, exposed for advanced use (custom RPC calls, etc). + /// First underlying provider, exposed for advanced use (custom RPC calls). + /// Multi-URL callers should prefer the wrapped methods which round-robin. pub fn raw(&self) -> &HttpProvider { - &self.inner + &self.inners[0] } /// `eth_chainId`. pub async fn chain_id(&self) -> ChainResult { - self.inner.get_chain_id().await.map_err(rpc_err) + self.next().get_chain_id().await.map_err(rpc_err) } /// Latest finalized block height per the node we're talking to. pub async fn block_number(&self) -> ChainResult { - let n = self.inner.get_block_number().await.map_err(rpc_err)?; + let n = self.next().get_block_number().await.map_err(rpc_err)?; Ok(BlockHeight::from(n)) } @@ -62,7 +91,7 @@ impl ChainProvider { ChainError::InvalidArgument(format!("block_with_txs: negative height {h:?}")) })?; let tag = BlockNumberOrTag::Number(n); - self.inner + self.next() .get_block_by_number(tag) .full() .await @@ -93,7 +122,7 @@ impl ChainProvider { if let Some(addr) = address { filter = filter.address(addr); } - self.inner.get_logs(&filter).await.map_err(rpc_err) + self.next().get_logs(&filter).await.map_err(rpc_err) } /// `eth_call` against `to` with abi-encoded `data`. Returns the raw @@ -104,6 +133,6 @@ impl ChainProvider { let req = TransactionRequest::default() .to(to) .input(TransactionInput::new(data)); - self.inner.call(req).await.map_err(rpc_err) + self.next().call(req).await.map_err(rpc_err) } } diff --git a/crates/chain/src/rest.rs b/crates/chain/src/rest.rs index b8d18e5..5d62c5d 100644 --- a/crates/chain/src/rest.rs +++ b/crates/chain/src/rest.rs @@ -13,6 +13,8 @@ use crate::error::{ChainError, ChainResult}; use indexer_domain::{BlockHeight, Hash}; use serde::{Deserialize, Serialize}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::time::Duration; /// Subset of the native `/tx/` response shape that the indexer needs. @@ -42,30 +44,54 @@ pub struct NativeTxResponse { pub data: Option, } -/// Native REST client. +/// Native REST client. Holds one or more base URLs and round-robins +/// requests across them. Single-URL is the common case; multi-URL is used +/// when the indexer talks directly to multiple fullnodes (bypassing the +/// public Caddy edge) to spread fetch load across endpoints. #[derive(Debug, Clone)] pub struct RestClient { - base: String, + bases: Vec, + cursor: Arc, http: reqwest::Client, } impl RestClient { - /// Build a client pointing at the chain's HTTP base URL (no trailing - /// `/tx/...`). Default 10s request timeout. + /// Build a client from a comma-separated URL list (or a single URL). + /// Each URL is the chain's HTTP base (no `/tx/...` suffix). Default 10s + /// request timeout. Errors if no valid URL found. pub fn new(base_url: impl Into) -> ChainResult { + let raw = base_url.into(); + let bases: Vec = raw + .split(',') + .map(|s| s.trim().trim_end_matches('/').to_owned()) + .filter(|s| !s.is_empty()) + .collect(); + if bases.is_empty() { + return Err(ChainError::InvalidArgument( + "rest base url is empty".to_string(), + )); + } let http = reqwest::Client::builder() .timeout(Duration::from_secs(10)) .build()?; Ok(Self { - base: base_url.into().trim_end_matches('/').to_owned(), + bases, + cursor: Arc::new(AtomicUsize::new(0)), http, }) } + /// Pick the next base URL via round-robin. Atomic FetchAdd makes this + /// lock-free across concurrent fetch tasks. + fn next_base(&self) -> &str { + let i = self.cursor.fetch_add(1, Ordering::Relaxed); + &self.bases[i % self.bases.len()] + } + /// Fetch a tx by hash. Returns None on 404 so the caller can decide /// whether to retry (chain hasn't seen it yet) vs surface as missing. pub async fn tx(&self, hash: &Hash) -> ChainResult> { - let url = format!("{}/tx/{}", self.base, hash); + let url = format!("{}/tx/{}", self.next_base(), hash); let resp = self.http.get(&url).send().await?; if resp.status() == reqwest::StatusCode::NOT_FOUND { return Ok(None); @@ -86,7 +112,7 @@ impl RestClient { /// jump the cursor straight to `window_start_block` rather than walking /// 404s one-by-one. pub async fn chain_info(&self) -> ChainResult { - let url = format!("{}/chain/info", self.base); + let url = format!("{}/chain/info", self.next_base()); let resp = self.http.get(&url).send().await?; if !resp.status().is_success() { let status = resp.status(); @@ -101,7 +127,7 @@ impl RestClient { /// Fetch a block by height with full txs inlined. Returns None on 404 /// (chain doesn't have this height yet, or asking past pruning window). pub async fn block(&self, h: BlockHeight) -> ChainResult> { - let url = format!("{}/chain/blocks/{}", self.base, h.0); + let url = format!("{}/chain/blocks/{}", self.next_base(), h.0); let resp = self.http.get(&url).send().await?; if resp.status() == reqwest::StatusCode::NOT_FOUND { return Ok(None); diff --git a/crates/db/src/blocks.rs b/crates/db/src/blocks.rs index 575cf27..afbda21 100644 --- a/crates/db/src/blocks.rs +++ b/crates/db/src/blocks.rs @@ -117,6 +117,46 @@ pub async fn latest_height(pool: &PgPool) -> DbResult> { Ok(h.map(BlockHeight)) } +/// Multi-row batch insert. ON CONFLICT (height) DO NOTHING per row, so a +/// crash mid-batch on retry hits the existing rows and silently skips them. +/// Used by the bulk-COPY-style backfill path to amortise transaction +/// overhead — one commit per batch instead of one per block. +pub async fn insert_batch<'e, E>(executor: E, blocks_in: &[Block]) -> DbResult<()> +where + E: sqlx::PgExecutor<'e>, +{ + if blocks_in.is_empty() { + return Ok(()); + } + let mut qb = sqlx::QueryBuilder::new( + "INSERT INTO blocks (height, hash, parent_hash, timestamp, validator, \ + gas_used, gas_limit, base_fee, tx_count, state_root, round, justification_signers) ", + ); + qb.push_values(blocks_in.iter(), |mut row, b| { + let signers = serde_json::Value::Array( + b.justification_signers + .iter() + .map(|s| serde_json::Value::String(s.clone())) + .collect(), + ); + row.push_bind(b.height) + .push_bind(&b.hash) + .push_bind(&b.parent_hash) + .push_bind(b.timestamp) + .push_bind(&b.validator) + .push_bind(b.gas_used) + .push_bind(b.gas_limit) + .push_bind(b.base_fee) + .push_bind(b.tx_count) + .push_bind(&b.state_root) + .push_bind(b.round) + .push_bind(signers); + }); + qb.push(" ON CONFLICT (height) DO NOTHING"); + qb.build().execute(executor).await?; + Ok(()) +} + /// Delete a block (and dependent txs/logs via FK CASCADE) at a height. Used /// by the reorg rewind path. pub async fn delete_at<'e, E>(executor: E, h: BlockHeight) -> DbResult diff --git a/crates/db/src/logs.rs b/crates/db/src/logs.rs index 6e59b48..78d34d1 100644 --- a/crates/db/src/logs.rs +++ b/crates/db/src/logs.rs @@ -28,6 +28,35 @@ where Ok(()) } +/// Multi-row batch insert. ON CONFLICT (block_height, log_index) DO NOTHING +/// per row, so a crash mid-batch on retry hits the existing rows and silently +/// skips them. +pub async fn insert_batch<'e, E>(executor: E, logs_in: &[Log]) -> DbResult<()> +where + E: sqlx::PgExecutor<'e>, +{ + if logs_in.is_empty() { + return Ok(()); + } + let mut qb = sqlx::QueryBuilder::new( + "INSERT INTO logs (block_height, tx_hash, log_index, address, topic0, topic1, topic2, topic3, data) ", + ); + qb.push_values(logs_in.iter(), |mut row, l| { + row.push_bind(l.block_height) + .push_bind(&l.tx_hash) + .push_bind(l.log_index) + .push_bind(&l.address) + .push_bind(&l.topic0) + .push_bind(&l.topic1) + .push_bind(&l.topic2) + .push_bind(&l.topic3) + .push_bind(&l.data); + }); + qb.push(" ON CONFLICT (block_height, log_index) DO NOTHING"); + qb.build().execute(executor).await?; + Ok(()) +} + /// All logs for a given tx, ordered by `log_index`. pub async fn for_tx(pool: &PgPool, tx_hash: &Hash) -> DbResult> { let rows = sqlx::query( diff --git a/crates/db/src/transactions.rs b/crates/db/src/transactions.rs index 397482d..f736854 100644 --- a/crates/db/src/transactions.rs +++ b/crates/db/src/transactions.rs @@ -35,6 +35,41 @@ where Ok(()) } +/// Multi-row batch insert. ON CONFLICT (hash) DO NOTHING per row, so a +/// crash mid-batch on retry hits the existing rows and silently skips them. +pub async fn insert_batch<'e, E>(executor: E, txs: &[Transaction]) -> DbResult<()> +where + E: sqlx::PgExecutor<'e>, +{ + if txs.is_empty() { + return Ok(()); + } + let mut qb = sqlx::QueryBuilder::new( + "INSERT INTO transactions (hash, block_height, tx_index, from_addr, to_addr, value, \ + gas_limit, gas_used, gas_price, fee, nonce, data, status, contract_address, tx_type) ", + ); + qb.push_values(txs.iter(), |mut row, t| { + row.push_bind(&t.hash) + .push_bind(t.block_height) + .push_bind(t.tx_index) + .push_bind(&t.from_addr) + .push_bind(&t.to_addr) + .push_bind(t.value) + .push_bind(t.gas_limit) + .push_bind(t.gas_used) + .push_bind(t.gas_price) + .push_bind(t.fee) + .push_bind(t.nonce) + .push_bind(&t.data) + .push_bind(t.status) + .push_bind(&t.contract_address) + .push_bind(t.tx_type.as_str()); + }); + qb.push(" ON CONFLICT (hash) DO NOTHING"); + qb.build().execute(executor).await?; + Ok(()) +} + /// All txs in a block, ordered by `tx_index`. pub async fn for_block(pool: &PgPool, h: BlockHeight) -> DbResult> { let rows = sqlx::query( diff --git a/crates/sync/src/backfill.rs b/crates/sync/src/backfill.rs index 481be8b..7cd88a0 100644 --- a/crates/sync/src/backfill.rs +++ b/crates/sync/src/backfill.rs @@ -16,7 +16,7 @@ //! cancel during a slow fetch returns within the cancel timeout, not after //! the in-flight chunk completes. -use crate::block_writer::{BlockBundle, write_block}; +use crate::block_writer::{BlockBundle, batch_write_blocks, write_block}; use crate::convert::{to_domain_block_from_native, to_domain_log, to_domain_txs_from_native}; use crate::cursor::{read_cursor, write_cursor}; use crate::{SyncConfig, SyncError, SyncResult}; @@ -28,11 +28,18 @@ use indexer_domain::BlockHeight; use tokio_util::sync::CancellationToken; /// How many blocks to fetch concurrently in the backfill window. -/// Each fetch is one REST call + one eth_getLogs call. Writes still -/// land sequentially per block. 50 is a conservative default — see -/// `INDEXER_BACKFILL_CONCURRENCY` env var to tune. +/// Each fetch is one REST call + one eth_getLogs call. Writes land in +/// batches of `INDEXER_BACKFILL_BATCH` (default 100). 50 is a +/// conservative default — see `INDEXER_BACKFILL_CONCURRENCY` env var to tune. const DEFAULT_BACKFILL_CONCURRENCY: usize = 50; +/// How many fetched bundles to accumulate before flushing as one batched +/// transaction. Bigger batch = fewer commits + lower fsync overhead; +/// smaller batch = lower memory + more frequent cursor advance. 100 is a +/// good middle: ~5MB peak buffer for typical mainnet blocks, single-digit +/// commits/sec at 500+ b/s. +const DEFAULT_BACKFILL_BATCH: usize = 100; + fn backfill_concurrency() -> usize { std::env::var("INDEXER_BACKFILL_CONCURRENCY") .ok() @@ -41,6 +48,14 @@ fn backfill_concurrency() -> usize { .unwrap_or(DEFAULT_BACKFILL_CONCURRENCY) } +fn backfill_batch_size() -> usize { + std::env::var("INDEXER_BACKFILL_BATCH") + .ok() + .and_then(|v| v.parse().ok()) + .filter(|&n: &usize| n > 0 && n <= 1000) + .unwrap_or(DEFAULT_BACKFILL_BATCH) +} + /// Run the backfill loop until cancellation OR until we reach /// `tip - safe_lag` (then return; caller decides whether to sleep + recheck /// or hand off to the tail loop). @@ -79,7 +94,12 @@ pub async fn run_backfill( // concurrent fetch + serial write keeps invariants while saturating // the network. INDEXER_BACKFILL_CONCURRENCY env var tunes the pool. let concurrency = backfill_concurrency(); - tracing::info!(concurrency, "backfill: pipelined fetch enabled"); + let batch_size = backfill_batch_size(); + tracing::info!( + concurrency, + batch_size, + "backfill: pipelined fetch + batched write enabled" + ); let start = cursor.0 + 1; let total = (cap - cursor.0) as usize; @@ -91,32 +111,60 @@ pub async fn run_backfill( }) .buffered(concurrency); + // Accumulator: bundles wait here until we hit batch_size or stream ends, + // then flush as one transaction. 404-gap heights advance the cursor + // out-of-band (separate write) so the batch invariant (all heights + // present) holds for the bundles that flush together. + let mut buf: Vec = Vec::with_capacity(batch_size); let mut done = 0usize; loop { // Race cancellation against the next item — a cancel issued while // a slow fetch is in flight returns immediately rather than after // the chunk completes. - let (h, result) = tokio::select! { + let next = tokio::select! { biased; _ = cancel.cancelled() => { + // Flush whatever we've accumulated so the cursor reflects + // it before we exit; otherwise the next start re-fetches + // already-buffered bundles. + if !buf.is_empty() { + let highest = buf.iter().map(|b| b.block.height).max().unwrap(); + batch_write_blocks(pool, std::mem::take(&mut buf), analytics).await?; + cursor = highest; + } tracing::info!(cursor = cursor.0, "backfill: cancelled mid-pipeline"); return Ok(cursor); } - next = fetched.next() => match next { - Some(item) => item, - None => break, - }, + next = fetched.next() => next, + }; + let (h, result) = match next { + Some(item) => item, + None => break, }; match result? { - Some(bundle) => write_block(pool, bundle, analytics).await?, + Some(bundle) => buf.push(bundle), None => { - // 404 / damaged-block gap (see ingest_one rationale below) + // 404 / damaged-block gap. Flush in-flight buffer first so + // the gap-cursor advance lands AFTER the data we already + // fetched, preserving the spec §5 invariant 2 (cursor never + // lands ahead of data). + if !buf.is_empty() { + batch_write_blocks(pool, std::mem::take(&mut buf), analytics).await?; + // cursor is overwritten to the gap height below; + // batch_write already advanced the persisted cursor. + } tracing::warn!(height = h.0, "backfill: skipping single 404 block"); write_cursor(pool, h, 0).await?; + cursor = h; } } - cursor = h; done += 1; + // Flush at batch boundary. + if buf.len() >= batch_size { + let highest = buf.iter().map(|b| b.block.height).max().unwrap(); + batch_write_blocks(pool, std::mem::take(&mut buf), analytics).await?; + cursor = highest; + } if done.is_multiple_of(1000) { tracing::info!( cursor = cursor.0, @@ -125,6 +173,12 @@ pub async fn run_backfill( ); } } + // Tail flush — anything still buffered after the stream closes. + if !buf.is_empty() { + let highest = buf.iter().map(|b| b.block.height).max().unwrap(); + batch_write_blocks(pool, std::mem::take(&mut buf), analytics).await?; + cursor = highest; + } tracing::info!( cursor = cursor.0, ingested = done, diff --git a/crates/sync/src/block_writer.rs b/crates/sync/src/block_writer.rs index 94adcf4..9b805ee 100644 --- a/crates/sync/src/block_writer.rs +++ b/crates/sync/src/block_writer.rs @@ -17,6 +17,12 @@ use indexer_analytics::{AnalyticsHandle, RawTxRow}; use indexer_db::{PgPool, blocks, logs, transactions}; use indexer_domain::{Block, Log, Transaction}; +/// Page size for batch inserts. Postgres protocol caps bind params at +/// ~65k per query; the widest table (transactions, 15 cols) tops out at +/// ~4300 rows per statement. 500 leaves comfortable headroom and keeps +/// one batch's write+commit latency reasonable. +const BATCH_INSERT_CHUNK: usize = 500; + /// Bundle of rows to write atomically. Built by the sync loop before calling /// [`write_block`]. pub struct BlockBundle { @@ -98,3 +104,88 @@ pub async fn write_block( Ok(()) } + +/// Batched variant of [`write_block`] — writes N bundles in a single +/// transaction with multi-row INSERTs. Amortises commit/fsync cost across +/// the batch (one commit per chunk vs one per block). Returns Ok on commit. +/// +/// On crash mid-batch the transaction rolls back; on retry the per-row +/// `ON CONFLICT (...) DO NOTHING` keeps inserts idempotent. Cursor advances +/// to the highest height in the batch only after commit, so a partial +/// batch is replayed safely. +pub async fn batch_write_blocks( + pool: &PgPool, + bundles: Vec, + analytics: Option<&AnalyticsHandle>, +) -> SyncResult<()> { + if bundles.is_empty() { + return Ok(()); + } + + // Highest block in the chunk drives the cursor advance + the analytics + // timestamp (we use each row's own block timestamp). + let max_height = bundles + .iter() + .map(|b| b.block.height) + .max() + .expect("non-empty bundles"); + let cursor_ts = bundles + .iter() + .find(|b| b.block.height == max_height) + .map(|b| b.block.timestamp) + .unwrap_or(0); + + // Flatten + sort by primary key so the multi-row INSERTs execute against + // index pages in roughly sequential order — measurable PG win on large + // batches. Order matters per spec §5 invariant 2 (FK order) but the + // sorts here are within tables; we still insert tables in dependency + // order: blocks → transactions → logs. + let mut all_blocks: Vec = bundles.iter().map(|b| b.block.clone()).collect(); + all_blocks.sort_by_key(|b| b.height); + let mut all_txs: Vec = bundles.iter().flat_map(|b| b.txs.clone()).collect(); + all_txs.sort_by_key(|t| (t.block_height, t.tx_index)); + let mut all_logs: Vec = bundles.iter().flat_map(|b| b.logs.clone()).collect(); + all_logs.sort_by_key(|l| (l.block_height, l.log_index)); + + let mut tx = pool.begin().await.map_err(SyncError::from)?; + for chunk in all_blocks.chunks(BATCH_INSERT_CHUNK) { + blocks::insert_batch(&mut *tx, chunk).await?; + } + for chunk in all_txs.chunks(BATCH_INSERT_CHUNK) { + transactions::insert_batch(&mut *tx, chunk).await?; + } + for chunk in all_logs.chunks(BATCH_INSERT_CHUNK) { + logs::insert_batch(&mut *tx, chunk).await?; + } + write_cursor(&mut *tx, max_height, cursor_ts).await?; + tx.commit().await.map_err(SyncError::from)?; + + // Analytics — same pattern as write_block, post-commit. + if let Some(handle) = analytics { + for bundle in &bundles { + let bh = match bundle.block.height.as_u64() { + Some(h) => h, + None => continue, + }; + for t in &bundle.txs { + let row = RawTxRow { + block_height: bh, + timestamp: bundle.block.timestamp as u64, + tx_hash: t.hash.clone(), + from_addr: t.from_addr.clone(), + to_addr: t.to_addr.clone(), + value_str: t.value.to_string(), + fee_str: t.fee.to_string(), + gas_used: t.gas_used.unwrap_or(0) as u64, + status: t.status as u8, + tx_type: t.tx_type.as_str().to_string(), + }; + if let Err(e) = handle.push(row) { + tracing::warn!(error = %e, "analytics push failed; flusher closed?"); + return Ok(()); + } + } + } + } + Ok(()) +} diff --git a/crates/sync/src/cursor.rs b/crates/sync/src/cursor.rs index 8173f3c..2a4a5a8 100644 --- a/crates/sync/src/cursor.rs +++ b/crates/sync/src/cursor.rs @@ -22,13 +22,31 @@ pub async fn read_cursor(pool: &PgPool) -> SyncResult> { /// Write the cursor inside an existing transaction. Caller commits. /// +/// Monotonic — the on-disk value only updates if `h` is strictly greater +/// than the current value. Two parallel writers (the batched backfill loop +/// and the per-block tail catchup) can race and commit out-of-order; +/// without this guard, the slower writer's older cursor would clobber the +/// faster writer's newer one. The GREATEST cast keeps the row at the +/// highest committed height regardless of commit order. +/// /// `now_ts` is the chain-time second associated with the bump (we put the /// finalized block's timestamp here so cursor staleness is comparable to -/// chain time, not wall-clock). +/// chain time, not wall-clock). updated_at always refreshes so callers can +/// audit recency, even when the value itself is unchanged. pub async fn write_cursor<'e, E>(executor: E, h: BlockHeight, now_ts: i64) -> SyncResult<()> where E: sqlx::PgExecutor<'e>, { - meta::set_i64(executor, LAST_SYNCED_HEIGHT_KEY, h.0, now_ts).await?; + sqlx::query( + "INSERT INTO _meta (key, value, updated_at) VALUES ($1, $2, $3) \ + ON CONFLICT (key) DO UPDATE SET \ + value = GREATEST(_meta.value::int8, EXCLUDED.value::int8)::text, \ + updated_at = EXCLUDED.updated_at", + ) + .bind(LAST_SYNCED_HEIGHT_KEY) + .bind(h.0.to_string()) + .bind(now_ts) + .execute(executor) + .await?; Ok(()) } From d73446b606f7587f64acbe035d76fa0b3431105f Mon Sep 17 00:00:00 2001 From: satyakwok Date: Thu, 14 May 2026 20:56:07 +0200 Subject: [PATCH 2/5] style: cargo fmt --- crates/chain/src/provider.rs | 12 +++++------- crates/chain/src/rest.rs | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/crates/chain/src/provider.rs b/crates/chain/src/provider.rs index cf7659d..d1477f8 100644 --- a/crates/chain/src/provider.rs +++ b/crates/chain/src/provider.rs @@ -14,8 +14,8 @@ use alloy_primitives::{Address, Bytes}; use alloy_provider::{Provider, ProviderBuilder, RootProvider}; use alloy_rpc_types::{Block, BlockNumberOrTag, Filter, Log, TransactionInput, TransactionRequest}; use indexer_domain::BlockHeight; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; /// Concrete provider type — alloy 2.0's default HTTP transport (reqwest). /// Hidden from callers behind [`ChainProvider`]; exposed via @@ -41,18 +41,16 @@ impl ChainProvider { if trimmed.is_empty() { continue; } - let parsed = trimmed - .parse::() - .map_err(|e| ChainError::InvalidArgument(format!("bad rpc url '{trimmed}': {e}")))?; + let parsed = trimmed.parse::().map_err(|e| { + ChainError::InvalidArgument(format!("bad rpc url '{trimmed}': {e}")) + })?; let inner = ProviderBuilder::new() .disable_recommended_fillers() .connect_http(parsed); inners.push(inner); } if inners.is_empty() { - return Err(ChainError::InvalidArgument( - "rpc url is empty".to_string(), - )); + return Err(ChainError::InvalidArgument("rpc url is empty".to_string())); } Ok(Self { inners, diff --git a/crates/chain/src/rest.rs b/crates/chain/src/rest.rs index 5d62c5d..53c9e4d 100644 --- a/crates/chain/src/rest.rs +++ b/crates/chain/src/rest.rs @@ -13,8 +13,8 @@ use crate::error::{ChainError, ChainResult}; use indexer_domain::{BlockHeight, Hash}; use serde::{Deserialize, Serialize}; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; /// Subset of the native `/tx/` response shape that the indexer needs. From e09314f0292e5c01d8fbef96623830d095d47df6 Mon Sep 17 00:00:00 2001 From: satyakwok Date: Thu, 14 May 2026 21:03:49 +0200 Subject: [PATCH 3/5] fix(chain): validate REST URLs at construction (CR major) --- crates/chain/src/rest.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/crates/chain/src/rest.rs b/crates/chain/src/rest.rs index 53c9e4d..b31dd98 100644 --- a/crates/chain/src/rest.rs +++ b/crates/chain/src/rest.rs @@ -58,14 +58,24 @@ pub struct RestClient { impl RestClient { /// Build a client from a comma-separated URL list (or a single URL). /// Each URL is the chain's HTTP base (no `/tx/...` suffix). Default 10s - /// request timeout. Errors if no valid URL found. + /// request timeout. Errors if any entry fails URL parsing — fail fast + /// at construction so a malformed entry can't surface as an intermittent + /// runtime failure when round-robin happens to pick it. pub fn new(base_url: impl Into) -> ChainResult { let raw = base_url.into(); - let bases: Vec = raw - .split(',') - .map(|s| s.trim().trim_end_matches('/').to_owned()) - .filter(|s| !s.is_empty()) - .collect(); + let mut bases: Vec = Vec::new(); + for entry in raw.split(',') { + let trimmed = entry.trim().trim_end_matches('/'); + if trimmed.is_empty() { + continue; + } + // Validate by parsing — discard the parsed value, keep the + // string form for the format!()-based URL composition below. + trimmed.parse::().map_err(|e| { + ChainError::InvalidArgument(format!("bad rest url '{trimmed}': {e}")) + })?; + bases.push(trimmed.to_owned()); + } if bases.is_empty() { return Err(ChainError::InvalidArgument( "rest base url is empty".to_string(), From 126557e3c62b4f445537912e34deecdafd46450c Mon Sep 17 00:00:00 2001 From: satyakwok Date: Thu, 14 May 2026 21:09:58 +0200 Subject: [PATCH 4/5] test(chain): add multi-base parsing + round-robin tests for RestClient --- crates/chain/src/rest.rs | 54 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/crates/chain/src/rest.rs b/crates/chain/src/rest.rs index b31dd98..0d30afc 100644 --- a/crates/chain/src/rest.rs +++ b/crates/chain/src/rest.rs @@ -259,4 +259,58 @@ mod tests { let raw = r#"{ "txid": "0xabc" }"#; assert!(serde_json::from_str::(raw).is_err()); } + + #[test] + fn parses_single_url() { + let c = RestClient::new("http://a.example/").unwrap(); + assert_eq!(c.bases, vec!["http://a.example".to_string()]); + assert_eq!(c.next_base(), "http://a.example"); + assert_eq!(c.next_base(), "http://a.example"); + } + + #[test] + fn parses_comma_list_trims_and_normalises() { + let c = + RestClient::new(" http://a.example/ , http://b.example , ,http://c.example/").unwrap(); + assert_eq!( + c.bases, + vec![ + "http://a.example".to_string(), + "http://b.example".to_string(), + "http://c.example".to_string(), + ] + ); + } + + #[test] + fn round_robin_rotates_deterministically() { + let c = RestClient::new("http://a.example,http://b.example,http://c.example").unwrap(); + let picks: Vec<&str> = (0..7).map(|_| c.next_base()).collect(); + assert_eq!( + picks, + vec![ + "http://a.example", + "http://b.example", + "http://c.example", + "http://a.example", + "http://b.example", + "http://c.example", + "http://a.example", + ] + ); + } + + #[test] + fn rejects_empty_input() { + assert!(RestClient::new("").is_err()); + assert!(RestClient::new(" ").is_err()); + assert!(RestClient::new(", , ").is_err()); + } + + #[test] + fn rejects_malformed_url_at_construction() { + // Validation must catch bad entries up front, not on round-robin pick. + assert!(RestClient::new("not a url").is_err()); + assert!(RestClient::new("http://ok.example,not://valid url with spaces").is_err()); + } } From fe9c3e2e53eb59ebf1e73b95f9d0cd3cf7c64dcf Mon Sep 17 00:00:00 2001 From: satyakwok Date: Thu, 14 May 2026 21:10:59 +0200 Subject: [PATCH 5/5] fix(smoke): scrape /metrics from loopback listener (audit 2026-05-13) --- scripts/smoke.sh | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/scripts/smoke.sh b/scripts/smoke.sh index 00974de..a3a5016 100755 --- a/scripts/smoke.sh +++ b/scripts/smoke.sh @@ -282,11 +282,17 @@ ok "/blocks with correct token -> 200" echo note "verifying observability + readyz (no auth)" -# Restart API once more without bearer token but WITH metrics + readyz active +# Restart API once more without bearer token but WITH metrics + readyz active. +# Per audit 2026-05-13, /metrics now binds on a separate loopback listener +# (INDEXER_API_METRICS_BIND, default 127.0.0.1:9080) so it can never leak +# via the public router. We pick a fresh port + scrape it separately. +METRICS_PORT=$((API_PORT + 1)) +METRICS_URL="http://127.0.0.1:${METRICS_PORT}" kill "$API_PID" 2>/dev/null || true wait "$API_PID" 2>/dev/null || true DATABASE_URL="$DB_URL" \ INDEXER_API_BIND="127.0.0.1:${API_PORT}" \ +INDEXER_API_METRICS_BIND="127.0.0.1:${METRICS_PORT}" \ RUST_LOG="warn" \ ./target/release/api & API_PID=$! @@ -305,13 +311,18 @@ v=$(curl -fsS "$API_BASE/readyz" | jq -r '.checks.cache') [[ "$v" == "disabled" ]] || fail "/readyz.checks.cache != disabled (got '$v')" ok "/readyz (pg ok, cache disabled)" -# /metrics exposes Prometheus-format counters after a few requests +# /metrics is on the loopback-only listener now, not on the public router. +# Drive a couple of requests through the public API first so the counters +# have something to render, then scrape from the metrics port. curl -fsS "$API_BASE/blocks" >/dev/null curl -fsS "$API_BASE/blocks" >/dev/null -metrics=$(curl -fsS "$API_BASE/metrics") +# Public router must NOT serve /metrics — confirm that's a 404. +code=$(curl -s -o /dev/null -w '%{http_code}' "$API_BASE/metrics") +[[ "$code" == "404" ]] || fail "/metrics leaked on public router (got $code, want 404)" +metrics=$(curl -fsS "$METRICS_URL/metrics") || fail "metrics listener not reachable on $METRICS_URL" echo "$metrics" | grep -q '^indexer_api_requests_total{' || fail "/metrics missing indexer_api_requests_total" echo "$metrics" | grep -q '^indexer_api_request_seconds_bucket{' || fail "/metrics missing latency histogram" -ok "/metrics (Prometheus exposition with request counter + latency histogram)" +ok "/metrics (loopback listener: counters + latency histogram present, public router 404s)" # request_id header set on every response (generated server-side; clients # pass through if they want).