@@ -224,15 +224,16 @@ impl ArrowReader {
224224 }
225225 } ;
226226
227- // There are two possible sources both for potential lists of selected RowGroup indices,
228- // and for `RowSelection`s.
229- // Selected RowGroup index lists can come from two sources:
227+ // There are three possible sources for potential lists of selected RowGroup indices,
228+ // and two for `RowSelection`s.
229+ // Selected RowGroup index lists can come from three sources:
230+ // * When task.start and task.length specify a byte range (file splitting);
230231 // * When there are equality delete files that are applicable;
231232 // * When there is a scan predicate and row_group_filtering_enabled = true.
232233 // `RowSelection`s can be created in either or both of the following cases:
233234 // * When there are positional delete files that are applicable;
234235 // * When there is a scan predicate and row_selection_enabled = true
235- // Note that, in the former case we only perform row group filtering when
236+ // Note that row group filtering from predicates only happens when
236237 // there is a scan predicate AND row_group_filtering_enabled = true,
237238 // but we perform row selection filtering if there are applicable
238239 // equality delete files OR (there is a scan predicate AND row_selection_enabled),
@@ -241,6 +242,17 @@ impl ArrowReader {
241242 let mut selected_row_group_indices = None ;
242243 let mut row_selection = None ;
243244
245+ // Filter row groups based on byte range from task.start and task.length.
246+ // If both start and length are 0, read the entire file (backwards compatibility).
247+ if task. start != 0 || task. length != 0 {
248+ let byte_range_filtered_row_groups = Self :: filter_row_groups_by_byte_range (
249+ record_batch_stream_builder. metadata ( ) ,
250+ task. start ,
251+ task. length ,
252+ ) ?;
253+ selected_row_group_indices = Some ( byte_range_filtered_row_groups) ;
254+ }
255+
244256 if let Some ( predicate) = final_predicate {
245257 let ( iceberg_field_ids, field_id_map) = Self :: build_field_id_set_and_map (
246258 record_batch_stream_builder. parquet_schema ( ) ,
@@ -256,14 +268,26 @@ impl ArrowReader {
256268 record_batch_stream_builder = record_batch_stream_builder. with_row_filter ( row_filter) ;
257269
258270 if row_group_filtering_enabled {
259- let result = Self :: get_selected_row_group_indices (
271+ let predicate_filtered_row_groups = Self :: get_selected_row_group_indices (
260272 & predicate,
261273 record_batch_stream_builder. metadata ( ) ,
262274 & field_id_map,
263275 & task. schema ,
264276 ) ?;
265277
266- selected_row_group_indices = Some ( result) ;
278+ // Merge predicate-based filtering with byte range filtering (if present)
279+ // by taking the intersection of both filters
280+ selected_row_group_indices = match selected_row_group_indices {
281+ Some ( byte_range_filtered) => {
282+ // Keep only row groups that are in both filters
283+ let intersection: Vec < usize > = byte_range_filtered
284+ . into_iter ( )
285+ . filter ( |idx| predicate_filtered_row_groups. contains ( idx) )
286+ . collect ( ) ;
287+ Some ( intersection)
288+ }
289+ None => Some ( predicate_filtered_row_groups) ,
290+ } ;
267291 }
268292
269293 if row_selection_enabled {
@@ -717,6 +741,36 @@ impl ArrowReader {
717741
718742 Ok ( results. into_iter ( ) . flatten ( ) . collect :: < Vec < _ > > ( ) . into ( ) )
719743 }
744+
745+ /// Filters row groups by byte range to support Iceberg's file splitting.
746+ ///
747+ /// Iceberg splits large files at row group boundaries, so we only read row groups
748+ /// whose byte ranges overlap with [start, start+length).
749+ fn filter_row_groups_by_byte_range (
750+ parquet_metadata : & Arc < ParquetMetaData > ,
751+ start : u64 ,
752+ length : u64 ,
753+ ) -> Result < Vec < usize > > {
754+ let row_groups = parquet_metadata. row_groups ( ) ;
755+ let mut selected = Vec :: new ( ) ;
756+ let end = start + length;
757+
758+ // Row groups are stored sequentially after the 4-byte magic header.
759+ let mut current_byte_offset = 4u64 ;
760+
761+ for ( idx, row_group) in row_groups. iter ( ) . enumerate ( ) {
762+ let row_group_size = row_group. compressed_size ( ) as u64 ;
763+ let row_group_end = current_byte_offset + row_group_size;
764+
765+ if current_byte_offset < end && start < row_group_end {
766+ selected. push ( idx) ;
767+ }
768+
769+ current_byte_offset = row_group_end;
770+ }
771+
772+ Ok ( selected)
773+ }
720774}
721775
722776/// Build the map of parquet field id to Parquet column index in the schema.
@@ -1949,6 +2003,194 @@ message schema {
19492003 Arc :: new ( SchemaDescriptor :: new ( Arc :: new ( schema) ) )
19502004 }
19512005
2006+ /// Verifies that file splits respect byte ranges and only read specific row groups.
2007+ #[ tokio:: test]
2008+ async fn test_file_splits_respect_byte_ranges ( ) {
2009+ use arrow_array:: Int32Array ;
2010+ use parquet:: file:: reader:: { FileReader , SerializedFileReader } ;
2011+
2012+ let schema = Arc :: new (
2013+ Schema :: builder ( )
2014+ . with_schema_id ( 1 )
2015+ . with_fields ( vec ! [
2016+ NestedField :: required( 1 , "id" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
2017+ ] )
2018+ . build ( )
2019+ . unwrap ( ) ,
2020+ ) ;
2021+
2022+ let arrow_schema = Arc :: new ( ArrowSchema :: new ( vec ! [
2023+ Field :: new( "id" , DataType :: Int32 , false ) . with_metadata( HashMap :: from( [ (
2024+ PARQUET_FIELD_ID_META_KEY . to_string( ) ,
2025+ "1" . to_string( ) ,
2026+ ) ] ) ) ,
2027+ ] ) ) ;
2028+
2029+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
2030+ let table_location = tmp_dir. path ( ) . to_str ( ) . unwrap ( ) . to_string ( ) ;
2031+ let file_path = format ! ( "{}/multi_row_group.parquet" , & table_location) ;
2032+
2033+ // Force each batch into its own row group for testing byte range filtering.
2034+ let batch1 = RecordBatch :: try_new ( arrow_schema. clone ( ) , vec ! [ Arc :: new( Int32Array :: from(
2035+ ( 0 ..100 ) . collect:: <Vec <i32 >>( ) ,
2036+ ) ) ] )
2037+ . unwrap ( ) ;
2038+ let batch2 = RecordBatch :: try_new ( arrow_schema. clone ( ) , vec ! [ Arc :: new( Int32Array :: from(
2039+ ( 100 ..200 ) . collect:: <Vec <i32 >>( ) ,
2040+ ) ) ] )
2041+ . unwrap ( ) ;
2042+ let batch3 = RecordBatch :: try_new ( arrow_schema. clone ( ) , vec ! [ Arc :: new( Int32Array :: from(
2043+ ( 200 ..300 ) . collect:: <Vec <i32 >>( ) ,
2044+ ) ) ] )
2045+ . unwrap ( ) ;
2046+
2047+ let props = WriterProperties :: builder ( )
2048+ . set_compression ( Compression :: SNAPPY )
2049+ . set_max_row_group_size ( 100 )
2050+ . build ( ) ;
2051+
2052+ let file = File :: create ( & file_path) . unwrap ( ) ;
2053+ let mut writer = ArrowWriter :: try_new ( file, arrow_schema. clone ( ) , Some ( props) ) . unwrap ( ) ;
2054+ writer. write ( & batch1) . expect ( "Writing batch 1" ) ;
2055+ writer. write ( & batch2) . expect ( "Writing batch 2" ) ;
2056+ writer. write ( & batch3) . expect ( "Writing batch 3" ) ;
2057+ writer. close ( ) . unwrap ( ) ;
2058+
2059+ // Read the file metadata to get row group byte positions
2060+ let file = File :: open ( & file_path) . unwrap ( ) ;
2061+ let reader = SerializedFileReader :: new ( file) . unwrap ( ) ;
2062+ let metadata = reader. metadata ( ) ;
2063+
2064+ println ! ( "File has {} row groups" , metadata. num_row_groups( ) ) ;
2065+ assert_eq ! ( metadata. num_row_groups( ) , 3 , "Expected 3 row groups" ) ;
2066+
2067+ // Get byte positions for each row group
2068+ let row_group_0 = metadata. row_group ( 0 ) ;
2069+ let row_group_1 = metadata. row_group ( 1 ) ;
2070+ let row_group_2 = metadata. row_group ( 2 ) ;
2071+
2072+ let rg0_start = 4u64 ; // Parquet files start with 4-byte magic "PAR1"
2073+ let rg1_start = rg0_start + row_group_0. compressed_size ( ) as u64 ;
2074+ let rg2_start = rg1_start + row_group_1. compressed_size ( ) as u64 ;
2075+ let file_end = rg2_start + row_group_2. compressed_size ( ) as u64 ;
2076+
2077+ println ! (
2078+ "Row group 0: {} rows, starts at byte {}, {} bytes compressed" ,
2079+ row_group_0. num_rows( ) ,
2080+ rg0_start,
2081+ row_group_0. compressed_size( )
2082+ ) ;
2083+ println ! (
2084+ "Row group 1: {} rows, starts at byte {}, {} bytes compressed" ,
2085+ row_group_1. num_rows( ) ,
2086+ rg1_start,
2087+ row_group_1. compressed_size( )
2088+ ) ;
2089+ println ! (
2090+ "Row group 2: {} rows, starts at byte {}, {} bytes compressed" ,
2091+ row_group_2. num_rows( ) ,
2092+ rg2_start,
2093+ row_group_2. compressed_size( )
2094+ ) ;
2095+
2096+ let file_io = FileIO :: from_path ( & table_location) . unwrap ( ) . build ( ) . unwrap ( ) ;
2097+ let reader = ArrowReaderBuilder :: new ( file_io) . build ( ) ;
2098+
2099+ // Task 1: read only the first row group
2100+ let task1 = FileScanTask {
2101+ start : rg0_start,
2102+ length : row_group_0. compressed_size ( ) as u64 ,
2103+ record_count : Some ( 100 ) ,
2104+ data_file_path : file_path. clone ( ) ,
2105+ data_file_format : DataFileFormat :: Parquet ,
2106+ schema : schema. clone ( ) ,
2107+ project_field_ids : vec ! [ 1 ] ,
2108+ predicate : None ,
2109+ deletes : vec ! [ ] ,
2110+ } ;
2111+
2112+ // Task 2: read the second and third row groups
2113+ let task2 = FileScanTask {
2114+ start : rg1_start,
2115+ length : file_end - rg1_start,
2116+ record_count : Some ( 200 ) ,
2117+ data_file_path : file_path. clone ( ) ,
2118+ data_file_format : DataFileFormat :: Parquet ,
2119+ schema : schema. clone ( ) ,
2120+ project_field_ids : vec ! [ 1 ] ,
2121+ predicate : None ,
2122+ deletes : vec ! [ ] ,
2123+ } ;
2124+
2125+ let tasks1 = Box :: pin ( futures:: stream:: iter ( vec ! [ Ok ( task1) ] ) ) as FileScanTaskStream ;
2126+ let result1 = reader
2127+ . clone ( )
2128+ . read ( tasks1)
2129+ . unwrap ( )
2130+ . try_collect :: < Vec < RecordBatch > > ( )
2131+ . await
2132+ . unwrap ( ) ;
2133+
2134+ let total_rows_task1: usize = result1. iter ( ) . map ( |b| b. num_rows ( ) ) . sum ( ) ;
2135+ println ! (
2136+ "Task 1 (bytes {}-{}) returned {} rows" ,
2137+ rg0_start,
2138+ rg0_start + row_group_0. compressed_size( ) as u64 ,
2139+ total_rows_task1
2140+ ) ;
2141+
2142+ let tasks2 = Box :: pin ( futures:: stream:: iter ( vec ! [ Ok ( task2) ] ) ) as FileScanTaskStream ;
2143+ let result2 = reader
2144+ . read ( tasks2)
2145+ . unwrap ( )
2146+ . try_collect :: < Vec < RecordBatch > > ( )
2147+ . await
2148+ . unwrap ( ) ;
2149+
2150+ let total_rows_task2: usize = result2. iter ( ) . map ( |b| b. num_rows ( ) ) . sum ( ) ;
2151+ println ! (
2152+ "Task 2 (bytes {}-{}) returned {} rows" ,
2153+ rg1_start, file_end, total_rows_task2
2154+ ) ;
2155+
2156+ assert_eq ! (
2157+ total_rows_task1, 100 ,
2158+ "Task 1 should read only the first row group (100 rows), but got {} rows" ,
2159+ total_rows_task1
2160+ ) ;
2161+
2162+ assert_eq ! (
2163+ total_rows_task2, 200 ,
2164+ "Task 2 should read only the second+third row groups (200 rows), but got {} rows" ,
2165+ total_rows_task2
2166+ ) ;
2167+
2168+ // Verify the actual data values are correct (not just the row count)
2169+ if total_rows_task1 > 0 {
2170+ let first_batch = & result1[ 0 ] ;
2171+ let id_col = first_batch
2172+ . column ( 0 )
2173+ . as_primitive :: < arrow_array:: types:: Int32Type > ( ) ;
2174+ let first_val = id_col. value ( 0 ) ;
2175+ let last_val = id_col. value ( id_col. len ( ) - 1 ) ;
2176+ println ! ( "Task 1 data range: {} to {}" , first_val, last_val) ;
2177+
2178+ assert_eq ! ( first_val, 0 , "Task 1 should start with id=0" ) ;
2179+ assert_eq ! ( last_val, 99 , "Task 1 should end with id=99" ) ;
2180+ }
2181+
2182+ if total_rows_task2 > 0 {
2183+ let first_batch = & result2[ 0 ] ;
2184+ let id_col = first_batch
2185+ . column ( 0 )
2186+ . as_primitive :: < arrow_array:: types:: Int32Type > ( ) ;
2187+ let first_val = id_col. value ( 0 ) ;
2188+ println ! ( "Task 2 first value: {}" , first_val) ;
2189+
2190+ assert_eq ! ( first_val, 100 , "Task 2 should start with id=100, not id=0" ) ;
2191+ }
2192+ }
2193+
19522194 /// Test schema evolution: reading old Parquet file (with only column 'a')
19532195 /// using a newer table schema (with columns 'a' and 'b').
19542196 /// This tests that:
0 commit comments