2222//! compliant with the `SendableRecordBatchStream` trait.
2323
2424use std:: collections:: VecDeque ;
25- use std:: mem;
2625use std:: pin:: Pin ;
2726use std:: sync:: Arc ;
2827use std:: task:: { Context , Poll } ;
@@ -98,10 +97,6 @@ impl FileStream {
9897 self
9998 }
10099
101- /// Begin opening the next file in parallel while decoding the current file in FileStream.
102- ///
103- /// Since file opening is mostly IO (and may involve a
104- /// bunch of sequential IO), it can be parallelized with decoding.
105100 fn start_next_file ( & mut self ) -> Option < Result < FileOpenFuture > > {
106101 let part_file = self . file_iter . pop_front ( ) ?;
107102 Some ( self . file_opener . open ( part_file) )
@@ -110,50 +105,24 @@ impl FileStream {
110105 fn poll_inner ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Option < Result < RecordBatch > > > {
111106 loop {
112107 match & mut self . state {
113- FileStreamState :: Idle => {
114- self . file_stream_metrics . time_opening . start ( ) ;
115-
116- match self . start_next_file ( ) . transpose ( ) {
117- Ok ( Some ( future) ) => self . state = FileStreamState :: Open { future } ,
118- Ok ( None ) => return Poll :: Ready ( None ) ,
119- Err ( e) => {
120- self . state = FileStreamState :: Error ;
121- return Poll :: Ready ( Some ( Err ( e) ) ) ;
122- }
108+ FileStreamState :: Idle => match self . start_next_file ( ) . transpose ( ) {
109+ Ok ( Some ( future) ) => {
110+ self . file_stream_metrics . time_opening . start ( ) ;
111+ self . state = FileStreamState :: Open { future } ;
123112 }
124- }
113+ Ok ( None ) => return Poll :: Ready ( None ) ,
114+ Err ( e) => {
115+ self . state = FileStreamState :: Error ;
116+ return Poll :: Ready ( Some ( Err ( e) ) ) ;
117+ }
118+ } ,
125119 FileStreamState :: Open { future } => match ready ! ( future. poll_unpin( cx) ) {
126120 Ok ( reader) => {
127121 self . file_stream_metrics . files_opened . add ( 1 ) ;
128- // include time needed to start opening in `start_next_file`
129122 self . file_stream_metrics . time_opening . stop ( ) ;
130- let next = {
131- let scanning_total_metric = self
132- . file_stream_metrics
133- . time_scanning_total
134- . metrics
135- . clone ( ) ;
136- let _timer = scanning_total_metric. timer ( ) ;
137- self . start_next_file ( ) . transpose ( )
138- } ;
139123 self . file_stream_metrics . time_scanning_until_data . start ( ) ;
140124 self . file_stream_metrics . time_scanning_total . start ( ) ;
141-
142- match next {
143- Ok ( Some ( next_future) ) => {
144- self . state = FileStreamState :: Scan {
145- reader,
146- next : Some ( NextOpen :: Pending ( next_future) ) ,
147- } ;
148- }
149- Ok ( None ) => {
150- self . state = FileStreamState :: Scan { reader, next : None } ;
151- }
152- Err ( e) => {
153- self . state = FileStreamState :: Error ;
154- return Poll :: Ready ( Some ( Err ( e) ) ) ;
155- }
156- }
125+ self . state = FileStreamState :: Scan { reader } ;
157126 }
158127 Err ( e) => {
159128 self . file_stream_metrics . file_open_errors . add ( 1 ) ;
@@ -170,14 +139,7 @@ impl FileStream {
170139 }
171140 }
172141 } ,
173- FileStreamState :: Scan { reader, next } => {
174- // We need to poll the next `FileOpenFuture` here to drive it forward
175- if let Some ( next_open_future) = next
176- && let NextOpen :: Pending ( f) = next_open_future
177- && let Poll :: Ready ( reader) = f. as_mut ( ) . poll ( cx)
178- {
179- * next_open_future = NextOpen :: Ready ( reader) ;
180- }
142+ FileStreamState :: Scan { reader } => {
181143 match ready ! ( reader. poll_next_unpin( cx) ) {
182144 Some ( Ok ( batch) ) => {
183145 self . file_stream_metrics . time_scanning_until_data . stop ( ) ;
@@ -189,12 +151,9 @@ impl FileStream {
189151 batch
190152 } else {
191153 let batch = batch. slice ( 0 , * remain) ;
192- // Count this file, the prefetched next file
193- // (if any), and all remaining files we will
194- // never open.
195- let done = 1
196- + self . file_iter . len ( )
197- + usize:: from ( next. is_some ( ) ) ;
154+ // Count this file and all remaining files
155+ // we will never open.
156+ let done = 1 + self . file_iter . len ( ) ;
198157 self . file_stream_metrics
199158 . files_processed
200159 . add ( done) ;
@@ -214,29 +173,9 @@ impl FileStream {
214173 self . file_stream_metrics . time_scanning_total . stop ( ) ;
215174
216175 match self . on_error {
217- // If `OnError::Skip` we skip the file as soon as we hit the first error
218176 OnError :: Skip => {
219177 self . file_stream_metrics . files_processed . add ( 1 ) ;
220- match mem:: take ( next) {
221- Some ( future) => {
222- self . file_stream_metrics . time_opening . start ( ) ;
223-
224- match future {
225- NextOpen :: Pending ( future) => {
226- self . state =
227- FileStreamState :: Open { future }
228- }
229- NextOpen :: Ready ( reader) => {
230- self . state = FileStreamState :: Open {
231- future : Box :: pin (
232- std:: future:: ready ( reader) ,
233- ) ,
234- }
235- }
236- }
237- }
238- None => return Poll :: Ready ( None ) ,
239- }
178+ self . state = FileStreamState :: Idle ;
240179 }
241180 OnError :: Fail => {
242181 self . state = FileStreamState :: Error ;
@@ -248,26 +187,7 @@ impl FileStream {
248187 self . file_stream_metrics . files_processed . add ( 1 ) ;
249188 self . file_stream_metrics . time_scanning_until_data . stop ( ) ;
250189 self . file_stream_metrics . time_scanning_total . stop ( ) ;
251-
252- match mem:: take ( next) {
253- Some ( future) => {
254- self . file_stream_metrics . time_opening . start ( ) ;
255-
256- match future {
257- NextOpen :: Pending ( future) => {
258- self . state = FileStreamState :: Open { future }
259- }
260- NextOpen :: Ready ( reader) => {
261- self . state = FileStreamState :: Open {
262- future : Box :: pin ( std:: future:: ready (
263- reader,
264- ) ) ,
265- }
266- }
267- }
268- }
269- None => return Poll :: Ready ( None ) ,
270- }
190+ self . state = FileStreamState :: Idle ;
271191 }
272192 }
273193 }
@@ -323,14 +243,6 @@ pub trait FileOpener: Unpin + Send + Sync {
323243 fn open ( & self , partitioned_file : PartitionedFile ) -> Result < FileOpenFuture > ;
324244}
325245
326- /// Represents the state of the next `FileOpenFuture`. Since we need to poll
327- /// this future while scanning the current file, we need to store the result if it
328- /// is ready
329- pub enum NextOpen {
330- Pending ( FileOpenFuture ) ,
331- Ready ( Result < BoxStream < ' static , Result < RecordBatch > > > ) ,
332- }
333-
334246pub enum FileStreamState {
335247 /// The idle state, no file is currently being read
336248 Idle ,
@@ -345,10 +257,6 @@ pub enum FileStreamState {
345257 Scan {
346258 /// The reader instance
347259 reader : BoxStream < ' static , Result < RecordBatch > > ,
348- /// A [`FileOpenFuture`] for the next file to be processed.
349- /// This allows the next file to be opened in parallel while the
350- /// current file is read.
351- next : Option < NextOpen > ,
352260 } ,
353261 /// Encountered an error
354262 Error ,
@@ -388,11 +296,6 @@ pub struct FileStreamMetrics {
388296 ///
389297 /// Time between when [`FileOpener::open`] is called and when the
390298 /// [`FileStream`] receives a stream for reading.
391- ///
392- /// If there are multiple files being scanned, the stream
393- /// will open the next file in the background while scanning the
394- /// current file. This metric will only capture time spent opening
395- /// while not also scanning.
396299 /// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs>
397300 pub time_opening : StartableTime ,
398301 /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding
0 commit comments