Skip to content

Commit cd7061c

Browse files
committed
Switch to Hyper 1
1 parent 38ca892 commit cd7061c

File tree

9 files changed

+296
-150
lines changed

9 files changed

+296
-150
lines changed

Cargo.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@ darling = "0.20.10"
3636
erased-serde = "0.3.28"
3737
futures-util = "0.3.28"
3838
governor = "0.6"
39-
hyper = { version = "0.14", default-features = false }
39+
http-body-util = "0.1"
40+
hyper = { version = "1", default-features = false }
41+
hyper-util = { version = "0.1", default-features = false }
4042
indexmap = "2.0.0"
4143
ipnetwork = "0.20"
4244
libc = "0.2"
4345
once_cell = "1.5"
44-
tonic = { version = "0.11.0", default-features = false }
45-
opentelemetry-proto = "0.5.0"
46+
tonic = { version = "0.12", default-features = false }
47+
opentelemetry-proto = "0.7"
4648
parking_lot = "0.12.1"
4749
proc-macro2 = { version = "1", default-features = false }
4850
prometheus = { version = "0.13.3", default-features = false }
@@ -69,6 +71,7 @@ tokio = "1.41.0"
6971
thread_local = "1.1"
7072
tikv-jemallocator = "0.5"
7173
tikv-jemalloc-ctl = "0.5"
74+
tower-service = "0.3"
7275
tracing-slog = "0.3.0"
7376
tracing-subscriber = "0.3"
7477
yaml-merge-keys = "0.5"

examples/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ publish = false
99
anyhow = { workspace = true }
1010
foundations = { workspace = true }
1111
futures-util = { workspace = true }
12+
http-body-util = { workspace = true }
1213
hyper = { workspace = true }
14+
hyper-util = { workspace = true, features = ["server", "tokio"] }
1315
tokio = { workspace = true, features = ["full"]}
1416

1517
[[example]]

examples/http_server/main.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ use foundations::settings::collections::Map;
1818
use foundations::telemetry::{self, log, tracing, TelemetryConfig, TelemetryContext};
1919
use foundations::BootstrapResult;
2020
use futures_util::stream::{FuturesUnordered, StreamExt};
21-
use hyper::server::conn::Http;
21+
use http_body_util::Full;
22+
use hyper::body::{Bytes, Incoming};
2223
use hyper::service::service_fn;
23-
use hyper::{Body, Request, Response};
24+
use hyper::{Request, Response};
25+
use hyper_util::rt::{TokioExecutor, TokioIo};
2426
use std::convert::Infallible;
2527
use std::net::{SocketAddr, TcpListener as StdTcpListener};
2628
use std::sync::Arc;
@@ -193,7 +195,10 @@ async fn serve_connection(
193195
}
194196
});
195197

