1818//! Native Iceberg table scan operator using iceberg-rust
1919
2020use std:: any:: Any ;
21- use std:: collections:: HashMap ;
21+ use std:: collections:: { HashMap , VecDeque } ;
2222use std:: fmt;
2323use std:: pin:: Pin ;
2424use std:: sync:: Arc ;
25+ use std:: task:: { Context , Poll } ;
2526
27+ use arrow:: array:: RecordBatch ;
2628use arrow:: datatypes:: SchemaRef ;
2729use datafusion:: common:: { DataFusionError , Result as DFResult } ;
28- use datafusion:: execution:: { SendableRecordBatchStream , TaskContext } ;
30+ use datafusion:: execution:: { RecordBatchStream , SendableRecordBatchStream , TaskContext } ;
2931use datafusion:: physical_expr:: EquivalenceProperties ;
3032use datafusion:: physical_plan:: execution_plan:: { Boundedness , EmissionType } ;
3133use datafusion:: physical_plan:: stream:: RecordBatchStreamAdapter ;
3234use datafusion:: physical_plan:: {
3335 DisplayAs , DisplayFormatType , ExecutionPlan , Partitioning , PlanProperties ,
3436} ;
35- use futures:: { StreamExt , TryStreamExt } ;
37+ use futures:: future:: BoxFuture ;
38+ use futures:: { ready, FutureExt , Stream , StreamExt , TryStreamExt } ;
3639use iceberg:: io:: FileIO ;
3740
3841use crate :: execution:: operators:: ExecutionError ;
@@ -117,14 +120,14 @@ impl ExecutionPlan for IcebergScanExec {
117120 fn execute (
118121 & self ,
119122 partition : usize ,
120- _context : Arc < TaskContext > ,
123+ context : Arc < TaskContext > ,
121124 ) -> DFResult < SendableRecordBatchStream > {
122125 // Execute pre-planned tasks from Scala (planning happens via Iceberg's Java API)
123126 if let Some ( ref task_groups) = self . file_task_groups {
124127 if partition < task_groups. len ( ) {
125128 let tasks = & task_groups[ partition] ;
126129
127- return self . execute_with_tasks ( tasks. clone ( ) ) ;
130+ return self . execute_with_tasks ( tasks. clone ( ) , context ) ;
128131 } else {
129132 return Err ( DataFusionError :: Execution ( format ! (
130133 "IcebergScanExec: Partition index {} out of range (only {} task groups available)" ,
@@ -148,38 +151,26 @@ impl IcebergScanExec {
148151 fn execute_with_tasks (
149152 & self ,
150153 tasks : Vec < iceberg:: scan:: FileScanTask > ,
154+ context : Arc < TaskContext > ,
151155 ) -> DFResult < SendableRecordBatchStream > {
152156 let output_schema = Arc :: clone ( & self . output_schema ) ;
153- let catalog_properties = self . catalog_properties . clone ( ) ;
154- let metadata_location = self . metadata_location . clone ( ) ;
155157
156- let fut = async move {
157- let file_io = Self :: load_file_io ( & catalog_properties, & metadata_location) ?;
158+ // Create FileIO synchronously
159+ let file_io = Self :: load_file_io ( & self . catalog_properties , & self . metadata_location ) ?;
158160
159- let task_stream = futures:: stream:: iter ( tasks. into_iter ( ) . map ( Ok ) ) . boxed ( ) ;
161+ // Get batch size from context
162+ let batch_size = context. session_config ( ) . batch_size ( ) ;
160163
161- let reader = iceberg:: arrow:: ArrowReaderBuilder :: new ( file_io) . build ( ) ;
164+ // Create parallel file stream that overlaps opening next file with reading current file
165+ let file_stream =
166+ IcebergFileStream :: new ( tasks, file_io, batch_size, Arc :: clone ( & output_schema) ) ?;
162167
163- // read() is synchronous and returns Result<ArrowRecordBatchStream>
164- let stream = reader. read ( task_stream) . map_err ( |e| {
165- DataFusionError :: Execution ( format ! ( "Failed to read Iceberg tasks: {}" , e) )
166- } ) ?;
167-
168- let mapped_stream = stream
169- . map_err ( |e| DataFusionError :: Execution ( format ! ( "Iceberg scan error: {}" , e) ) ) ;
170-
171- Ok :: < _ , DataFusionError > ( Box :: pin ( mapped_stream)
172- as Pin <
173- Box < dyn futures:: Stream < Item = DFResult < arrow:: array:: RecordBatch > > + Send > ,
174- > )
175- } ;
168+ // Note: BatchSplitStream adds overhead. Since we're already setting batch_size in
169+ // iceberg-rust's ArrowReaderBuilder, it should produce correctly sized batches.
170+ // Only use BatchSplitStream as a safety net if needed.
171+ // For now, return the file_stream directly to reduce stream nesting overhead.
176172
177- let stream = futures:: stream:: once ( fut) . try_flatten ( ) ;
178-
179- Ok ( Box :: pin ( RecordBatchStreamAdapter :: new (
180- output_schema,
181- stream,
182- ) ) )
173+ Ok ( Box :: pin ( file_stream) )
183174 }
184175
185176 fn load_file_io (
@@ -199,6 +190,194 @@ impl IcebergScanExec {
199190 }
200191}
201192
193+ /// State machine for IcebergFileStream
194+ enum FileStreamState {
195+ /// Idle state - need to start opening next file
196+ Idle ,
197+ /// Opening a file
198+ Opening {
199+ future : BoxFuture < ' static , DFResult < SendableRecordBatchStream > > ,
200+ } ,
201+ /// Reading from current file while potentially opening next file
202+ Reading {
203+ current : SendableRecordBatchStream ,
204+ next : Option < BoxFuture < ' static , DFResult < SendableRecordBatchStream > > > ,
205+ } ,
206+ /// Error state
207+ Error ,
208+ }
209+
210+ /// Stream that reads Iceberg files with parallel opening optimization.
211+ /// Opens the next file while reading the current file to overlap IO with compute.
212+ ///
213+ /// Inspired by DataFusion's [`FileStream`] pattern for overlapping file opening with reading.
214+ ///
215+ /// [`FileStream`]: https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs
216+ struct IcebergFileStream {
217+ schema : SchemaRef ,
218+ file_io : FileIO ,
219+ batch_size : usize ,
220+ tasks : VecDeque < iceberg:: scan:: FileScanTask > ,
221+ state : FileStreamState ,
222+ }
223+
224+ impl IcebergFileStream {
225+ fn new (
226+ tasks : Vec < iceberg:: scan:: FileScanTask > ,
227+ file_io : FileIO ,
228+ batch_size : usize ,
229+ schema : SchemaRef ,
230+ ) -> DFResult < Self > {
231+ Ok ( Self {
232+ schema,
233+ file_io,
234+ batch_size,
235+ tasks : tasks. into_iter ( ) . collect ( ) ,
236+ state : FileStreamState :: Idle ,
237+ } )
238+ }
239+
240+ /// Start opening the next file
241+ fn start_next_file (
242+ & mut self ,
243+ ) -> Option < BoxFuture < ' static , DFResult < SendableRecordBatchStream > > > {
244+ let task = self . tasks . pop_front ( ) ?;
245+ let file_io = self . file_io . clone ( ) ;
246+ let batch_size = self . batch_size ;
247+ let schema = Arc :: clone ( & self . schema ) ;
248+
249+ Some ( Box :: pin ( async move {
250+ // Create a single-task stream
251+ let task_stream = futures:: stream:: iter ( vec ! [ Ok ( task) ] ) . boxed ( ) ;
252+
253+ // Create reader with optimizations
254+ let reader = iceberg:: arrow:: ArrowReaderBuilder :: new ( file_io)
255+ . with_batch_size ( batch_size)
256+ . with_row_selection_enabled ( true )
257+ . build ( ) ;
258+
259+ // Read the task
260+ let stream = reader. read ( task_stream) . map_err ( |e| {
261+ DataFusionError :: Execution ( format ! ( "Failed to read Iceberg task: {}" , e) )
262+ } ) ?;
263+
264+ // Map errors and wrap minimally - RecordBatchStreamAdapter is needed to provide schema
265+ let mapped_stream = stream
266+ . map_err ( |e| DataFusionError :: Execution ( format ! ( "Iceberg scan error: {}" , e) ) ) ;
267+
268+ Ok (
269+ Box :: pin ( RecordBatchStreamAdapter :: new ( schema, mapped_stream) )
270+ as SendableRecordBatchStream ,
271+ )
272+ } ) )
273+ }
274+
275+ fn poll_inner ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Option < DFResult < RecordBatch > > > {
276+ loop {
277+ match & mut self . state {
278+ FileStreamState :: Idle => {
279+ // Start opening the first file
280+ match self . start_next_file ( ) {
281+ Some ( future) => {
282+ self . state = FileStreamState :: Opening { future } ;
283+ }
284+ None => return Poll :: Ready ( None ) ,
285+ }
286+ }
287+ FileStreamState :: Opening { future } => {
288+ // Wait for file to open
289+ match ready ! ( future. poll_unpin( cx) ) {
290+ Ok ( stream) => {
291+ // File opened, start reading and open next file in parallel
292+ let next = self . start_next_file ( ) ;
293+ self . state = FileStreamState :: Reading {
294+ current : stream,
295+ next,
296+ } ;
297+ }
298+ Err ( e) => {
299+ self . state = FileStreamState :: Error ;
300+ return Poll :: Ready ( Some ( Err ( e) ) ) ;
301+ }
302+ }
303+ }
304+ FileStreamState :: Reading { current, next } => {
305+ // Poll next file opening future to drive it forward (background IO)
306+ if let Some ( next_future) = next {
307+ if let Poll :: Ready ( result) = next_future. poll_unpin ( cx) {
308+ // Next file is ready, store it
309+ match result {
310+ Ok ( stream) => {
311+ * next = Some ( Box :: pin ( futures:: future:: ready ( Ok ( stream) ) ) ) ;
312+ }
313+ Err ( e) => {
314+ self . state = FileStreamState :: Error ;
315+ return Poll :: Ready ( Some ( Err ( e) ) ) ;
316+ }
317+ }
318+ }
319+ }
320+
321+ // Poll current stream for next batch
322+ match ready ! ( current. poll_next_unpin( cx) ) {
323+ Some ( result) => {
324+ return Poll :: Ready ( Some ( result) ) ;
325+ }
326+ None => {
327+ // Current file is done, move to next file if available
328+ match next. take ( ) {
329+ Some ( mut next_future) => {
330+ // Check if next file is already opened
331+ match next_future. poll_unpin ( cx) {
332+ Poll :: Ready ( Ok ( stream) ) => {
333+ let next_next = self . start_next_file ( ) ;
334+ self . state = FileStreamState :: Reading {
335+ current : stream,
336+ next : next_next,
337+ } ;
338+ }
339+ Poll :: Ready ( Err ( e) ) => {
340+ self . state = FileStreamState :: Error ;
341+ return Poll :: Ready ( Some ( Err ( e) ) ) ;
342+ }
343+ Poll :: Pending => {
344+ // Still opening, wait for it
345+ self . state = FileStreamState :: Opening {
346+ future : next_future,
347+ } ;
348+ }
349+ }
350+ }
351+ None => {
352+ // No more files
353+ return Poll :: Ready ( None ) ;
354+ }
355+ }
356+ }
357+ }
358+ }
359+ FileStreamState :: Error => {
360+ return Poll :: Ready ( None ) ;
361+ }
362+ }
363+ }
364+ }
365+ }
366+
367+ impl Stream for IcebergFileStream {
368+ type Item = DFResult < arrow:: array:: RecordBatch > ;
369+
370+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
371+ self . poll_inner ( cx)
372+ }
373+ }
374+
375+ impl RecordBatchStream for IcebergFileStream {
376+ fn schema ( & self ) -> SchemaRef {
377+ Arc :: clone ( & self . schema )
378+ }
379+ }
380+
202381impl DisplayAs for IcebergScanExec {
203382 fn fmt_as ( & self , _t : DisplayFormatType , f : & mut fmt:: Formatter ) -> fmt:: Result {
204383 write ! (
0 commit comments