@@ -38,12 +38,43 @@ impl<T> Unpin for Streaming<T> {}
3838
3939#[ derive( Debug , Clone ) ]
4040enum State {
41- ReadHeader ,
41+ ReadHeader {
42+ span : Option < tracing:: Span > ,
43+ } ,
4244 ReadBody {
45+ span : tracing:: Span ,
4346 compression : Option < CompressionEncoding > ,
4447 len : usize ,
4548 } ,
46- Error ( Status ) ,
49+ Error ( Box < Status > ) ,
50+ }
51+
52+ impl State {
53+ fn read_header ( ) -> Self {
54+ Self :: ReadHeader { span : None }
55+ }
56+
57+ fn read_body ( compression : Option < CompressionEncoding > , len : usize ) -> Self {
58+ let span = tracing:: debug_span!(
59+ "read_body" ,
60+ compression = compression. map( |c| c. as_str( ) ) ,
61+ compressed. bytes = compression. is_some( ) . then_some( len) ,
62+ uncompressed. bytes = compression. is_none( ) . then_some( len) ,
63+ ) ;
64+ Self :: ReadBody {
65+ span,
66+ compression,
67+ len,
68+ }
69+ }
70+
71+ fn span ( & self ) -> Option < & tracing:: Span > {
72+ match self {
73+ Self :: ReadHeader { span } => span. as_ref ( ) ,
74+ Self :: ReadBody { span, .. } => Some ( span) ,
75+ Self :: Error ( _) => None ,
76+ }
77+ }
4778}
4879
4980#[ derive( Debug , PartialEq , Eq ) ]
@@ -125,7 +156,7 @@ impl<T> Streaming<T> {
125156 . map_frame ( |frame| frame. map_data ( |mut buf| buf. copy_to_bytes ( buf. remaining ( ) ) ) )
126157 . map_err ( |err| Status :: map_error ( err. into ( ) ) )
127158 . boxed_unsync ( ) ,
128- state : State :: ReadHeader ,
159+ state : State :: read_header ( ) ,
129160 direction,
130161 buf : BytesMut :: with_capacity ( buffer_size) ,
131162 trailers : None ,
@@ -142,7 +173,19 @@ impl StreamingInner {
142173 & mut self ,
143174 buffer_settings : BufferSettings ,
144175 ) -> Result < Option < DecodeBuf < ' _ > > , Status > {
145- if let State :: ReadHeader = self . state {
176+ if let State :: ReadHeader { span } = & mut self . state {
177+ if !self . buf . has_remaining ( ) {
178+ return Ok ( None ) ;
179+ }
180+
181+ let span = span. get_or_insert_with ( || {
182+ tracing:: debug_span!(
183+ "read_header" ,
184+ compression = tracing:: field:: Empty ,
185+ body. bytes = tracing:: field:: Empty ,
186+ )
187+ } ) ;
188+ let _guard = span. enter ( ) ;
146189 if self . buf . remaining ( ) < HEADER_SIZE {
147190 return Ok ( None ) ;
148191 }
@@ -151,7 +194,8 @@ impl StreamingInner {
151194 0 => None ,
152195 1 => {
153196 {
154- if self . encoding . is_some ( ) {
197+ if let Some ( ce) = self . encoding {
198+ span. record ( "compression" , ce. as_str ( ) ) ;
155199 self . encoding
156200 } else {
157201 // https://grpc.github.io/grpc/core/md_doc_compression.html
@@ -177,6 +221,7 @@ impl StreamingInner {
177221 } ;
178222
179223 let len = self . buf . get_u32 ( ) as usize ;
224+ span. record ( "body.bytes" , len) ;
180225 let limit = self
181226 . max_message_size
182227 . unwrap_or ( DEFAULT_MAX_RECV_MESSAGE_SIZE ) ;
@@ -191,14 +236,19 @@ impl StreamingInner {
191236 }
192237
193238 self . buf . reserve ( len) ;
239+ drop ( _guard) ;
194240
195- self . state = State :: ReadBody {
196- compression : compression_encoding,
197- len,
198- }
241+ self . state = State :: read_body ( compression_encoding, len)
199242 }
200243
201- if let State :: ReadBody { len, compression } = self . state {
244+ if let State :: ReadBody {
245+ len,
246+ span,
247+ compression,
248+ } = & self . state
249+ {
250+ let ( len, compression) = ( * len, * compression) ;
251+ let _guard = span. enter ( ) ;
202252 // if we haven't read enough of the message then return and keep
203253 // reading
204254 if self . buf . remaining ( ) < len || self . buf . len ( ) < len {
@@ -228,6 +278,7 @@ impl StreamingInner {
228278 return Err ( Status :: new ( Code :: Internal , message) ) ;
229279 }
230280 let decompressed_len = self . decompress_buf . len ( ) ;
281+ span. record ( "uncompressed.bytes" , decompressed_len) ;
231282 DecodeBuf :: new ( & mut self . decompress_buf , decompressed_len)
232283 } else {
233284 DecodeBuf :: new ( & mut self . buf , len)
@@ -241,14 +292,16 @@ impl StreamingInner {
241292
242293 // Returns Some(()) if data was found or None if the loop in `poll_next` should break
243294 fn poll_frame ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < Option < ( ) > , Status > > {
295+ let _guard = self . state . span ( ) . map ( |s| s. enter ( ) ) ;
244296 let chunk = match ready ! ( Pin :: new( & mut self . body) . poll_frame( cx) ) {
245297 Some ( Ok ( d) ) => Some ( d) ,
246298 Some ( Err ( status) ) => {
247299 if self . direction == Direction :: Request && status. code ( ) == Code :: Cancelled {
248300 return Poll :: Ready ( Ok ( None ) ) ;
249301 }
250302
251- let _ = std:: mem:: replace ( & mut self . state , State :: Error ( status. clone ( ) ) ) ;
303+ drop ( _guard) ;
304+ let _ = std:: mem:: replace ( & mut self . state , State :: Error ( Box :: new ( status. clone ( ) ) ) ) ;
252305 debug ! ( "decoder inner stream error: {:?}" , status) ;
253306 return Poll :: Ready ( Err ( status) ) ;
254307 }
@@ -378,7 +431,7 @@ impl<T> Streaming<T> {
378431 match self . inner . decode_chunk ( self . decoder . buffer_settings ( ) ) ? {
379432 Some ( mut decode_buf) => match self . decoder . decode ( & mut decode_buf) ? {
380433 Some ( msg) => {
381- self . inner . state = State :: ReadHeader ;
434+ self . inner . state = State :: read_header ( ) ;
382435 Ok ( Some ( msg) )
383436 }
384437 None => Ok ( None ) ,
@@ -394,7 +447,7 @@ impl<T> Stream for Streaming<T> {
394447 fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
395448 loop {
396449 if let State :: Error ( status) = & self . inner . state {
397- return Poll :: Ready ( Some ( Err ( status. clone ( ) ) ) ) ;
450+ return Poll :: Ready ( Some ( Err ( * status. clone ( ) ) ) ) ;
398451 }
399452
400453 if let Some ( item) = self . decode_chunk ( ) ? {
0 commit comments