|
1 | 1 | use super::{ClassifyEos, ClassifyResponse}; |
2 | 2 | use futures::{prelude::*, ready}; |
| 3 | +use http_body::Frame; |
3 | 4 | use linkerd_error::Error; |
4 | 5 | use linkerd_stack::{layer, ExtractParam, NewService, Service}; |
5 | 6 | use pin_project::{pin_project, pinned_drop}; |
@@ -215,40 +216,34 @@ where |
215 | 216 | type Data = B::Data; |
216 | 217 | type Error = B::Error; |
217 | 218 |
|
218 | | - fn poll_data( |
| 219 | + fn poll_frame( |
219 | 220 | self: Pin<&mut Self>, |
220 | 221 | cx: &mut Context<'_>, |
221 | | - ) -> Poll<Option<Result<Self::Data, Self::Error>>> { |
| 222 | + ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { |
222 | 223 | let this = self.project(); |
223 | | - match ready!(this.inner.poll_data(cx)) { |
224 | | - None => Poll::Ready(None), |
225 | | - Some(Ok(data)) => Poll::Ready(Some(Ok(data))), |
226 | | - Some(Err(e)) => { |
| 224 | + match ready!(this.inner.poll_frame(cx)) { |
| 225 | + None => { |
| 226 | + // Classify the stream if it has reached a `None`. |
227 | 227 | if let Some(State { classify, tx }) = this.state.take() { |
228 | | - let _ = tx.try_send(classify.error(&e)); |
| 228 | + let _ = tx.try_send(classify.eos(None)); |
229 | 229 | } |
230 | | - Poll::Ready(Some(Err(e))) |
| 230 | + Poll::Ready(None) |
231 | 231 | } |
232 | | - } |
233 | | - } |
234 | | - |
235 | | - fn poll_trailers( |
236 | | - self: Pin<&mut Self>, |
237 | | - cx: &mut Context<'_>, |
238 | | - ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { |
239 | | - let this = self.project(); |
240 | | - match ready!(this.inner.poll_trailers(cx)) { |
241 | | - Ok(trls) => { |
242 | | - if let Some(State { classify, tx }) = this.state.take() { |
243 | | - let _ = tx.try_send(classify.eos(trls.as_ref())); |
| 232 | + Some(Ok(data)) => { |
| 233 | + // Classify the stream if this is a trailers frame. |
| 234 | + if let trls @ Some(_) = data.trailers_ref() { |
| 235 | + if let Some(State { classify, tx }) = this.state.take() { |
| 236 | + let _ = tx.try_send(classify.eos(trls)); |
| 237 | + } |
244 | 238 | } |
245 | | - Poll::Ready(Ok(trls)) |
| 239 | + Poll::Ready(Some(Ok(data))) |
246 | 240 | } |
247 | | - Err(e) => { |
| 241 | + Some(Err(e)) => { |
| 242 | + // Classify the stream if an error has been encountered. |
248 | 243 | if let Some(State { classify, tx }) = this.state.take() { |
249 | 244 | let _ = tx.try_send(classify.error(&e)); |
250 | 245 | } |
251 | | - Poll::Ready(Err(e)) |
| 246 | + Poll::Ready(Some(Err(e))) |
252 | 247 | } |
253 | 248 | } |
254 | 249 | } |
|
0 commit comments