Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ use tokio_util::sync::CancellationToken;
struct IndexerConfig {
database_url: String,
rpc_url: String,
/// Optional separate base URL for the native REST endpoints
/// (`/chain/blocks/<n>`, `/tx/<hash>`). Defaults to `rpc_url` when unset.
/// Lets us point JSON-RPC at `<host>/rpc` and REST at `<host>` (root)
/// when bypassing the Caddy edge that handles the path rewrite.
#[serde(default)]
rest_url: Option<String>,
#[serde(default)]
grpc_url: Option<String>,
#[serde(default)]
Expand Down Expand Up @@ -88,8 +94,10 @@ async fn main() -> anyhow::Result<()> {
let provider = ChainProvider::http(&cfg.rpc_url)?;
// Native REST client for `/chain/blocks/<n>` + `/tx/<hash>` — Sentrix's
// EVM JSON-RPC ignores `full=true` on getBlockByNumber, so block + tx
// ingest goes via the native REST path. Same host, different URL space.
let rest = RestClient::new(&cfg.rpc_url)?;
// ingest goes via the native REST path. REST_URL falls back to RPC_URL;
// override when JSON-RPC needs `/rpc` suffix (direct fullnode bypass).
let rest_base = cfg.rest_url.as_deref().unwrap_or(&cfg.rpc_url);
let rest = RestClient::new(rest_base)?;
let cancel = CancellationToken::new();

// Analytics flusher (optional). The handle threads into both the
Expand Down
63 changes: 45 additions & 18 deletions crates/chain/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,71 @@ use alloy_primitives::{Address, Bytes};
use alloy_provider::{Provider, ProviderBuilder, RootProvider};
use alloy_rpc_types::{Block, BlockNumberOrTag, Filter, Log, TransactionInput, TransactionRequest};
use indexer_domain::BlockHeight;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

/// Concrete provider type — alloy 2.0's default HTTP transport (reqwest).
/// Hidden from callers behind [`ChainProvider`]; exposed via
/// [`ChainProvider::raw`] for advanced use.
pub type HttpProvider = RootProvider;

/// Thin wrapper around an alloy `RootProvider` keyed to a single Sentrix
/// JSON-RPC endpoint. Cheap to clone (the underlying provider is `Arc`-y).
/// Thin wrapper around one or more alloy `RootProvider`s. Single-URL is
/// the common case; multi-URL spreads JSON-RPC load round-robin across
/// fullnodes for higher backfill throughput.
#[derive(Clone)]
pub struct ChainProvider {
inner: HttpProvider,
inners: Vec<HttpProvider>,
cursor: Arc<AtomicUsize>,
}

impl ChainProvider {
/// Build a provider from an HTTP(S) URL.
/// Build a provider from a comma-separated URL list (or a single URL).
/// Returns Err if no valid URL parses.
pub fn http(url: &str) -> ChainResult<Self> {
let url = url
.parse::<reqwest::Url>()
.map_err(|e| ChainError::InvalidArgument(format!("bad rpc url: {e}")))?;
let inner = ProviderBuilder::new()
.disable_recommended_fillers()
.connect_http(url);
Ok(Self { inner })
let mut inners = Vec::new();
for raw in url.split(',') {
let trimmed = raw.trim();
if trimmed.is_empty() {
continue;
}
let parsed = trimmed.parse::<reqwest::Url>().map_err(|e| {
ChainError::InvalidArgument(format!("bad rpc url '{trimmed}': {e}"))
})?;
let inner = ProviderBuilder::new()
.disable_recommended_fillers()
.connect_http(parsed);
inners.push(inner);
}
if inners.is_empty() {
return Err(ChainError::InvalidArgument("rpc url is empty".to_string()));
}
Ok(Self {
inners,
cursor: Arc::new(AtomicUsize::new(0)),
})
}

/// Round-robin pick. Atomic FetchAdd is lock-free across concurrent
/// fetch tasks in the backfill pipeline.
fn next(&self) -> &HttpProvider {
let i = self.cursor.fetch_add(1, Ordering::Relaxed);
&self.inners[i % self.inners.len()]
}

/// Underlying provider, exposed for advanced use (custom RPC calls, etc).
/// First underlying provider, exposed for advanced use (custom RPC calls).
/// Multi-URL callers should prefer the wrapped methods which round-robin.
pub fn raw(&self) -> &HttpProvider {
&self.inner
&self.inners[0]
}

/// `eth_chainId`.
pub async fn chain_id(&self) -> ChainResult<u64> {
self.inner.get_chain_id().await.map_err(rpc_err)
self.next().get_chain_id().await.map_err(rpc_err)
}

/// Latest finalized block height per the node we're talking to.
pub async fn block_number(&self) -> ChainResult<BlockHeight> {
let n = self.inner.get_block_number().await.map_err(rpc_err)?;
let n = self.next().get_block_number().await.map_err(rpc_err)?;
Ok(BlockHeight::from(n))
}

Expand All @@ -62,7 +89,7 @@ impl ChainProvider {
ChainError::InvalidArgument(format!("block_with_txs: negative height {h:?}"))
})?;
let tag = BlockNumberOrTag::Number(n);
self.inner
self.next()
.get_block_by_number(tag)
.full()
.await
Expand Down Expand Up @@ -93,7 +120,7 @@ impl ChainProvider {
if let Some(addr) = address {
filter = filter.address(addr);
}
self.inner.get_logs(&filter).await.map_err(rpc_err)
self.next().get_logs(&filter).await.map_err(rpc_err)
}

/// `eth_call` against `to` with abi-encoded `data`. Returns the raw
Expand All @@ -104,6 +131,6 @@ impl ChainProvider {
let req = TransactionRequest::default()
.to(to)
.input(TransactionInput::new(data));
self.inner.call(req).await.map_err(rpc_err)
self.next().call(req).await.map_err(rpc_err)
}
}
106 changes: 98 additions & 8 deletions crates/chain/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
use crate::error::{ChainError, ChainResult};
use indexer_domain::{BlockHeight, Hash};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

/// Subset of the native `/tx/<hash>` response shape that the indexer needs.
Expand Down Expand Up @@ -42,30 +44,64 @@ pub struct NativeTxResponse {
pub data: Option<String>,
}

/// Native REST client.
/// Native REST client. Holds one or more base URLs and round-robins
/// requests across them. Single-URL is the common case; multi-URL is used
/// when the indexer talks directly to multiple fullnodes (bypassing the
/// public Caddy edge) to spread fetch load across endpoints.
#[derive(Debug, Clone)]
pub struct RestClient {
base: String,
bases: Vec<String>,
cursor: Arc<AtomicUsize>,
http: reqwest::Client,
}

impl RestClient {
/// Build a client pointing at the chain's HTTP base URL (no trailing
/// `/tx/...`). Default 10s request timeout.
/// Build a client from a comma-separated URL list (or a single URL).
/// Each URL is the chain's HTTP base (no `/tx/...` suffix). Default 10s
/// request timeout. Errors if any entry fails URL parsing — fail fast
/// at construction so a malformed entry can't surface as an intermittent
/// runtime failure when round-robin happens to pick it.
pub fn new(base_url: impl Into<String>) -> ChainResult<Self> {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
let raw = base_url.into();
let mut bases: Vec<String> = Vec::new();
for entry in raw.split(',') {
let trimmed = entry.trim().trim_end_matches('/');
if trimmed.is_empty() {
continue;
}
// Validate by parsing — discard the parsed value, keep the
// string form for the format!()-based URL composition below.
trimmed.parse::<reqwest::Url>().map_err(|e| {
ChainError::InvalidArgument(format!("bad rest url '{trimmed}': {e}"))
})?;
bases.push(trimmed.to_owned());
}
if bases.is_empty() {
return Err(ChainError::InvalidArgument(
"rest base url is empty".to_string(),
));
}
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?;
Ok(Self {
base: base_url.into().trim_end_matches('/').to_owned(),
bases,
cursor: Arc::new(AtomicUsize::new(0)),
http,
})
}

/// Pick the next base URL via round-robin. Atomic FetchAdd makes this
/// lock-free across concurrent fetch tasks.
fn next_base(&self) -> &str {
let i = self.cursor.fetch_add(1, Ordering::Relaxed);
&self.bases[i % self.bases.len()]
}

/// Fetch a tx by hash. Returns None on 404 so the caller can decide
/// whether to retry (chain hasn't seen it yet) vs surface as missing.
pub async fn tx(&self, hash: &Hash) -> ChainResult<Option<NativeTxResponse>> {
let url = format!("{}/tx/{}", self.base, hash);
let url = format!("{}/tx/{}", self.next_base(), hash);
let resp = self.http.get(&url).send().await?;
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(None);
Expand All @@ -86,7 +122,7 @@ impl RestClient {
/// jump the cursor straight to `window_start_block` rather than walking
/// 404s one-by-one.
pub async fn chain_info(&self) -> ChainResult<ChainInfoResponse> {
let url = format!("{}/chain/info", self.base);
let url = format!("{}/chain/info", self.next_base());
let resp = self.http.get(&url).send().await?;
if !resp.status().is_success() {
let status = resp.status();
Expand All @@ -101,7 +137,7 @@ impl RestClient {
/// Fetch a block by height with full txs inlined. Returns None on 404
/// (chain doesn't have this height yet, or asking past pruning window).
pub async fn block(&self, h: BlockHeight) -> ChainResult<Option<NativeBlockResponse>> {
let url = format!("{}/chain/blocks/{}", self.base, h.0);
let url = format!("{}/chain/blocks/{}", self.next_base(), h.0);
let resp = self.http.get(&url).send().await?;
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(None);
Expand Down Expand Up @@ -223,4 +259,58 @@ mod tests {
let raw = r#"{ "txid": "0xabc" }"#;
assert!(serde_json::from_str::<NativeTxResponse>(raw).is_err());
}

#[test]
fn parses_single_url() {
let c = RestClient::new("http://a.example/").unwrap();
assert_eq!(c.bases, vec!["http://a.example".to_string()]);
assert_eq!(c.next_base(), "http://a.example");
assert_eq!(c.next_base(), "http://a.example");
}

#[test]
fn parses_comma_list_trims_and_normalises() {
let c =
RestClient::new(" http://a.example/ , http://b.example , ,http://c.example/").unwrap();
assert_eq!(
c.bases,
vec![
"http://a.example".to_string(),
"http://b.example".to_string(),
"http://c.example".to_string(),
]
);
}

#[test]
fn round_robin_rotates_deterministically() {
let c = RestClient::new("http://a.example,http://b.example,http://c.example").unwrap();
let picks: Vec<&str> = (0..7).map(|_| c.next_base()).collect();
assert_eq!(
picks,
vec![
"http://a.example",
"http://b.example",
"http://c.example",
"http://a.example",
"http://b.example",
"http://c.example",
"http://a.example",
]
);
}

#[test]
fn rejects_empty_input() {
assert!(RestClient::new("").is_err());
assert!(RestClient::new(" ").is_err());
assert!(RestClient::new(", , ").is_err());
}

#[test]
fn rejects_malformed_url_at_construction() {
// Validation must catch bad entries up front, not on round-robin pick.
assert!(RestClient::new("not a url").is_err());
assert!(RestClient::new("http://ok.example,not://valid url with spaces").is_err());
}
}
40 changes: 40 additions & 0 deletions crates/db/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,46 @@ pub async fn latest_height(pool: &PgPool) -> DbResult<Option<BlockHeight>> {
Ok(h.map(BlockHeight))
}

/// Multi-row batch insert. ON CONFLICT (height) DO NOTHING per row, so a
/// crash mid-batch on retry hits the existing rows and silently skips them.
/// Used by the bulk-COPY-style backfill path to amortise transaction
/// overhead — one commit per batch instead of one per block.
pub async fn insert_batch<'e, E>(executor: E, blocks_in: &[Block]) -> DbResult<()>
where
E: sqlx::PgExecutor<'e>,
{
if blocks_in.is_empty() {
return Ok(());
}
let mut qb = sqlx::QueryBuilder::new(
"INSERT INTO blocks (height, hash, parent_hash, timestamp, validator, \
gas_used, gas_limit, base_fee, tx_count, state_root, round, justification_signers) ",
);
qb.push_values(blocks_in.iter(), |mut row, b| {
let signers = serde_json::Value::Array(
b.justification_signers
.iter()
.map(|s| serde_json::Value::String(s.clone()))
.collect(),
);
row.push_bind(b.height)
.push_bind(&b.hash)
.push_bind(&b.parent_hash)
.push_bind(b.timestamp)
.push_bind(&b.validator)
.push_bind(b.gas_used)
.push_bind(b.gas_limit)
.push_bind(b.base_fee)
.push_bind(b.tx_count)
.push_bind(&b.state_root)
.push_bind(b.round)
.push_bind(signers);
});
qb.push(" ON CONFLICT (height) DO NOTHING");
qb.build().execute(executor).await?;
Ok(())
}

/// Delete a block (and dependent txs/logs via FK CASCADE) at a height. Used
/// by the reorg rewind path.
pub async fn delete_at<'e, E>(executor: E, h: BlockHeight) -> DbResult<u64>
Expand Down
29 changes: 29 additions & 0 deletions crates/db/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,35 @@ where
Ok(())
}

/// Multi-row batch insert. ON CONFLICT (block_height, log_index) DO NOTHING
/// per row, so a crash mid-batch on retry hits the existing rows and silently
/// skips them.
pub async fn insert_batch<'e, E>(executor: E, logs_in: &[Log]) -> DbResult<()>
where
E: sqlx::PgExecutor<'e>,
{
if logs_in.is_empty() {
return Ok(());
}
let mut qb = sqlx::QueryBuilder::new(
"INSERT INTO logs (block_height, tx_hash, log_index, address, topic0, topic1, topic2, topic3, data) ",
);
qb.push_values(logs_in.iter(), |mut row, l| {
row.push_bind(l.block_height)
.push_bind(&l.tx_hash)
.push_bind(l.log_index)
.push_bind(&l.address)
.push_bind(&l.topic0)
.push_bind(&l.topic1)
.push_bind(&l.topic2)
.push_bind(&l.topic3)
.push_bind(&l.data);
});
qb.push(" ON CONFLICT (block_height, log_index) DO NOTHING");
qb.build().execute(executor).await?;
Ok(())
}

/// All logs for a given tx, ordered by `log_index`.
pub async fn for_tx(pool: &PgPool, tx_hash: &Hash) -> DbResult<Vec<Log>> {
let rows = sqlx::query(
Expand Down
Loading
Loading