@@ -33,7 +33,9 @@ use bytes::Bytes;
3333use fnv:: FnvHashSet ;
3434use futures:: future:: BoxFuture ;
3535use futures:: { try_join, FutureExt , StreamExt , TryFutureExt , TryStreamExt } ;
36- use parquet:: arrow:: arrow_reader:: { ArrowPredicateFn , ArrowReaderOptions , RowFilter , RowSelection } ;
36+ use parquet:: arrow:: arrow_reader:: {
37+ ArrowPredicateFn , ArrowReaderOptions , RowFilter , RowSelection , RowSelector ,
38+ } ;
3739use parquet:: arrow:: async_reader:: AsyncFileReader ;
3840use parquet:: arrow:: { ParquetRecordBatchStreamBuilder , ProjectionMask , PARQUET_FIELD_ID_META_KEY } ;
3941use parquet:: file:: metadata:: { ParquetMetaData , ParquetMetaDataReader , RowGroupMetaData } ;
@@ -341,15 +343,104 @@ impl ArrowReader {
341343 /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
342344 /// as having been deleted by a positional delete, taking into account any row groups that have
343345 /// been skipped entirely by the filter predicate
344- #[ allow( unused) ]
345346 fn build_deletes_row_selection (
346347 row_group_metadata : & [ RowGroupMetaData ] ,
347348 selected_row_groups : & Option < Vec < usize > > ,
348349 mut positional_deletes : RoaringTreemap ,
349350 ) -> Result < RowSelection > {
350- // TODO
351+ let mut results: Vec < RowSelector > = Vec :: new ( ) ;
352+ let mut selected_row_groups_idx = 0 ;
353+ let mut current_page_base_idx: u64 = 0 ;
354+
355+ for ( idx, row_group_metadata) in row_group_metadata. iter ( ) . enumerate ( ) {
356+ let page_num_rows = row_group_metadata. num_rows ( ) as u64 ;
357+ let next_page_base_idx = current_page_base_idx + page_num_rows;
351358
352- Ok ( RowSelection :: default ( ) )
359+ // if row group selection is enabled,
360+ if let Some ( selected_row_groups) = selected_row_groups {
361+ // if we've consumed all the selected row groups, we're done
362+ if selected_row_groups_idx == selected_row_groups. len ( ) {
363+ break ;
364+ }
365+
366+ if idx == selected_row_groups[ selected_row_groups_idx] {
367+ // we're in a selected row group. Increment selected_row_groups_idx
368+ // so that next time around the for loop we're looking for the next
369+ // selected row group
370+ selected_row_groups_idx += 1 ;
371+ } else {
372+ // remove any positional deletes from the skipped page so that
373+ // `positional.deletes.min()` can be used
374+ positional_deletes. remove_range ( current_page_base_idx..next_page_base_idx) ;
375+
376+ // still increment the current page base index but then skip to the next row group
377+ // in the file
378+ current_page_base_idx += page_num_rows;
379+ continue ;
380+ }
381+ }
382+
383+ let mut next_deleted_row_idx = match positional_deletes. min ( ) {
384+ Some ( next_deleted_row_idx) => {
385+ // if the index of the next deleted row is beyond this page, add a selection for
386+ // the remainder of this page and skip to the next page
387+ if next_deleted_row_idx >= next_page_base_idx {
388+ results. push ( RowSelector :: select ( page_num_rows as usize ) ) ;
389+ continue ;
390+ }
391+
392+ next_deleted_row_idx
393+ }
394+
395+ // If there are no more pos deletes, add a selector for the entirety of this page.
396+ _ => {
397+ results. push ( RowSelector :: select ( page_num_rows as usize ) ) ;
398+ continue ;
399+ }
400+ } ;
401+
402+ let mut current_idx = current_page_base_idx;
403+ ' chunks: while next_deleted_row_idx < next_page_base_idx {
404+ // `select` all rows that precede the next delete index
405+ if current_idx < next_deleted_row_idx {
406+ let run_length = next_deleted_row_idx - current_idx;
407+ results. push ( RowSelector :: select ( run_length as usize ) ) ;
408+ current_idx += run_length;
409+ }
410+
411+ // `skip` all consecutive deleted rows in the current row group
412+ let mut run_length = 0 ;
413+ while next_deleted_row_idx == current_idx
414+ && next_deleted_row_idx < next_page_base_idx
415+ {
416+ run_length += 1 ;
417+ current_idx += 1 ;
418+ positional_deletes. remove ( next_deleted_row_idx) ;
419+
420+ next_deleted_row_idx = match positional_deletes. min ( ) {
421+ Some ( next_deleted_row_idx) => next_deleted_row_idx,
422+ _ => {
423+ // We've processed the final positional delete.
424+ // Conclude the skip and then break so that we select the remaining
425+ // rows in the page and move on to the next row group
426+ results. push ( RowSelector :: skip ( run_length) ) ;
427+ break ' chunks;
428+ }
429+ } ;
430+ }
431+ results. push ( RowSelector :: skip ( run_length) ) ;
432+ }
433+
434+ if current_idx < next_page_base_idx {
435+ results. push ( RowSelector :: select (
436+ ( next_page_base_idx - current_idx) as usize ,
437+ ) ) ;
438+ }
439+
440+ current_page_base_idx += page_num_rows;
441+ }
442+
443+ Ok ( results. into ( ) )
353444 }
354445
355446 fn build_field_id_set_and_map (
@@ -1255,9 +1346,12 @@ mod tests {
12551346 use std:: sync:: Arc ;
12561347
12571348 use arrow_schema:: { DataType , Field , Schema as ArrowSchema , TimeUnit } ;
1349+ use parquet:: arrow:: arrow_reader:: { RowSelection , RowSelector } ;
12581350 use parquet:: arrow:: ProjectionMask ;
1351+ use parquet:: file:: metadata:: { ColumnChunkMetaData , RowGroupMetaData } ;
12591352 use parquet:: schema:: parser:: parse_message_type;
1260- use parquet:: schema:: types:: SchemaDescriptor ;
1353+ use parquet:: schema:: types:: { SchemaDescPtr , SchemaDescriptor } ;
1354+ use roaring:: RoaringTreemap ;
12611355
12621356 use crate :: arrow:: reader:: { CollectFieldIdVisitor , PARQUET_FIELD_ID_META_KEY } ;
12631357 use crate :: arrow:: ArrowReader ;
@@ -1423,4 +1517,146 @@ message schema {
14231517 . expect ( "Some ProjectionMask" ) ;
14241518 assert_eq ! ( mask, ProjectionMask :: leaves( & parquet_schema, vec![ 0 ] ) ) ;
14251519 }
1520+
1521+ #[ test]
1522+ fn test_build_deletes_row_selection ( ) {
1523+ let schema_descr = get_test_schema_descr ( ) ;
1524+
1525+ let mut columns = vec ! [ ] ;
1526+ for ptr in schema_descr. columns ( ) {
1527+ let column = ColumnChunkMetaData :: builder ( ptr. clone ( ) ) . build ( ) . unwrap ( ) ;
1528+ columns. push ( column) ;
1529+ }
1530+
1531+ let row_groups_metadata = vec ! [
1532+ build_test_row_group_meta( schema_descr. clone( ) , columns. clone( ) , 1000 , 0 ) ,
1533+ build_test_row_group_meta( schema_descr. clone( ) , columns. clone( ) , 500 , 1 ) ,
1534+ build_test_row_group_meta( schema_descr. clone( ) , columns. clone( ) , 500 , 2 ) ,
1535+ build_test_row_group_meta( schema_descr. clone( ) , columns. clone( ) , 1000 , 3 ) ,
1536+ build_test_row_group_meta( schema_descr. clone( ) , columns. clone( ) , 500 , 4 ) ,
1537+ ] ;
1538+
1539+ let selected_row_groups = Some ( vec ! [ 1 , 3 ] ) ;
1540+
1541+ /* cases to cover:
1542+ * {skip|select} {first|intermediate|last} {one row|multiple rows} in
1543+ {first|imtermediate|last} {skipped|selected} row group
1544+ * row group selection disabled
1545+ */
1546+
1547+ let positional_deletes = RoaringTreemap :: from_iter ( & [
1548+ 1 , // in skipped rg 0, should be ignored
1549+ 3 , // run of three consecutive items in skipped rg0
1550+ 4 , 5 , 998 , // two consecutive items at end of skipped rg0
1551+ 999 , 1000 , // solitary row at start of selected rg1 (1, 9)
1552+ 1010 , // run of 3 rows in selected rg1
1553+ 1011 , 1012 , // (3, 485)
1554+ 1498 , // run of two items at end of selected rg1
1555+ 1499 , 1500 , // run of two items at start of skipped rg2
1556+ 1501 , 1600 , // should ignore, in skipped rg2
1557+ 1999 , // single row at end of skipped rg2
1558+ 2000 , // run of two items at start of selected rg3
1559+ 2001 , // (4, 98)
1560+ 2100 , // single row in selected row group 3 (1, 99)
1561+ 2200 , // run of 3 consecutive rows in selected row group 3
1562+ 2201 , 2202 , // (3, 796)
1563+ 2999 , // single item at end of selected rg3 (1)
1564+ 3000 , // single item at start of skipped rg4
1565+ ] ) ;
1566+
1567+ // using selected row groups 1 and 3
1568+ let result = ArrowReader :: build_deletes_row_selection (
1569+ & row_groups_metadata,
1570+ & selected_row_groups,
1571+ positional_deletes. clone ( ) ,
1572+ )
1573+ . unwrap ( ) ;
1574+
1575+ let expected = RowSelection :: from ( vec ! [
1576+ RowSelector :: skip( 1 ) ,
1577+ RowSelector :: select( 9 ) ,
1578+ RowSelector :: skip( 3 ) ,
1579+ RowSelector :: select( 485 ) ,
1580+ RowSelector :: skip( 4 ) ,
1581+ RowSelector :: select( 98 ) ,
1582+ RowSelector :: skip( 1 ) ,
1583+ RowSelector :: select( 99 ) ,
1584+ RowSelector :: skip( 3 ) ,
1585+ RowSelector :: select( 796 ) ,
1586+ RowSelector :: skip( 1 ) ,
1587+ ] ) ;
1588+
1589+ assert_eq ! ( result, expected) ;
1590+
1591+ // selecting all row groups
1592+ let result = ArrowReader :: build_deletes_row_selection (
1593+ & row_groups_metadata,
1594+ & None ,
1595+ positional_deletes,
1596+ )
1597+ . unwrap ( ) ;
1598+
1599+ let expected = RowSelection :: from ( vec ! [
1600+ RowSelector :: select( 1 ) ,
1601+ RowSelector :: skip( 1 ) ,
1602+ RowSelector :: select( 1 ) ,
1603+ RowSelector :: skip( 3 ) ,
1604+ RowSelector :: select( 992 ) ,
1605+ RowSelector :: skip( 3 ) ,
1606+ RowSelector :: select( 9 ) ,
1607+ RowSelector :: skip( 3 ) ,
1608+ RowSelector :: select( 485 ) ,
1609+ RowSelector :: skip( 4 ) ,
1610+ RowSelector :: select( 98 ) ,
1611+ RowSelector :: skip( 1 ) ,
1612+ RowSelector :: select( 398 ) ,
1613+ RowSelector :: skip( 3 ) ,
1614+ RowSelector :: select( 98 ) ,
1615+ RowSelector :: skip( 1 ) ,
1616+ RowSelector :: select( 99 ) ,
1617+ RowSelector :: skip( 3 ) ,
1618+ RowSelector :: select( 796 ) ,
1619+ RowSelector :: skip( 2 ) ,
1620+ RowSelector :: select( 499 ) ,
1621+ ] ) ;
1622+
1623+ assert_eq ! ( result, expected) ;
1624+ }
1625+
1626+ fn build_test_row_group_meta (
1627+ schema_descr : SchemaDescPtr ,
1628+ columns : Vec < ColumnChunkMetaData > ,
1629+ num_rows : i64 ,
1630+ ordinal : i16 ,
1631+ ) -> RowGroupMetaData {
1632+ RowGroupMetaData :: builder ( schema_descr. clone ( ) )
1633+ . set_num_rows ( num_rows)
1634+ . set_total_byte_size ( 2000 )
1635+ . set_column_metadata ( columns)
1636+ . set_ordinal ( ordinal)
1637+ . build ( )
1638+ . unwrap ( )
1639+ }
1640+
1641+ fn get_test_schema_descr ( ) -> SchemaDescPtr {
1642+ use parquet:: schema:: types:: Type as SchemaType ;
1643+
1644+ let schema = SchemaType :: group_type_builder ( "schema" )
1645+ . with_fields ( vec ! [
1646+ Arc :: new(
1647+ SchemaType :: primitive_type_builder( "a" , parquet:: basic:: Type :: INT32 )
1648+ . build( )
1649+ . unwrap( ) ,
1650+ ) ,
1651+ Arc :: new(
1652+ SchemaType :: primitive_type_builder( "b" , parquet:: basic:: Type :: INT32 )
1653+ . build( )
1654+ . unwrap( ) ,
1655+ ) ,
1656+ ] )
1657+ . build ( )
1658+ . unwrap ( ) ;
1659+
1660+ Arc :: new ( SchemaDescriptor :: new ( Arc :: new ( schema) ) )
1661+ }
14261662}
0 commit comments