Skip to content

Commit 60ab0cf

Browse files
committed
proxy http work in progress
1 parent b03bf57 commit 60ab0cf

File tree

10 files changed

+101
-55
lines changed

10 files changed

+101
-55
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,6 +1995,7 @@ dependencies = [
19951995
"async-trait",
19961996
"bytes",
19971997
"futures",
1998+
"hyper-util",
19981999
"linkerd-errno",
19992000
"pin-project",
20002001
"tokio",
@@ -2327,9 +2328,11 @@ dependencies = [
23272328
"h2",
23282329
"http 1.2.0",
23292330
"http-body",
2331+
"http-body-util",
23302332
"httparse",
23312333
"hyper",
23322334
"hyper-balance",
2335+
"hyper-util",
23332336
"linkerd-detect",
23342337
"linkerd-duplex",
23352338
"linkerd-error",

linkerd/io/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ default = []
1616
async-trait = "0.1"
1717
futures = { version = "0.3", default-features = false }
1818
bytes = { workspace = true }
19+
hyper-util = { workspace = true, features = ["tokio"] }
1920
linkerd-errno = { path = "../errno" }
2021
tokio = { version = "1", features = ["io-util", "net"] }
2122
tokio-test = { version = "0.4", optional = true }

linkerd/io/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,10 @@ impl PeerAddr for tokio::io::DuplexStream {
7676
Ok(([0, 0, 0, 0], 0).into())
7777
}
7878
}
79+
80+
impl<T: PeerAddr> PeerAddr for hyper_util::rt::tokio::TokioIo<T> {
81+
#[inline]
82+
fn peer_addr(&self) -> Result<SocketAddr> {
83+
self.inner().peer_addr()
84+
}
85+
}

linkerd/proxy/http/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ hyper = { workspace = true, features = [
2727
"server",
2828
] }
2929
hyper-balance = { path = "../../../hyper-balance" }
30+
hyper-util = { workspace = true, default-features = false, features = [
31+
"client",
32+
"client-legacy",
33+
"http1",
34+
"service",
35+
] }
3036
parking_lot = "0.12"
3137
pin-project = "1"
3238
rand = "0.9"
@@ -54,6 +60,7 @@ linkerd-proxy-balance = { path = "../balance" }
5460
linkerd-stack = { path = "../../stack" }
5561

5662
[dev-dependencies]
63+
http-body-util = { workspace = true, features = ["channel"] }
5764
tokio-test = "0.4"
5865
tower-test = "0.4"
5966
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }

