feat(sync): bulk PG COPY for backfill writes (lift the 50 b/s write ceiling)#35
Conversation
…ched COPY) Backfill switches from per-block sqlx::Transaction with per-row INSERT to a buffered bulk path: collect N blocks (default 100, INDEXER_WRITE_BATCH_SIZE 1..=1000) then flush blocks/transactions/logs in one transaction via three COPY ... FROM STDIN streams + a single cursor bump to MAX(height). 404 damaged-block gaps fold into the same batch as cursor-only heights so the bump still lands atomically with the surrounding blocks. Tail loop's ingest_one keeps the per-row write_block path with ON CONFLICT DO NOTHING — at-tip retries across gRPC reconnects need that idempotency, and a single-block tail commit doesn't benefit from COPY anyway. Spec §5 invariant 2 holds: cursor advance is inside the same transaction as the COPY streams, partial-batch flush on cancel keeps the cursor in sync with the data. Drops ON CONFLICT for the batch path — backfill cursor monotonicity within a single run rules out duplicates inside a batch, and the entire batch is one rollback boundary on crash. Bumps CHANGELOG (Unreleased / Changed). Adds INDEXER_WRITE_BATCH_SIZE to compose.env.example. New unit tests cover write_text COPY-format escaping and batch_cursor max-height + 404-fold semantics (3 tests, sync 4 → 7).
📝 WalkthroughWalkthroughThis PR refactors the indexer backfill from per-block inserts to buffered bulk-COPY batched writes. It adds INDEXER_WRITE_BATCH_SIZE (default 100, cap 1000) and documents env examples. A new write_block_batch builds three COPY FROM STDIN streams (blocks, transactions, logs) inside one transaction, updates the cursor to the batch MAX(height), commits, then performs best-effort analytics pushes. run_backfill now buffers bundles and gap heights, flushes atomically when batch_size is reached or on cancellation/stream-end, and uses batch max for cursor advancement while retaining the per-row tail ingest path. Sequence Diagram(s)sequenceDiagram
participant ComponentA
participant ComponentB
ComponentA->>ComponentB: observable interaction
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/sync/src/backfill.rs`:
- Around line 64-69: The write_batch_size() currently filters out values >
MAX_WRITE_BATCH_SIZE and falls back to DEFAULT_WRITE_BATCH_SIZE; change it to
clamp parsed values above the cap to MAX_WRITE_BATCH_SIZE instead of rejecting
them. In write_batch_size(), parse INDEXER_WRITE_BATCH_SIZE as before, return
DEFAULT_WRITE_BATCH_SIZE for parse/failure or non-positive values, but if the
parsed value > MAX_WRITE_BATCH_SIZE return MAX_WRITE_BATCH_SIZE, otherwise
return the parsed value; reference the write_batch_size() function and constants
MAX_WRITE_BATCH_SIZE and DEFAULT_WRITE_BATCH_SIZE when making the change.
In `@crates/sync/src/block_writer.rs`:
- Around line 133-136: The function signature for batch_cursor is formatted
across multiple lines and fails rustfmt; reflow the signature to a single line
(e.g., fn batch_cursor(bundles: &[BlockBundle], cursor_only: &[BlockHeight]) ->
(BlockHeight, i64) {) and then run rustfmt/cargo fmt to apply the standard
formatting for the crate; update any nearby signatures in the same file if they
were split similarly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: a9bfcc13-895e-4173-b08c-6630fd45ad3a
📒 Files selected for processing (4)
CHANGELOG.mdcompose.env.examplecrates/sync/src/backfill.rscrates/sync/src/block_writer.rs
| fn write_batch_size() -> usize { | ||
| std::env::var("INDEXER_WRITE_BATCH_SIZE") | ||
| .ok() | ||
| .and_then(|v| v.parse().ok()) | ||
| .filter(|&n: &usize| n > 0 && n <= MAX_WRITE_BATCH_SIZE) | ||
| .unwrap_or(DEFAULT_WRITE_BATCH_SIZE) |
There was a problem hiding this comment.
Clamp oversized INDEXER_WRITE_BATCH_SIZE values instead of resetting to 100.
Values above 1000 currently get rejected by the filter and silently fall back to the default batch size. That contradicts the documented “capped at 1000” behavior and can unexpectedly cut backfill throughput when an operator intentionally overprovisions.
Suggested fix
fn write_batch_size() -> usize {
std::env::var("INDEXER_WRITE_BATCH_SIZE")
.ok()
- .and_then(|v| v.parse().ok())
- .filter(|&n: &usize| n > 0 && n <= MAX_WRITE_BATCH_SIZE)
+ .and_then(|v| v.parse::<usize>().ok())
+ .map(|n| match n {
+ 0 => DEFAULT_WRITE_BATCH_SIZE,
+ _ => n.min(MAX_WRITE_BATCH_SIZE),
+ })
.unwrap_or(DEFAULT_WRITE_BATCH_SIZE)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn write_batch_size() -> usize { | |
| std::env::var("INDEXER_WRITE_BATCH_SIZE") | |
| .ok() | |
| .and_then(|v| v.parse().ok()) | |
| .filter(|&n: &usize| n > 0 && n <= MAX_WRITE_BATCH_SIZE) | |
| .unwrap_or(DEFAULT_WRITE_BATCH_SIZE) | |
| fn write_batch_size() -> usize { | |
| std::env::var("INDEXER_WRITE_BATCH_SIZE") | |
| .ok() | |
| .and_then(|v| v.parse::<usize>().ok()) | |
| .map(|n| match n { | |
| 0 => DEFAULT_WRITE_BATCH_SIZE, | |
| _ => n.min(MAX_WRITE_BATCH_SIZE), | |
| }) | |
| .unwrap_or(DEFAULT_WRITE_BATCH_SIZE) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/sync/src/backfill.rs` around lines 64 - 69, The write_batch_size()
currently filters out values > MAX_WRITE_BATCH_SIZE and falls back to
DEFAULT_WRITE_BATCH_SIZE; change it to clamp parsed values above the cap to
MAX_WRITE_BATCH_SIZE instead of rejecting them. In write_batch_size(), parse
INDEXER_WRITE_BATCH_SIZE as before, return DEFAULT_WRITE_BATCH_SIZE for
parse/failure or non-positive values, but if the parsed value >
MAX_WRITE_BATCH_SIZE return MAX_WRITE_BATCH_SIZE, otherwise return the parsed
value; reference the write_batch_size() function and constants
MAX_WRITE_BATCH_SIZE and DEFAULT_WRITE_BATCH_SIZE when making the change.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/sync/src/block_writer.rs`:
- Around line 91-117: Before computing batch cursor and writing it, validate
that the union of heights from bundles and cursor_only_heights is strictly
contiguous (no gaps) and has no duplicates; add a helper (e.g.
validate_contiguous_batch) and call it at the start of write_block_batch (before
batch_cursor/write_cursor) to collect heights from bundles via
BlockBundle.block.height.0 and from cursor_only_heights via BlockHeight.0, sort
them, and return a SyncError if any adjacent pair are equal or not exactly +1 so
the transaction fails instead of advancing the cursor past missing blocks.
- Around line 434-460: Add a regression assertion to the
write_text_escapes_pg_copy_specials test to verify COPY's null sentinel is
escaped: call write_text(&mut buf, r"\N") (or "\\N" literal) and assert the
output equals the escaped form (r"\\N"), ensuring the write_text function still
converts a bare "\N" into "\\N" so it won’t be interpreted as NULL by COPY;
modify the existing test function write_text_escapes_pg_copy_specials to include
this case.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: c861c2c1-1ed0-4a55-9533-bdf7510f102b
📒 Files selected for processing (1)
crates/sync/src/block_writer.rs
| pub async fn write_block_batch( | ||
| pool: &PgPool, | ||
| bundles: &mut Vec<BlockBundle>, | ||
| cursor_only_heights: &mut Vec<BlockHeight>, | ||
| analytics: Option<&AnalyticsHandle>, | ||
| ) -> SyncResult<()> { | ||
| if bundles.is_empty() && cursor_only_heights.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| // Pick the max-height block whose timestamp seeds the cursor's `updated_at` | ||
| // (best chain-time we have for this batch). If the batch is cursor-only | ||
| // (all 404 gaps), we have no timestamp — fall back to 0 like the per-row | ||
| // gap-skip path does. | ||
| let (cursor_height, cursor_ts) = batch_cursor(bundles, cursor_only_heights); | ||
|
|
||
| let mut tx = pool.begin().await.map_err(SyncError::from)?; | ||
|
|
||
| // FK ordering: blocks first, then transactions (FK → blocks), then logs | ||
| // (FK → transactions). Each COPY drains its slice in one round-trip. | ||
| if !bundles.is_empty() { | ||
| copy_blocks(&mut tx, bundles).await?; | ||
| copy_transactions(&mut tx, bundles).await?; | ||
| copy_logs(&mut tx, bundles).await?; | ||
| } | ||
|
|
||
| write_cursor(&mut *tx, cursor_height, cursor_ts).await?; |
There was a problem hiding this comment.
Reject sparse batches before advancing the cursor.
This path always bumps the cursor to MAX(height) even if bundles plus cursor_only_heights do not cover every intervening height. If the caller ever hands this function [100, 102] with no explicit gap for 101, the transaction still commits and write_cursor permanently skips 101. Please validate contiguity (and ideally duplicates) before calling batch_cursor/write_cursor, so the batch fails closed instead of silently losing data.
Possible guard
fn validate_contiguous_batch(
bundles: &[BlockBundle],
cursor_only_heights: &[BlockHeight],
) -> SyncResult<()> {
let mut heights: Vec<i64> = bundles
.iter()
.map(|b| b.block.height.0)
.chain(cursor_only_heights.iter().map(|h| h.0))
.collect();
heights.sort_unstable();
for pair in heights.windows(2) {
if pair[0] == pair[1] || pair[1] != pair[0] + 1 {
return Err(SyncError::msg("non-contiguous backfill batch"));
}
}
Ok(())
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/sync/src/block_writer.rs` around lines 91 - 117, Before computing
batch cursor and writing it, validate that the union of heights from bundles and
cursor_only_heights is strictly contiguous (no gaps) and has no duplicates; add
a helper (e.g. validate_contiguous_batch) and call it at the start of
write_block_batch (before batch_cursor/write_cursor) to collect heights from
bundles via BlockBundle.block.height.0 and from cursor_only_heights via
BlockHeight.0, sort them, and return a SyncError if any adjacent pair are equal
or not exactly +1 so the transaction fails instead of advancing the cursor past
missing blocks.
| #[test] | ||
| fn write_text_escapes_pg_copy_specials() { | ||
| let mut buf = String::new(); | ||
| write_text(&mut buf, "plain"); | ||
| assert_eq!(buf, "plain"); | ||
|
|
||
| buf.clear(); | ||
| write_text(&mut buf, "tab\there"); | ||
| assert_eq!(buf, "tab\\there"); | ||
|
|
||
| buf.clear(); | ||
| write_text(&mut buf, "back\\slash"); | ||
| assert_eq!(buf, "back\\\\slash"); | ||
|
|
||
| buf.clear(); | ||
| write_text(&mut buf, "line\nfeed"); | ||
| assert_eq!(buf, "line\\nfeed"); | ||
|
|
||
| buf.clear(); | ||
| write_text(&mut buf, "carriage\rreturn"); | ||
| assert_eq!(buf, "carriage\\rreturn"); | ||
|
|
||
| // jsonb-shaped text passes through untouched (no specials): | ||
| buf.clear(); | ||
| write_text(&mut buf, r#"["0xabc","0xdef"]"#); | ||
| assert_eq!(buf, r#"["0xabc","0xdef"]"#); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Add a regression case for the COPY null sentinel.
COPY FROM STDIN treats bare \N as NULL. write_text currently escapes that correctly, but this test suite does not pin the behavior down. A dedicated assertion here would protect against a future refactor turning a literal string into a null field.
Suggested test addition
buf.clear();
write_text(&mut buf, "back\\slash");
assert_eq!(buf, "back\\\\slash");
+ buf.clear();
+ write_text(&mut buf, r#"\N"#);
+ assert_eq!(buf, r#"\\N"#);
+
buf.clear();
write_text(&mut buf, "line\nfeed");
assert_eq!(buf, "line\\nfeed");📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #[test] | |
| fn write_text_escapes_pg_copy_specials() { | |
| let mut buf = String::new(); | |
| write_text(&mut buf, "plain"); | |
| assert_eq!(buf, "plain"); | |
| buf.clear(); | |
| write_text(&mut buf, "tab\there"); | |
| assert_eq!(buf, "tab\\there"); | |
| buf.clear(); | |
| write_text(&mut buf, "back\\slash"); | |
| assert_eq!(buf, "back\\\\slash"); | |
| buf.clear(); | |
| write_text(&mut buf, "line\nfeed"); | |
| assert_eq!(buf, "line\\nfeed"); | |
| buf.clear(); | |
| write_text(&mut buf, "carriage\rreturn"); | |
| assert_eq!(buf, "carriage\\rreturn"); | |
| // jsonb-shaped text passes through untouched (no specials): | |
| buf.clear(); | |
| write_text(&mut buf, r#"["0xabc","0xdef"]"#); | |
| assert_eq!(buf, r#"["0xabc","0xdef"]"#); | |
| } | |
| #[test] | |
| fn write_text_escapes_pg_copy_specials() { | |
| let mut buf = String::new(); | |
| write_text(&mut buf, "plain"); | |
| assert_eq!(buf, "plain"); | |
| buf.clear(); | |
| write_text(&mut buf, "tab\there"); | |
| assert_eq!(buf, "tab\\there"); | |
| buf.clear(); | |
| write_text(&mut buf, "back\\slash"); | |
| assert_eq!(buf, "back\\\\slash"); | |
| buf.clear(); | |
| write_text(&mut buf, r#"\N"#); | |
| assert_eq!(buf, r#"\\N"#); | |
| buf.clear(); | |
| write_text(&mut buf, "line\nfeed"); | |
| assert_eq!(buf, "line\\nfeed"); | |
| buf.clear(); | |
| write_text(&mut buf, "carriage\rreturn"); | |
| assert_eq!(buf, "carriage\\rreturn"); | |
| // jsonb-shaped text passes through untouched (no specials): | |
| buf.clear(); | |
| write_text(&mut buf, r#"["0xabc","0xdef"]"#); | |
| assert_eq!(buf, r#"["0xabc","0xdef"]"#); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/sync/src/block_writer.rs` around lines 434 - 460, Add a regression
assertion to the write_text_escapes_pg_copy_specials test to verify COPY's null
sentinel is escaped: call write_text(&mut buf, r"\N") (or "\\N" literal) and
assert the output equals the escaped form (r"\\N"), ensuring the write_text
function still converts a bare "\N" into "\\N" so it won’t be interpreted as
NULL by COPY; modify the existing test function
write_text_escapes_pg_copy_specials to include this case.
Summary
Why
PR #33 lifted fetch concurrency to 50 and measured 51 blocks/sec, but writes still serialised per-block — one `BEGIN/INSERT × N rows/COMMIT` round-trip per block, plus per-row INSERT overhead inside each transaction. With ~1.5M backfill blocks remaining the wall-clock at 51 b/s is still ~8h. PG `COPY FROM STDIN` collapses N blocks worth of inserts into one transaction with three streamed COPY commands — the chatter that was bounding throughput goes away.
Trade-off accepted
The batch path drops `ON CONFLICT DO NOTHING` (COPY doesn't support it). Inside a single backfill run the cursor monotonicity invariant rules out duplicate writes, and the whole batch is one rollback boundary on crash. Replay across runs from a manually-rewound cursor would conflict — operators in that case already truncate downstream tables. The tail loop's per-row path keeps the conflict guard for at-tip behaviour.
Implementation notes
Expected speedup
Per-row INSERT path: ~3 PG round-trips per block (BEGIN, batch of inserts, COMMIT) + per-row index/check overhead. COPY path at batch=100: ~1 round-trip amortised over 100 blocks + COPY's bulk-insert PG fast path that bypasses per-row WAL framing overhead. PG community benchmarks routinely show 5-20x for COPY vs INSERT at row counts ≥10k. With most blocks being coinbase-only (~2 rows total per block — block + coinbase tx), expecting 3-10x lift on top of the 51 b/s ceiling — i.e. landing in the 150-500 b/s band depending on PG/disk headroom on vps4.
Live benchmark
Skipped — operator has the running indexer on vps4 and I don't have SSH there. Suggested check on deploy:
```bash
Before / after restart, sample for 60s with a 5s gap:
sudo docker exec indexer-rs-mainnet-postgres-1 psql -U indexer -d indexer -t -A \
-c "SELECT value FROM _meta WHERE key='last_synced_height';"
sleep 60
sudo docker exec indexer-rs-mainnet-postgres-1 psql -U indexer -d indexer -t -A \
-c "SELECT value FROM _meta WHERE key='last_synced_height';"
delta / 60 = blocks/sec
```
If 100 feels too aggressive on PG load, dial via env: `INDEXER_WRITE_BATCH_SIZE=50` or `200`. Hard cap is 1000.
Test plan
Summary by CodeRabbit
New Features
Performance
Documentation
Tests