Conversation
There was a problem hiding this comment.
Pull request overview
Adds a true sequential-processing mode to the firehose pipeline (single worker for decode/handlers) while using ripget for parallel range requests behind a sequential reader, plus configuration plumbing (API/runner/CLI/env) and defaults for the ripget window buffer sizing.
Changes:
- Extend
firehose(...)withsequential+buffer_window_bytesand add ripget-backed sequential epoch streaming. - Add buffer window sizing defaults (min(4GiB, 15% RAM), fallback 512MiB) and human-friendly parsing helpers.
- Propagate sequential/window config through
JetstreamerRunner→PluginRunnerand add CLI/env wiring plus a sequential integration test.
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/lib.rs |
Wires sequential + buffer window settings into runner config and CLI/env parsing. |
jetstreamer-plugin/src/lib.rs |
Extends PluginRunner to pass sequential/window options through to firehose. |
jetstreamer-firehose/src/system.rs |
Implements default buffer window sizing + byte-size parsing and tests. |
jetstreamer-firehose/src/lib.rs |
Updates crate docs/examples for new firehose parameters. |
jetstreamer-firehose/src/firehose.rs |
Implements sequential mode behavior (single worker + ripget options), timeout adjustments, and adds an integration test. |
jetstreamer-firehose/src/epochs.rs |
Adds ripget-based sequential epoch stream reader and optioned stream opener. |
jetstreamer-firehose/README.md |
Documents the buffer window environment variable. |
jetstreamer-firehose/Cargo.toml |
Adds ripget and libc dependencies. |
Cargo.toml |
Adds workspace dependency for ripget. |
Cargo.lock |
Locks new transitive deps introduced by ripget. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// Creates a new runner that writes to `clickhouse_dsn` using `num_threads`. | ||
| pub fn new(clickhouse_dsn: impl Display, num_threads: usize) -> Self { | ||
| /// | ||
| /// When `sequential` is `true`, firehose runs with one worker and `num_threads` is used as | ||
| /// ripget parallel download concurrency. | ||
| pub fn new( | ||
| clickhouse_dsn: impl Display, | ||
| num_threads: usize, | ||
| sequential: bool, | ||
| buffer_window_bytes: Option<u64>, | ||
| ) -> Self { |
There was a problem hiding this comment.
Changing PluginRunner::new to require sequential and buffer_window_bytes is a breaking public API change for jetstreamer-plugin consumers. Consider keeping the original new(clickhouse_dsn, num_threads) as a convenience that forwards to a new new_with_options(...)/builder (defaulting to sequential=false and buffer_window_bytes=None) to preserve backwards compatibility.
jetstreamer-firehose/src/lib.rs
Outdated
| //! per second used when sizing the firehose thread pool. Increase or decrease to match your | ||
| //! host's effective bandwidth. | ||
| //! - `JETSTREAMER_BUFFER_WINDOW` (default lower of 4 GiB and 15% of available RAM): total | ||
| //! ripget hot/cold window size used when firehose is run in sequential mode. |
There was a problem hiding this comment.
JETSTREAMER_BUFFER_WINDOW is documented here as a firehose configuration env var, but the jetstreamer-firehose crate doesn’t appear to read this env var anywhere (the value is only passed via the firehose(..., buffer_window_bytes) parameter from the outer CLI). This can mislead library consumers; either implement env var parsing in the firehose crate, or clarify in the docs that the env var is consumed by the jetstreamer binary/CLI wrapper and that direct library users must pass buffer_window_bytes explicitly.
| //! ripget hot/cold window size used when firehose is run in sequential mode. | |
| //! ripget hot/cold window size used when firehose is run in sequential mode. This environment | |
| //! variable is interpreted by the [`jetstreamer`](https://crates.io/crates/jetstreamer) CLI | |
| //! wrapper; when using this crate directly as a library, pass the desired buffer window size | |
| //! explicitly via the relevant `buffer_window_bytes` parameter instead of relying on the env | |
| //! variable. |
| // `_SC_AVPHYS_PAGES` is not available on all Unix targets (including macOS), so use | ||
| // physical pages as a portable fallback for default sizing. | ||
| let pages = unsafe { libc::sysconf(libc::_SC_PHYS_PAGES) }; | ||
| if pages <= 0 { |
There was a problem hiding this comment.
On non-Linux Unix targets, detect_available_memory_bytes() uses _SC_PHYS_PAGES (total physical memory) as a fallback, but the public docs describe this default as a percentage of "available RAM". Either adjust the wording to match the behavior on these platforms, or use _SC_AVPHYS_PAGES when it’s available so the implementation matches the documented semantics more closely.
|
@copilot open a new pull request to apply changes based on the comments in this thread |
|
@copilot code review[agent] summarize your feedback in a comment I can copy paste into codex to implement your feedback |
|
@copilot summarize your feedback in a comment I can copy paste into codex to implement your feedback |
sequential: boolto the primaryfirehose(...)API to support a true sequential-processing mode.threadsas ripget concurrency in sequential mode (instead of firehose partitioning).buffer_window_bytes: Option<u64>tofirehose(...);Nonenow resolves to a computed default.min(4 GiB, 15% of available RAM)with a512 MiBfallback if RAM detection is unavailable.4GiB,512MiB, raw bytes) and validation (>= 2bytes).PluginRunnerandJetstreamerRunner(Config, builder, and firehose call path).--sequential,JETSTREAMER_SEQUENTIAL,--buffer-window <size>, andJETSTREAMER_BUFFER_WINDOW.start-1000 .. start+1000) in sequential mode with 8 threads and a 4 GiB buffer.