Skip to content

Commit c1162d3

Browse files
committed
Extract handle_connection out of do_serve
1 parent c251845 commit c1162d3

File tree

1 file changed

+66
-49
lines changed

1 file changed

+66
-49
lines changed

axum/src/serve/mod.rs

Lines changed: 66 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -304,55 +304,7 @@ where
304304
}
305305
};
306306

307-
let io = TokioIo::new(io);
308-
309-
trace!("connection {remote_addr:?} accepted");
310-
311-
make_service
312-
.ready()
313-
.await
314-
.unwrap_or_else(|err| match err {});
315-
316-
let tower_service = make_service
317-
.call(IncomingStream {
318-
io: &io,
319-
remote_addr,
320-
})
321-
.await
322-
.unwrap_or_else(|err| match err {})
323-
.map_request(|req: Request<Incoming>| req.map(Body::new));
324-
325-
let hyper_service = TowerToHyperService::new(tower_service);
326-
let signal_tx = signal_tx.clone();
327-
let close_rx = close_rx.clone();
328-
329-
tokio::spawn(async move {
330-
#[allow(unused_mut)]
331-
let mut builder = Builder::new(TokioExecutor::new());
332-
// CONNECT protocol needed for HTTP/2 websockets
333-
#[cfg(feature = "http2")]
334-
builder.http2().enable_connect_protocol();
335-
336-
let mut conn = pin!(builder.serve_connection_with_upgrades(io, hyper_service));
337-
let mut signal_closed = pin!(signal_tx.closed().fuse());
338-
339-
loop {
340-
tokio::select! {
341-
result = conn.as_mut() => {
342-
if let Err(_err) = result {
343-
trace!("failed to serve connection: {_err:#}");
344-
}
345-
break;
346-
}
347-
_ = &mut signal_closed => {
348-
trace!("signal received in task, starting graceful shutdown");
349-
conn.as_mut().graceful_shutdown();
350-
}
351-
}
352-
}
353-
354-
drop(close_rx);
355-
});
307+
handle_connection(&mut make_service, &signal_tx, &close_rx, io, remote_addr).await;
356308
}
357309

358310
drop(close_rx);
@@ -365,6 +317,71 @@ where
365317
close_tx.closed().await;
366318
}
367319

320+
async fn handle_connection<L, M, S>(
321+
make_service: &mut M,
322+
signal_tx: &watch::Sender<()>,
323+
close_rx: &watch::Receiver<()>,
324+
io: <L as Listener>::Io,
325+
remote_addr: <L as Listener>::Addr,
326+
) where
327+
L: Listener,
328+
L::Addr: Debug,
329+
M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
330+
for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
331+
S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
332+
S::Future: Send,
333+
{
334+
let io = TokioIo::new(io);
335+
336+
trace!("connection {remote_addr:?} accepted");
337+
338+
make_service
339+
.ready()
340+
.await
341+
.unwrap_or_else(|err| match err {});
342+
343+
let tower_service = make_service
344+
.call(IncomingStream {
345+
io: &io,
346+
remote_addr,
347+
})
348+
.await
349+
.unwrap_or_else(|err| match err {})
350+
.map_request(|req: Request<Incoming>| req.map(Body::new));
351+
352+
let hyper_service = TowerToHyperService::new(tower_service);
353+
let signal_tx = signal_tx.clone();
354+
let close_rx = close_rx.clone();
355+
356+
tokio::spawn(async move {
357+
#[allow(unused_mut)]
358+
let mut builder = Builder::new(TokioExecutor::new());
359+
// CONNECT protocol needed for HTTP/2 websockets
360+
#[cfg(feature = "http2")]
361+
builder.http2().enable_connect_protocol();
362+
363+
let mut conn = pin!(builder.serve_connection_with_upgrades(io, hyper_service));
364+
let mut signal_closed = pin!(signal_tx.closed().fuse());
365+
366+
loop {
367+
tokio::select! {
368+
result = conn.as_mut() => {
369+
if let Err(_err) = result {
370+
trace!("failed to serve connection: {_err:#}");
371+
}
372+
break;
373+
}
374+
_ = &mut signal_closed => {
375+
trace!("signal received in task, starting graceful shutdown");
376+
conn.as_mut().graceful_shutdown();
377+
}
378+
}
379+
}
380+
381+
drop(close_rx);
382+
});
383+
}
384+
368385
/// An incoming stream.
369386
///
370387
/// Used with [`serve`] and [`IntoMakeServiceWithConnectInfo`].

0 commit comments

Comments
 (0)