linkerd/proxy/http/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ where
6363
C::Connection: Unpin + Send,
6464
C::Metadata: Send,
6565
C::Future: Unpin + Send + 'static,
66-
B: crate::Body + Send + 'static,
66+
B: crate::Body + Send + Unpin + 'static,
6767
B::Data: Send,
6868
B::Error: Into<Error> + Send + Sync,
6969
{
@@ -123,7 +123,7 @@ where
123123
C::Connection: Unpin + Send,
124124
C::Future: Unpin + Send + 'static,
125125
C::Error: Into<Error>,
126-
B: crate::Body + Send + 'static,
126+
B: crate::Body + Send + Unpin + 'static,
127127
B::Data: Send,
128128
B::Error: Into<Error> + Send + Sync,
129129
{

linkerd/proxy/http/src/h1.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ pub struct PoolSettings {
3333
pub struct Client<C, T, B> {
3434
connect: C,
3535
target: T,
36-
absolute_form: Option<hyper::Client<HyperConnect<C, T>, B>>,
37-
origin_form: Option<hyper::Client<HyperConnect<C, T>, B>>,
36+
absolute_form: Option<hyper_util::client::legacy::Client<HyperConnect<C, T>, B>>,
37+
origin_form: Option<hyper_util::client::legacy::Client<HyperConnect<C, T>, B>>,
3838
pool: PoolSettings,
3939
}
4040

@@ -68,9 +68,9 @@ impl<C, T, B> Client<C, T, B>
6868
where
6969
T: Clone + Send + Sync + 'static,
7070
C: MakeConnection<(crate::Variant, T)> + Clone + Send + Sync + 'static,
71-
C::Connection: Unpin + Send,
71+
C::Connection: Unpin + Send + hyper::rt::Read + hyper::rt::Write,
7272
C::Future: Unpin + Send + 'static,
73-
B: crate::Body + Send + 'static,
73+
B: crate::Body + Send + Unpin + 'static,
7474
B::Data: Send,
7575
B::Error: Into<Error> + Send + Sync,
7676
{
@@ -94,10 +94,9 @@ where
9494
// ish, so we just build a one-off client for the connection.
9595
// There's no real reason to hold the client for re-use.
9696
debug!(use_absolute_form, is_missing_host, "Using one-off client");
97-
hyper::Client::builder()
97+
hyper_util::client::legacy::Client::builder(TracingExecutor)
9898
.pool_max_idle_per_host(0)
9999
.set_host(use_absolute_form)
100-
.executor(TracingExecutor)
101100
.build(HyperConnect::new(
102101
self.connect.clone(),
103102
self.target.clone(),
@@ -120,11 +119,10 @@ where
120119
if client.is_none() {
121120
debug!(use_absolute_form, "Caching new client");
122121
*client = Some(
123-
hyper::Client::builder()
122+
hyper_util::client::legacy::Client::builder(TracingExecutor)
124123
.pool_max_idle_per_host(self.pool.max_idle)
125124
.pool_idle_timeout(self.pool.idle_timeout)
126125
.set_host(use_absolute_form)
127-
.executor(TracingExecutor)
128126
.build(HyperConnect::new(
129127
self.connect.clone(),
130128
self.target.clone(),

linkerd/proxy/http/src/h2.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ where
5555
C::Connection: Send + Unpin + 'static,
5656
C::Metadata: Send,
5757
C::Future: Send + 'static,
58-
B: Body + Send + 'static,
58+
B: Body + Send + Unpin + 'static,
5959
B::Data: Send,
6060
B::Error: Into<Error> + Send + Sync,
6161
{
@@ -147,7 +147,7 @@ where
147147
B::Data: Send,
148148
B::Error: Into<Error> + Send + Sync,
149149
{
150-
type Response = http::Response<hyper::Body>;
150+
type Response = http::Response<hyper::body::Incoming>;
151151
type Error = hyper::Error;
152152
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Response, Self::Error>>>>;
153153

linkerd/proxy/http/src/orig_proto.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::{h1, h2, Body};
22
use futures::prelude::*;
33
use http::header::{HeaderValue, TRANSFER_ENCODING};
4+
use http_body::Frame;
45
use linkerd_error::{Error, Result};
56
use linkerd_http_box::BoxBody;
67
use linkerd_stack::{layer, MakeConnection, Service};
@@ -56,7 +57,7 @@ where
5657
C: MakeConnection<(crate::Variant, T)> + Clone + Send + Sync + 'static,
5758
C::Connection: Unpin + Send,
5859
C::Future: Unpin + Send + 'static,
59-
B: crate::Body + Send + 'static,
60+
B: crate::Body + Send + Unpin + 'static,
6061
B::Data: Send,
6162
B::Error: Into<Error> + Send + Sync,
6263
{
@@ -211,23 +212,13 @@ where
211212
self.inner.is_end_stream()
212213
}
213214

214-
fn poll_data(
215+
fn poll_frame(
215216
self: Pin<&mut Self>,
216217
cx: &mut Context<'_>,
217-
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
218+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
218219
self.project()
219220
.inner
220-
.poll_data(cx)
221-
.map_err(downgrade_h2_error)
222-
}
223-
224-
fn poll_trailers(
225-
self: Pin<&mut Self>,
226-
cx: &mut Context<'_>,
227-
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
228-
self.project()
229-
.inner
230-
.poll_trailers(cx)
221+
.poll_frame(cx)
231222
.map_err(downgrade_h2_error)
232223
}
233224

linkerd/proxy/http/src/server.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use crate::{
2-
client_handle::SetClientHandle, h2, BoxBody, BoxRequest, ClientHandle, TracingExecutor, Variant,
3-
};
1+
use crate::{client_handle::SetClientHandle, h2, BoxBody, ClientHandle, TracingExecutor, Variant};
42
use linkerd_error::Error;
5-
use linkerd_io::{self as io, PeerAddr};
3+
use linkerd_http_upgrade::glue::UpgradeBody;
4+
use linkerd_io::PeerAddr;
65
use linkerd_stack::{layer, ExtractParam, NewService};
76
use std::{
87
future::Future,
@@ -126,13 +125,22 @@ where
126125

127126
impl<I, N, S> Service<I> for ServeHttp<N>
128127
where
129-
I: io::AsyncRead + io::AsyncWrite + PeerAddr + Send + Unpin + 'static,
128+
I: hyper::rt::Read + hyper::rt::Write + PeerAddr + Send + Unpin + 'static,
130129
N: NewService<ClientHandle, Service = S> + Send + 'static,
131-
S: Service<http::Request<BoxBody>, Response = http::Response<BoxBody>, Error = Error>
132-
+ Unpin
130+
S: Service<
131+
http::Request<hyper::body::Incoming>,
132+
Response = http::Response<BoxBody>,
133+
Error = Error,
134+
> + Service<
135+
http::Request<UpgradeBody<hyper::body::Incoming>>,
136+
Response = http::Response<BoxBody>,
137+
Error = Error,
138+
> + Clone
133139
+ Send
140+
+ Unpin
134141
+ 'static,
135-
S::Future: Send + 'static,
142+
<S as Service<http::Request<hyper::body::Incoming>>>::Future: Send + 'static,
143+
<S as Service<http::Request<UpgradeBody<hyper::body::Incoming>>>>::Future: Send + 'static,
136144
{
137145
type Response = ();
138146
type Error = Error;
@@ -162,10 +170,8 @@ where
162170
match version {
163171
Variant::Http1 => {
164172
// Enable support for HTTP upgrades (CONNECT and websockets).
165-
let svc = linkerd_http_upgrade::upgrade::Service::new(
166-
BoxRequest::new(svc),
167-
drain.clone(),
168-
);
173+
let svc = linkerd_http_upgrade::upgrade::Service::new(svc, drain.clone());
174+
let svc = hyper_util::service::TowerToHyperService::new(svc);
169175
let mut conn = http1.serve_connection(io, svc).with_upgrades();
170176

171177
tokio::select! {
@@ -187,7 +193,8 @@ where
187193
}
188194

189195
Variant::H2 => {
190-
let mut conn = http2.serve_connection(io, BoxRequest::new(svc));
196+
let svc = hyper_util::service::TowerToHyperService::new(svc);
197+
let mut conn = http2.serve_connection(io, svc);
191198

192199
tokio::select! {
193200
res = &mut conn => {

0 commit comments

Comments
 (0)