11use crate :: codecs:: Decode ;
22use crate :: core:: util:: PartialBuffer ;
33
4- use core:: task:: { Context , Poll } ;
5- use std:: io:: Result ;
6-
7- use futures_core:: ready;
4+ use std:: { io:: Result , ops:: ControlFlow } ;
85
96#[ derive( Debug ) ]
107enum State {
@@ -14,11 +11,6 @@ enum State {
1411 Next ,
1512}
1613
17- pub ( crate ) trait AsyncBufRead {
18- fn poll_fill_buf ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < & [ u8 ] > > ;
19- fn consume ( & mut self , bytes : usize ) ;
20- }
21-
2214#[ derive( Debug ) ]
2315pub struct Decoder {
2416 state : State ,
@@ -41,86 +33,166 @@ impl Decoder {
4133
4234 pub fn do_poll_read < D : Decode > (
4335 & mut self ,
44- cx : & mut Context < ' _ > ,
4536 output : & mut PartialBuffer < & mut [ u8 ] > ,
46- reader : & mut dyn AsyncBufRead ,
4737 decoder : & mut D ,
48- ) -> Poll < Result < ( ) > > {
49- let mut first = true ;
50-
38+ input : & mut PartialBuffer < & [ u8 ] > ,
39+ mut first : bool ,
40+ ) -> ControlFlow < Result < ( ) > > {
5141 loop {
5242 self . state = match self . state {
5343 State :: Decoding => {
54- let input = if first {
55- & [ ] [ ..]
56- } else {
57- ready ! ( reader. poll_fill_buf( cx) ) ?
58- } ;
59-
60- if input. is_empty ( ) && !first {
44+ if input. unwritten ( ) . is_empty ( ) && !first {
6145 // Avoid attempting to reinitialise the decoder if the
6246 // reader has returned EOF.
6347 self . multiple_members = false ;
6448
6549 State :: Flushing
6650 } else {
67- let mut input = PartialBuffer :: new ( input) ;
68- let res = decoder . decode ( & mut input , output ) . or_else ( |err| {
51+ match decoder . decode ( input, output ) {
52+ Ok ( true ) => State :: Flushing ,
6953 // ignore the first error, occurs when input is empty
7054 // but we need to run decode to flush
71- if first {
72- Ok ( false )
73- } else {
74- Err ( err)
75- }
76- } ) ;
77-
78- if !first {
79- let len = input. written ( ) . len ( ) ;
80- reader. consume ( len) ;
81- }
82-
83- first = false ;
84-
85- if res? {
86- State :: Flushing
87- } else {
88- State :: Decoding
55+ Err ( err) if !first => return ControlFlow :: Break ( Err ( err) ) ,
56+ // poll for more data for the next decode
57+ _ => break ,
8958 }
9059 }
9160 }
9261
9362 State :: Flushing => {
94- if decoder. finish ( output) ? {
95- if self . multiple_members {
96- decoder. reinit ( ) ?;
97- State :: Next
98- } else {
99- State :: Done
63+ match decoder. finish ( output) {
64+ Ok ( true ) => {
65+ if self . multiple_members {
66+ if let Err ( err) = decoder. reinit ( ) {
67+ return ControlFlow :: Break ( Err ( err) ) ;
68+ }
69+
70+ // The decode stage might consume all the input,
71+ // the next stage might need to poll again if it's empty.
72+ first = true ;
73+ State :: Next
74+ } else {
75+ State :: Done
76+ }
10077 }
101- } else {
102- State :: Flushing
78+ Ok ( false ) => State :: Flushing ,
79+ Err ( err ) => return ControlFlow :: Break ( Err ( err ) ) ,
10380 }
10481 }
10582
106- State :: Done => State :: Done ,
83+ State :: Done => return ControlFlow :: Break ( Ok ( ( ) ) ) ,
10784
10885 State :: Next => {
109- let input = ready ! ( reader. poll_fill_buf( cx) ) ?;
110- if input. is_empty ( ) {
86+ if input. unwritten ( ) . is_empty ( ) {
87+ if first {
88+ // poll for more data to check if there's another stream
89+ break ;
90+ }
11191 State :: Done
11292 } else {
11393 State :: Decoding
11494 }
11595 }
11696 } ;
11797
118- if let State :: Done = self . state {
119- return Poll :: Ready ( Ok ( ( ) ) ) ;
120- }
12198 if output. unwritten ( ) . is_empty ( ) {
122- return Poll :: Ready ( Ok ( ( ) ) ) ;
99+ return ControlFlow :: Break ( Ok ( ( ) ) ) ;
123100 }
124101 }
102+
103+ if output. unwritten ( ) . is_empty ( ) {
104+ ControlFlow :: Break ( Ok ( ( ) ) )
105+ } else {
106+ ControlFlow :: Continue ( ( ) )
107+ }
125108 }
126109}
110+
111+ macro_rules! impl_do_poll_read {
112+ ( ) => {
113+ use crate :: generic:: bufread:: Decoder as GenericDecoder ;
114+
115+ use std:: ops:: ControlFlow ;
116+
117+ use futures_core:: ready;
118+ use pin_project_lite:: pin_project;
119+
120+ pin_project! {
121+ #[ derive( Debug ) ]
122+ pub struct Decoder <R , D > {
123+ #[ pin]
124+ reader: R ,
125+ decoder: D ,
126+ inner: GenericDecoder ,
127+ }
128+ }
129+
130+ impl <R : AsyncBufRead , D : Decode > Decoder <R , D > {
131+ pub fn new( reader: R , decoder: D ) -> Self {
132+ Self {
133+ reader,
134+ decoder,
135+ inner: GenericDecoder :: default ( ) ,
136+ }
137+ }
138+ }
139+
140+ impl <R , D > Decoder <R , D > {
141+ pub fn get_ref( & self ) -> & R {
142+ & self . reader
143+ }
144+
145+ pub fn get_mut( & mut self ) -> & mut R {
146+ & mut self . reader
147+ }
148+
149+ pub fn get_pin_mut( self : Pin <& mut Self >) -> Pin <& mut R > {
150+ self . project( ) . reader
151+ }
152+
153+ pub fn into_inner( self ) -> R {
154+ self . reader
155+ }
156+
157+ pub fn multiple_members( & mut self , enabled: bool ) {
158+ self . inner. multiple_members( enabled) ;
159+ }
160+ }
161+
162+ impl <R : AsyncBufRead , D : Decode > Decoder <R , D > {
163+ fn do_poll_read(
164+ self : Pin <& mut Self >,
165+ cx: & mut Context <' _>,
166+ output: & mut PartialBuffer <& mut [ u8 ] >,
167+ ) -> Poll <Result <( ) >> {
168+ let mut this = self . project( ) ;
169+
170+ if let ControlFlow :: Break ( res) = this. inner. do_poll_read(
171+ output,
172+ this. decoder,
173+ & mut PartialBuffer :: new( & [ ] [ ..] ) ,
174+ true ,
175+ ) {
176+ return Poll :: Ready ( res) ;
177+ }
178+
179+ loop {
180+ let mut input =
181+ PartialBuffer :: new( ready!( this. reader. as_mut( ) . poll_fill_buf( cx) ) ?) ;
182+
183+ let control_flow =
184+ this. inner
185+ . do_poll_read( output, this. decoder, & mut input, false ) ;
186+
187+ let bytes_read = input. written( ) . len( ) ;
188+ this. reader. as_mut( ) . consume( bytes_read) ;
189+
190+ if let ControlFlow :: Break ( res) = control_flow {
191+ break Poll :: Ready ( res) ;
192+ }
193+ }
194+ }
195+ }
196+ } ;
197+ }
198+ pub ( crate ) use impl_do_poll_read;
0 commit comments