Skip to content

Commit 722919f

Browse files
committed
feat: abort SSE connections on hot reload
SSE streams now return an error on reload instead of ending cleanly, causing hyper to abort the connection. This triggers client retry behavior (e.g., Datastar reconnects automatically).
1 parent 5f99f83 commit 722919f

File tree

7 files changed

+148
-5
lines changed

7 files changed

+148
-5
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ hyper-util = { version = "0.1", features = ["full", "server-auto"] }
3232
tokio = { version = "1", features = ["full"] }
3333
tokio-stream = "0.1"
3434
tokio-util = { version = "0.7", features = ["io", "compat"] }
35+
futures-util = "0.3"
3536

3637
bytes = "1.6.0"
3738
url = "2.5.0"

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,9 @@ $ http-nu :3001 -w ./serve.nu
144144
```
145145

146146
This watches the script's directory for any changes (including included files)
147-
and hot-reloads the handler. Useful during development.
147+
and hot-reloads the handler. Useful during development. Active
148+
[SSE connections](#server-sent-events) are aborted on reload to trigger client
149+
reconnection.
148150

149151
### Reading from stdin
150152

src/engine.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::path::Path;
22
use std::sync::{atomic::AtomicBool, Arc};
33

4+
use tokio_util::sync::CancellationToken;
5+
46
use nu_cli::{add_cli_context, gather_parent_env_vars};
57
use nu_cmd_lang::create_default_context;
68
use nu_command::add_shell_command_context;
@@ -29,6 +31,8 @@ use crate::Error;
2931
pub struct Engine {
3032
pub state: EngineState,
3133
pub closure: Option<Closure>,
34+
/// Cancellation token triggered on engine reload
35+
pub reload_token: CancellationToken,
3236
}
3337

3438
impl Engine {
@@ -47,6 +51,7 @@ impl Engine {
4751
Ok(Self {
4852
state: engine_state,
4953
closure: None,
54+
reload_token: CancellationToken::new(),
5055
})
5156
}
5257

@@ -345,6 +350,8 @@ impl Engine {
345350
/// On error, prints to stderr and emits JSON to stdout, returning None.
346351
pub fn script_to_engine(base: &Engine, script: &str) -> Option<Engine> {
347352
let mut engine = base.clone();
353+
// Fresh cancellation token for this engine instance
354+
engine.reload_token = CancellationToken::new();
348355

349356
if let Err(e) = engine.parse_closure(script) {
350357
log_error(&nu_utils::strip_ansi_string_likely(e.to_string()));

src/handler.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ use std::sync::Arc;
33
use std::time::Instant;
44

55
use arc_swap::ArcSwap;
6+
use futures_util::StreamExt;
67
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full, StreamBody};
78
use hyper::body::{Bytes, Frame};
89
use tokio_stream::wrappers::ReceiverStream;
9-
use tokio_stream::StreamExt;
10+
use tokio_util::sync::CancellationToken;
1011
use tower::Service;
1112
use tower_http::services::{ServeDir, ServeFile};
1213

@@ -135,6 +136,7 @@ where
135136
// Phase 1: Log request
136137
log_request(request_id, &request);
137138

139+
let reload_token = engine.reload_token.clone();
138140
let (meta_rx, bridged_body) = spawn_eval_thread(engine, request, stream);
139141

140142
// Wait for both:
@@ -158,7 +160,15 @@ where
158160

159161
match &meta.body_type {
160162
ResponseBodyType::Normal => {
161-
build_normal_response(&meta, Ok(body_result?), use_brotli, guard, start_time).await
163+
build_normal_response(
164+
&meta,
165+
Ok(body_result?),
166+
use_brotli,
167+
guard,
168+
start_time,
169+
reload_token,
170+
)
171+
.await
162172
}
163173
ResponseBodyType::Static {
164174
root,
@@ -327,6 +337,7 @@ async fn build_normal_response(
327337
use_brotli: bool,
328338
guard: RequestGuard,
329339
start_time: Instant,
340+
reload_token: CancellationToken,
330341
) -> HTTPResult {
331342
let request_id = guard.request_id();
332343
let (inferred_content_type, body) = body_result?;
@@ -362,7 +373,8 @@ async fn build_normal_response(
362373
}
363374

364375
// Add SSE-required headers for event streams
365-
if content_type == "text/event-stream" {
376+
let is_sse = content_type == "text/event-stream";
377+
if is_sse {
366378
header_map.insert(
367379
hyper::header::CACHE_CONTROL,
368380
hyper::header::HeaderValue::from_static("no-cache"),
@@ -415,9 +427,29 @@ async fn build_normal_response(
415427
ResponseTransport::Stream(rx) => {
416428
if use_brotli {
417429
compression::compress_stream(rx)
430+
} else if is_sse {
431+
// SSE streams abort on reload (error triggers client retry)
432+
let stream = futures_util::stream::try_unfold(
433+
(ReceiverStream::new(rx), reload_token),
434+
|(mut data_rx, token)| async move {
435+
tokio::select! {
436+
biased;
437+
_ = token.cancelled() => {
438+
Err(std::io::Error::other("reload").into())
439+
}
440+
item = StreamExt::next(&mut data_rx) => {
441+
match item {
442+
Some(data) => Ok(Some((Frame::data(Bytes::from(data)), (data_rx, token)))),
443+
None => Ok(None),
444+
}
445+
}
446+
}
447+
},
448+
);
449+
BodyExt::boxed(StreamBody::new(stream))
418450
} else {
419451
let stream = ReceiverStream::new(rx).map(|data| Ok(Frame::data(Bytes::from(data))));
420-
StreamBody::new(stream).boxed()
452+
BodyExt::boxed(StreamBody::new(stream))
421453
}
422454
}
423455
};

src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,8 @@ async fn serve(
277277
tokio::spawn(async move {
278278
while let Some(script) = rx.recv().await {
279279
if let Some(new_engine) = script_to_engine(&base_for_updates, &script) {
280+
// Signal reload to cancel SSE streams on old engine
281+
engine_updater.load().reload_token.cancel();
280282
engine_updater.store(Arc::new(new_engine));
281283
log_reloaded();
282284
}

tests/server_test.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2008,3 +2008,101 @@ async fn test_binary_octet_stream_content_type() {
20082008
"Expected application/octet-stream content-type for binary, got: {response}"
20092009
);
20102010
}
2011+
2012+
#[tokio::test]
2013+
async fn test_sse_cancelled_on_hot_reload() {
2014+
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
2015+
2016+
// Spawn server with an SSE endpoint that streams indefinitely
2017+
let (mut child, mut stdin, addr_rx) = TestServerWithStdin::spawn("127.0.0.1:0", false);
2018+
2019+
// Send initial SSE script - stream many events slowly
2020+
let sse_script = r#"{|req|
2021+
.response {headers: {"Content-Type": "text/event-stream"}}
2022+
1..100 | each {|i|
2023+
sleep 100ms
2024+
$"data: event-($i)\n\n"
2025+
}
2026+
}"#;
2027+
stdin.write_all(sse_script.as_bytes()).await.unwrap();
2028+
stdin.write_all(b"\0").await.unwrap();
2029+
stdin.flush().await.unwrap();
2030+
2031+
// Wait for server to start
2032+
let address = tokio::time::timeout(std::time::Duration::from_secs(5), addr_rx)
2033+
.await
2034+
.expect("Server didn't start")
2035+
.expect("Channel closed");
2036+
2037+
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2038+
2039+
// Start curl to SSE endpoint
2040+
let mut sse_child = tokio::process::Command::new("curl")
2041+
.arg("-sN")
2042+
.arg(&address)
2043+
.stdout(std::process::Stdio::piped())
2044+
.spawn()
2045+
.expect("Failed to start curl");
2046+
2047+
let stdout = sse_child.stdout.take().expect("Failed to get stdout");
2048+
let mut reader = BufReader::new(stdout).lines();
2049+
2050+
// Read a few events to confirm SSE is working
2051+
let mut events_received = 0;
2052+
for _ in 0..3 {
2053+
if let Ok(Ok(Some(line))) =
2054+
tokio::time::timeout(std::time::Duration::from_secs(2), reader.next_line()).await
2055+
{
2056+
if line.starts_with("data:") {
2057+
events_received += 1;
2058+
}
2059+
}
2060+
}
2061+
assert!(
2062+
events_received >= 1,
2063+
"Should have received at least one SSE event before reload"
2064+
);
2065+
2066+
// Trigger hot reload with a different script
2067+
let new_script = r#"{|req| "reloaded"}"#;
2068+
stdin.write_all(new_script.as_bytes()).await.unwrap();
2069+
stdin.write_all(b"\0").await.unwrap();
2070+
stdin.flush().await.unwrap();
2071+
2072+
// Wait for reload to process
2073+
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2074+
2075+
// After reload, the SSE stream should be cancelled.
2076+
// With HTTP keep-alive, curl won't exit on its own, but no more events should arrive.
2077+
// Try to read another event - it should timeout (stream cancelled) or return None.
2078+
let more_events =
2079+
tokio::time::timeout(std::time::Duration::from_millis(500), reader.next_line()).await;
2080+
2081+
// Either timeout (stream stalled) or None (stream ended) is acceptable
2082+
let stream_stopped = match more_events {
2083+
Err(_) => true, // Timeout - no more data
2084+
Ok(Ok(None)) => true, // Stream ended
2085+
Ok(Ok(Some(line))) => !line.starts_with("data:"), // Got something but not an event
2086+
Ok(Err(_)) => true, // Read error
2087+
};
2088+
assert!(stream_stopped, "SSE stream should stop after reload");
2089+
2090+
// Kill the curl process since it won't exit with keep-alive
2091+
sse_child.kill().await.ok();
2092+
2093+
// Verify the new handler works
2094+
let output = std::process::Command::new("curl")
2095+
.arg("-s")
2096+
.arg(&address)
2097+
.output()
2098+
.expect("curl failed");
2099+
2100+
assert_eq!(
2101+
String::from_utf8_lossy(&output.stdout).trim(),
2102+
"reloaded",
2103+
"New handler should be active after reload"
2104+
);
2105+
2106+
// Cleanup - kill the server
2107+
child.kill().await.ok();
2108+
}

0 commit comments

Comments
 (0)