diff --git a/README.md b/README.md index b0fe860..a8a13b4 100644 --- a/README.md +++ b/README.md @@ -32,9 +32,11 @@ The `l4book` subscription first sends a snapshot of the entire book and then for ```bash cargo run --release --bin websocket_server -- --address 0.0.0.0 --port 8000 +# With custom inactivity timeout (e.g., 30s): +cargo run --release --bin websocket_server -- --address 0.0.0.0 --port 8000 --inactivity-exit-secs 30 ``` -If this local server does not detect the node writing down any new events, it will automatically exit after some amount of time (currently set to 5 seconds). +If this local server does not detect the node writing down any new events, it will automatically exit after some amount of time (default 5 seconds; configurable via `--inactivity-exit-secs `). In addition, the local server periodically fetches order book snapshots from the node, and compares to its own internal state. If a difference is detected, it will exit. If you want logging, prepend the command with `RUST_LOG=info`. diff --git a/binaries/src/bin/websocket_server.rs b/binaries/src/bin/websocket_server.rs index 09a605f..b08eafb 100644 --- a/binaries/src/bin/websocket_server.rs +++ b/binaries/src/bin/websocket_server.rs @@ -25,6 +25,12 @@ struct Args { /// documentation for for more info. #[arg(long)] websocket_compression_level: Option, + + /// Inactivity timeout in seconds before server exits. + /// If no node events are observed for this duration, the process exits. + /// Default is 5 seconds to match README behavior. + #[arg(long)] + inactivity_exit_secs: Option, } #[tokio::main] @@ -37,7 +43,8 @@ async fn main() -> Result<()> { println!("Running websocket server on {full_address}"); let compression_level = args.websocket_compression_level.unwrap_or(/* Some compression */ 1); - run_websocket_server(&full_address, true, compression_level).await?; + let inactivity_exit_secs = args.inactivity_exit_secs.unwrap_or(5); + run_websocket_server(&full_address, true, compression_level, inactivity_exit_secs).await?; Ok(()) } diff --git a/server/src/listeners/order_book/mod.rs b/server/src/listeners/order_book/mod.rs index fcd9425..a863704 100644 --- a/server/src/listeners/order_book/mod.rs +++ b/server/src/listeners/order_book/mod.rs @@ -39,7 +39,7 @@ mod utils; // WARNING - this code assumes no other file system operations are occurring in the watched directories // if there are scripts running, this may not work as intended -pub(crate) async fn hl_listen(listener: Arc>, dir: PathBuf) -> Result<()> { +pub(crate) async fn hl_listen(listener: Arc>, dir: PathBuf, inactivity_exit_secs: u64) -> Result<()> { let order_statuses_dir = EventSource::OrderStatuses.event_source_dir(&dir).canonicalize()?; let fills_dir = EventSource::Fills.event_source_dir(&dir).canonicalize()?; let order_diffs_dir = EventSource::OrderDiffs.event_source_dir(&dir).canonicalize()?; @@ -122,7 +122,7 @@ pub(crate) async fn hl_listen(listener: Arc>, dir: Path let snapshot_fetch_task_tx = snapshot_fetch_task_tx.clone(); fetch_snapshot(dir.clone(), listener, snapshot_fetch_task_tx, ignore_spot); } - () = sleep(Duration::from_secs(5)) => { + () = sleep(Duration::from_secs(inactivity_exit_secs)) => { let listener = listener.lock().await; if listener.is_ready() { return Err(format!("Stream has fallen behind ({HL_NODE} failed?)").into()); diff --git a/server/src/servers/websocket_server.rs b/server/src/servers/websocket_server.rs index 05d04fd..acf7d0d 100644 --- a/server/src/servers/websocket_server.rs +++ b/server/src/servers/websocket_server.rs @@ -29,7 +29,7 @@ use tokio::{ }; use yawc::{FrameView, OpCode, WebSocket}; -pub async fn run_websocket_server(address: &str, ignore_spot: bool, compression_level: u32) -> Result<()> { +pub async fn run_websocket_server(address: &str, ignore_spot: bool, compression_level: u32, inactivity_exit_secs: u64) -> Result<()> { let (internal_message_tx, _) = channel::>(100); // Central task: listen to messages and forward them for distribution @@ -42,7 +42,7 @@ pub async fn run_websocket_server(address: &str, ignore_spot: bool, compression_ { let listener = listener.clone(); tokio::spawn(async move { - if let Err(err) = hl_listen(listener, home_dir).await { + if let Err(err) = hl_listen(listener, home_dir, inactivity_exit_secs).await { error!("Listener fatal error: {err}"); std::process::exit(1); }