Skip to content

Commit f68339e

Browse files
committed
chore(deps): address hyper deprecations in policy controller
NB: this branch is based upon #13492. see #8733 for more information about migrating to hyper 1.0. this enables the `backports` and `deprecated` feature flags in the hyper dependencies in this project, and addresses warnings. see <https://hyper.rs/guides/1/upgrading/> for more information about these feature flags. largely, the control plane is unaffected by this upgrade, besides the following changes: * one usage of a deprecated `hyper::body::aggregate` function is updated. * a `hyper::rt::Executor<E>` implementation, which spawns tasks onto the tokio runtime, is provided. once we upgrade to hyper 1.0, we can replace this with the executor provided in [`hyper-util`](https://docs.rs/hyper-util/latest/hyper_util/rt/tokio/struct.TokioExecutor.html#impl-Executor%3CFut%3E-for-TokioExecutor). * the `hyper::service::Service<hyper::Request<tonic::body::BoxBody>>` implementation for `GrpcHttp` now boxes its returned future, on account of `SendRequest` returning an anonymous `impl Future<Output = ...>`. * the `policy-test` additionally depends on the `runtime` feature of hyper. this is an artifact of an internal config structure shared by the legacy connection builder and the backported connection builder containing two keep-alive fields that were feature gated prior to 1.0. Signed-off-by: katelyn martin <[email protected]>
1 parent 9385c66 commit f68339e

File tree

8 files changed

+38
-10
lines changed

8 files changed

+38
-10
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,6 +1415,7 @@ dependencies = [
14151415
"clap",
14161416
"drain",
14171417
"futures",
1418+
"http-body",
14181419
"hyper",
14191420
"ipnet",
14201421
"k8s-openapi",

policy-controller/grpc/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ async-trait = "0.1"
1111
http = "0.2"
1212
drain = "0.1"
1313
futures = { version = "0.3", default-features = false }
14-
hyper = { version = "0.14", features = ["http2", "server", "tcp"] }
14+
hyper = { version = "0.14", features = ["backports", "deprecated", "http2", "server", "tcp"] }
1515
linkerd-policy-controller-core = { path = "../core" }
1616
maplit = "1"
1717
prost-types = "0.12.6"

policy-controller/runtime/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ async-trait = "0.1"
1919
drain = "0.1"
2020
futures = { version = "0.3", default-features = false }
2121
k8s-openapi = { workspace = true }
22-
hyper = { version = "0.14", features = ["http1", "http2", "runtime", "server"] }
22+
http-body = "0.4"
23+
hyper = { version = "0.14", features = ["backports", "deprecated", "http1", "http2", "runtime", "server"] }
2324
ipnet = { version = "2", default-features = false }
2425
openssl = { version = "0.10.68", optional = true }
2526
parking_lot = "0.12"

policy-controller/runtime/src/admission.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ impl hyper::service::Service<Request<Body>> for Admission {
7373

7474
let admission = self.clone();
7575
Box::pin(async move {
76-
let bytes = hyper::body::aggregate(req.into_body()).await?;
76+
use http_body::Body as _;
77+
let bytes = req.into_body().collect().await?.aggregate();
7778
let review: Review = match serde_json::from_reader(bytes.reader()) {
7879
Ok(review) => review,
7980
Err(error) => {

policy-test/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ publish = false
77

88
[dependencies]
99
anyhow = "1"
10-
hyper = { version = "0.14", features = ["client", "http2"] }
10+
hyper = { version = "0.14", features = ["backports", "deprecated", "client", "http2", "runtime"] }
1111
futures = { version = "0.3", default-features = false }
1212
ipnet = "2"
1313
k8s-gateway-api = "0.16"

policy-test/src/grpc.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,17 @@
44
//! forwarding to connect to a running instance.
55
66
use anyhow::Result;
7-
pub use linkerd2_proxy_api::*;
87
use linkerd2_proxy_api::{
98
inbound::inbound_server_policies_client::InboundServerPoliciesClient,
109
outbound::outbound_policies_client::OutboundPoliciesClient,
1110
};
1211
use linkerd_policy_controller_grpc::workload;
1312
use linkerd_policy_controller_k8s_api::{self as k8s, ResourceExt};
13+
use std::{future::Future, pin::Pin};
1414
use tokio::io;
1515

16+
pub use linkerd2_proxy_api::*;
17+
1618
#[macro_export]
1719
macro_rules! assert_is_default_all_unauthenticated {
1820
($config:expr) => {
@@ -105,7 +107,7 @@ pub struct OutboundPolicyClient {
105107

106108
#[derive(Debug)]
107109
struct GrpcHttp {
108-
tx: hyper::client::conn::SendRequest<tonic::body::BoxBody>,
110+
tx: hyper::client::conn::http2::SendRequest<tonic::body::BoxBody>,
109111
}
110112

111113
async fn get_policy_controller_pod(client: &kube::Client) -> Result<String> {
@@ -338,8 +340,7 @@ impl GrpcHttp {
338340
where
339341
I: io::AsyncRead + io::AsyncWrite + Unpin + Send + 'static,
340342
{
341-
let (tx, conn) = hyper::client::conn::Builder::new()
342-
.http2_only(true)
343+
let (tx, conn) = hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor)
343344
.handshake(io)
344345
.await?;
345346
tokio::spawn(conn);
@@ -350,7 +351,7 @@ impl GrpcHttp {
350351
impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp {
351352
type Response = hyper::Response<hyper::Body>;
352353
type Error = hyper::Error;
353-
type Future = hyper::client::conn::ResponseFuture;
354+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
354355

355356
fn poll_ready(
356357
&mut self,
@@ -360,6 +361,8 @@ impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp
360361
}
361362

362363
fn call(&mut self, req: hyper::Request<tonic::body::BoxBody>) -> Self::Future {
364+
use futures::FutureExt;
365+
363366
let (mut parts, body) = req.into_parts();
364367

365368
let mut uri = parts.uri.into_parts();
@@ -371,7 +374,9 @@ impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp
371374
);
372375
parts.uri = hyper::Uri::from_parts(uri).unwrap();
373376

374-
self.tx.call(hyper::Request::from_parts(parts, body))
377+
self.tx
378+
.send_request(hyper::Request::from_parts(parts, body))
379+
.boxed()
375380
}
376381
}
377382

policy-test/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pub mod grpc;
88
pub mod outbound_api;
99
pub mod web;
1010

11+
mod rt;
12+
1113
use kube::runtime::wait::Condition;
1214
use linkerd_policy_controller_k8s_api::{
1315
self as k8s,

policy-test/src/rt.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
//! HTTP runtime components for Linkerd.
2+
3+
use hyper::rt::Executor;
4+
use std::future::Future;
5+
6+
#[derive(Clone, Debug, Default)]
7+
pub struct TokioExecutor;
8+
9+
impl<F> Executor<F> for TokioExecutor
10+
where
11+
F: Future + Send + 'static,
12+
F::Output: Send + 'static,
13+
{
14+
#[inline]
15+
fn execute(&self, f: F) {
16+
tokio::spawn(f);
17+
}
18+
}

0 commit comments

Comments
 (0)