Forcefully closing an SSE connection #1914
-
Hi, first of all, thank you for making such a great crate, it's mostly a pleasure to use it. I've got a problem understanding exactly how SSE connections work with axum (or at all, maybe). Can I forcefully close the connection on the server side? I have followed your example from #1060 and tried to limit the number of elements in a stream, but the client side keeps reporting that the connection state is Open. My use case is closing the SSE once user access expires. I can just end the stream at that moment, but I am a bit concerned about what will happen in memory in such a case. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Using use axum::{
extract::State,
response::{sse::Event, Sse},
routing::get,
Router,
};
use futures::Stream;
use tokio::sync::oneshot;
use std::{
convert::Infallible,
net::SocketAddr,
sync::{Arc, Mutex},
time::Duration,
};
#[derive(Clone, Default)]
struct AppState {
stream_handles: Arc<Mutex<Vec<oneshot::Sender<()>>>>,
}
#[tokio::main]
async fn main() {
let state = AppState::default();
let app = Router::new()
.route("/stream", get(stream))
.route("/end-all-streams", get(end_all_streams))
.with_state(state);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
println!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
/// Create a new stream
async fn stream(
State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let (tx, mut rx) = oneshot::channel::<()>();
state.stream_handles.lock().unwrap().push(tx);
let stream = async_stream::stream! {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
// either send an event every second
_ = interval.tick() => {
yield Ok(Event::default().data("hi"));
}
// or if the stream should close, then return
_ = &mut rx => {
return;
}
}
}
};
Sse::new(stream)
}
/// Close all streams
async fn end_all_streams(State(state): State<AppState>) {
let stream_handles = std::mem::take(&mut *state.stream_handles.lock().unwrap());
for tx in stream_handles {
_ = tx.send(());
}
} |
Beta Was this translation helpful? Give feedback.
Using
async_stream
you simplyreturn
: