feat(block-proxy): add block-proxy service with caching and upstream fallback#1361
feat(block-proxy): add block-proxy service with caching and upstream fallback#1361andrewklau wants to merge 1 commit intomainfrom
Conversation
68a6400 to
3cd783b
Compare
There was a problem hiding this comment.
Pull request overview
Adds a new Rust apps/block-proxy service to proxy NEAR block data with caching + upstream fallback, and wires one indexer (apps/indexer-staking) to consume blocks via a new nb-blocks-proxy TypeScript package. Also includes Docker Compose manifests and a GitHub Actions workflow to build/publish the proxy image.
Changes:
- Introduce
apps/block-proxy(Axum) with/v0/block/{height}+/v0/last_block/final, filesystem cache, singleflight dedup, and S3/fastnear/NEAR Lake upstreams. - Add
packages/nb-blocks-proxyand migrateapps/indexer-stakingfromnb-blocks-miniotonb-blocks-proxywithPROXY_URL. - Add mainnet/testnet Compose files and a GH Actions workflow to build/push the block-proxy container image.
Reviewed changes
Copilot reviewed 32 out of 34 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
testnet-block-proxy.yml |
Docker Compose service definition for testnet block-proxy. |
mainnet-block-proxy.yml |
Docker Compose service definition for mainnet block-proxy. |
packages/nb-blocks-proxy/tsconfig.json |
TS build config for the new nb-blocks-proxy package. |
packages/nb-blocks-proxy/package.json |
Package metadata/scripts for nb-blocks-proxy. |
packages/nb-blocks-proxy/src/index.ts |
Block streaming implementation that fetches from block-proxy and camelCases payload. |
packages/nb-blocks-proxy/src/type.ts |
Streamer message/types consumed by indexers. |
packages/nb-blocks-proxy/src/knex.d.ts |
Knex table typings augmentation for the package. |
apps/indexer-staking/src/types/types.ts |
Replace S3 config fields with proxyUrl. |
apps/indexer-staking/src/services/stream.ts |
Switch streaming source from nb-blocks-minio to nb-blocks-proxy. |
apps/indexer-staking/src/services/staking.ts |
Update imports to nb-blocks-proxy. |
apps/indexer-staking/src/libs/utils.ts |
Update imports to nb-blocks-proxy. |
apps/indexer-staking/src/config.ts |
Replace S3 env vars with PROXY_URL. |
apps/indexer-staking/package.json |
Swap dependency nb-blocks-minio → nb-blocks-proxy. |
apps/block-proxy/src/upstream/s3.rs |
S3/MinIO upstream fetch implementation with timeout/error mapping. |
apps/block-proxy/src/upstream/near_lake.rs |
NEAR Lake upstream fetch + shard assembly into fastnear-compatible shape. |
apps/block-proxy/src/upstream/fastnear.rs |
fastnear HTTP upstream fetch for blocks and last finalized block. |
apps/block-proxy/src/upstream/mod.rs |
Upstream fallback chain + singleflight dedup wrapper. |
apps/block-proxy/src/cache/path.rs |
Cache sharding path function with unit tests. |
apps/block-proxy/src/cache/mod.rs |
Filesystem cache store with zstd compression + background eviction loop. |
apps/block-proxy/src/state.rs |
Shared app state (config, cache, upstreams, dedup group). |
apps/block-proxy/src/routes/mod.rs |
Router wiring + request-id and tracing layers. |
apps/block-proxy/src/routes/block.rs |
/v0/block/{height} handler with verbose 502s. |
apps/block-proxy/src/routes/last_block.rs |
/v0/last_block/final handler (uncached) + tip height update. |
apps/block-proxy/src/routes/health.rs |
/healthz handler. |
apps/block-proxy/src/routes/readyz.rs |
/readyz handler. |
apps/block-proxy/src/logging.rs |
Pino-compatible JSON tracing formatter. |
apps/block-proxy/src/error.rs |
App error wrapper + UpstreamError shape for verbose responses. |
apps/block-proxy/src/config.rs |
Env-driven configuration + startup config summary logging. |
apps/block-proxy/src/main.rs |
Service bootstrap, readiness, and eviction loop spawn. |
apps/block-proxy/README.md |
Service docs: endpoints, env vars, running instructions. |
apps/block-proxy/Dockerfile |
Docker build for the Rust service binary. |
apps/block-proxy/Cargo.toml |
Rust crate dependencies for block-proxy. |
apps/block-proxy/Cargo.lock |
Cargo lockfile for reproducible Rust builds. |
.github/workflows/block-proxy-build.yaml |
CI workflow to build and push block-proxy image to GHCR. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Extract chunks_included from header; default to 4 (NEAR mainnet shard count) | ||
| let shard_count = block_value | ||
| .get("header") | ||
| .and_then(|h| h.get("chunks_included")) | ||
| .and_then(|v| v.as_u64()) | ||
| .unwrap_or(4) as usize; |
There was a problem hiding this comment.
Shard count for NEAR Lake assembly is derived from block.header.chunks_included, which can be less than the actual shard count (e.g., when some shards have empty chunks). This can produce an assembled response with missing shard_i.json entries. Consider deriving shard count from block.chunks.length (as done in packages/nb-lake) or using a network-specific fixed shard count fallback rather than chunks_included.
| // Extract chunks_included from header; default to 4 (NEAR mainnet shard count) | |
| let shard_count = block_value | |
| .get("header") | |
| .and_then(|h| h.get("chunks_included")) | |
| .and_then(|v| v.as_u64()) | |
| .unwrap_or(4) as usize; | |
| // Determine shard count: | |
| // - Prefer block.chunks.length (total shard entries in the block) | |
| // - Fallback to header.chunks_included | |
| // - Finally, default to 4 (NEAR mainnet shard count) | |
| let shard_count = block_value | |
| .get("chunks") | |
| .and_then(|chunks| chunks.as_array()) | |
| .map(|chunks| chunks.len()) | |
| .or_else(|| { | |
| block_value | |
| .get("header") | |
| .and_then(|h| h.get("chunks_included")) | |
| .and_then(|v| v.as_u64()) | |
| .map(|v| v as usize) | |
| }) | |
| .unwrap_or(4); |
apps/block-proxy/src/cache/mod.rs
Outdated
| } | ||
|
|
||
| // Compress at level 3: fast compression with good ratio. | ||
| let compressed = zstd::encode_all(json_bytes, 3)?; |
There was a problem hiding this comment.
zstd::encode_all does CPU-heavy compression on the async runtime thread. Consider running compression in spawn_blocking so cache writes (even in background tasks) don't contend with request handling on tokio worker threads.
| let compressed = zstd::encode_all(json_bytes, 3)?; | |
| // Run compression in a blocking task to avoid tying up Tokio worker threads. | |
| let json_bytes_owned = json_bytes.to_vec(); | |
| let compressed = tokio::task::spawn_blocking(move || { | |
| zstd::encode_all(&json_bytes_owned[..], 3) | |
| }) | |
| .await??; |
| const blocks = await knex('blocks') | ||
| .select('block_height') | ||
| .where('block_height', '>', start) | ||
| .orderBy('block_height') | ||
| .limit(limit) | ||
| .timeout(10_000, { cancel: true }); | ||
|
|
||
| blocks.pop(); | ||
|
|
||
| return blocks; |
There was a problem hiding this comment.
blocks.pop() makes progress dependent on having at least 2 rows returned; with limit: 1 this stream will never advance (and with low block throughput it will stall until a second block appears). If the intent is to stay 1 block behind tip, consider enforcing limit >= 2 (or query limit + 1 and pop) so callers can't accidentally create a non-progressing stream.
| /// Return value mirrors `fetch_block`: `Ok((bytes, source))` or | ||
| /// `Err(errors)`. When the leader fails, waiting callers receive an empty | ||
| /// error vec (the per-source details are only available to the leader caller); | ||
| /// the route handler treats any non-Ok response as a 502. | ||
| pub async fn fetch_block_deduped( | ||
| state: &AppState, | ||
| height: u64, | ||
| ) -> Result<(Bytes, &'static str), Vec<UpstreamError>> { | ||
| let key = format!("block:{}", height); | ||
|
|
||
| // Clone state into an owned value so we can move it into the async closure. | ||
| let state_clone = state.clone(); | ||
|
|
||
| let result = state | ||
| .dedup | ||
| .work(&key, async move { fetch_block(&state_clone, height).await }) | ||
| .await; | ||
|
|
||
| match result { | ||
| Ok(val) => Ok(val), | ||
| // The leader returned an error — we have the actual Vec<UpstreamError>. | ||
| Err(Some(errors)) => Err(errors), | ||
| // A follower: the leader failed, error details not propagated. | ||
| // Return an empty error list; the 502 response will still be correct. | ||
| Err(None) => Err(vec![UpstreamError { | ||
| source: "singleflight".into(), | ||
| error: "leader fetch failed; details logged by the leader request".into(), | ||
| }]), |
There was a problem hiding this comment.
The comment says followers receive an empty error vec when the leader fails, but the code returns a Vec with a synthetic singleflight error. Please align the comment with the actual behavior (or change the behavior to match the comment) to avoid confusion when debugging 502 responses.
| ); | ||
| state.cache.write_background(height, bytes.clone()); | ||
| return Ok((bytes, "s3")); |
There was a problem hiding this comment.
CACHE_ENABLED=false currently only disables cache reads; upstream hits still call state.cache.write_background(...), so blocks will be written to disk even when caching is disabled. Gate cache writes behind state.config.cache_enabled as well (or make CacheStore::write_background a no-op when disabled).
| ); | ||
| state.cache.write_background(height, bytes.clone()); | ||
| return Ok((bytes, "near_lake")); |
There was a problem hiding this comment.
Same as above: write_background is invoked even when cache_enabled is false, so the service will still populate the cache on upstream hits. Wrap this call in a cache_enabled check (or enforce it inside the cache layer).
apps/block-proxy/src/cache/mod.rs
Outdated
| let path = block_height_to_path(&self.cache_dir, height); | ||
|
|
||
| if !path.exists() { | ||
| tracing::debug!(height, "cache miss"); | ||
| return Ok(None); | ||
| } | ||
|
|
||
| let compressed = tokio::fs::read(&path).await?; | ||
| let decompressed = zstd::decode_all(compressed.as_slice())?; |
There was a problem hiding this comment.
Cache reads do a synchronous path.exists() check and then tokio::fs::read(), which is both a blocking check on the async path and a TOCTTOU race (eviction can delete the file between the check and the read). Prefer attempting the async read directly and treating NotFound as a cache miss (or use tokio::fs::try_exists).
apps/block-proxy/src/cache/mod.rs
Outdated
| let compressed = tokio::fs::read(&path).await?; | ||
| let decompressed = zstd::decode_all(compressed.as_slice())?; | ||
|
|
||
| tracing::debug!(height, bytes = decompressed.len(), "cache hit"); | ||
| Ok(Some(Bytes::from(decompressed))) |
There was a problem hiding this comment.
zstd::decode_all runs CPU-heavy decompression on the async runtime thread. To avoid starving tokio worker threads under load, move compression/decompression work into spawn_blocking (or use a streaming async-friendly approach).
apps/block-proxy/Dockerfile
Outdated
| FROM rust:1-bookworm AS builder | ||
| WORKDIR /app | ||
| COPY apps/block-proxy/ . | ||
| RUN cargo build --release --bin block-proxy |
There was a problem hiding this comment.
PR description mentions using cargo-chef for cached builds, but this Dockerfile does a plain cargo build and will rebuild dependencies whenever sources change. Either update the PR description to match the current Dockerfile or switch this Dockerfile to a cargo-chef (or similar) layered dependency build to get the promised caching behavior.
| ); | ||
| state.cache.write_background(height, bytes.clone()); | ||
| return Ok((bytes, "fastnear")); |
There was a problem hiding this comment.
Same as above: this upstream path writes to the filesystem cache even when cache_enabled is false. Please conditionally call write_background only when caching is enabled (or centralize the check inside CacheStore).
d3cb4ca to
9c35747
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 40 out of 42 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
apps/block-proxy/src/cache/mod.rs
Outdated
|
|
||
| // Only evict if within the recent block window of the tip. | ||
| // Historical blocks are permanent. | ||
| let is_recent = tip >= recent_block_window && height > tip - recent_block_window; |
There was a problem hiding this comment.
Cache eviction determines is_recent as tip >= recent_block_window && height > tip - recent_block_window, which means when tip < recent_block_window no blocks are considered recent and nothing will ever be evicted (even though all blocks are within the window). Consider treating tip < recent_block_window as “everything up to tip is recent” (or adjust the predicate) so TTL eviction works on small/test chains too.
| let is_recent = tip >= recent_block_window && height > tip - recent_block_window; | |
| let is_recent = if tip >= recent_block_window { | |
| // For sufficiently long chains, consider blocks within `recent_block_window` | |
| // of the tip as "recent". | |
| height > tip - recent_block_window | |
| } else { | |
| // For short/test chains where `tip < recent_block_window`, treat all | |
| // blocks up to the current tip as "recent" so TTL eviction still applies. | |
| height <= tip | |
| }; |
apps/block-proxy/src/admin/toggle.rs
Outdated
| match get_toggle(&state, &source) { | ||
| Some(toggle) => { | ||
| toggle.store(true, Ordering::SeqCst); | ||
| tracing::info!(source = %source, "upstream enabled via admin API"); | ||
| (StatusCode::OK, Json(json!({ "source": source, "enabled": true }))).into_response() | ||
| } |
There was a problem hiding this comment.
/admin/upstreams/{source}/enable will happily flip the AtomicBool even when the upstream client isn’t configured/constructed (e.g., state.s3/state.near_lake is None). This can lead to /stats and the dashboard showing a source as enabled even though it can never be used. Consider rejecting enable requests (400/409) when the upstream isn’t available, or auto-initializing the upstream on first enable.
apps/block-proxy/src/upstream/s3.rs
Outdated
| /// Returns `None` if `s3_endpoint` or `s3_bucket` is not configured (both required | ||
| /// for MinIO access). AWS credentials are read from config fields when provided. |
There was a problem hiding this comment.
The doc comment implies new() returns None only if s3_endpoint/s3_bucket are missing and that credentials are used “when provided”, but the implementation also requires s3_access_key + s3_secret_key (?). Please update the comment to match, or support the AWS default credential chain when keys aren’t set.
| /// Returns `None` if `s3_endpoint` or `s3_bucket` is not configured (both required | |
| /// for MinIO access). AWS credentials are read from config fields when provided. | |
| /// Returns `None` if any of `s3_endpoint`, `s3_bucket`, `s3_access_key`, or | |
| /// `s3_secret_key` is not configured. All four fields are required for MinIO | |
| /// access and for AWS S3 when using static credentials from config. |
apps/block-proxy/Dockerfile
Outdated
| @@ -0,0 +1,23 @@ | |||
| FROM rust:1-bookworm AS chef | |||
| RUN cargo install cargo-chef | |||
There was a problem hiding this comment.
The cargo install cargo-chef step pulls and executes an unpinned third‑party build tool on every image build, which is a supply chain risk: a compromised or hijacked cargo-chef release could execute arbitrary code in the builder and silently backdoor the resulting block-proxy binary. Because this runs with full control over the build artifacts, an attacker could inject malicious logic into the production image. To mitigate this, pin cargo-chef to a specific, vetted version (and update only after review) or vendor/build it from a fixed commit rather than relying on the moving latest version.
- Rust-based block proxy with multi-upstream support (S3/MinIO, fastnear, NEAR Lake) - Filesystem cache with zstd compression and TTL-based eviction - Singleflight request deduplication - Prometheus metrics and JSON stats endpoint on admin port - Graceful shutdown with SIGTERM handling - Docker build with pinned cargo-chef - nb-blocks-proxy TypeScript package for streaming blocks - Staking indexer migrated to use block-proxy Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
9c35747 to
5077185
Compare
Adds a new Rust-based block proxy service (apps/block-proxy) that sits between indexers and upstream block data sources, providing caching, deduplication, and transparent failover.