196-
if let Err(e) = Http::new().serve_connection(conn, on_request).await {
198+
if let Err(e) = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new())
199+
.serve_connection(TokioIo::new(conn), on_request)
200+
.await
201+
{
197202
log::error!("failed to serve HTTP"; "error" => ?e);
198203
metrics::http_server::failed_connections_total(&endpoint_name).inc();
199204
}
@@ -204,9 +209,9 @@ async fn serve_connection(
204209
#[tracing::span_fn("respond to request")]
205210
async fn respond(
206211
endpoint_name: Arc<String>,
207-
req: Request<Body>,
212+
req: Request<Incoming>,
208213
routes: Arc<Map<String, ResponseSettings>>,
209-
) -> Result<Response<Body>, Infallible> {
214+
) -> Result<Response<Full<Bytes>>, Infallible> {
210215
log::add_fields! {
211216
"request_uri" => req.uri().to_string(),
212217
"method" => req.method().to_string()

foundations/Cargo.toml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,15 @@ client-telemetry = ["logging", "metrics", "tracing", "dep:futures-util"]
7777

7878
# Enables the telemetry server.
7979
telemetry-server = [
80+
"dep:http-body-util",
8081
"dep:hyper",
82+
"dep:hyper-util",
8183
"dep:socket2",
82-
"dep:percent-encoding"
84+
"dep:percent-encoding",
8385
]
8486

8587
# Enables telemetry reporting over gRPC
86-
telemetry-otlp-grpc = ["dep:tonic", "dep:tokio", "dep:hyper"]
88+
telemetry-otlp-grpc = ["dep:tonic", "tonic/prost", "dep:tokio", "dep:hyper"]
8789

8890
# Enables experimental tokio runtime metrics
8991
tokio-runtime-metrics = [
@@ -184,11 +186,9 @@ clap = { workspace = true, optional = true }
184186
erased-serde = { workspace = true, optional = true }
185187
futures-util = { workspace = true, optional = true }
186188
governor = { workspace = true, optional = true }
187-
hyper = { workspace = true, optional = true, features = [
188-
"http1",
189-
"runtime",
190-
"server",
191-
] }
189+
http-body-util = { workspace = true, optional = true }
190+
hyper = { workspace = true, optional = true, features = ["http1", "server"] }
191+
hyper-util = { workspace = true, optional = true, features = ["tokio"] }
192192
indexmap = { workspace = true, optional = true, features = ["serde"] }
193193
libc = { workspace = true, optional = true }
194194
once_cell = { workspace = true, optional = true }

foundations/src/telemetry/driver.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
use crate::utils::feature_use;
22
use crate::BootstrapResult;
33
use futures_util::future::BoxFuture;
4-
use futures_util::stream::{FuturesUnordered, Stream};
5-
use futures_util::FutureExt;
4+
use futures_util::stream::FuturesUnordered;
5+
use futures_util::{FutureExt, Stream};
66
use std::future::Future;
77
use std::pin::Pin;
88
use std::task::{Context, Poll};
99

1010
feature_use!(cfg(feature = "telemetry-server"), {
1111
use super::server::TelemetryServerFuture;
12-
use anyhow::anyhow;
13-
use hyper::Server;
1412
use std::net::SocketAddr;
1513
});
1614

@@ -38,7 +36,7 @@ impl TelemetryDriver {
3836
) -> Self {
3937
Self {
4038
#[cfg(feature = "telemetry-server")]
41-
server_addr: server_fut.as_ref().map(Server::local_addr),
39+
server_addr: server_fut.as_ref().map(|fut| fut.local_addr()),
4240

4341
#[cfg(feature = "telemetry-server")]
4442
server_fut,
@@ -66,9 +64,11 @@ impl TelemetryDriver {
6664
#[cfg(feature = "telemetry-server")]
6765
{
6866
if let Some(server_fut) = self.server_fut.take() {
69-
self.tele_futures.push(
70-
async move { Ok(server_fut.with_graceful_shutdown(signal).await?) }.boxed(),
71-
);
67+
self.tele_futures.push(Box::pin(async move {
68+
server_fut.with_graceful_shutdown(signal).await;
69+
70+
Ok(())
71+
}));
7272

7373
return;
7474
}
@@ -93,7 +93,7 @@ impl Future for TelemetryDriver {
9393
#[cfg(feature = "telemetry-server")]
9494
if let Some(server_fut) = &mut self.server_fut {
9595
if let Poll::Ready(res) = Pin::new(server_fut).poll(cx) {
96-
ready_res.push(res.map_err(|err| anyhow!(err)));
96+
match res {}
9797
}
9898
}
9999

foundations/src/telemetry/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,9 @@ pub use self::testing::TestTelemetryContext;
131131
pub use self::memory_profiler::MemoryProfiler;
132132

133133
#[cfg(feature = "telemetry-server")]
134-
pub use self::server::{TelemetryRouteHandler, TelemetryRouteHandlerFuture, TelemetryServerRoute};
134+
pub use self::server::{
135+
BoxError, TelemetryRouteHandler, TelemetryRouteHandlerFuture, TelemetryServerRoute,
136+
};
135137

136138
pub use self::telemetry_context::{
137139
TelemetryContext, WithTelemetryContext, WithTelemetryContextLocal,
@@ -309,7 +311,10 @@ pub fn init(config: TelemetryConfig) -> BootstrapResult<TelemetryDriver> {
309311

310312
#[cfg(feature = "telemetry-server")]
311313
{
312-
let server_fut = self::server::init(config.settings.clone(), config.custom_server_routes)?;
314+
let server_fut = server::TelemetryServerFuture::new(
315+
config.settings.clone(),
316+
config.custom_server_routes,
317+
)?;
313318

314319
Ok(TelemetryDriver::new(server_fut, tele_futures))
315320
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
#[cfg(feature = "metrics")]
2+
use super::metrics;
3+
use super::settings::TelemetrySettings;
4+
use crate::telemetry::log;
5+
use crate::BootstrapResult;
6+
use anyhow::Context as _;
7+
use futures_util::future::FutureExt;
8+
use futures_util::{pin_mut, ready};
9+
use hyper_util::rt::TokioIo;
10+
use socket2::{Domain, SockAddr, Socket, Type};
11+
use std::convert::Infallible;
12+
use std::future::Future;
13+
use std::net::SocketAddr;
14+
use std::pin::Pin;
15+
use std::sync::Arc;
16+
use std::task::{Context, Poll};
17+
use tokio::net::TcpListener;
18+
use tokio::sync::watch;
19+
20+
mod router;
21+
22+
use router::Router;
23+
pub use router::{
24+
BoxError, TelemetryRouteHandler, TelemetryRouteHandlerFuture, TelemetryServerRoute,
25+
};
26+
27+
pub(super) struct TelemetryServerFuture {
28+
listener: TcpListener,
29+
router: Router,
30+
}
31+
32+
impl TelemetryServerFuture {
33+
pub(super) fn new(
34+
settings: TelemetrySettings,
35+
custom_routes: Vec<TelemetryServerRoute>,
36+
) -> BootstrapResult<Option<TelemetryServerFuture>> {
37+
if !settings.server.enabled {
38+
return Ok(None);
39+
}
40+
41+
let settings = Arc::new(settings);
42+
43+
// Eagerly init the memory profiler so it gets set up before syscalls are sandboxed with seccomp.
44+
#[cfg(all(target_os = "linux", feature = "memory-profiling"))]
45+
if settings.memory_profiler.enabled {
46+
memory_profiling::profiler(Arc::clone(&settings))
47+
.map_err(|err| anyhow::anyhow!(err))?;
48+
}
49+
50+
let addr = settings.server.addr;
51+
52+
#[cfg(feature = "settings")]
53+
let addr = SocketAddr::from(addr);
54+
55+
let router = Router::new(custom_routes, settings);
56+
57+
let listener = {
58+
let std_listener = std::net::TcpListener::from(
59+
bind_socket(addr).with_context(|| format!("binding to socket {addr:?}"))?,
60+
);
61+
62+
std_listener.set_nonblocking(true)?;
63+
64+
tokio::net::TcpListener::from_std(std_listener)?
65+
};
66+
67+
Ok(Some(TelemetryServerFuture { listener, router }))
68+
}
69+
pub(super) fn local_addr(&self) -> SocketAddr {
70+
self.listener.local_addr().unwrap()
71+
}
72+
73+
// Adapted from Hyper 0.14 Server stuff and axum::serve::serve.
74+
pub(super) async fn with_graceful_shutdown(
75+
self,
76+
shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
77+
) {
78+
let (signal_tx, signal_rx) = watch::channel(());
79+
let signal_tx = Arc::new(signal_tx);
80+
81+
tokio::spawn(async move {
82+
shutdown_signal.await;
83+
84+
drop(signal_rx);
85+
});
86+
87+
let (close_tx, close_rx) = watch::channel(());
88+
let listener = self.listener;
89+
90+
pin_mut!(listener);
91+
92+
loop {
93+
let socket = tokio::select! {
94+
conn = listener.accept() => match conn {
95+
Ok((conn, _)) => TokioIo::new(conn),
96+
Err(e) => {
97+
log::warn!("failed to accept connection"; "error" => e);
98+
99+
continue;
100+
}
101+
},
102+
_ = signal_tx.closed() => { break },
103+
};
104+
105+
let router = self.router.clone();
106+
let signal_tx = Arc::clone(&signal_tx);
107+
let close_rx = close_rx.clone();
108+
109+
tokio::spawn(async move {
110+
let conn = hyper::server::conn::http1::Builder::new()
111+
.serve_connection(socket, router)
112+
.with_upgrades();
113+
114+
let signal_closed = signal_tx.closed().fuse();
115+
116+
pin_mut!(conn);
117+
pin_mut!(signal_closed);
118+
119+
loop {
120+
tokio::select! {
121+
_ = conn.as_mut() => break,
122+
_ = &mut signal_closed => conn.as_mut().graceful_shutdown(),
123+
}
124+
}
125+
126+
drop(close_rx);
127+
});
128+
}
129+
130+
drop(close_rx);
131+
132+
close_tx.closed().await;
133+
}
134+
}
135+
136+
impl Future for TelemetryServerFuture {
137+
type Output = Infallible;
138+
139+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
140+
let this = &mut *self;
141+
142+
loop {
143+
let socket = match ready!(Pin::new(&mut this.listener).poll_accept(cx)) {
144+
Ok((conn, _)) => TokioIo::new(conn),
145+
Err(e) => {
146+
log::warn!("failed to accept connection"; "error" => e);
147+
148+
continue;
149+
}
150+
};
151+
152+
let router = this.router.clone();
153+
154+
tokio::spawn(
155+
hyper::server::conn::http1::Builder::new()
156+
// upgrades needed for websockets
157+
.serve_connection(socket, router)
158+
.with_upgrades(),
159+
);
160+
}
161+
}
162+
}
163+
164+
fn bind_socket(addr: SocketAddr) -> BootstrapResult<Socket> {
165+
let socket = Socket::new(
166+
if addr.is_ipv4() {
167+
Domain::IPV4
168+
} else {
169+
Domain::IPV6
170+
},
171+
Type::STREAM,
172+
None,
173+
)?;
174+
175+
socket.set_reuse_address(true)?;
176+
#[cfg(unix)]
177+
socket.set_reuse_port(true)?;
178+
socket.bind(&SockAddr::from(addr))?;
179+
socket.listen(1024)?;
180+
181+
Ok(socket)
182+
}
183+
184+
#[cfg(all(target_os = "linux", feature = "memory-profiling"))]
185+
mod memory_profiling {
186+
use super::*;
187+
use crate::telemetry::MemoryProfiler;
188+
use crate::Result;
189+
190+
pub(super) fn profiler(settings: Arc<TelemetrySettings>) -> Result<MemoryProfiler> {
191+
MemoryProfiler::get_or_init_with(&settings.memory_profiler)?.ok_or_else(|| {
192+
"profiling should be enabled via `_RJEM_MALLOC_CONF=prof:true` env var".into()
193+
})
194+
}
195+
196+
pub(super) async fn heap_profile(settings: Arc<TelemetrySettings>) -> Result<String> {
197+
profiler(settings)?.heap_profile().await
198+
}
199+
200+
pub(super) async fn heap_stats(settings: Arc<TelemetrySettings>) -> Result<String> {
201+
profiler(settings)?.heap_stats()
202+
}
203+
}

0 commit comments

Comments
 (0)