Skip to content

Commit f5b4f6b

Browse files
authored
refactor(app/integration): remove artificial Sync bounds (#3700)
see linkerd/linkerd2#8733 for more information. we are in the process of upgrading to hyper 1.x. in the process of doing so, we will wish to use our friendly `BoxBody` type, which provides a convenient and reusable interface to abstract over different artitrary `B`-typed request and response bodies. unfortunately, by virtue of its definition, it is not a `Sync` type: ```rust pub struct BoxBody { inner: Pin<Box<dyn Body<Data = Data, Error = Error> + Send + 'static>>, } #[pin_project] pub struct Data { #[pin] inner: Box<dyn bytes::Buf + Send + 'static>, } ``` these are erased `Box<dyn ..>` objects that only ensure `Send`-ness. rather than changing that, because that is the proper definition of the type, we should update code in our test client and test server to stop requesting arbitrary `Sync` bounds. this commit removes `Sync` bounds from various places that in fact only need be `Send + 'static`. this will help facilitate making use of `BoxBody` in #3504. Signed-off-by: katelyn martin <[email protected]>
1 parent dacf858 commit f5b4f6b

File tree

4 files changed

+24
-16
lines changed

4 files changed

+24
-16
lines changed

linkerd/app/integration/src/client.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,7 @@ impl Client {
131131
pub fn request(
132132
&self,
133133
builder: http::request::Builder,
134-
) -> impl Future<Output = Result<Response<hyper::Body>, ClientError>> + Send + Sync + 'static
135-
{
134+
) -> impl Future<Output = Result<Response<hyper::Body>, ClientError>> + Send + 'static {
136135
self.send_req(builder.body(Bytes::new().into()).unwrap())
137136
}
138137

@@ -156,8 +155,7 @@ impl Client {
156155
pub(crate) fn send_req(
157156
&self,
158157
mut req: Request<hyper::Body>,
159-
) -> impl Future<Output = Result<Response<hyper::Body>, ClientError>> + Send + Sync + 'static
160-
{
158+
) -> impl Future<Output = Result<Response<hyper::Body>, ClientError>> + Send + 'static {
161159
if req.uri().scheme().is_none() {
162160
if self.tls.is_some() {
163161
*req.uri_mut() = format!("https://{}{}", self.authority, req.uri().path())
@@ -172,7 +170,7 @@ impl Client {
172170
tracing::debug!(headers = ?req.headers(), "request");
173171
let (tx, rx) = oneshot::channel();
174172
let _ = self.tx.send((req.map(Into::into), tx));
175-
async { rx.await.expect("request cancelled") }.in_current_span()
173+
async move { rx.await.expect("request cancelled") }.in_current_span()
176174
}
177175

178176
pub async fn wait_for_closed(self) {
@@ -221,7 +219,7 @@ enum Run {
221219
Http2,
222220
}
223221

224-
pub type Running = Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;
222+
pub type Running = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
225223

226224
fn run(
227225
addr: SocketAddr,

linkerd/app/integration/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ impl fmt::Display for HumanDuration {
256256

257257
pub async fn cancelable<E: Send + 'static>(
258258
drain: drain::Watch,
259-
f: impl Future<Output = Result<(), E>> + Send + 'static,
259+
f: impl Future<Output = Result<(), E>>,
260260
) -> Result<(), E> {
261261
tokio::select! {
262262
res = f => res,

linkerd/app/integration/src/server.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct Listening {
5959
}
6060

6161
type RspFuture =
62-
Pin<Box<dyn Future<Output = Result<Response<hyper::Body>, Error>> + Send + Sync + 'static>>;
62+
Pin<Box<dyn Future<Output = Result<Response<hyper::Body>, Error>> + Send + 'static>>;
6363

6464
impl Listening {
6565
pub fn connections(&self) -> usize {
@@ -128,7 +128,7 @@ impl Server {
128128
pub fn route_async<F, U>(mut self, path: &str, cb: F) -> Self
129129
where
130130
F: Fn(Request<hyper::Body>) -> U + Send + Sync + 'static,
131-
U: TryFuture<Ok = Response<hyper::Body>> + Send + Sync + 'static,
131+
U: TryFuture<Ok = Response<hyper::Body>> + Send + 'static,
132132
U::Error: Into<Error> + Send + 'static,
133133
{
134134
let func = move |req| Box::pin(cb(req).map_err(Into::into)) as RspFuture;
@@ -219,9 +219,17 @@ impl Server {
219219
tracing::trace!(?result, "serve done");
220220
result
221221
};
222-
tokio::spawn(
223-
cancelable(drain.clone(), f).instrument(span.clone().or_current()),
224-
);
222+
// let fut = Box::pin(cancelable(drain.clone(), f).instrument(span.clone().or_current()))
223+
let drain = drain.clone();
224+
tokio::spawn(async move {
225+
tokio::select! {
226+
res = f => res,
227+
_ = drain.signaled() => {
228+
tracing::debug!("canceled!");
229+
Ok(())
230+
}
231+
}
232+
});
225233
}
226234
}
227235
.instrument(

linkerd/app/integration/src/tests/profiles.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,15 +293,16 @@ mod cross_version {
293293
.method("POST")
294294
.body(body)
295295
.unwrap();
296-
let res = tokio::spawn(async move { client.request_body(req).await });
296+
let fut = client.send_req(req);
297+
let res = tokio::spawn(fut);
297298
tx.send_data(Bytes::from_static(b"hello"))
298299
.await
299300
.expect("the whole body should be read");
300301
tx.send_data(Bytes::from_static(b"world"))
301302
.await
302303
.expect("the whole body should be read");
303304
drop(tx);
304-
let res = res.await.unwrap();
305+
let res = res.await.unwrap().unwrap();
305306
assert_eq!(res.status(), 200);
306307
}
307308

@@ -392,7 +393,8 @@ mod cross_version {
392393
.method("POST")
393394
.body(body)
394395
.unwrap();
395-
let res = tokio::spawn(async move { client.request_body(req).await });
396+
let fut = client.send_req(req);
397+
let res = tokio::spawn(fut);
396398
// send a 32k chunk
397399
tx.send_data(Bytes::from(&[1u8; 32 * 1024][..]))
398400
.await
@@ -406,7 +408,7 @@ mod cross_version {
406408
.await
407409
.expect("the whole body should be read");
408410
drop(tx);
409-
let res = res.await.unwrap();
411+
let res = res.await.unwrap().unwrap();
410412

411413
assert_eq!(res.status(), 533);
412414
}

0 commit comments

Comments
 (0)