Skip to content
This repository was archived by the owner on Sep 10, 2024. It is now read-only.

Commit 5f1ccd3

Browse files
committed
listener: fix the connection not being polled on graceful shutdown
1 parent 99a768f commit 5f1ccd3

File tree

4 files changed

+72
-12
lines changed

4 files changed

+72
-12
lines changed

Cargo.lock

Lines changed: 13 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/listener/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ repository.workspace = true
99

1010
[dependencies]
1111
bytes = "1.5.0"
12+
event-listener = "4.0.0"
1213
futures-util = "0.3.29"
1314
http-body = "0.4.5"
1415
hyper = { version = "0.14.27", features = ["server", "http1", "http2", "tcp"] }
16+
libc = "0.2.150"
1517
pin-project-lite = "0.2.13"
1618
socket2 = "0.5.5"
1719
thiserror.workspace = true
@@ -20,7 +22,6 @@ tokio-rustls = "0.24.1"
2022
tower-http = { version = "0.4.4", features = ["add-extension"] }
2123
tower-service = "0.3.2"
2224
tracing.workspace = true
23-
libc = "0.2.150"
2425

2526
[dev-dependencies]
2627
anyhow.workspace = true

crates/listener/src/server.rs

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::{
2020
time::Duration,
2121
};
2222

23+
use event_listener::{Event, EventListener};
2324
use futures_util::{stream::SelectAll, Stream, StreamExt};
2425
use http_body::Body;
2526
use hyper::{body::HttpBody, server::conn::Connection, Request, Response};
@@ -29,6 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
2930
use tokio_rustls::rustls::ServerConfig;
3031
use tower_http::add_extension::AddExtension;
3132
use tower_service::Service;
33+
use tracing::Instrument;
3234

