Skip to content

Commit 834942d

Browse files
authored
opencensus: drive rsp future to establish conn (#588)
In the `futures` 0.1 version of the OpenCensus module, the `tower-grpc` response future for the client-streaming RPC was never polled, because we did not need the server response for that RPC. At the time, this was fine, as `tower-grpc` still established the connection and ensured messages were sent to the server. However, after updating to `tonic`, this was no longer the case --- the response future is now responsible for establishing the connection. This means spans are no longer sent to the OpenCensus collector. Due to an issue where the cloud integration tests were silently failing to run, we missed this regression. This PR fixes the regression by ensuring that we drive the response future. I'd like to rewrite this code to use async-await rather than the manual state machine, but this change fixes the regression, so I thought I'd go ahead and open a PR, and refactor in a separate branch. Signed-off-by: Eliza Weisman <[email protected]>
1 parent 67088e5 commit 834942d

File tree

1 file changed

+41
-23
lines changed

1 file changed

+41
-23
lines changed

linkerd/opencensus/src/lib.rs

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#![deny(warnings, rust_2018_idioms)]
2-
use futures::{ready, Stream};
2+
use futures::{ready, FutureExt, Stream};
33
use http_body::Body as HttpBody;
44
use linkerd2_error::Error;
55
use linkerd2_stack::NewService;
@@ -45,16 +45,17 @@ enum State {
4545
// Node data should only be sent on the first message of a streaming
4646
// request.
4747
node: Option<Node>,
48-
// We hold the response future, but never poll it.
49-
_rsp: Pin<
50-
Box<
51-
dyn Future<
52-
Output = Result<
53-
grpc::Response<Streaming<ExportTraceServiceResponse>>,
54-
grpc::Status,
55-
>,
56-
> + Send
57-
+ 'static,
48+
rsp: Option<
49+
Pin<
50+
Box<
51+
dyn Future<
52+
Output = Result<
53+
grpc::Response<Streaming<ExportTraceServiceResponse>>,
54+
grpc::Status,
55+
>,
56+
> + Send
57+
+ 'static,
58+
>,
5859
>,
5960
>,
6061
metrics: Registry,
@@ -177,30 +178,47 @@ where
177178
let req = grpc::Request::new(request_rx);
178179
trace!("Establishing new TraceService::export request");
179180
this.metrics.start_stream();
180-
let _rsp = Box::pin(async move { svc.export(req).await });
181+
let rsp = Box::pin(async move { svc.export(req).await });
181182
State::Sending {
182183
sender: request_tx,
183184
node: Some(this.node.clone()),
184-
_rsp,
185+
rsp: Some(rsp),
185186
metrics: this.metrics.clone(),
186187
}
187188
}
188189
State::Sending {
189190
ref mut sender,
190191
ref mut node,
191192
ref mut metrics,
193+
ref mut rsp,
192194
..
193195
} => {
194-
match ready!(Self::poll_send_spans(
195-
this.spans,
196-
sender,
197-
node,
198-
*this.max_batch_size,
199-
metrics,
200-
cx,
201-
)) {
202-
Ok(()) => return Poll::Ready(()),
203-
Err(()) => State::Idle,
196+
let mut idle = false;
197+
if let Some(mut f) = rsp.take() {
198+
match f.poll_unpin(cx) {
199+
Poll::Ready(Ok(_)) => {}
200+
Poll::Pending => *rsp = Some(f),
201+
Poll::Ready(Err(error)) => {
202+
tracing::debug!(%error, "response future failed, sending a new request");
203+
idle = true;
204+
}
205+
}
206+
}
207+
// this is gross...let's just see if it works.
208+
if idle {
209+
State::Idle
210+
} else {
211+
match ready!(Self::poll_send_spans(
212+
this.spans,
213+
sender,
214+
node,
215+
*this.max_batch_size,
216+
metrics,
217+
cx,
218+
)) {
219+
Ok(()) => return Poll::Ready(()),
220+
Err(()) => State::Idle,
221+
}
204222
}
205223
}
206224
};

0 commit comments

Comments
 (0)