Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,7 @@ dependencies = [
"linkerd-app-core",
"linkerd-app-test",
"linkerd-distribute",
"linkerd-http-body-compat",
"linkerd-http-classify",
"linkerd-http-prom",
"linkerd-http-retry",
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ tokio-test = "0.4"
tower-test = "0.4"

linkerd-app-test = { path = "../test", features = ["client-policy"] }
linkerd-http-body-compat = { path = "../../http/body-compat" }
linkerd-http-prom = { path = "../../http/prom", features = ["test-util"] }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use http::Response;
use linkerd_app_core::{
metrics::prom::Counter,
proxy::http::Body,
svc::{self, http::BoxBody, Service, ServiceExt},
};

Expand All @@ -23,11 +23,14 @@ pub async fn send_assert_incremented(
};
assert_eq!(counter.get(), init);
send(tx);
if let Ok(mut rsp) = call.await {
if !rsp.body().is_end_stream() {
if let Ok(mut body) = call
.await
.map(Response::into_body)
.map(linkerd_http_body_compat::ForwardCompatibleBody::new)
{
if !body.is_end_stream() {
assert_eq!(counter.get(), 0);
while let Some(Ok(_)) = rsp.body_mut().data().await {}
let _ = rsp.body_mut().trailers().await;
while let Some(Ok(_)) = body.frame().await {}
}
}
assert_eq!(counter.get(), init + 1);
Expand Down
25 changes: 13 additions & 12 deletions linkerd/app/outbound/src/http/logical/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,25 +157,26 @@ async fn serve(
handle: &mut tower_test::mock::Handle<Request, Response>,
call: impl Future<Output = Result<Response>> + Send + 'static,
) {
let (mut req, tx) = handle
let (req, tx) = handle
.next_request()
.await
.expect("service must receive request");
tracing::debug!(?req, "Received request");

// Ensure the whole request is processed.
if !req.body().is_end_stream() {
while let Some(res) = req.body_mut().data().await {
res.expect("request body must not error");
}
if !req.body().is_end_stream() {
req.body_mut()
.trailers()
.await
.expect("request body must not error");
}
let (parts, mut body) = req
.map(linkerd_http_body_compat::ForwardCompatibleBody::new)
.into_parts();
if !body.is_end_stream() {
while body
.frame()
.await
.transpose()
.expect("request body must not error")
.is_some()
{}
}
drop(req);
drop((parts, body));

tokio::spawn(
async move {
Expand Down
46 changes: 24 additions & 22 deletions linkerd/app/outbound/src/http/logical/tests/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async fn grpc() {
);

info!("Verifying that we see the successful response");
let mut rsp = time::timeout(
let (parts, mut body) = time::timeout(
TIMEOUT * 4,
send_req(
svc.clone(),
Expand All @@ -155,17 +155,18 @@ async fn grpc() {
)
.await
.expect("response")
.expect("response");
assert_eq!(rsp.status(), StatusCode::OK);
while let Some(res) = rsp.body_mut().data().await {
res.expect("data");
}
let trailers = rsp
.body_mut()
.trailers()
.expect("response")
.map(linkerd_http_body_compat::ForwardCompatibleBody::new)
.into_parts();
assert_eq!(parts.status, StatusCode::OK);
let trailers = body
.frame()
.await
.expect("trailers")
.expect("trailers");
.expect("a result")
.expect("a frame")
.into_trailers()
.ok()
.expect("trailers frame");
assert_eq!(
trailers
.get("grpc-status")
Expand Down Expand Up @@ -214,7 +215,7 @@ async fn grpc_requires_allow() {
);

info!("Verifying that we see the successful response");
let mut rsp = time::timeout(
let (parts, mut body) = time::timeout(
TIMEOUT * 4,
send_req(
svc.clone(),
Expand All @@ -230,17 +231,18 @@ async fn grpc_requires_allow() {
)
.await
.expect("response")
.expect("response");
assert_eq!(rsp.status(), StatusCode::OK);
while let Some(res) = rsp.body_mut().data().await {
res.expect("data");
}
let trailers = rsp
.body_mut()
.trailers()
.expect("response")
.map(linkerd_http_body_compat::ForwardCompatibleBody::new)
.into_parts();
assert_eq!(parts.status, StatusCode::OK);
let trailers = body
.frame()
.await
.expect("trailers")
.expect("trailers");
.expect("a result")
.expect("a frame")
.into_trailers()
.ok()
.expect("trailers frame");
assert_eq!(
trailers
.get("grpc-status")
Expand Down
50 changes: 35 additions & 15 deletions linkerd/app/outbound/src/http/logical/tests/retries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use linkerd_app_core::{
errors,
proxy::http::{self, Body, StatusCode},
proxy::http::{self, StatusCode},
svc::http::stream_timeouts::StreamDeadlineError,
trace,
};
Expand Down Expand Up @@ -340,7 +340,7 @@ async fn grpc_internal() {
);

info!("Verifying that we see the successful response");
let mut rsp = time::timeout(
let (parts, mut body) = time::timeout(
TIMEOUT * 10,
send_req(
svc.clone(),
Expand All @@ -351,14 +351,24 @@ async fn grpc_internal() {
)
.await
.expect("response")
.expect("response ok");
assert_eq!(rsp.status(), StatusCode::OK);
while rsp.body_mut().data().await.is_some() {}
let trailers = rsp.body_mut().trailers().await;
.expect("response ok")
.map(linkerd_http_body_compat::ForwardCompatibleBody::new)
.into_parts();
assert_eq!(parts.status, StatusCode::OK);
let trailers = loop {
match body.frame().await {
Some(Ok(frame)) => {
if let Ok(trailers) = frame.into_trailers() {
break trailers;
} else {
continue;
}
}
None | Some(Err(_)) => panic!("body did not yield trailers"),
}
};
assert_eq!(
trailers
.expect("trailers")
.expect("trailers")
.get("grpc-status")
.expect("grpc-status")
.to_str()
Expand Down Expand Up @@ -405,7 +415,7 @@ async fn grpc_timeout() {
);

info!("Verifying that we see the successful response");
let mut rsp = time::timeout(
let (parts, mut body) = time::timeout(
TIMEOUT * 10,
send_req(
svc.clone(),
Expand All @@ -416,14 +426,24 @@ async fn grpc_timeout() {
)
.await
.expect("response")
.expect("response ok");
assert_eq!(rsp.status(), StatusCode::OK);
while rsp.body_mut().data().await.is_some() {}
let trailers = rsp.body_mut().trailers().await;
.expect("response ok")
.map(linkerd_http_body_compat::ForwardCompatibleBody::new)
.into_parts();
assert_eq!(parts.status, StatusCode::OK);
let trailers = loop {
match body.frame().await {
Some(Ok(frame)) => {
if let Ok(trailers) = frame.into_trailers() {
break trailers;
} else {
continue;
}
}
None | Some(Err(_)) => panic!("body did not yield trailers"),
}
};
assert_eq!(
trailers
.expect("trailers")
.expect("trailers")
.get("grpc-status")
.expect("grpc-status")
.to_str()
Expand Down
Loading