diff --git a/Cargo.lock b/Cargo.lock index 71ce2cf7..d417de39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1672,6 +1672,46 @@ dependencies = [ "vec_map", ] +[[package]] +name = "clap" +version = "4.5.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim 0.11.1", +] + +[[package]] +name = "clap_derive" +version = "4.5.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.111", +] + +[[package]] +name = "clap_lex" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" + [[package]] name = "clickhouse" version = "0.14.1" @@ -2145,7 +2185,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" dependencies = [ "data-encoding", - "syn 2.0.111", + "syn 1.0.109", ] [[package]] @@ -3176,6 +3216,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -3484,7 +3530,7 @@ dependencies = [ "percent-encoding 2.3.2", "pin-project-lite", "socket2 0.6.1", - "system-configuration 0.6.1", + "system-configuration", "tokio", "tower-service", "tracing", @@ -3840,11 +3886,13 @@ dependencies = [ "dashmap", "fnv", "futures-util", + "libc", "log", "multihash", "once_cell", "prost", "reqwest 0.12.24", + "ripget", "rseek", "serde", "serde_cbor", @@ -5272,7 +5320,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ "bytes", - "heck", + "heck 0.4.1", "itertools 0.10.5", "lazy_static", "log", @@ -5689,7 +5737,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 0.1.2", - "system-configuration 0.5.1", + "system-configuration", "tokio", "tokio-native-tls", "tokio-rustls 0.24.1", @@ -5802,6 +5850,23 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ripget" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5430a564aaf6e9413e9ad5f27b629211c0ef7c746bd034714de4e97cb312ef4" +dependencies = [ + "clap 4.5.60", + "env_logger", + "futures-util", + "indicatif 0.18.3", + "log", + "reqwest 0.12.24", + "thiserror 1.0.69", + "tokio", + "tokio-util 0.7.17", +] + [[package]] name = "rocksdb" version = "0.23.0" @@ -6904,7 +6969,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b45846c3741bc62c5695c495e4a27f830e65cbde7dd244809337768ae6ad0b6" dependencies = [ "chrono", - "clap", + "clap 2.34.0", "rpassword", "solana-bls-signatures", "solana-clock", @@ -6951,7 +7016,7 @@ dependencies = [ "agave-reserved-account-keys", "base64 0.22.1", "chrono", - "clap", + "clap 2.34.0", "console 0.16.1", "humantime", "indicatif 0.18.3", @@ -7369,7 +7434,7 @@ checksum = "5ef05bab4daca11502c90fc4febe6e9741b2a229008bfe1af8c85913d1f86515" dependencies = [ "agave-logger", "bincode", - "clap", + "clap 2.34.0", "crossbeam-channel", "log", "serde", @@ -7546,7 +7611,7 @@ dependencies = [ "assert_matches", "bincode", "bv", - "clap", + "clap 2.34.0", "crossbeam-channel", "flate2", "indexmap 2.12.1", @@ -10098,7 +10163,7 @@ version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "rustversion", @@ -10185,18 +10250,7 @@ checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ "bitflags 1.3.2", "core-foundation 0.9.4", - "system-configuration-sys 0.5.0", -] - -[[package]] -name = "system-configuration" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" -dependencies = [ - "bitflags 2.10.0", - "core-foundation 0.9.4", - "system-configuration-sys 0.6.0", + "system-configuration-sys", ] [[package]] @@ -10209,16 +10263,6 @@ dependencies = [ "libc", ] -[[package]] -name = "system-configuration-sys" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "tap" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index c0f3da91..4661d7a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,7 @@ thousands = "0" tokio-stream = { version = "0", default-features = false } rangemap = "1" rseek = ">= 0.2" +ripget = "0.2" rayon = "1" xxhash-rust = { version = "0.8", features = ["xxh64"] } dashmap = "5" diff --git a/README.md b/README.md index 00d1d7c3..1b893157 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,20 @@ If `JETSTREAMER_THREADS` is omitted, Jetstreamer auto-sizes the worker pool usin hardware-aware heuristic exposed by `jetstreamer_firehose::system::optimal_firehose_thread_count`. +For sequential replay mode, enable `--sequential` (or `JETSTREAMER_SEQUENTIAL=1`). In this mode +Jetstreamer uses a single firehose worker and reuses `JETSTREAMER_THREADS` as ripget parallel +download concurrency: + +```bash +# Sequential mode with CLI flag +JETSTREAMER_THREADS=4 cargo run --release -- 800 --sequential + +# Sequential mode with explicit ripget window override +JETSTREAMER_SEQUENTIAL=1 JETSTREAMER_BUFFER_WINDOW=4GiB cargo run --release -- 800 +``` + +`JETSTREAMER_BUFFER_WINDOW` defaults to `min(4 GiB, 15% of available RAM)` when unset. + The built-in program and instruction tracking plugins now record vote and non-vote activity separately. `program_invocations` includes an `is_vote` flag per row, while `slot_instructions` stores separate vote/non-vote instruction and transaction counts. diff --git a/jetstreamer-firehose/Cargo.toml b/jetstreamer-firehose/Cargo.toml index d736451a..f8a29072 100644 --- a/jetstreamer-firehose/Cargo.toml +++ b/jetstreamer-firehose/Cargo.toml @@ -41,7 +41,7 @@ solana-signature.workspace = true solana-sdk-ids.workspace = true prost_011.workspace = true solana-entry.workspace = true -reqwest.workspace = true +reqwest = { workspace = true, features = ["rustls-tls", "stream"] } tokio.workspace = true futures-util.workspace = true serde = { workspace = true, features = ["derive"] } @@ -53,6 +53,7 @@ wincode.workspace = true solana-logger.workspace = true log.workspace = true rseek.workspace = true +ripget.workspace = true crc.workspace = true serde_cbor.workspace = true fnv.workspace = true @@ -64,6 +65,7 @@ xxhash-rust.workspace = true dashmap.workspace = true once_cell.workspace = true url.workspace = true +libc.workspace = true aws-credential-types = { workspace = true, optional = true } aws-sdk-s3 = { workspace = true, optional = true } diff --git a/jetstreamer-firehose/README.md b/jetstreamer-firehose/README.md index 14feeaca..1f52d9a8 100644 --- a/jetstreamer-firehose/README.md +++ b/jetstreamer-firehose/README.md @@ -28,10 +28,12 @@ https://github.com/rpcpool/yellowstone-faithful/tree/main/geyser-plugin-runner - `JETSTREAMER_NETWORK_CAPACITY_MB` (default `1000`): assumed network throughput in megabytes per second when sizing the firehose thread pool. Increase or decrease to match your host's effective bandwidth. - Notes: - `JETSTREAMER_HTTP_BASE_URL` and `JETSTREAMER_COMPACT_INDEX_BASE_URL` accept both full HTTP(S) URLs and `s3://bucket/...` URIs; the latter automatically activates the S3 transport layer. - Changing `JETSTREAMER_NETWORK` also alters the in-memory cache namespace, so you can switch networks without cross-contaminating cached offsets. +- Sequential-mode ripget buffering is configured via the `buffer_window_bytes` parameter on + `firehose::firehose(...)`. If you run through the top-level `jetstreamer` binary crate, that + layer exposes `JETSTREAMER_BUFFER_WINDOW` and forwards it to firehose. diff --git a/jetstreamer-firehose/src/epochs.rs b/jetstreamer-firehose/src/epochs.rs index b8771e58..2ae92519 100644 --- a/jetstreamer-firehose/src/epochs.rs +++ b/jetstreamer-firehose/src/epochs.rs @@ -1,4 +1,5 @@ use reqwest::Client; +use ripget::{WindowedDownload, WindowedDownloadOptions, download_url_windowed}; use rseek::Seekable; use serde::Deserialize; use std::{fmt, io, pin::Pin}; @@ -99,6 +100,107 @@ impl Len for BufReader { } } +/// Controls how epoch CAR streams are opened for [`fetch_epoch_stream_with_options`]. +#[derive(Clone, Debug)] +pub struct FetchEpochStreamOptions { + /// When `true`, stream bytes sequentially through ripget's windowed downloader. + pub sequential: bool, + /// Parallel range request count used by ripget when `sequential` is enabled. + pub ripget_threads: usize, + /// Total hot/cold window size in bytes for ripget windowed streaming. + pub buffer_window_bytes: u64, + /// Pre-built ripget HTTP client for connection reuse across epoch downloads. + pub ripget_client: Option, +} + +impl FetchEpochStreamOptions { + /// Returns default options that preserve the legacy seekable behavior. + pub fn parallel_default() -> Self { + Self { + sequential: false, + ripget_threads: 1, + buffer_window_bytes: 2, + ripget_client: None, + } + } +} + +struct RipgetEpochReader { + inner: WindowedDownload, + len: u64, + position: u64, +} + +impl RipgetEpochReader { + async fn new( + url: impl AsRef, + threads: usize, + buffer_window_bytes: u64, + ripget_client: Option, + ) -> Result { + let mut options = WindowedDownloadOptions::new(buffer_window_bytes.max(2)) + .threads(std::cmp::max(1, threads)) + .user_agent(format!( + "jetstreamer-firehose/{}", + env!("CARGO_PKG_VERSION") + )); + if let Some(c) = ripget_client { + options = options.client(c); + } + let inner = download_url_windowed(url.as_ref(), options).await?; + let len = inner.expected_len(); + Ok(Self { + inner, + len, + position: 0, + }) + } +} + +impl AsyncRead for RipgetEpochReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + let before = buf.filled().len(); + let result = Pin::new(&mut this.inner).poll_read(cx, buf); + if let std::task::Poll::Ready(Ok(())) = &result { + let after = buf.filled().len(); + let delta = after.saturating_sub(before) as u64; + this.position = this.position.saturating_add(delta); + } + result + } +} + +impl AsyncSeek for RipgetEpochReader { + fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { + if matches!(position, SeekFrom::Current(0)) { + return Ok(()); + } + Err(io::Error::new( + io::ErrorKind::Unsupported, + "seek is not supported for ripget windowed streams", + )) + } + + fn poll_complete( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + std::task::Poll::Ready(Ok(this.position)) + } +} + +impl Len for RipgetEpochReader { + fn len(&self) -> u64 { + self.len + } +} + /// Checks the configured archive backend to determine whether an epoch CAR exists. pub async fn epoch_exists(epoch: u64, client: &Client) -> bool { let location = archive::car_location(); @@ -136,6 +238,19 @@ pub async fn epoch_exists(epoch: u64, client: &Client) -> bool { /// Fetches an epoch’s CAR file from the configured archive backend as a buffered, seekable stream. pub async fn fetch_epoch_stream(epoch: u64, client: &Client) -> EpochStream { + fetch_epoch_stream_with_options(epoch, client, None).await +} + +/// Fetches an epoch’s CAR file with explicit stream options. +/// +/// In sequential mode, arbitrary seeking is not supported and seek requests other than +/// `SeekFrom::Current(0)` return `io::ErrorKind::Unsupported`. +pub async fn fetch_epoch_stream_with_options( + epoch: u64, + client: &Client, + options: Option, +) -> EpochStream { + let options = options.unwrap_or_else(FetchEpochStreamOptions::parallel_default); let location = archive::car_location(); let path = format!("{epoch}/epoch-{epoch}.car"); @@ -145,6 +260,28 @@ pub async fn fetch_epoch_stream(epoch: u64, client: &Client) -> EpochStream { .join(&path) .unwrap_or_else(|err| panic!("invalid CAR URL for epoch {epoch}: {err}")); let request_url = url.to_string(); + if options.sequential { + match RipgetEpochReader::new( + request_url.clone(), + options.ripget_threads, + options.buffer_window_bytes, + options.ripget_client, + ) + .await + { + Ok(reader) => { + return EpochStream::new(BufReader::with_capacity(8 * 1024 * 1024, reader)); + } + Err(err) => { + log::warn!( + target: crate::LOG_MODULE, + "ripget windowed stream failed to initialize for epoch {} ({}), falling back to seekable stream", + epoch, + err + ); + } + } + } let http_client = client.clone(); let seekable = Seekable::new(move || http_client.get(request_url.clone())).await; let reader = BufReader::with_capacity(8 * 1024 * 1024, seekable); diff --git a/jetstreamer-firehose/src/firehose.rs b/jetstreamer-firehose/src/firehose.rs index 857150fd..de0fad3b 100644 --- a/jetstreamer-firehose/src/firehose.rs +++ b/jetstreamer-firehose/src/firehose.rs @@ -36,7 +36,10 @@ use tokio::{ use crate::{ LOG_MODULE, SharedError, - epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch}, + epochs::{ + FetchEpochStreamOptions, epoch_to_slot_range, fetch_epoch_stream, + fetch_epoch_stream_with_options, slot_to_epoch, + }, index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError, slot_to_offset}, node_reader::NodeReader, utils, @@ -46,6 +49,7 @@ use crate::{ // header, seeking, reading next block). Adjust here to tune stall detection/restart // aggressiveness. const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); +const OP_TIMEOUT_SEQUENTIAL: std::time::Duration = std::time::Duration::from_secs(180); // Epochs earlier than this were bincode-encoded in Old Faithful. const BINCODE_EPOCH_CUTOFF: u64 = 157; @@ -398,7 +402,7 @@ fn decode_transaction_status_meta_from_frame( return Ok(solana_transaction_status::TransactionStatusMeta::default()); } - match utils::decompress_zstd(reassembled_metadata.clone()) { + match utils::decompress_zstd(reassembled_metadata.as_slice()) { Ok(decompressed) => { decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| { Box::new(std::io::Error::other(format!( @@ -442,7 +446,7 @@ fn decode_rewards_from_frame( return Ok(DecodedRewards::empty()); } - match utils::decompress_zstd(reassembled_rewards.clone()) { + match utils::decompress_zstd(reassembled_rewards.as_slice()) { Ok(decompressed) => decode_rewards_from_bytes(slot, decompressed.as_slice()).map_err( |err| { Box::new(std::io::Error::other(format!( @@ -829,10 +833,19 @@ pub struct FirehoseErrorContext { /// /// The requested `slot_range` is half-open `[start, end)`; on recoverable errors the /// runner restarts from the last processed slot to maintain coverage. +/// +/// When `sequential` is `true`, the firehose uses one worker thread and opens epoch streams +/// with ripget's parallel windowed downloader. In this mode `threads` configures ripget range +/// concurrency rather than firehose worker partitioning. +/// +/// `buffer_window_bytes` controls the ripget hot/cold window when `sequential` is enabled. +/// Pass `None` to use the default (`min(4 GiB, 15% of available RAM)`). #[inline] #[allow(clippy::too_many_arguments)] pub async fn firehose( threads: u64, + sequential: bool, + buffer_window_bytes: Option, slot_range: Range, on_block: Option, on_tx: Option, @@ -859,12 +872,25 @@ where let client = crate::network::create_http_client(); log::info!(target: LOG_MODULE, "starting firehose..."); log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url()); + let firehose_threads = if sequential { 1 } else { threads }; + let sequential_download_threads = std::cmp::max(1, threads as usize); + let sequential_buffer_window_bytes = buffer_window_bytes + .filter(|value| *value >= 2) + .unwrap_or_else(crate::system::default_firehose_buffer_window_bytes); + if sequential { + log::info!( + target: LOG_MODULE, + "sequential mode enabled: firehose_threads=1, ripget_threads={}, ripget_window={}", + sequential_download_threads, + crate::system::format_byte_size(sequential_buffer_window_bytes) + ); + } let slot_range = Arc::new(slot_range); // divide slot_range into n subranges - let subranges = generate_subranges(&slot_range, threads); - if threads > 1 { + let subranges = generate_subranges(&slot_range, firehose_threads); + if firehose_threads > 1 { log::debug!(target: LOG_MODULE, "⚡ thread sub-ranges: {:?}", subranges); } @@ -880,6 +906,20 @@ where } }); } + + // Build a shared ripget HTTP client so TCP connections survive across epoch transitions. + let shared_ripget_client: Option = if sequential { + Some( + ripget::build_client(Some(&format!( + "jetstreamer-firehose/{}", + env!("CARGO_PKG_VERSION") + ))) + .expect("failed to build ripget HTTP client"), + ) + } else { + None + }; + let mut handles = Vec::new(); // Shared per-thread error counters let error_counts: Arc> = @@ -915,6 +955,10 @@ where let shutdown_flag = shutdown_flag.clone(); let pending_skipped_slots = pending_skipped_slots.clone(); let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe()); + let sequential_mode = sequential; + let ripget_threads = sequential_download_threads; + let ripget_buffer_window_bytes = sequential_buffer_window_bytes; + let ripget_client = shared_ripget_client.clone(); let handle = tokio::spawn(async move { let transactions_since_stats = transactions_since_stats_cloned; @@ -962,6 +1006,11 @@ where // let mut triggered = false; while let Err((err, slot)) = async { let mut last_emitted_slot = last_emitted_slot_global; + let op_timeout = if sequential_mode { + OP_TIMEOUT_SEQUENTIAL + } else { + OP_TIMEOUT + }; if poll_shutdown(&shutdown_flag, &mut shutdown_rx) { log::info!( target: &log_target, @@ -994,25 +1043,6 @@ where return Ok(()); } log::info!(target: &log_target, "entering epoch {}", epoch_num); - let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await { - Ok(stream) => stream, - Err(_) => { - return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start))); - } - }; - let mut reader = NodeReader::new(stream); - - let header_fut = reader.read_raw_header(); - let header = match timeout(OP_TIMEOUT, header_fut).await { - Ok(res) => res - .map_err(FirehoseError::ReadHeader) - .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?, - Err(_) => { - return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start))); - } - }; - log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header); - let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num); let local_start = std::cmp::max(slot_range.start, epoch_start); let local_end_inclusive = @@ -1027,6 +1057,49 @@ where ); continue; } + let use_sequential_stream = sequential_mode && local_start == epoch_start; + let stream = match timeout(op_timeout, async { + if use_sequential_stream { + fetch_epoch_stream_with_options( + epoch_num, + &client, + Some(FetchEpochStreamOptions { + sequential: true, + ripget_threads, + buffer_window_bytes: ripget_buffer_window_bytes, + ripget_client: ripget_client.clone(), + }), + ) + .await + } else { + fetch_epoch_stream(epoch_num, &client).await + } + }) + .await + { + Ok(stream) => stream, + Err(_) => { + return Err(( + FirehoseError::OperationTimeout("fetch_epoch_stream"), + current_slot.unwrap_or(slot_range.start), + )); + } + }; + let mut reader = NodeReader::new(stream); + + let header_fut = reader.read_raw_header(); + let header = match timeout(op_timeout, header_fut).await { + Ok(res) => res + .map_err(FirehoseError::ReadHeader) + .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?, + Err(_) => { + return Err(( + FirehoseError::OperationTimeout("read_raw_header"), + current_slot.unwrap_or(slot_range.start), + )); + } + }; + log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header); let mut previous_blockhash = Hash::default(); let mut latest_entry_blockhash = Hash::default(); @@ -1061,7 +1134,7 @@ where }; if let Some(seek_slot) = seek_slot { let seek_fut = reader.seek_to_slot(seek_slot); - match timeout(OP_TIMEOUT, seek_fut).await { + match timeout(op_timeout, seek_fut).await { Ok(res) => { res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))? } @@ -1088,7 +1161,7 @@ where return Ok(()); } let read_fut = reader.read_until_block(); - let nodes = match timeout(OP_TIMEOUT, read_fut).await { + let nodes = match timeout(op_timeout, read_fut).await { Ok(result) => result .map_err(FirehoseError::ReadUntilBlockError) .map_err(|e| { @@ -1219,7 +1292,7 @@ where ) })?; let reassembled_metadata = nodes - .reassemble_dataframes(tx.metadata.clone()) + .reassemble_dataframes(&tx.metadata) .map_err(|err| { ( FirehoseError::NodeDecodingError(item_index, err), @@ -1281,8 +1354,8 @@ where signature: *signature, message_hash, is_vote, - transaction_status_meta: as_native_metadata.clone(), - transaction: versioned_tx.clone(), + transaction_status_meta: as_native_metadata, + transaction: versioned_tx, }, ) .await @@ -1544,7 +1617,7 @@ where Rewards(rewards) => { if reward_enabled || block_enabled { let reassembled = nodes - .reassemble_dataframes(rewards.data.clone()) + .reassemble_dataframes(&rewards.data) .map_err(|err| { ( FirehoseError::NodeDecodingError(item_index, err), @@ -2181,7 +2254,7 @@ async fn firehose_geyser_thread( match node { Transaction(tx) => { let versioned_tx = tx.as_parsed()?; - let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?; + let reassembled_metadata = nodes.reassemble_dataframes(&tx.metadata)?; let as_native_metadata = decode_transaction_status_meta_from_frame( block.slot, @@ -2288,7 +2361,7 @@ async fn firehose_geyser_thread( Subset(_subset) => (), Epoch(_epoch) => (), Rewards(rewards) => { - let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?; + let reassembled = nodes.reassemble_dataframes(&rewards.data)?; if !reassembled.is_empty() { this_block_rewards = decode_rewards_from_frame( block.slot, @@ -2593,6 +2666,8 @@ async fn assert_slot_min_executed_transactions(slot: u64, min_executed: u64) { firehose( 1, + false, + None, target_slot_block..(target_slot_block + 1), Some(move |_thread_id: usize, block: BlockData| { let found_block = found_block.clone(); @@ -2802,6 +2877,8 @@ async fn test_firehose_epoch_800() { firehose( THREADS.try_into().unwrap(), + false, + None, (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2), Some(|thread_id: usize, block: BlockData| { async move { @@ -2905,6 +2982,8 @@ async fn test_firehose_target_slot_transactions() { firehose( 4, + false, + None, (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS), Some(|_thread_id: usize, block: BlockData| { async move { @@ -2964,6 +3043,78 @@ async fn test_firehose_target_slot_transactions() { ); } +#[cfg(test)] +#[serial] +#[tokio::test(flavor = "multi_thread")] +async fn test_firehose_epoch_900_boundary_window_sequential_monotonic_transactions() { + use std::sync::{ + Arc, Mutex, + atomic::{AtomicU64, Ordering}, + }; + + solana_logger::setup_with_default("info"); + const SLOT_COUNT: u64 = 100; + const THREADS: u64 = 4; + const TEST_BUFFER_WINDOW: &str = "4GiB"; + + let (epoch_900_start, _) = epoch_to_slot_range(900); + let slot_range = (epoch_900_start - SLOT_COUNT)..(epoch_900_start + SLOT_COUNT); + + let last_seen_tx_slot = Arc::new(Mutex::new(slot_range.start)); + let observed_txs = Arc::new(AtomicU64::new(0)); + let stats_tracking = StatsTracking { + on_stats: log_stats_handler, + tracking_interval_slots: 100, + }; + let test_buffer_window_bytes = crate::system::parse_buffer_window_bytes(TEST_BUFFER_WINDOW) + .expect("valid test buffer window"); + + firehose( + THREADS, + true, + Some(test_buffer_window_bytes), + slot_range.clone(), + None::, + Some({ + let last_seen_tx_slot = last_seen_tx_slot.clone(); + let observed_txs = observed_txs.clone(); + move |_thread_id: usize, transaction: TransactionData| { + let last_seen_tx_slot = last_seen_tx_slot.clone(); + let observed_txs = observed_txs.clone(); + async move { + let mut previous = last_seen_tx_slot.lock().unwrap(); + // Old Faithful does not include leader-skipped slots, so gaps are + // expected. We only enforce monotonic (non-decreasing) tx slot ordering. + assert!( + transaction.slot >= *previous, + "transaction slot regressed: prev={}, current={}", + *previous, + transaction.slot + ); + *previous = transaction.slot; + observed_txs.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + .boxed() + } + }), + None::, + None::, + None::, + Some(stats_tracking), + None, + ) + .await + .unwrap(); + + assert!( + observed_txs.load(Ordering::Relaxed) > 0, + "expected to observe at least one transaction in slots [{}, {})", + slot_range.start, + slot_range.end + ); +} + #[cfg(test)] #[serial] #[tokio::test(flavor = "multi_thread")] @@ -3026,6 +3177,8 @@ async fn test_firehose_epoch_850_has_logs() { firehose( 4, + false, + None, START_SLOT..(START_SLOT + SLOT_COUNT), None::, Some(|_thread_id: usize, transaction: TransactionData| { @@ -3070,6 +3223,8 @@ async fn test_firehose_epoch_850_votes_present() { firehose( 2, + false, + None, (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS), Some(|_thread_id: usize, block: BlockData| { async move { @@ -3139,6 +3294,8 @@ async fn test_firehose_restart_loses_coverage_without_reset() { firehose( THREADS.try_into().unwrap(), + false, + None, START_SLOT..(START_SLOT + NUM_SLOTS), Some(|_thread_id: usize, block: BlockData| { async move { @@ -3200,6 +3357,8 @@ async fn test_firehose_gap_coverage_near_known_missing_range() { firehose( THREADS.try_into().unwrap(), + false, + None, START_SLOT..(END_SLOT + 1), Some(|_thread_id: usize, block: BlockData| { async move { diff --git a/jetstreamer-firehose/src/lib.rs b/jetstreamer-firehose/src/lib.rs index 8913810b..bc7b3fd5 100644 --- a/jetstreamer-firehose/src/lib.rs +++ b/jetstreamer-firehose/src/lib.rs @@ -42,6 +42,11 @@ //! S3 transports are compiled behind the `s3-backend` Cargo feature. Enable that feature when //! building if you plan to stream from `s3://` URIs instead of HTTP mirrors. //! +//! Sequential-mode ripget buffering is configured on the [`firehose::firehose`] call via +//! `buffer_window_bytes`. Pass `None` to use the built-in default (`min(4 GiB, 15% of available +//! RAM)`). If you're using [`jetstreamer`](https://crates.io/crates/jetstreamer)'s binary runner, +//! that layer exposes `JETSTREAMER_BUFFER_WINDOW` and forwards the parsed value here. +//! //! # Limitations //! Old Faithful currently publishes blocks, transactions, epochs, and reward metadata but does //! not ship account updates. The firehose mirrors that limitation; plan on a separate data @@ -131,6 +136,8 @@ //! //! firehose::firehose( //! 4, +//! false, +//! None, //! slot_range, //! Some(block_handler()), //! Some(tx_handler()), diff --git a/jetstreamer-firehose/src/network.rs b/jetstreamer-firehose/src/network.rs index d8dda2f7..6b14fb9e 100644 --- a/jetstreamer-firehose/src/network.rs +++ b/jetstreamer-firehose/src/network.rs @@ -16,6 +16,8 @@ pub fn create_http_client() -> Client { let user_agent = format!("jetstreamer/v{}", version); Client::builder() .user_agent(user_agent) + // Prefer rustls even when other workspace dependencies enable native-tls. + .use_rustls_tls() .build() .expect("failed to create HTTP client") } diff --git a/jetstreamer-firehose/src/node.rs b/jetstreamer-firehose/src/node.rs index 7d955594..7827064a 100644 --- a/jetstreamer-firehose/src/node.rs +++ b/jetstreamer-firehose/src/node.rs @@ -5,6 +5,7 @@ use { crc::{CRC_64_GO_ISO, Crc}, fnv::FnvHasher, std::{ + collections::HashMap, fmt, io::{self, Read}, vec::Vec, @@ -39,16 +40,19 @@ impl NodeWithCid { pub struct NodesWithCids( #[doc = "Ordered collection of nodes paired with their content identifiers."] pub Vec, + #[doc(hidden)] HashMap, ); impl NodesWithCids { /// Creates an empty [`NodesWithCids`]. - pub const fn new() -> NodesWithCids { - NodesWithCids(vec![]) + pub fn new() -> NodesWithCids { + NodesWithCids(vec![], HashMap::new()) } /// Appends a node to the collection. pub fn push(&mut self, node_with_cid: NodeWithCid) { + let index = self.0.len(); + self.1.insert(*node_with_cid.get_cid(), index); self.0.push(node_with_cid); } @@ -69,45 +73,43 @@ impl NodesWithCids { /// Looks up a node by CID. pub fn get_by_cid(&self, cid: &Cid) -> Option<&NodeWithCid> { - self.0 - .iter() - .find(|&node_with_cid| node_with_cid.get_cid() == cid) + self.1.get(cid).and_then(|index| self.0.get(*index)) } /// Reassembles a potentially multi-part dataframe using the nodes in the collection. pub fn reassemble_dataframes( &self, - first_dataframe: dataframe::DataFrame, + first_dataframe: &dataframe::DataFrame, ) -> Result, SharedError> { - let mut data = first_dataframe.data.to_vec(); - let mut next_arr = first_dataframe.next; - while next_arr.is_some() { - for next_cid in next_arr.clone().unwrap() { - let next_node = self.get_by_cid(&next_cid); - if next_node.is_none() { - return Err(Box::new(std::io::Error::other(std::format!( + let mut data = Vec::with_capacity(first_dataframe.data.len()); + data.extend_from_slice(first_dataframe.data.as_slice()); + + let mut next_arr = first_dataframe.next.as_deref(); + while let Some(next_cids) = next_arr { + let mut next_segment = None; + for next_cid in next_cids { + let next_node = self.get_by_cid(next_cid).ok_or_else(|| { + Box::new(std::io::Error::other(std::format!( "Missing CID: {:?}", next_cid - )))); - } - let next_node_un = next_node.unwrap(); + ))) as SharedError + })?; - if !next_node_un.get_node().is_dataframe() { - return Err(Box::new(std::io::Error::other(std::format!( + let next_dataframe = next_node.get_node().get_dataframe().ok_or_else(|| { + Box::new(std::io::Error::other(std::format!( "Expected DataFrame, got {:?}", - next_node_un.get_node() - )))); - } + next_node.get_node() + ))) as SharedError + })?; - let next_dataframe = next_node_un.get_node().get_dataframe().unwrap(); - data.extend(next_dataframe.data.to_vec()); - next_arr.clone_from(&next_dataframe.next); + data.extend_from_slice(next_dataframe.data.as_slice()); + next_segment = next_dataframe.next.as_deref(); } + next_arr = next_segment; } - if first_dataframe.hash.is_some() { - let wanted_hash = first_dataframe.hash.unwrap(); - verify_hash(data.clone(), wanted_hash)?; + if let Some(wanted_hash) = first_dataframe.hash { + verify_hash(&data, wanted_hash)?; } Ok(data) } @@ -152,11 +154,11 @@ impl NodesWithCids { } /// Validates the provided data against the expected CRC64 (or legacy FNV) hash. -pub fn verify_hash(data: Vec, hash: u64) -> Result<(), SharedError> { - let crc64 = checksum_crc64(&data); +pub fn verify_hash(data: &[u8], hash: u64) -> Result<(), SharedError> { + let crc64 = checksum_crc64(data); if crc64 != hash { // Maybe it's the legacy checksum function? - let fnv = checksum_fnv(&data); + let fnv = checksum_fnv(data); if fnv != hash { return Err(Box::new(std::io::Error::other(std::format!( "data hash mismatch: wanted {:?}, got crc64={:?}, fnv={:?}", diff --git a/jetstreamer-firehose/src/system.rs b/jetstreamer-firehose/src/system.rs index 2169d6d0..91c1037e 100644 --- a/jetstreamer-firehose/src/system.rs +++ b/jetstreamer-firehose/src/system.rs @@ -3,6 +3,10 @@ use std::cmp; /// Environment variable that overrides detected network throughput in megabytes. const NETWORK_CAPACITY_OVERRIDE_ENV: &str = "JETSTREAMER_NETWORK_CAPACITY_MB"; const DEFAULT_NETWORK_CAPACITY_MB: u64 = 1_000; +const DEFAULT_BUFFER_WINDOW_FALLBACK_BYTES: u64 = 512 * 1024 * 1024; +const DEFAULT_BUFFER_WINDOW_PERCENT_NUMERATOR: u64 = 15; +const DEFAULT_BUFFER_WINDOW_PERCENT_DENOMINATOR: u64 = 100; +const DEFAULT_BUFFER_WINDOW_MAX_BYTES: u64 = 4 * 1024 * 1024 * 1024; /// Calculates an optimal number of firehose threads for the current machine. /// @@ -20,6 +24,23 @@ pub fn optimal_firehose_thread_count() -> usize { compute_optimal_thread_count(detect_cpu_core_count(), detect_network_capacity_megabytes()) } +/// Returns the default ripget sequential download window size in bytes. +/// +/// The default is the lower of 15% of detected available RAM and 4 GiB, falling back to 512 MiB +/// when RAM cannot be detected. +#[inline] +pub fn default_firehose_buffer_window_bytes() -> u64 { + default_buffer_window_bytes() +} + +/// Parses a human-readable byte size (for example `4GiB`, `512mb`, or `1073741824`). +/// +/// Returns `None` when the input is invalid or smaller than two bytes. +#[inline] +pub fn parse_buffer_window_bytes(value: &str) -> Option { + parse_byte_size(value).filter(|parsed| *parsed >= 2) +} + #[inline(always)] fn detect_cpu_core_count() -> usize { std::thread::available_parallelism() @@ -39,6 +60,110 @@ fn network_capacity_override() -> Option { .filter(|value| *value > 0) } +fn default_buffer_window_bytes() -> u64 { + let computed = detect_available_memory_bytes() + .map(compute_default_buffer_window_bytes) + .unwrap_or(DEFAULT_BUFFER_WINDOW_FALLBACK_BYTES); + computed.max(2) +} + +#[inline(always)] +fn compute_default_buffer_window_bytes(available_memory_bytes: u64) -> u64 { + let window = (available_memory_bytes as u128) + .saturating_mul(DEFAULT_BUFFER_WINDOW_PERCENT_NUMERATOR as u128) + / (DEFAULT_BUFFER_WINDOW_PERCENT_DENOMINATOR as u128); + window.min(DEFAULT_BUFFER_WINDOW_MAX_BYTES as u128) as u64 +} + +/// Formats a byte count as a human-readable string (e.g. `1.5 GiB`). +pub fn format_byte_size(bytes: u64) -> String { + const GIB: u64 = 1_073_741_824; + const MIB: u64 = 1_048_576; + const KIB: u64 = 1_024; + if bytes >= GIB && bytes % GIB == 0 { + format!("{} GiB", bytes / GIB) + } else if bytes >= MIB && bytes % MIB == 0 { + format!("{} MiB", bytes / MIB) + } else if bytes >= KIB && bytes % KIB == 0 { + format!("{} KiB", bytes / KIB) + } else if bytes >= GIB { + format!("{:.1} GiB", bytes as f64 / GIB as f64) + } else if bytes >= MIB { + format!("{:.1} MiB", bytes as f64 / MIB as f64) + } else if bytes >= KIB { + format!("{:.1} KiB", bytes as f64 / KIB as f64) + } else { + format!("{} bytes", bytes) + } +} + +fn parse_byte_size(value: &str) -> Option { + let trimmed = value.trim(); + if trimmed.is_empty() { + return None; + } + + let split_idx = trimmed + .char_indices() + .find_map(|(idx, ch)| { + if ch.is_ascii_digit() || ch == '_' { + None + } else { + Some(idx) + } + }) + .unwrap_or(trimmed.len()); + let (number_part, suffix_part) = trimmed.split_at(split_idx); + let number: u64 = number_part.replace('_', "").parse().ok()?; + let suffix = suffix_part.trim().to_ascii_lowercase(); + + let multiplier = match suffix.as_str() { + "" | "b" => 1u64, + "k" | "kb" => 1_000u64, + "m" | "mb" => 1_000_000u64, + "g" | "gb" => 1_000_000_000u64, + "ki" | "kib" => 1_024u64, + "mi" | "mib" => 1_048_576u64, + "gi" | "gib" => 1_073_741_824u64, + _ => return None, + }; + number.checked_mul(multiplier) +} + +#[cfg(target_os = "linux")] +fn detect_available_memory_bytes() -> Option { + let mut info = std::mem::MaybeUninit::::uninit(); + let rc = unsafe { libc::sysinfo(info.as_mut_ptr()) }; + if rc != 0 { + return None; + } + let info = unsafe { info.assume_init() }; + let freeram = u64::try_from(info.freeram).ok()?; + let mem_unit = u64::from(info.mem_unit.max(1)); + freeram.checked_mul(mem_unit) +} + +#[cfg(all(unix, not(target_os = "linux")))] +fn detect_available_memory_bytes() -> Option { + let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) }; + if page_size <= 0 { + return None; + } + // `_SC_AVPHYS_PAGES` is not available on all Unix targets (including macOS), so use + // physical pages as a portable fallback for default sizing. + let pages = unsafe { libc::sysconf(libc::_SC_PHYS_PAGES) }; + if pages <= 0 { + return None; + } + let bytes = (pages as u128).saturating_mul(page_size as u128); + Some(bytes.min(u64::MAX as u128) as u64) +} + +#[cfg(not(unix))] +fn detect_available_memory_bytes() -> Option { + None +} + #[inline(always)] fn compute_optimal_thread_count( cpu_cores: usize, @@ -58,7 +183,11 @@ fn compute_optimal_thread_count( #[cfg(test)] mod tests { - use super::{NETWORK_CAPACITY_OVERRIDE_ENV, compute_optimal_thread_count}; + use super::{ + NETWORK_CAPACITY_OVERRIDE_ENV, compute_default_buffer_window_bytes, + compute_optimal_thread_count, parse_buffer_window_bytes, parse_byte_size, + }; + use serial_test::serial; use std::env; #[test] @@ -92,6 +221,7 @@ mod tests { } #[test] + #[serial] fn override_env_takes_precedence() { let high_guard = EnvGuard::set(NETWORK_CAPACITY_OVERRIDE_ENV, "1000"); let high_capacity = super::detect_network_capacity_megabytes(); @@ -108,6 +238,7 @@ mod tests { } #[test] + #[serial] fn override_env_invalid_values_are_ignored() { let guard = EnvGuard::set(NETWORK_CAPACITY_OVERRIDE_ENV, "not-a-number"); assert_eq!(super::network_capacity_override(), None); @@ -115,6 +246,7 @@ mod tests { } #[test] + #[serial] fn default_capacity_matches_expected() { let guard = EnvGuard::unset(NETWORK_CAPACITY_OVERRIDE_ENV); assert_eq!( @@ -124,6 +256,41 @@ mod tests { drop(guard); } + #[test] + fn computes_default_window_from_available_ram() { + // 16 GiB -> 2.4 GiB (15%) + let available = 16u64 * 1024 * 1024 * 1024; + let expected = 2_576_980_377u64; + assert_eq!(compute_default_buffer_window_bytes(available), expected); + } + + #[test] + fn caps_default_window_at_four_gib() { + // 64 GiB -> 9.6 GiB (15%), capped to 4 GiB. + let available = 64u64 * 1024 * 1024 * 1024; + let expected = 4u64 * 1024 * 1024 * 1024; + assert_eq!(compute_default_buffer_window_bytes(available), expected); + } + + #[test] + fn parses_human_readable_buffer_window_values() { + assert_eq!(parse_byte_size("123"), Some(123)); + assert_eq!(parse_byte_size("256mb"), Some(256_000_000)); + assert_eq!(parse_byte_size("256MiB"), Some(268_435_456)); + assert_eq!(parse_byte_size("1_024"), Some(1024)); + } + + #[test] + fn parse_buffer_window_rejects_invalid_values() { + assert_eq!(parse_buffer_window_bytes("nope"), None); + assert_eq!(parse_buffer_window_bytes("1"), None); + } + + #[test] + fn default_buffer_window_is_nonzero() { + assert!(super::default_firehose_buffer_window_bytes() >= 2); + } + struct EnvGuard { key: &'static str, original: Option, diff --git a/jetstreamer-firehose/src/utils.rs b/jetstreamer-firehose/src/utils.rs index 5a6b633c..1eefe70d 100644 --- a/jetstreamer-firehose/src/utils.rs +++ b/jetstreamer-firehose/src/utils.rs @@ -210,8 +210,8 @@ impl<'de> serde::Deserialize<'de> for Buffer { pub const MAX_ALLOWED_SECTION_SIZE: usize = 32 << 20; // 32MiB /// Decompresses a Zstandard byte stream. -pub fn decompress_zstd(data: Vec) -> Result, SharedError> { - let mut decoder = zstd::Decoder::new(&data[..])?; +pub fn decompress_zstd(data: &[u8]) -> Result, SharedError> { + let mut decoder = zstd::Decoder::new(data)?; let mut decompressed = Vec::new(); decoder.read_to_end(&mut decompressed)?; Ok(decompressed) diff --git a/jetstreamer-plugin/src/lib.rs b/jetstreamer-plugin/src/lib.rs index 98447854..c93a2fad 100644 --- a/jetstreamer-plugin/src/lib.rs +++ b/jetstreamer-plugin/src/lib.rs @@ -91,7 +91,7 @@ //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { -//! let mut runner = PluginRunner::new("http://localhost:8123", 1); +//! let mut runner = PluginRunner::new("http://localhost:8123", 1, false, None); //! runner.register(Box::new(LoggingPlugin)); //! let runner = Arc::new(runner); //! @@ -251,16 +251,28 @@ pub struct PluginRunner { plugins: Arc>>, clickhouse_dsn: String, num_threads: usize, + sequential: bool, + buffer_window_bytes: Option, db_update_interval_slots: u64, } impl PluginRunner { /// Creates a new runner that writes to `clickhouse_dsn` using `num_threads`. - pub fn new(clickhouse_dsn: impl Display, num_threads: usize) -> Self { + /// + /// When `sequential` is `true`, firehose runs with one worker and `num_threads` is used as + /// ripget parallel download concurrency. + pub fn new( + clickhouse_dsn: impl Display, + num_threads: usize, + sequential: bool, + buffer_window_bytes: Option, + ) -> Self { Self { plugins: Arc::new(Vec::new()), clickhouse_dsn: clickhouse_dsn.to_string(), num_threads: std::cmp::max(1, num_threads), + sequential, + buffer_window_bytes, db_update_interval_slots: 100, } } @@ -411,23 +423,29 @@ impl PluginRunner { .. } => { let tally = take_slot_tx_tally(*slot); - if let Err(err) = record_slot_status( - db_client, - *slot, - thread_id, - *executed_transaction_count, - tally.votes, - tally.non_votes, - *block_time, - ) - .await - { - log::error!( - target: &log_target, - "failed to record slot status: {}", - err - ); - } + let slot = *slot; + let executed_transaction_count = *executed_transaction_count; + let block_time = *block_time; + let log_target_clone = log_target.clone(); + tokio::spawn(async move { + if let Err(err) = record_slot_status( + db_client, + slot, + thread_id, + executed_transaction_count, + tally.votes, + tally.non_votes, + block_time, + ) + .await + { + log::error!( + target: &log_target_clone, + "failed to record slot status: {}", + err + ); + } + }); } BlockData::PossibleLeaderSkipped { slot } => { // Drop any tallies that may exist for skipped slots. @@ -462,11 +480,10 @@ impl PluginRunner { ); return Ok(()); } - let transaction = Arc::new(transaction); for handle in plugin_handles.iter() { if let Err(err) = handle .plugin - .on_transaction(thread_id, clickhouse.clone(), transaction.as_ref()) + .on_transaction(thread_id, clickhouse.clone(), &transaction) .await { log::error!( @@ -747,6 +764,8 @@ impl PluginRunner { let mut firehose_future = Box::pin(firehose( self.num_threads as u64, + self.sequential, + self.buffer_window_bytes, slot_range, Some(on_block), Some(on_transaction), diff --git a/jetstreamer-plugin/src/plugins/instruction_tracking.rs b/jetstreamer-plugin/src/plugins/instruction_tracking.rs index 82fa5a55..d767fbfa 100644 --- a/jetstreamer-plugin/src/plugins/instruction_tracking.rs +++ b/jetstreamer-plugin/src/plugins/instruction_tracking.rs @@ -128,12 +128,14 @@ impl Plugin for InstructionTrackingPlugin { .into_iter() .collect::>(); - if let Some(db_client) = db.as_ref() + if let Some(db_client) = db && !rows.is_empty() { - write_instruction_events(Arc::clone(db_client), rows) - .await - .map_err(|err| -> Box { Box::new(err) })?; + tokio::spawn(async move { + if let Err(err) = write_instruction_events(db_client, rows).await { + log::error!("failed to write instruction events: {}", err); + } + }); } Ok(()) diff --git a/jetstreamer-plugin/src/plugins/program_tracking.rs b/jetstreamer-plugin/src/plugins/program_tracking.rs index 452e916c..6cda5613 100644 --- a/jetstreamer-plugin/src/plugins/program_tracking.rs +++ b/jetstreamer-plugin/src/plugins/program_tracking.rs @@ -167,12 +167,14 @@ impl Plugin for ProgramTrackingPlugin { let rows = Self::take_slot_events(slot, block_time); - if let Some(db_client) = db.as_ref() + if let Some(db_client) = db && !rows.is_empty() { - write_program_events(Arc::clone(db_client), rows) - .await - .map_err(|err| -> Box { Box::new(err) })?; + tokio::spawn(async move { + if let Err(err) = write_program_events(db_client, rows).await { + log::error!("failed to write program events: {}", err); + } + }); } Ok(()) diff --git a/src/lib.rs b/src/lib.rs index a398d03d..ffedc09c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,6 +52,10 @@ //! [`jetstreamer_firehose::system::optimal_firehose_thread_count`]): number of firehose //! ingestion threads. Increase this to multiplex Old Faithful HTTP requests across more //! cores, or leave it unset to size the pool automatically using CPU and network heuristics. +//! - `JETSTREAMER_SEQUENTIAL` (default `false`): when truthy, firehose uses a single worker +//! thread and uses ripget's parallel windowed downloader for sequential reads. +//! - `JETSTREAMER_BUFFER_WINDOW` (default lower of 4 GiB and 15% of available RAM): ripget +//! hot/cold window size used in sequential mode. Accepts raw bytes or suffixes like `512MiB`. //! - `JETSTREAMER_CLICKHOUSE_DSN` (default `http://localhost:8123`): DSN passed to plugin //! instances that emit ClickHouse writes. //! - `JETSTREAMER_CLICKHOUSE_MODE` (default `auto`): controls ClickHouse integration. Accepted @@ -85,6 +89,8 @@ //! | `JETSTREAMER_CLICKHOUSE_DSN` | `http://localhost:8123` | HTTP(S) DSN passed to the embedded plugin runner for ClickHouse writes. Override to target a remote ClickHouse deployment. | //! | `JETSTREAMER_CLICKHOUSE_MODE` | `auto` | Controls ClickHouse integration. `auto` enables output and spawns the helper only for local DSNs, `remote` enables output without spawning the helper, `local` always requests the helper, and `off` disables ClickHouse entirely. | //! | `JETSTREAMER_THREADS` | `auto` | Number of firehose ingestion threads. Leave unset to rely on hardware-based sizing or override with an explicit value when you know the ideal concurrency. | +//! | `JETSTREAMER_SEQUENTIAL` | `false` | Enables single-thread firehose processing with ripget-backed sequential streaming. | +//! | `JETSTREAMER_BUFFER_WINDOW` | `min(4 GiB, 15% available RAM)` | Total ripget hot/cold window size used only when `JETSTREAMER_SEQUENTIAL` is enabled. | //! //! Helper spawning only occurs when both the mode allows it (`auto`/`local`) **and** the DSN //! points to `localhost` or `127.0.0.1`. @@ -207,6 +213,8 @@ fn parse_clickhouse_mode(value: &str) -> Option { /// /// - `JETSTREAMER_THREADS`: Number of firehose ingestion threads. When unset the value is /// derived from [`jetstreamer_firehose::system::optimal_firehose_thread_count`]. +/// - `JETSTREAMER_SEQUENTIAL`: When true, runs a single firehose worker and uses ripget's +/// parallel windowed downloader for sequential reads. /// - `JETSTREAMER_CLICKHOUSE_DSN`: DSN for ClickHouse ingestion; defaults to /// `http://localhost:8123`. /// - `JETSTREAMER_CLICKHOUSE_MODE`: Controls ClickHouse integration. Accepted values are @@ -317,6 +325,8 @@ impl Default for JetstreamerRunner { clickhouse_dsn, config: Config { threads: jetstreamer_firehose::system::optimal_firehose_thread_count(), + sequential: false, + buffer_window_bytes: None, slot_range: 0..0, clickhouse_enabled: clickhouse_settings.enabled, spawn_clickhouse: clickhouse_settings.spawn_helper && clickhouse_settings.enabled, @@ -346,11 +356,31 @@ impl JetstreamerRunner { } /// Sets the number of firehose ingestion threads. + /// + /// When sequential mode is enabled, this value is used as ripget parallel range + /// concurrency while firehose itself runs one worker. pub fn with_threads(mut self, threads: usize) -> Self { self.config.threads = std::cmp::max(1, threads); self } + /// Toggles sequential firehose mode. + /// + /// When enabled, firehose uses one worker thread while retaining `JETSTREAMER_THREADS` as + /// the ripget parallel range count for sequential windowed downloads. + pub const fn with_sequential(mut self, sequential: bool) -> Self { + self.config.sequential = sequential; + self + } + + /// Sets the ripget sequential download window size in bytes. + /// + /// This value is only used when sequential mode is enabled. + pub const fn with_buffer_window_bytes(mut self, buffer_window_bytes: Option) -> Self { + self.config.buffer_window_bytes = buffer_window_bytes; + self + } + /// Restricts [`JetstreamerRunner::run`] to a specific slot range. pub const fn with_slot_range(mut self, slot_range: Range) -> Self { self.config.slot_range = slot_range; @@ -389,6 +419,8 @@ impl JetstreamerRunner { } let threads = std::cmp::max(1, self.config.threads); + let sequential = self.config.sequential; + let buffer_window_bytes = self.config.buffer_window_bytes; let clickhouse_enabled = self.config.clickhouse_enabled && !self.clickhouse_dsn.trim().is_empty(); let slot_range = self.config.slot_range.clone(); @@ -397,14 +429,21 @@ impl JetstreamerRunner { && should_spawn_for_dsn(&self.clickhouse_dsn); log::info!( - "processing slots [{}..{}) with {} firehose threads (clickhouse_enabled={})", + "processing slots [{}..{}) with {} configured threads (sequential={}, buffer_window_bytes={:?}, clickhouse_enabled={})", slot_range.start, slot_range.end, threads, + sequential, + buffer_window_bytes, clickhouse_enabled ); - let mut runner = PluginRunner::new(&self.clickhouse_dsn, threads); + let mut runner = PluginRunner::new( + &self.clickhouse_dsn, + threads, + sequential, + buffer_window_bytes, + ); for plugin in &self.config.builtin_plugins { runner.register(plugin.instantiate()); } @@ -509,6 +548,10 @@ impl JetstreamerRunner { pub struct Config { /// Number of simultaneous firehose streams to spawn. pub threads: usize, + /// Whether to process with a single firehose worker and ripget-backed sequential streaming. + pub sequential: bool, + /// Optional override for ripget sequential window size in bytes. + pub buffer_window_bytes: Option, /// The range of slots to process, inclusive of the start and exclusive of the end slot. pub slot_range: Range, /// Whether to connect to ClickHouse for plugin output. @@ -551,11 +594,15 @@ impl BuiltinPlugin { /// - `JETSTREAMER_CLICKHOUSE_MODE`: Controls ClickHouse integration. Accepts `auto`, `remote`, /// `local`, or `off`. /// - `JETSTREAMER_THREADS`: Number of firehose ingestion threads. +/// - `JETSTREAMER_SEQUENTIAL`: Enables single-thread sequential firehose mode when truthy. +/// - `JETSTREAMER_BUFFER_WINDOW`: Optional ripget sequential window size (for example `4GiB`). /// /// CLI flags: /// - `--with-plugin `: Adds one of the built-in plugins (`program-tracking` or /// `instruction-tracking`). When omitted, the CLI defaults to `program-tracking`. /// - `--no-plugins`: Disables all built-in plugins (overrides the default and any `--with-plugin`). +/// - `--sequential`: Enables single-thread sequential firehose mode. +/// - `--buffer-window `: Overrides ripget sequential window size (for example `4GiB`). /// /// # Examples /// @@ -575,6 +622,8 @@ pub fn parse_cli_args() -> Result> { let mut first_arg: Option = None; let mut builtin_plugins = Vec::new(); let mut no_plugins = false; + let mut sequential_cli = false; + let mut buffer_window_cli: Option = None; while let Some(arg) = args.next() { match arg.as_str() { "--with-plugin" => { @@ -591,6 +640,15 @@ pub fn parse_cli_args() -> Result> { "--no-plugins" => { no_plugins = true; } + "--sequential" => { + sequential_cli = true; + } + "--buffer-window" => { + let raw = args + .next() + .ok_or_else(|| "--buffer-window requires a value like 4GiB".to_string())?; + buffer_window_cli = Some(raw); + } _ if first_arg.is_none() => first_arg = Some(arg), other => return Err(format!("unrecognized argument '{other}'").into()), } @@ -621,6 +679,17 @@ pub fn parse_cli_args() -> Result> { .ok() .and_then(|s| s.parse::().ok()) .unwrap_or_else(jetstreamer_firehose::system::optimal_firehose_thread_count); + let sequential = if sequential_cli { + true + } else { + parse_env_bool("JETSTREAMER_SEQUENTIAL", false) + }; + let buffer_window_raw = if let Some(cli) = buffer_window_cli { + Some(cli) + } else { + std::env::var("JETSTREAMER_BUFFER_WINDOW").ok() + }; + let buffer_window_bytes = parse_optional_buffer_window_bytes(buffer_window_raw.as_deref())?; let spawn_clickhouse = clickhouse_settings.spawn_helper && clickhouse_enabled; @@ -634,6 +703,8 @@ pub fn parse_cli_args() -> Result> { Ok(Config { threads, + sequential, + buffer_window_bytes, slot_range, clickhouse_enabled, spawn_clickhouse, @@ -641,6 +712,40 @@ pub fn parse_cli_args() -> Result> { }) } +fn parse_optional_buffer_window_bytes( + raw: Option<&str>, +) -> Result, Box> { + let Some(raw) = raw else { + return Ok(None); + }; + let parsed = jetstreamer_firehose::system::parse_buffer_window_bytes(raw).ok_or_else(|| { + format!( + "invalid buffer window '{}'; expected integer bytes or suffix like 4GiB/512MiB", + raw + ) + })?; + Ok(Some(parsed)) +} + +fn parse_env_bool(key: &str, default: bool) -> bool { + match std::env::var(key) { + Ok(raw) => match raw.trim().to_ascii_lowercase().as_str() { + "1" | "true" | "yes" | "on" => true, + "0" | "false" | "no" | "off" => false, + other => { + log::warn!( + "unrecognized boolean value for {}='{}'; using default {}", + key, + other, + default + ); + default + } + }, + Err(_) => default, + } +} + fn should_spawn_for_dsn(dsn: &str) -> bool { let lower = dsn.to_ascii_lowercase(); lower.contains("localhost") || lower.contains("127.0.0.1")