Skip to content

Commit f4b6d94

Browse files
committed
feat: implementation for ArrowReader::build_deletes_row_selection
1 parent 4c0c7f9 commit f4b6d94

File tree

1 file changed

+73
-4
lines changed

1 file changed

+73
-4
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use bytes::Bytes;
3333
use fnv::FnvHashSet;
3434
use futures::future::BoxFuture;
3535
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
36-
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
36+
use parquet::arrow::arrow_reader::{
37+
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
38+
};
3739
use parquet::arrow::async_reader::AsyncFileReader;
3840
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
3941
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
@@ -340,15 +342,82 @@ impl ArrowReader {
340342
/// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
341343
/// as having been deleted by a positional delete, taking into account any row groups that have
342344
/// been skipped entirely by the filter predicate
343-
#[allow(unused)]
344345
fn build_deletes_row_selection(
345346
parquet_metadata: &Arc<ParquetMetaData>,
346347
selected_row_groups: &Option<Vec<usize>>,
347348
positional_deletes: &[usize],
348349
) -> Result<RowSelection> {
349-
// TODO
350+
let mut selected_row_groups_idx = 0;
351+
let mut curr_pos_del_idx = 0;
352+
let pos_del_len = positional_deletes.len();
353+
354+
let page_index = parquet_metadata.row_groups().iter().enumerate();
355+
356+
let mut results: Vec<RowSelector> = Vec::new();
357+
let mut current_page_base_idx: usize = 0;
358+
for (idx, row_group_metadata) in page_index {
359+
let page_num_rows = row_group_metadata.num_rows() as usize;
360+
let next_page_base_idx = current_page_base_idx + page_num_rows;
361+
362+
// skip any row groups that aren't in the row group selection
363+
if let Some(selected_row_groups) = selected_row_groups {
364+
// skip row groups that aren't present in selected_row_groups
365+
if selected_row_groups_idx == selected_row_groups.len() {
366+
break;
367+
}
368+
if idx == selected_row_groups[selected_row_groups_idx] {
369+
selected_row_groups_idx += 1;
370+
} else {
371+
current_page_base_idx += page_num_rows;
372+
continue;
373+
}
374+
}
375+
376+
let mut next_deleted_row_idx = positional_deletes[curr_pos_del_idx];
377+
378+
// if the index of the next deleted row is beyond this page, skip
379+
// to the next page
380+
if next_deleted_row_idx >= next_page_base_idx {
381+
continue;
382+
}
383+
384+
let mut current_idx = current_page_base_idx;
385+
while next_deleted_row_idx < next_page_base_idx {
386+
// select all rows that precede the next delete index
387+
if current_idx < next_deleted_row_idx {
388+
let run_length = next_deleted_row_idx - current_idx;
389+
results.push(RowSelector::select(run_length));
390+
current_idx += run_length;
391+
}
392+
393+
// skip all consecutive deleted rows
394+
let mut run_length = 1;
395+
while curr_pos_del_idx < pos_del_len - 1
396+
&& positional_deletes[curr_pos_del_idx + 1] == next_deleted_row_idx + 1
397+
{
398+
run_length += 1;
399+
curr_pos_del_idx += 1;
400+
}
401+
results.push(RowSelector::skip(run_length));
402+
current_idx += run_length;
403+
404+
curr_pos_del_idx += 1;
405+
if curr_pos_del_idx >= pos_del_len {
406+
break;
407+
}
408+
next_deleted_row_idx = positional_deletes[curr_pos_del_idx];
409+
}
410+
411+
if let Some(selected_row_groups) = selected_row_groups {
412+
if selected_row_groups_idx == selected_row_groups.len() {
413+
break;
414+
}
415+
}
416+
417+
current_page_base_idx += page_num_rows;
418+
}
350419

351-
Ok(RowSelection::default())
420+
Ok(results.into())
352421
}
353422

354423
fn build_field_id_set_and_map(

0 commit comments

Comments
 (0)