@@ -116,15 +116,8 @@ where
116116 Ok ( rsp) => {
117117 // Tap the response headers and use the response
118118 // body taps to decorate the response body.
119- let taps = rsp_taps. drain ( ..) . map ( |t| t. tap ( & rsp) ) . collect ( ) ;
120- let rsp = rsp. map ( move |inner| {
121- use linkerd_proxy_http:: Body as _;
122- let mut body = Body { inner, taps } ;
123- if body. is_end_stream ( ) {
124- eos ( & mut body. taps , None ) ;
125- }
126- body
127- } ) ;
119+ let taps = rsp_taps. drain ( ..) . map ( |t| t. tap ( & rsp) ) . collect :: < Vec < _ > > ( ) ;
120+ let rsp = rsp. map ( |inner| Body :: new ( inner, taps) ) ;
128121 Ok ( rsp)
129122 }
130123 Err ( e) => {
@@ -140,6 +133,22 @@ where
140133
141134// === Body ===
142135
136+ impl < B , T > Body < B , T >
137+ where
138+ B : linkerd_proxy_http:: Body ,
139+ B :: Error : HasH2Reason ,
140+ T : TapPayload ,
141+ {
142+ fn new ( inner : B , mut taps : Vec < T > ) -> Self {
143+ // If the body is already finished, record the end of the stream.
144+ if inner. is_end_stream ( ) {
145+ taps. drain ( ..) . for_each ( |t| t. eos ( None ) ) ;
146+ }
147+
148+ Self { inner, taps }
149+ }
150+ }
151+
143152// `T` need not implement Default.
144153impl < B , T > Default for Body < B , T >
145154where
@@ -169,71 +178,44 @@ where
169178 self . inner . is_end_stream ( )
170179 }
171180
172- fn poll_data (
173- mut self : Pin < & mut Self > ,
181+ fn poll_frame (
182+ self : Pin < & mut Self > ,
174183 cx : & mut Context < ' _ > ,
175- ) -> Poll < Option < Result < Self :: Data , B :: Error > > > {
176- let frame = ready ! ( self . as_mut ( ) . project( ) . inner . poll_data ( cx ) ) ;
177- match frame {
178- Some ( Err ( e ) ) => {
179- let e = self . as_mut ( ) . project ( ) . err ( e ) ;
180- Poll :: Ready ( Some ( Err ( e ) ) )
181- }
182- Some ( Ok ( body ) ) => {
183- self . as_mut ( ) . project ( ) . data ( Some ( & body ) ) ;
184- Poll :: Ready ( Some ( Ok ( body ) ) )
184+ ) -> Poll < Option < Result < http_body :: Frame < Self :: Data > , Self :: Error > > > {
185+ let BodyProj { mut inner , taps } = self . project ( ) ;
186+
187+ // Poll the inner body for the next frame.
188+ let frame = match ready ! ( inner . as_mut( ) . poll_frame ( cx ) ) {
189+ Some ( Ok ( frame ) ) => frame ,
190+ Some ( Err ( error ) ) => {
191+ // If an error occurred, we have reached the end of the stream.
192+ taps . drain ( .. ) . for_each ( |t| t . fail ( & error ) ) ;
193+ return Poll :: Ready ( Some ( Err ( error ) ) ) ;
185194 }
186195 None => {
187- self . as_mut ( ) . project ( ) . data ( None ) ;
188- Poll :: Ready ( None )
196+ // If there is not another frame, we have reached the end of the stream.
197+ taps. drain ( ..) . for_each ( |t| t. eos ( None ) ) ;
198+ return Poll :: Ready ( None ) ;
189199 }
190- }
191- }
200+ } ;
192201
193- fn poll_trailers (
194- mut self : Pin < & mut Self > ,
195- cx : & mut Context < ' _ > ,
196- ) -> Poll < Result < Option < http:: HeaderMap > , B :: Error > > {
197- let trailers = ready ! ( self . as_mut( ) . project( ) . inner. poll_trailers( cx) )
198- . map_err ( |e| self . as_mut ( ) . project ( ) . err ( e) ) ?;
199- self . as_mut ( ) . project ( ) . eos ( trailers. as_ref ( ) ) ;
200- Poll :: Ready ( Ok ( trailers) )
201- }
202-
203- #[ inline]
204- fn size_hint ( & self ) -> hyper:: body:: SizeHint {
205- self . inner . size_hint ( )
206- }
207- }
208-
209- impl < B , T > BodyProj < ' _ , B , T >
210- where
211- B : linkerd_proxy_http:: Body ,
212- B :: Error : HasH2Reason ,
213- T : TapPayload ,
214- {
215- fn data ( & mut self , frame : Option < & B :: Data > ) {
216- if let Some ( f) = frame {
217- for ref mut tap in self . taps . iter_mut ( ) {
218- tap. data ( f) ;
219- }
202+ // If we received a trailers frame, we have reached the end of the stream.
203+ if let trailers @ Some ( _) = frame. trailers_ref ( ) {
204+ taps. drain ( ..) . for_each ( |t| t. eos ( trailers) ) ;
205+ return Poll :: Ready ( Some ( Ok ( frame) ) ) ;
220206 }
221207
222- if self . inner . is_end_stream ( ) {
223- self . eos ( None ) ;
208+ // Otherwise, we *may* reached the end of the stream. If so, there are no trailers.
209+ if inner. is_end_stream ( ) {
210+ taps. drain ( ..) . for_each ( |t| t. eos ( None ) ) ;
224211 }
225- }
226212
227- fn eos ( & mut self , trailers : Option < & http:: HeaderMap > ) {
228- eos ( self . taps , trailers)
213+ Poll :: Ready ( Some ( Ok ( frame) ) )
229214 }
230215
231- fn err ( & mut self , error : B :: Error ) -> B :: Error {
232- for tap in self . taps . drain ( ..) {
233- tap. fail ( & error) ;
234- }
235-
236- error
216+ #[ inline]
217+ fn size_hint ( & self ) -> hyper:: body:: SizeHint {
218+ self . inner . size_hint ( )
237219 }
238220}
239221
@@ -245,12 +227,7 @@ where
245227 T : TapPayload ,
246228{
247229 fn drop ( self : Pin < & mut Self > ) {
248- self . project ( ) . eos ( None ) ;
249- }
250- }
251-
252- fn eos < T : TapPayload > ( taps : & mut Vec < T > , trailers : Option < & http:: HeaderMap > ) {
253- for tap in taps. drain ( ..) {
254- tap. eos ( trailers) ;
230+ let BodyProj { inner : _, taps } = self . project ( ) ;
231+ taps. drain ( ..) . for_each ( |t| t. eos ( None ) ) ;
255232 }
256233}
0 commit comments