Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to `indexer-rs`. Format: [Keep a Changelog](https://keepacha

## [Unreleased]

### Changed

- **Backfill writes now go through PG `COPY FROM STDIN`**, not per-row INSERT. Buffer N blocks (default 100, configurable via `INDEXER_WRITE_BATCH_SIZE`, max 1000) then flush blocks/transactions/logs in a single transaction with the cursor bump to `MAX(height)`. The tail loop's `ingest_one` keeps the per-row `ON CONFLICT DO NOTHING` path for at-tip retries. Spec §5 invariant 2 (cursor never lands ahead of data) holds: cursor advance is inside the same transaction as the COPY streams; partial-batch flush on cancel keeps the cursor in sync.

### Added

- **CoinBlast orphan adoption** — direct-deployed curves (e.g. CBLAST Genesis pre-factory) are now probed via `eth_call` (`token()` / `curveSupply()` / `graduationSrxThreshold()`) on first sighting and adopted into `cb_tokens` automatically. Topic-collision contracts get cached in `known_non_curves` so each address is probed at most once per worker lifetime.
Expand Down
2 changes: 2 additions & 0 deletions compose.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ RPC_URL=https://rpc.sentrixchain.com
INDEXER_NETWORK=mainnet
# INDEXER_PG_MAX_CONNECTIONS=10
# INDEXER_BACKFILL_LOOP_SECS=5
# INDEXER_BACKFILL_CONCURRENCY=50 # blocks fetched in parallel (1..=500)
# INDEXER_WRITE_BATCH_SIZE=100 # blocks per bulk-COPY transaction (1..=1000)

# Optional: gRPC tail loop (sub-second tip detection). Leave unset to
# rely on the backfill loop's polling cadence.
Expand Down
153 changes: 114 additions & 39 deletions crates/sync/src/backfill.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
//! Backfill loop — pipelined concurrent fetch + serial write.
//! Backfill loop — pipelined concurrent fetch + bulk-COPY batched write.
//!
//! Strategy: read cursor → ask chain for tip → walk from cursor+1 to
//! min(tip - safe_lag, max_backfill). For each height range, fetch N
//! blocks concurrently (REST + eth_getLogs per block); commit them
//! sequentially in height order so cursor never lands ahead of data
//! (spec §5 invariant 2). Retries via [`indexer_chain::retry_with_backoff`];
//! permanent failures bubble to the orchestrator.
//! blocks concurrently (REST + eth_getLogs per block); buffer them in
//! height order then flush every `batch_size` blocks (or on cancel /
//! stream end) via PG `COPY FROM STDIN`. Cursor advances to MAX(height)
//! of each batch inside the same transaction so cursor never lands ahead
//! of data (spec §5 invariant 2). Retries via
//! [`indexer_chain::retry_with_backoff`]; permanent failures bubble to the
//! orchestrator.
//!
//! Concurrency: tunable via `INDEXER_BACKFILL_CONCURRENCY` env (default 50).
//! Wall-clock measured improvement vs sequential: 3 → 51 blocks/sec on
//! mainnet (2026-05-14, 1.5M-block backfill 138h → ~8h).
//! Batch size: tunable via `INDEXER_WRITE_BATCH_SIZE` env (default 100,
//! capped at 1000 to keep memory bounded and avoid blowing the cursor lag
//! window during a long-running batch).
//!
//! Wall-clock measured improvement vs sequential per-row INSERT:
//! - 3 → 51 blocks/sec (PR #33, fetch concurrency)
//! - 51 → ? blocks/sec (this PR, bulk COPY) — see PR body for live benchmark.
//!
//! Cancellation: caller passes a [`CancellationToken`]. The pipeline races
//! `cancel.cancelled()` against the next item via `tokio::select!`, so a
//! cancel during a slow fetch returns within the cancel timeout, not after
//! the in-flight chunk completes.
//! the in-flight chunk completes. Any partial buffer is flushed atomically
//! before return so the cursor still lands.

use crate::block_writer::{BlockBundle, write_block};
use crate::block_writer::{BlockBundle, write_block_batch};
use crate::convert::{to_domain_block_from_native, to_domain_log, to_domain_txs_from_native};
use crate::cursor::{read_cursor, write_cursor};
use crate::{SyncConfig, SyncError, SyncResult};
Expand All @@ -29,10 +38,21 @@ use tokio_util::sync::CancellationToken;

/// How many blocks to fetch concurrently in the backfill window.
/// Each fetch is one REST call + one eth_getLogs call. Writes still
/// land sequentially per block. 50 is a conservative default — see
/// land sequentially per batch. 50 is a conservative default — see
/// `INDEXER_BACKFILL_CONCURRENCY` env var to tune.
const DEFAULT_BACKFILL_CONCURRENCY: usize = 50;

/// How many blocks to buffer before issuing a bulk-COPY transaction. Each
/// batch is one PG transaction with three COPY streams (blocks → txs → logs).
/// 100 is the sweet spot: small enough that a fault rolls back ≤2s of work,
/// large enough that COPY round-trip overhead amortises across many rows.
const DEFAULT_WRITE_BATCH_SIZE: usize = 100;

/// Hard cap on `INDEXER_WRITE_BATCH_SIZE`. Memory is the limiter — at
/// 1000 blocks × ~10KB per block worst-case, one batch holds ~10MB of row
/// data plus the COPY text-format buffer.
const MAX_WRITE_BATCH_SIZE: usize = 1000;

fn backfill_concurrency() -> usize {
std::env::var("INDEXER_BACKFILL_CONCURRENCY")
.ok()
Expand All @@ -41,6 +61,14 @@ fn backfill_concurrency() -> usize {
.unwrap_or(DEFAULT_BACKFILL_CONCURRENCY)
}

fn write_batch_size() -> usize {
std::env::var("INDEXER_WRITE_BATCH_SIZE")
.ok()
.and_then(|v| v.parse().ok())
.filter(|&n: &usize| n > 0 && n <= MAX_WRITE_BATCH_SIZE)
.unwrap_or(DEFAULT_WRITE_BATCH_SIZE)
Comment on lines +64 to +69
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Clamp oversized INDEXER_WRITE_BATCH_SIZE values instead of resetting to 100.

Values above 1000 currently get rejected by the filter and silently fall back to the default batch size. That contradicts the documented “capped at 1000” behavior and can unexpectedly cut backfill throughput when an operator intentionally overprovisions.

Suggested fix
 fn write_batch_size() -> usize {
     std::env::var("INDEXER_WRITE_BATCH_SIZE")
         .ok()
-        .and_then(|v| v.parse().ok())
-        .filter(|&n: &usize| n > 0 && n <= MAX_WRITE_BATCH_SIZE)
+        .and_then(|v| v.parse::<usize>().ok())
+        .map(|n| match n {
+            0 => DEFAULT_WRITE_BATCH_SIZE,
+            _ => n.min(MAX_WRITE_BATCH_SIZE),
+        })
         .unwrap_or(DEFAULT_WRITE_BATCH_SIZE)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn write_batch_size() -> usize {
std::env::var("INDEXER_WRITE_BATCH_SIZE")
.ok()
.and_then(|v| v.parse().ok())
.filter(|&n: &usize| n > 0 && n <= MAX_WRITE_BATCH_SIZE)
.unwrap_or(DEFAULT_WRITE_BATCH_SIZE)
fn write_batch_size() -> usize {
std::env::var("INDEXER_WRITE_BATCH_SIZE")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.map(|n| match n {
0 => DEFAULT_WRITE_BATCH_SIZE,
_ => n.min(MAX_WRITE_BATCH_SIZE),
})
.unwrap_or(DEFAULT_WRITE_BATCH_SIZE)
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/sync/src/backfill.rs` around lines 64 - 69, The write_batch_size()
currently filters out values > MAX_WRITE_BATCH_SIZE and falls back to
DEFAULT_WRITE_BATCH_SIZE; change it to clamp parsed values above the cap to
MAX_WRITE_BATCH_SIZE instead of rejecting them. In write_batch_size(), parse
INDEXER_WRITE_BATCH_SIZE as before, return DEFAULT_WRITE_BATCH_SIZE for
parse/failure or non-positive values, but if the parsed value >
MAX_WRITE_BATCH_SIZE return MAX_WRITE_BATCH_SIZE, otherwise return the parsed
value; reference the write_batch_size() function and constants
MAX_WRITE_BATCH_SIZE and DEFAULT_WRITE_BATCH_SIZE when making the change.

}

/// Run the backfill loop until cancellation OR until we reach
/// `tip - safe_lag` (then return; caller decides whether to sleep + recheck
/// or hand off to the tail loop).
Expand All @@ -65,22 +93,18 @@ pub async fn run_backfill(
return Ok(cursor);
}

let concurrency = backfill_concurrency();
let batch_size = write_batch_size();
tracing::info!(
from = cursor.0 + 1,
to = cap,
tip = tip.0,
safe_lag = cfg.safe_lag,
"backfill: starting walk",
concurrency,
batch_size,
"backfill: starting walk (concurrent fetch + bulk-COPY batched write)",
);

// Pipeline: fetch N blocks concurrently (each = REST + eth_getLogs),
// write them sequentially in height order so the cursor never lands
// ahead of the data. Per-block latency was the bottleneck (3 b/s);
// concurrent fetch + serial write keeps invariants while saturating
// the network. INDEXER_BACKFILL_CONCURRENCY env var tunes the pool.
let concurrency = backfill_concurrency();
tracing::info!(concurrency, "backfill: pipelined fetch enabled");

let start = cursor.0 + 1;
let total = (cap - cursor.0) as usize;

Expand All @@ -91,40 +115,69 @@ pub async fn run_backfill(
})
.buffered(concurrency);

// Reused across batches — `write_block_batch` clears them on success.
let mut buf: Vec<BlockBundle> = Vec::with_capacity(batch_size);
let mut gap_buf: Vec<BlockHeight> = Vec::new();

let mut done = 0usize;
loop {
// Race cancellation against the next item — a cancel issued while
// a slow fetch is in flight returns immediately rather than after
// the chunk completes.
let (h, result) = tokio::select! {
// a slow fetch is in flight returns immediately. Any in-progress
// batch buffer is flushed below before we exit so the cursor lands.
let next = tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!(cursor = cursor.0, "backfill: cancelled mid-pipeline");
tracing::info!(
cursor = cursor.0,
pending_blocks = buf.len(),
pending_gaps = gap_buf.len(),
"backfill: cancelled mid-pipeline, flushing partial batch",
);
if !buf.is_empty() || !gap_buf.is_empty() {
let max_h = peek_batch_max(&buf, &gap_buf);
write_block_batch(pool, &mut buf, &mut gap_buf, analytics).await?;
cursor = max_h;
}
return Ok(cursor);
}
next = fetched.next() => match next {
Some(item) => item,
None => break,
},
next = fetched.next() => next,
};

let Some((h, result)) = next else { break };

match result? {
Some(bundle) => write_block(pool, bundle, analytics).await?,
Some(bundle) => buf.push(bundle),
None => {
// 404 / damaged-block gap (see ingest_one rationale below)
// 404 / damaged-block gap — see ingest_one rationale below.
// Folded into the same batch as a cursor-only height so the
// bump still lands atomically with the surrounding blocks.
tracing::warn!(height = h.0, "backfill: skipping single 404 block");
write_cursor(pool, h, 0).await?;
gap_buf.push(h);
}
}
cursor = h;

done += 1;
if done.is_multiple_of(1000) {
tracing::info!(
cursor = cursor.0,
progress = format!("{done}/{total}"),
"backfill: pipeline progress"
);
if buf.len() + gap_buf.len() >= batch_size {
let max_h = peek_batch_max(&buf, &gap_buf);
write_block_batch(pool, &mut buf, &mut gap_buf, analytics).await?;
cursor = max_h;
if done.is_multiple_of(1000) {
tracing::info!(
cursor = cursor.0,
progress = format!("{done}/{total}"),
"backfill: pipeline progress"
);
}
}
}

// Drain any tail < batch_size left in the buffer when the stream ends.
if !buf.is_empty() || !gap_buf.is_empty() {
let max_h = peek_batch_max(&buf, &gap_buf);
write_block_batch(pool, &mut buf, &mut gap_buf, analytics).await?;
cursor = max_h;
}

tracing::info!(
cursor = cursor.0,
ingested = done,
Expand All @@ -133,6 +186,25 @@ pub async fn run_backfill(
Ok(cursor)
}

/// Largest height across both buffers — used to pre-compute the cursor we'll
/// land on so the post-flush update is a cheap clone, not a re-scan after the
/// buffers have been moved into the writer. Returns the previous cursor's
/// sentinel (-1) if both buffers are empty (caller checks first).
fn peek_batch_max(bundles: &[BlockBundle], gaps: &[BlockHeight]) -> BlockHeight {
let mut max_h = BlockHeight(i64::MIN);
for b in bundles {
if b.block.height.0 > max_h.0 {
max_h = b.block.height;
}
}
for h in gaps {
if h.0 > max_h.0 {
max_h = *h;
}
}
max_h
}

/// Fetch (no write) — used by the concurrent pipeline. Returns None on
/// 404 (damaged-block gap), Some(bundle) on success. Errors propagate.
async fn fetch_one(
Expand Down Expand Up @@ -165,7 +237,10 @@ async fn fetch_one(
}))
}

/// Fetch + write one block. Pulled out for the tail loop to reuse.
/// Fetch + write one block. Pulled out for the tail loop to reuse — the
/// per-row `write_block` path with `ON CONFLICT DO NOTHING` is the right
/// shape at the tip where the same height may be re-attempted across
/// gRPC reconnects (single block, idempotent retry).
///
/// Blocks + their tx envelopes come from the native REST endpoint
/// (`/chain/blocks/<n>`) — Sentrix's `eth_getBlockByNumber(full=true)`
Expand Down Expand Up @@ -219,7 +294,7 @@ pub async fn ingest_one(
.collect::<Result<Vec<_>, _>>()
.map_err(|e| SyncError::Invalid(e.to_string()))?;

write_block(
crate::block_writer::write_block(
pool,
BlockBundle {
block: dom_block,
Expand Down
Loading
Loading