3335
use crate::{
3436
maybe_tls::{MaybeTlsAcceptor, MaybeTlsStream, TlsStreamInfo},
@@ -153,6 +155,17 @@ impl AcceptError {
153155
/// Returns an error if the proxy protocol or TLS handshake failed.
154156
/// Returns the connection, which should be used to spawn a task to serve the
155157
/// connection.
158+
#[allow(clippy::type_complexity)]
159+
#[tracing::instrument(
160+
name = "accept",
161+
skip_all,
162+
fields(
163+
network.protocol.name = "http",
164+
network.peer.address,
165+
network.peer.port,
166+
),
167+
err,
168+
)]
156169
async fn accept<S, B>(
157170
maybe_proxy_acceptor: &MaybeProxyAcceptor,
158171
maybe_tls_acceptor: &MaybeTlsAcceptor,
@@ -171,6 +184,18 @@ where
171184
B::Data: Send + 'static,
172185
B::Error: std::error::Error + Send + Sync + 'static,
173186
{
187+
let span = tracing::Span::current();
188+
189+
match peer_addr {
190+
SocketAddr::Net(addr) => {
191+
span.record("network.peer.address", tracing::field::display(addr.ip()));
192+
span.record("network.peer.port", addr.port());
193+
}
194+
SocketAddr::Unix(ref addr) => {
195+
span.record("network.peer.address", tracing::field::debug(addr));
196+
}
197+
}
198+
174199
// Wrap the connection acceptation logic in a timeout
175200
tokio::time::timeout(HANDSHAKE_TIMEOUT, async move {
176201
let (proxy, stream) = maybe_proxy_acceptor
@@ -209,6 +234,7 @@ where
209234

210235
Ok(conn)
211236
})
237+
.instrument(span)
212238
.await
213239
.map_err(AcceptError::handshake_timeout)?
214240
}
@@ -220,19 +246,28 @@ pin_project! {
220246
/// signal is received, the boolean is set to true. The connection will then check the
221247
/// boolean before polling the underlying connection, and if it's true, it will start a
222248
/// graceful shutdown.
249+
///
250+
/// We also use an event listener to wake up the connection when the shutdown signal is
251+
/// received, because the connection needs to be polled again to start the graceful shutdown.
223252
struct AbortableConnection<C> {
224253
#[pin]
225254
connection: C,
255+
#[pin]
256+
shutdown_listener: EventListener,
257+
shutdown_event: Arc<Event>,
226258
shutdown_in_progress: Arc<AtomicBool>,
227259
did_start_shutdown: bool,
228260
}
229261
}
230262

231263
impl<C> AbortableConnection<C> {
232-
fn new(connection: C, shutdown_in_progress: &Arc<AtomicBool>) -> Self {
264+
fn new(connection: C, shutdown_in_progress: &Arc<AtomicBool>, event: &Arc<Event>) -> Self {
265+
let shutdown_listener = EventListener::new();
233266
Self {
234267
connection,
268+
shutdown_listener,
235269
shutdown_in_progress: Arc::clone(shutdown_in_progress),
270+
shutdown_event: Arc::clone(event),
236271
did_start_shutdown: false,
237272
}
238273
}
@@ -254,10 +289,20 @@ where
254289
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
255290
let mut this = self.project();
256291

257-
// XXX: This assumes the task will be polled again after the shutdown signal is
258-
// received. I *think* that internally `graceful_shutdown` is only
259-
// setting a bunch of flags anyway, so I expect it should be fine to not have a
260-
// waker here.
292+
// If we aren't listening for the shutdown signal, start listening
293+
if !this.shutdown_listener.is_listening() {
294+
// XXX: it feels like we should setup the listener when we create it, but it
295+
// needs a `Pin<&mut EventListener>` to do so, and I can't figure out
296+
// how to get one outside of the `poll` method.
297+
this.shutdown_listener.as_mut().listen(this.shutdown_event);
298+
}
299+
300+
// Poll the shutdown signal, so that wakers get registered.
301+
// XXX: I don't think we care about the result of this poll, since it's only
302+
// really to register wakers. But I'm not sure if it's safe to
303+
// ignore the result.
304+
let _ = this.shutdown_listener.poll(cx);
305+
261306
if !*this.did_start_shutdown
262307
&& this
263308
.shutdown_in_progress
@@ -312,6 +357,7 @@ where
312357

313358
// A shared atomic boolean to tell all connections to shutdown
314359
let shutdown_in_progress = Arc::new(AtomicBool::new(false));
360+
let shutdown_event = Arc::new(Event::new());
315361

316362
loop {
317363
tokio::select! {
@@ -330,10 +376,10 @@ where
330376
match res {
331377
Some(Ok(Ok(connection))) => {
332378
tracing::trace!("Accepted connection");
333-
let conn = AbortableConnection::new(connection, &shutdown_in_progress);
379+
let conn = AbortableConnection::new(connection, &shutdown_in_progress, &shutdown_event);
334380
connection_tasks.spawn(conn);
335381
},
336-
Some(Ok(Err(e))) => tracing::error!("Connection did not finish handshake: {e}"),
382+
Some(Ok(Err(_e))) => { /* Connection did not finish handshake, error should be logged in `accept` */ },
337383
Some(Err(e)) => tracing::error!("Join error: {e}"),
338384
None => tracing::error!("Join set was polled even though it was empty"),
339385
}
@@ -368,6 +414,7 @@ where
368414

369415
// Tell the active connections to shutdown
370416
shutdown_in_progress.store(true, std::sync::atomic::Ordering::Relaxed);
417+
shutdown_event.notify(usize::MAX);
371418

372419
// Wait for connections to cleanup
373420
if !accept_tasks.is_empty() || !connection_tasks.is_empty() {
@@ -386,10 +433,10 @@ where
386433
match res {
387434
Some(Ok(Ok(connection))) => {
388435
tracing::trace!("Accepted connection");
389-
let conn = AbortableConnection::new(connection, &shutdown_in_progress);
436+
let conn = AbortableConnection::new(connection, &shutdown_in_progress, &shutdown_event);
390437
connection_tasks.spawn(conn);
391438
}
392-
Some(Ok(Err(e))) => tracing::error!("Connection did not finish handshake: {e}"),
439+
Some(Ok(Err(_e))) => { /* Connection did not finish handshake, error should be logged in `accept` */ },
393440
Some(Err(e)) => tracing::error!("Join error: {e}"),
394441
None => tracing::error!("Join set was polled even though it was empty"),
395442
}

crates/tasks/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ apalis-cron = "0.4.7"
1414
async-stream = "0.3.5"
1515
async-trait = "0.1.74"
1616
chrono.workspace = true
17-
event-listener = "3.1.0"
17+
event-listener = "4.0.0"
1818
futures-lite = "2.0.1"
1919
rand.workspace = true
2020
rand_chacha = "0.3.1"

0 commit comments

Comments
 (0)