From 056a4fb81b0ddf1eb96bf3a39d276b3fdf2bf979 Mon Sep 17 00:00:00 2001 From: satyakwok <119509589+satyakwok@users.noreply.github.com> Date: Thu, 14 May 2026 17:02:19 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(sync):=20bulk=20PG=20COPY=20for=20back?= =?UTF-8?q?fill=20writes=20(single-row=20INSERT=20=E2=86=92=20batched=20CO?= =?UTF-8?q?PY)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backfill switches from per-block sqlx::Transaction with per-row INSERT to a buffered bulk path: collect N blocks (default 100, INDEXER_WRITE_BATCH_SIZE 1..=1000) then flush blocks/transactions/logs in one transaction via three COPY ... FROM STDIN streams + a single cursor bump to MAX(height). 404 damaged-block gaps fold into the same batch as cursor-only heights so the bump still lands atomically with the surrounding blocks. Tail loop's ingest_one keeps the per-row write_block path with ON CONFLICT DO NOTHING — at-tip retries across gRPC reconnects need that idempotency, and a single-block tail commit doesn't benefit from COPY anyway. Spec §5 invariant 2 holds: cursor advance is inside the same transaction as the COPY streams, partial-batch flush on cancel keeps the cursor in sync with the data. Drops ON CONFLICT for the batch path — backfill cursor monotonicity within a single run rules out duplicates inside a batch, and the entire batch is one rollback boundary on crash. Bumps CHANGELOG (Unreleased / Changed). Adds INDEXER_WRITE_BATCH_SIZE to compose.env.example. New unit tests cover write_text COPY-format escaping and batch_cursor max-height + 404-fold semantics (3 tests, sync 4 → 7). --- CHANGELOG.md | 4 + compose.env.example | 2 + crates/sync/src/backfill.rs | 153 +++++++--- crates/sync/src/block_writer.rs | 485 +++++++++++++++++++++++++++++--- 4 files changed, 566 insertions(+), 78 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15adc8e..9e0ef3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to `indexer-rs`. Format: [Keep a Changelog](https://keepacha ## [Unreleased] +### Changed + +- **Backfill writes now go through PG `COPY FROM STDIN`**, not per-row INSERT. Buffer N blocks (default 100, configurable via `INDEXER_WRITE_BATCH_SIZE`, max 1000) then flush blocks/transactions/logs in a single transaction with the cursor bump to `MAX(height)`. The tail loop's `ingest_one` keeps the per-row `ON CONFLICT DO NOTHING` path for at-tip retries. Spec §5 invariant 2 (cursor never lands ahead of data) holds: cursor advance is inside the same transaction as the COPY streams; partial-batch flush on cancel keeps the cursor in sync. + ### Added - **CoinBlast orphan adoption** — direct-deployed curves (e.g. CBLAST Genesis pre-factory) are now probed via `eth_call` (`token()` / `curveSupply()` / `graduationSrxThreshold()`) on first sighting and adopted into `cb_tokens` automatically. Topic-collision contracts get cached in `known_non_curves` so each address is probed at most once per worker lifetime. diff --git a/compose.env.example b/compose.env.example index 2a2da17..5e6afa5 100644 --- a/compose.env.example +++ b/compose.env.example @@ -14,6 +14,8 @@ RPC_URL=https://rpc.sentrixchain.com INDEXER_NETWORK=mainnet # INDEXER_PG_MAX_CONNECTIONS=10 # INDEXER_BACKFILL_LOOP_SECS=5 +# INDEXER_BACKFILL_CONCURRENCY=50 # blocks fetched in parallel (1..=500) +# INDEXER_WRITE_BATCH_SIZE=100 # blocks per bulk-COPY transaction (1..=1000) # Optional: gRPC tail loop (sub-second tip detection). Leave unset to # rely on the backfill loop's polling cadence. diff --git a/crates/sync/src/backfill.rs b/crates/sync/src/backfill.rs index 481be8b..9f0ebdc 100644 --- a/crates/sync/src/backfill.rs +++ b/crates/sync/src/backfill.rs @@ -1,22 +1,31 @@ -//! Backfill loop — pipelined concurrent fetch + serial write. +//! Backfill loop — pipelined concurrent fetch + bulk-COPY batched write. //! //! Strategy: read cursor → ask chain for tip → walk from cursor+1 to //! min(tip - safe_lag, max_backfill). For each height range, fetch N -//! blocks concurrently (REST + eth_getLogs per block); commit them -//! sequentially in height order so cursor never lands ahead of data -//! (spec §5 invariant 2). Retries via [`indexer_chain::retry_with_backoff`]; -//! permanent failures bubble to the orchestrator. +//! blocks concurrently (REST + eth_getLogs per block); buffer them in +//! height order then flush every `batch_size` blocks (or on cancel / +//! stream end) via PG `COPY FROM STDIN`. Cursor advances to MAX(height) +//! of each batch inside the same transaction so cursor never lands ahead +//! of data (spec §5 invariant 2). Retries via +//! [`indexer_chain::retry_with_backoff`]; permanent failures bubble to the +//! orchestrator. //! //! Concurrency: tunable via `INDEXER_BACKFILL_CONCURRENCY` env (default 50). -//! Wall-clock measured improvement vs sequential: 3 → 51 blocks/sec on -//! mainnet (2026-05-14, 1.5M-block backfill 138h → ~8h). +//! Batch size: tunable via `INDEXER_WRITE_BATCH_SIZE` env (default 100, +//! capped at 1000 to keep memory bounded and avoid blowing the cursor lag +//! window during a long-running batch). +//! +//! Wall-clock measured improvement vs sequential per-row INSERT: +//! - 3 → 51 blocks/sec (PR #33, fetch concurrency) +//! - 51 → ? blocks/sec (this PR, bulk COPY) — see PR body for live benchmark. //! //! Cancellation: caller passes a [`CancellationToken`]. The pipeline races //! `cancel.cancelled()` against the next item via `tokio::select!`, so a //! cancel during a slow fetch returns within the cancel timeout, not after -//! the in-flight chunk completes. +//! the in-flight chunk completes. Any partial buffer is flushed atomically +//! before return so the cursor still lands. -use crate::block_writer::{BlockBundle, write_block}; +use crate::block_writer::{BlockBundle, write_block_batch}; use crate::convert::{to_domain_block_from_native, to_domain_log, to_domain_txs_from_native}; use crate::cursor::{read_cursor, write_cursor}; use crate::{SyncConfig, SyncError, SyncResult}; @@ -29,10 +38,21 @@ use tokio_util::sync::CancellationToken; /// How many blocks to fetch concurrently in the backfill window. /// Each fetch is one REST call + one eth_getLogs call. Writes still -/// land sequentially per block. 50 is a conservative default — see +/// land sequentially per batch. 50 is a conservative default — see /// `INDEXER_BACKFILL_CONCURRENCY` env var to tune. const DEFAULT_BACKFILL_CONCURRENCY: usize = 50; +/// How many blocks to buffer before issuing a bulk-COPY transaction. Each +/// batch is one PG transaction with three COPY streams (blocks → txs → logs). +/// 100 is the sweet spot: small enough that a fault rolls back ≤2s of work, +/// large enough that COPY round-trip overhead amortises across many rows. +const DEFAULT_WRITE_BATCH_SIZE: usize = 100; + +/// Hard cap on `INDEXER_WRITE_BATCH_SIZE`. Memory is the limiter — at +/// 1000 blocks × ~10KB per block worst-case, one batch holds ~10MB of row +/// data plus the COPY text-format buffer. +const MAX_WRITE_BATCH_SIZE: usize = 1000; + fn backfill_concurrency() -> usize { std::env::var("INDEXER_BACKFILL_CONCURRENCY") .ok() @@ -41,6 +61,14 @@ fn backfill_concurrency() -> usize { .unwrap_or(DEFAULT_BACKFILL_CONCURRENCY) } +fn write_batch_size() -> usize { + std::env::var("INDEXER_WRITE_BATCH_SIZE") + .ok() + .and_then(|v| v.parse().ok()) + .filter(|&n: &usize| n > 0 && n <= MAX_WRITE_BATCH_SIZE) + .unwrap_or(DEFAULT_WRITE_BATCH_SIZE) +} + /// Run the backfill loop until cancellation OR until we reach /// `tip - safe_lag` (then return; caller decides whether to sleep + recheck /// or hand off to the tail loop). @@ -65,22 +93,18 @@ pub async fn run_backfill( return Ok(cursor); } + let concurrency = backfill_concurrency(); + let batch_size = write_batch_size(); tracing::info!( from = cursor.0 + 1, to = cap, tip = tip.0, safe_lag = cfg.safe_lag, - "backfill: starting walk", + concurrency, + batch_size, + "backfill: starting walk (concurrent fetch + bulk-COPY batched write)", ); - // Pipeline: fetch N blocks concurrently (each = REST + eth_getLogs), - // write them sequentially in height order so the cursor never lands - // ahead of the data. Per-block latency was the bottleneck (3 b/s); - // concurrent fetch + serial write keeps invariants while saturating - // the network. INDEXER_BACKFILL_CONCURRENCY env var tunes the pool. - let concurrency = backfill_concurrency(); - tracing::info!(concurrency, "backfill: pipelined fetch enabled"); - let start = cursor.0 + 1; let total = (cap - cursor.0) as usize; @@ -91,40 +115,69 @@ pub async fn run_backfill( }) .buffered(concurrency); + // Reused across batches — `write_block_batch` clears them on success. + let mut buf: Vec = Vec::with_capacity(batch_size); + let mut gap_buf: Vec = Vec::new(); + let mut done = 0usize; loop { // Race cancellation against the next item — a cancel issued while - // a slow fetch is in flight returns immediately rather than after - // the chunk completes. - let (h, result) = tokio::select! { + // a slow fetch is in flight returns immediately. Any in-progress + // batch buffer is flushed below before we exit so the cursor lands. + let next = tokio::select! { biased; _ = cancel.cancelled() => { - tracing::info!(cursor = cursor.0, "backfill: cancelled mid-pipeline"); + tracing::info!( + cursor = cursor.0, + pending_blocks = buf.len(), + pending_gaps = gap_buf.len(), + "backfill: cancelled mid-pipeline, flushing partial batch", + ); + if !buf.is_empty() || !gap_buf.is_empty() { + let max_h = peek_batch_max(&buf, &gap_buf); + write_block_batch(pool, &mut buf, &mut gap_buf, analytics).await?; + cursor = max_h; + } return Ok(cursor); } - next = fetched.next() => match next { - Some(item) => item, - None => break, - }, + next = fetched.next() => next, }; + + let Some((h, result)) = next else { break }; + match result? { - Some(bundle) => write_block(pool, bundle, analytics).await?, + Some(bundle) => buf.push(bundle), None => { - // 404 / damaged-block gap (see ingest_one rationale below) + // 404 / damaged-block gap — see ingest_one rationale below. + // Folded into the same batch as a cursor-only height so the + // bump still lands atomically with the surrounding blocks. tracing::warn!(height = h.0, "backfill: skipping single 404 block"); - write_cursor(pool, h, 0).await?; + gap_buf.push(h); } } - cursor = h; + done += 1; - if done.is_multiple_of(1000) { - tracing::info!( - cursor = cursor.0, - progress = format!("{done}/{total}"), - "backfill: pipeline progress" - ); + if buf.len() + gap_buf.len() >= batch_size { + let max_h = peek_batch_max(&buf, &gap_buf); + write_block_batch(pool, &mut buf, &mut gap_buf, analytics).await?; + cursor = max_h; + if done.is_multiple_of(1000) { + tracing::info!( + cursor = cursor.0, + progress = format!("{done}/{total}"), + "backfill: pipeline progress" + ); + } } } + + // Drain any tail < batch_size left in the buffer when the stream ends. + if !buf.is_empty() || !gap_buf.is_empty() { + let max_h = peek_batch_max(&buf, &gap_buf); + write_block_batch(pool, &mut buf, &mut gap_buf, analytics).await?; + cursor = max_h; + } + tracing::info!( cursor = cursor.0, ingested = done, @@ -133,6 +186,25 @@ pub async fn run_backfill( Ok(cursor) } +/// Largest height across both buffers — used to pre-compute the cursor we'll +/// land on so the post-flush update is a cheap clone, not a re-scan after the +/// buffers have been moved into the writer. Returns the previous cursor's +/// sentinel (-1) if both buffers are empty (caller checks first). +fn peek_batch_max(bundles: &[BlockBundle], gaps: &[BlockHeight]) -> BlockHeight { + let mut max_h = BlockHeight(i64::MIN); + for b in bundles { + if b.block.height.0 > max_h.0 { + max_h = b.block.height; + } + } + for h in gaps { + if h.0 > max_h.0 { + max_h = *h; + } + } + max_h +} + /// Fetch (no write) — used by the concurrent pipeline. Returns None on /// 404 (damaged-block gap), Some(bundle) on success. Errors propagate. async fn fetch_one( @@ -165,7 +237,10 @@ async fn fetch_one( })) } -/// Fetch + write one block. Pulled out for the tail loop to reuse. +/// Fetch + write one block. Pulled out for the tail loop to reuse — the +/// per-row `write_block` path with `ON CONFLICT DO NOTHING` is the right +/// shape at the tip where the same height may be re-attempted across +/// gRPC reconnects (single block, idempotent retry). /// /// Blocks + their tx envelopes come from the native REST endpoint /// (`/chain/blocks/`) — Sentrix's `eth_getBlockByNumber(full=true)` @@ -219,7 +294,7 @@ pub async fn ingest_one( .collect::, _>>() .map_err(|e| SyncError::Invalid(e.to_string()))?; - write_block( + crate::block_writer::write_block( pool, BlockBundle { block: dom_block, diff --git a/crates/sync/src/block_writer.rs b/crates/sync/src/block_writer.rs index 94adcf4..d6a0dd0 100644 --- a/crates/sync/src/block_writer.rs +++ b/crates/sync/src/block_writer.rs @@ -1,10 +1,20 @@ -//! Atomic block-write transaction. +//! Atomic block-write transactions — single-block + bulk-COPY batch paths. //! -//! Wraps block + tx + log inserts + cursor advance in a single -//! `sqlx::Transaction`. Either all of it commits (cursor reflects the new -//! state) or none of it does (a crash mid-write leaves the cursor pointing -//! at the previous height + the partial rows are rolled back). Spec §5 -//! invariants 1, 2, 10. +//! Two write modes share one bundle type: +//! +//! - [`write_block`] — single-block path used by the tail loop. Uses per-row +//! INSERT with `ON CONFLICT DO NOTHING` for at-least-once delivery semantics +//! at the tip (where the same height may be re-attempted across reconnects). +//! - [`write_block_batch`] — bulk-COPY path used by the backfill loop. Streams +//! N blocks worth of rows into one PG transaction via `COPY ... FROM STDIN`, +//! advances the cursor to `MAX(height)` of the batch, and commits. Trades +//! the `ON CONFLICT` idempotency-on-replay for raw write throughput; the +//! backfill cursor invariant (monotonic, only advances after commit) means +//! we never re-walk a height inside a single backfill run anyway. +//! +//! Both paths keep the cursor advance inside the same `sqlx::Transaction` as +//! the data writes, so a crash mid-write leaves the cursor pointing at the +//! previous height with the partial rows rolled back (spec §5 invariants 1, 2). //! //! After commit, optionally pushes one [`indexer_analytics::RawTxRow`] per //! tx into the analytics buffer. The push is fire-and-forget — analytics is @@ -15,10 +25,11 @@ use crate::cursor::write_cursor; use crate::{SyncError, SyncResult}; use indexer_analytics::{AnalyticsHandle, RawTxRow}; use indexer_db::{PgPool, blocks, logs, transactions}; -use indexer_domain::{Block, Log, Transaction}; +use indexer_domain::{Block, BlockHeight, Log, Transaction}; +use sqlx::{Postgres, Transaction as SqlxTransaction}; /// Bundle of rows to write atomically. Built by the sync loop before calling -/// [`write_block`]. +/// [`write_block`] or buffered for [`write_block_batch`]. pub struct BlockBundle { /// The block header row. pub block: Block, @@ -59,42 +70,438 @@ pub async fn write_block( tx.commit().await.map_err(SyncError::from)?; - // Best-effort analytics push, after the SQL boundary so a failed flusher - // can't roll back our data. + push_analytics_for_block(analytics, &b.block, &b.txs); + + Ok(()) +} + +/// Write a batch of blocks via PG `COPY FROM STDIN` — one transaction, three +/// COPY streams (blocks → transactions → logs in FK order), one cursor bump +/// to `MAX(height)`. Drains the buffer on success or returns Err and leaves +/// the buffer untouched on failure (caller decides retry). +/// +/// Cursor advance lives inside the same transaction as the data, so a crash +/// mid-batch rolls everything back and the cursor stays at the previous +/// batch's MAX. Spec §5 invariant 2 (cursor never lands ahead of data) holds. +/// +/// `cursor_only_heights` lets the backfill loop fold 404 / damaged-block +/// gaps into the batch without extra round-trips: heights listed here have +/// no rows of their own but contribute to the cursor max so the gap doesn't +/// stall the batch boundary. +pub async fn write_block_batch( + pool: &PgPool, + bundles: &mut Vec, + cursor_only_heights: &mut Vec, + analytics: Option<&AnalyticsHandle>, +) -> SyncResult<()> { + if bundles.is_empty() && cursor_only_heights.is_empty() { + return Ok(()); + } + + // Pick the max-height block whose timestamp seeds the cursor's `updated_at` + // (best chain-time we have for this batch). If the batch is cursor-only + // (all 404 gaps), we have no timestamp — fall back to 0 like the per-row + // gap-skip path does. + let (cursor_height, cursor_ts) = batch_cursor(bundles, cursor_only_heights); + + let mut tx = pool.begin().await.map_err(SyncError::from)?; + + // FK ordering: blocks first, then transactions (FK → blocks), then logs + // (FK → transactions). Each COPY drains its slice in one round-trip. + if !bundles.is_empty() { + copy_blocks(&mut tx, bundles).await?; + copy_transactions(&mut tx, bundles).await?; + copy_logs(&mut tx, bundles).await?; + } + + write_cursor(&mut *tx, cursor_height, cursor_ts).await?; + + tx.commit().await.map_err(SyncError::from)?; + + // Best-effort analytics push, after the SQL boundary. if let Some(handle) = analytics { - // Should never hit (writer only runs on heights coming back from - // resolved blocks, never the -1 sentinel from the cursor) but keep - // analytics non-fatal: warn + skip the row, don't panic the loop. - let block_height = match b.block.height.as_u64() { - Some(h) => h, - None => { - tracing::warn!( - height = ?b.block.height, - "analytics: skipping row — block height not convertible to u64 \ - (cursor sentinel reached writer; this should not happen)" - ); - return Ok(()); - } - }; + for b in bundles.iter() { + push_analytics_for_block(Some(handle), &b.block, &b.txs); + } + } + + bundles.clear(); + cursor_only_heights.clear(); + Ok(()) +} + +fn batch_cursor( + bundles: &[BlockBundle], + cursor_only: &[BlockHeight], +) -> (BlockHeight, i64) { + // Track the highest block + its timestamp; cursor-only heights still + // contribute to the max but carry no timestamp. + let mut max_h = BlockHeight(i64::MIN); + let mut max_ts = 0i64; + for b in bundles { + if b.block.height.0 > max_h.0 { + max_h = b.block.height; + max_ts = b.block.timestamp; + } + } + for h in cursor_only { + if h.0 > max_h.0 { + max_h = *h; + // No timestamp for a 404-only height; mirror the per-row skip + // path which also writes 0 here. + max_ts = 0; + } + } + (max_h, max_ts) +} + +// ─── COPY helpers ──────────────────────────────────────────────────────────── +// +// All three use PG text COPY format (default) — tab-separated columns, `\N` +// for NULL, with backslash / tab / newline / CR escaping per +// . +// Text format handles `numeric(78,0)` (decimal string), `jsonb` (UTF-8), and +// every other column type in our schema without per-type wire encoders. +// +// We intentionally COPY directly into the target tables (not a staging temp +// table + INSERT … ON CONFLICT). The backfill cursor invariant guarantees we +// never re-walk a height inside one run, and the entire batch is one +// transaction — so duplicates inside a batch are impossible. Replay across +// runs from a manually-rewound cursor would conflict, but operators in that +// situation already truncate downstream tables. The tail loop's `write_block` +// keeps the per-row `ON CONFLICT DO NOTHING` path for at-tip re-attempts. + +async fn copy_blocks( + tx: &mut SqlxTransaction<'_, Postgres>, + bundles: &[BlockBundle], +) -> SyncResult<()> { + let mut buf = String::with_capacity(bundles.len() * 256); + for b in bundles { + let blk = &b.block; + // jsonb: serde_json gives compact UTF-8 with no tabs/newlines inside + // (arrays of hex strings); still pass through `escape_text` to be safe + // if a future justifier value ever picks up control bytes. + let signers = serde_json::Value::Array( + blk.justification_signers + .iter() + .map(|s| serde_json::Value::String(s.clone())) + .collect(), + ); + let signers_str = signers.to_string(); + + // Columns: height, hash, parent_hash, timestamp, validator, gas_used, + // gas_limit, base_fee, tx_count, state_root, round, justification_signers. + write_int(&mut buf, blk.height.0); + buf.push('\t'); + write_text(&mut buf, &blk.hash); + buf.push('\t'); + write_text(&mut buf, &blk.parent_hash); + buf.push('\t'); + write_int(&mut buf, blk.timestamp); + buf.push('\t'); + write_text(&mut buf, &blk.validator); + buf.push('\t'); + write_int(&mut buf, blk.gas_used); + buf.push('\t'); + write_int(&mut buf, blk.gas_limit); + buf.push('\t'); + match &blk.base_fee { + Some(w) => write_text(&mut buf, &w.to_string()), + None => buf.push_str("\\N"), + } + buf.push('\t'); + write_int(&mut buf, blk.tx_count as i64); + buf.push('\t'); + match &blk.state_root { + Some(s) => write_text(&mut buf, s), + None => buf.push_str("\\N"), + } + buf.push('\t'); + write_int(&mut buf, blk.round as i64); + buf.push('\t'); + write_text(&mut buf, &signers_str); + buf.push('\n'); + } + + let mut copy = tx + .copy_in_raw( + "COPY blocks (height, hash, parent_hash, timestamp, validator, gas_used, \ + gas_limit, base_fee, tx_count, state_root, round, justification_signers) \ + FROM STDIN", + ) + .await + .map_err(SyncError::from)?; + copy.send(buf.as_bytes()).await.map_err(SyncError::from)?; + copy.finish().await.map_err(SyncError::from)?; + Ok(()) +} + +async fn copy_transactions( + tx: &mut SqlxTransaction<'_, Postgres>, + bundles: &[BlockBundle], +) -> SyncResult<()> { + // Pre-size: most blocks have 1 tx (coinbase), occasional bursts higher. + let estimate: usize = bundles.iter().map(|b| b.txs.len()).sum(); + if estimate == 0 { + // Empty COPY is allowed but skip the round-trip. + return Ok(()); + } + let mut buf = String::with_capacity(estimate * 256); + for b in bundles { for t in &b.txs { - let row = RawTxRow { - block_height, - timestamp: b.block.timestamp as u64, - tx_hash: t.hash.clone(), - from_addr: t.from_addr.clone(), - to_addr: t.to_addr.clone(), - value_str: t.value.to_string(), - fee_str: t.fee.to_string(), - gas_used: t.gas_used.unwrap_or(0) as u64, - status: t.status as u8, - tx_type: t.tx_type.as_str().to_string(), - }; - if let Err(e) = handle.push(row) { - tracing::warn!(error = %e, "analytics push failed; flusher closed?"); - break; + write_text(&mut buf, &t.hash); + buf.push('\t'); + write_int(&mut buf, t.block_height.0); + buf.push('\t'); + write_int(&mut buf, t.tx_index.0 as i64); + buf.push('\t'); + write_text(&mut buf, &t.from_addr); + buf.push('\t'); + match &t.to_addr { + Some(s) => write_text(&mut buf, s), + None => buf.push_str("\\N"), + } + buf.push('\t'); + write_text(&mut buf, &t.value.to_string()); + buf.push('\t'); + write_int(&mut buf, t.gas_limit); + buf.push('\t'); + match t.gas_used { + Some(g) => write_int(&mut buf, g), + None => buf.push_str("\\N"), + } + buf.push('\t'); + match &t.gas_price { + Some(w) => write_text(&mut buf, &w.to_string()), + None => buf.push_str("\\N"), + } + buf.push('\t'); + write_text(&mut buf, &t.fee.to_string()); + buf.push('\t'); + write_int(&mut buf, t.nonce); + buf.push('\t'); + match &t.data { + Some(s) => write_text(&mut buf, s), + None => buf.push_str("\\N"), + } + buf.push('\t'); + write_int(&mut buf, t.status as i64); + buf.push('\t'); + match &t.contract_address { + Some(s) => write_text(&mut buf, s), + None => buf.push_str("\\N"), } + buf.push('\t'); + write_text(&mut buf, t.tx_type.as_str()); + buf.push('\n'); } } + let mut copy = tx + .copy_in_raw( + "COPY transactions (hash, block_height, tx_index, from_addr, to_addr, value, \ + gas_limit, gas_used, gas_price, fee, nonce, data, status, contract_address, \ + tx_type) FROM STDIN", + ) + .await + .map_err(SyncError::from)?; + copy.send(buf.as_bytes()).await.map_err(SyncError::from)?; + copy.finish().await.map_err(SyncError::from)?; + Ok(()) +} +async fn copy_logs( + tx: &mut SqlxTransaction<'_, Postgres>, + bundles: &[BlockBundle], +) -> SyncResult<()> { + let estimate: usize = bundles.iter().map(|b| b.logs.len()).sum(); + if estimate == 0 { + return Ok(()); + } + let mut buf = String::with_capacity(estimate * 256); + for b in bundles { + for l in &b.logs { + write_int(&mut buf, l.block_height.0); + buf.push('\t'); + write_text(&mut buf, &l.tx_hash); + buf.push('\t'); + write_int(&mut buf, l.log_index.0 as i64); + buf.push('\t'); + write_text(&mut buf, &l.address); + buf.push('\t'); + match &l.topic0 { + Some(s) => write_text(&mut buf, s), + None => buf.push_str("\\N"), + } + buf.push('\t'); + match &l.topic1 { + Some(s) => write_text(&mut buf, s), + None => buf.push_str("\\N"), + } + buf.push('\t'); + match &l.topic2 { + Some(s) => write_text(&mut buf, s), + None => buf.push_str("\\N"), + } + buf.push('\t'); + match &l.topic3 { + Some(s) => write_text(&mut buf, s), + None => buf.push_str("\\N"), + } + buf.push('\t'); + match &l.data { + Some(s) => write_text(&mut buf, s), + None => buf.push_str("\\N"), + } + buf.push('\n'); + } + } + let mut copy = tx + .copy_in_raw( + "COPY logs (block_height, tx_hash, log_index, address, topic0, topic1, topic2, \ + topic3, data) FROM STDIN", + ) + .await + .map_err(SyncError::from)?; + copy.send(buf.as_bytes()).await.map_err(SyncError::from)?; + copy.finish().await.map_err(SyncError::from)?; Ok(()) } + +/// Write a numeric column without allocation. +#[inline] +fn write_int(buf: &mut String, v: i64) { + use std::fmt::Write; + let _ = write!(buf, "{v}"); +} + +/// Write a text column with PG COPY text-format escaping. Only four bytes +/// need escaping in default text format: backslash, tab, newline, CR. +/// `\b` / `\f` / `\v` are legal raw inside a field. Empty string stays +/// empty (not NULL). +fn write_text(buf: &mut String, s: &str) { + for ch in s.chars() { + match ch { + '\\' => buf.push_str("\\\\"), + '\t' => buf.push_str("\\t"), + '\n' => buf.push_str("\\n"), + '\r' => buf.push_str("\\r"), + other => buf.push(other), + } + } +} + +fn push_analytics_for_block( + analytics: Option<&AnalyticsHandle>, + block: &Block, + txs: &[Transaction], +) { + let Some(handle) = analytics else { return }; + // Should never hit (writer only runs on heights coming back from + // resolved blocks, never the -1 sentinel from the cursor) but keep + // analytics non-fatal: warn + skip the row, don't panic the loop. + let block_height = match block.height.as_u64() { + Some(h) => h, + None => { + tracing::warn!( + height = ?block.height, + "analytics: skipping row — block height not convertible to u64 \ + (cursor sentinel reached writer; this should not happen)" + ); + return; + } + }; + for t in txs { + let row = RawTxRow { + block_height, + timestamp: block.timestamp as u64, + tx_hash: t.hash.clone(), + from_addr: t.from_addr.clone(), + to_addr: t.to_addr.clone(), + value_str: t.value.to_string(), + fee_str: t.fee.to_string(), + gas_used: t.gas_used.unwrap_or(0) as u64, + status: t.status as u8, + tx_type: t.tx_type.as_str().to_string(), + }; + if let Err(e) = handle.push(row) { + tracing::warn!(error = %e, "analytics push failed; flusher closed?"); + break; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn write_text_escapes_pg_copy_specials() { + let mut buf = String::new(); + write_text(&mut buf, "plain"); + assert_eq!(buf, "plain"); + + buf.clear(); + write_text(&mut buf, "tab\there"); + assert_eq!(buf, "tab\\there"); + + buf.clear(); + write_text(&mut buf, "back\\slash"); + assert_eq!(buf, "back\\\\slash"); + + buf.clear(); + write_text(&mut buf, "line\nfeed"); + assert_eq!(buf, "line\\nfeed"); + + buf.clear(); + write_text(&mut buf, "carriage\rreturn"); + assert_eq!(buf, "carriage\\rreturn"); + + // jsonb-shaped text passes through untouched (no specials): + buf.clear(); + write_text(&mut buf, r#"["0xabc","0xdef"]"#); + assert_eq!(buf, r#"["0xabc","0xdef"]"#); + } + + #[test] + fn batch_cursor_picks_max_height_with_timestamp() { + // Three real blocks at heights 10/12/11 + a 404-skipped height 13. + // Max is 13 (cursor-only) so timestamp falls back to 0. + let mk = |h: i64, ts: i64| BlockBundle { + block: Block { + height: BlockHeight(h), + hash: format!("0xh{h}"), + parent_hash: format!("0xp{h}"), + timestamp: ts, + validator: "0xv".into(), + gas_used: 0, + gas_limit: 0, + base_fee: None, + tx_count: 0, + state_root: None, + round: 0, + justification_signers: vec![], + }, + txs: vec![], + logs: vec![], + }; + let bundles = vec![mk(10, 100), mk(12, 120), mk(11, 110)]; + let gaps = vec![BlockHeight(13)]; + let (h, ts) = batch_cursor(&bundles, &gaps); + assert_eq!(h.0, 13); + assert_eq!(ts, 0); // 404 height has no timestamp. + + // Without the gap, max is 12 with timestamp 120. + let (h, ts) = batch_cursor(&bundles, &[]); + assert_eq!(h.0, 12); + assert_eq!(ts, 120); + } + + #[test] + fn batch_cursor_handles_empty_bundles_with_only_gaps() { + let bundles: Vec = vec![]; + let gaps = vec![BlockHeight(7), BlockHeight(9), BlockHeight(8)]; + let (h, ts) = batch_cursor(&bundles, &gaps); + assert_eq!(h.0, 9); + assert_eq!(ts, 0); + } +} From 743da95dcf5a4fd0eb66f512cbbf8895177fba1d Mon Sep 17 00:00:00 2001 From: satyakwok Date: Thu, 14 May 2026 17:40:29 +0200 Subject: [PATCH 2/2] style: cargo fmt --- crates/sync/src/block_writer.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/sync/src/block_writer.rs b/crates/sync/src/block_writer.rs index d6a0dd0..47ca8ea 100644 --- a/crates/sync/src/block_writer.rs +++ b/crates/sync/src/block_writer.rs @@ -130,10 +130,7 @@ pub async fn write_block_batch( Ok(()) } -fn batch_cursor( - bundles: &[BlockBundle], - cursor_only: &[BlockHeight], -) -> (BlockHeight, i64) { +fn batch_cursor(bundles: &[BlockBundle], cursor_only: &[BlockHeight]) -> (BlockHeight, i64) { // Track the highest block + its timestamp; cursor-only heights still // contribute to the max but carry no timestamp. let mut max_h = BlockHeight(i64::MIN);