Skip to content

Commit dcaa135

Browse files
authored
feat[datafusion]: Support DataFusion PartitionedFile extensions for external index (#5556)
related impl in datafusion for parquet format apache/datafusion#10813, see discussion in #5481 Signed-off-by: Huaijin <[email protected]> --------- Signed-off-by: Huaijin <[email protected]> Signed-off-by: Connor Tsui <[email protected]>
1 parent 8781064 commit dcaa135

File tree

1 file changed

+209
-0
lines changed

1 file changed

+209
-0
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use vortex::layout::LayoutReader;
4343
use vortex::layout::layouts::USE_VORTEX_OPERATORS;
4444
use vortex::metrics::VortexMetrics;
4545
use vortex::scan::ScanBuilder;
46+
use vortex::scan::Selection;
4647
use vortex::session::VortexSession;
4748
use vortex_utils::aliases::dash_map::DashMap;
4849
use vortex_utils::aliases::dash_map::Entry;
@@ -177,6 +178,7 @@ impl FileOpener for VortexOpener {
177178
let metrics = self.metrics.clone();
178179
let layout_reader = self.layout_readers.clone();
179180
let has_output_ordering = self.has_output_ordering;
181+
let extensions = file.extensions.clone();
180182

181183
let projected_schema = match projection.as_ref() {
182184
None => logical_schema.clone(),
@@ -315,6 +317,9 @@ impl FileOpener for VortexOpener {
315317
};
316318

317319
let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader);
320+
if let Some(selection) = get_selection_from_extensions(extensions) {
321+
scan_builder = scan_builder.with_selection(selection);
322+
}
318323
if let Some(file_range) = file_meta.range {
319324
scan_builder = apply_byte_range(
320325
file_range,
@@ -424,6 +429,24 @@ fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u
424429
start_row..u64::min(row_count, end_row)
425430
}
426431

432+
/// Attempts to extract a `Selection` from the extensions object, if present.
433+
///
434+
/// This function is used to retrieve the row selection plan that may have been
435+
/// attached to a `PartitionedFile` via its `extensions` field.
436+
///
437+
/// # Arguments
438+
///
439+
/// * `extensions` - Optional type-erased extensions object that may contain a `Selection`
440+
///
441+
/// # Returns
442+
///
443+
/// Returns `Some(Selection)` if the extensions contain a valid `Selection`, otherwise `None`.
444+
fn get_selection_from_extensions(
445+
extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
446+
) -> Option<Selection> {
447+
extensions?.downcast_ref::<Selection>().cloned()
448+
}
449+
427450
#[cfg(test)]
428451
mod tests {
429452
use std::sync::LazyLock;
@@ -450,6 +473,7 @@ mod tests {
450473
use rstest::rstest;
451474
use vortex::VortexSessionDefault;
452475
use vortex::array::arrow::FromArrowArray;
476+
use vortex::buffer::Buffer;
453477
use vortex::file::WriteOptionsSessionExt;
454478
use vortex::io::ObjectStoreWriter;
455479
use vortex::io::VortexWrite;
@@ -459,6 +483,30 @@ mod tests {
459483

460484
static SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
461485

486+
fn make_test_opener(
487+
object_store: Arc<dyn ObjectStore>,
488+
schema: SchemaRef,
489+
projection: Option<Arc<[usize]>>,
490+
) -> VortexOpener {
491+
VortexOpener {
492+
session: SESSION.clone(),
493+
object_store,
494+
projection,
495+
filter: None,
496+
file_pruning_predicate: None,
497+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
498+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
499+
partition_fields: vec![],
500+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
501+
logical_schema: schema,
502+
batch_size: 100,
503+
limit: None,
504+
metrics: Default::default(),
505+
layout_readers: Default::default(),
506+
has_output_ordering: false,
507+
}
508+
}
509+
462510
#[rstest]
463511
#[case(0..100, 100, 100, 0..100)]
464512
#[case(0..105, 100, 105, 0..100)]
@@ -534,6 +582,18 @@ mod tests {
534582
}
535583
}
536584

585+
fn make_test_batch_with_10_rows() -> RecordBatch {
586+
record_batch!(
587+
("a", Int32, (0..=9).map(Some).collect::<Vec<_>>()),
588+
(
589+
"b",
590+
Utf8,
591+
(0..=9).map(|i| Some(format!("r{}", i))).collect::<Vec<_>>()
592+
)
593+
)
594+
.unwrap()
595+
}
596+
537597
#[rstest]
538598
#[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), (1, 3), (0, 0))]
539599
// If we don't have a physical expr adapter, we just drop filters on partition values
@@ -930,4 +990,153 @@ mod tests {
930990

931991
Ok(())
932992
}
993+
994+
#[tokio::test]
995+
// Test that Selection::IncludeByIndex filters to specific row indices.
996+
async fn test_selection_include_by_index() -> anyhow::Result<()> {
997+
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
998+
999+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1000+
let file_path = "/path/file.vortex";
1001+
1002+
let batch = make_test_batch_with_10_rows();
1003+
let data_size =
1004+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1005+
1006+
let table_schema = batch.schema();
1007+
let file_meta = make_meta(file_path, data_size);
1008+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
1009+
file.extensions = Some(Arc::new(Selection::IncludeByIndex(Buffer::from_iter(
1010+
vec![1, 3, 5, 7],
1011+
))));
1012+
1013+
let opener = make_test_opener(
1014+
object_store.clone(),
1015+
table_schema.clone(),
1016+
Some(vec![0, 1].into()),
1017+
);
1018+
1019+
let stream = opener.open(file_meta, file)?.await?;
1020+
let data = stream.try_collect::<Vec<_>>().await?;
1021+
let format_opts = FormatOptions::new().with_types_info(true);
1022+
1023+
assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
1024+
+-------+------+
1025+
| a | b |
1026+
| Int32 | Utf8 |
1027+
+-------+------+
1028+
| 1 | r1 |
1029+
| 3 | r3 |
1030+
| 5 | r5 |
1031+
| 7 | r7 |
1032+
+-------+------+
1033+
");
1034+
1035+
Ok(())
1036+
}
1037+
1038+
#[tokio::test]
1039+
// Test that Selection::ExcludeByIndex excludes specific row indices.
1040+
async fn test_selection_exclude_by_index() -> anyhow::Result<()> {
1041+
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
1042+
1043+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1044+
let file_path = "/path/file.vortex";
1045+
1046+
let batch = make_test_batch_with_10_rows();
1047+
let data_size =
1048+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1049+
1050+
let table_schema = batch.schema();
1051+
let file_meta = make_meta(file_path, data_size);
1052+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
1053+
file.extensions = Some(Arc::new(Selection::ExcludeByIndex(Buffer::from_iter(
1054+
vec![0, 2, 4, 6, 8],
1055+
))));
1056+
1057+
let opener = make_test_opener(
1058+
object_store.clone(),
1059+
table_schema.clone(),
1060+
Some(vec![0, 1].into()),
1061+
);
1062+
1063+
let stream = opener.open(file_meta, file)?.await?;
1064+
let data = stream.try_collect::<Vec<_>>().await?;
1065+
let format_opts = FormatOptions::new().with_types_info(true);
1066+
1067+
assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
1068+
+-------+------+
1069+
| a | b |
1070+
| Int32 | Utf8 |
1071+
+-------+------+
1072+
| 1 | r1 |
1073+
| 3 | r3 |
1074+
| 5 | r5 |
1075+
| 7 | r7 |
1076+
| 9 | r9 |
1077+
+-------+------+
1078+
");
1079+
1080+
Ok(())
1081+
}
1082+
1083+
#[tokio::test]
1084+
// Test that Selection::All returns all rows.
1085+
async fn test_selection_all() -> anyhow::Result<()> {
1086+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1087+
let file_path = "/path/file.vortex";
1088+
1089+
let batch = make_test_batch_with_10_rows();
1090+
let data_size =
1091+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1092+
1093+
let table_schema = batch.schema();
1094+
let file_meta = make_meta(file_path, data_size);
1095+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
1096+
file.extensions = Some(Arc::new(Selection::All));
1097+
1098+
let opener = make_test_opener(
1099+
object_store.clone(),
1100+
table_schema.clone(),
1101+
Some(vec![0].into()),
1102+
);
1103+
1104+
let stream = opener.open(file_meta, file)?.await?;
1105+
let data = stream.try_collect::<Vec<_>>().await?;
1106+
1107+
let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
1108+
assert_eq!(total_rows, 10);
1109+
1110+
Ok(())
1111+
}
1112+
1113+
#[tokio::test]
1114+
// Test that when no extensions are provided, all rows are returned (backward compatibility).
1115+
async fn test_selection_no_extensions() -> anyhow::Result<()> {
1116+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1117+
let file_path = "/path/file.vortex";
1118+
1119+
let batch = make_test_batch_with_10_rows();
1120+
let data_size =
1121+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1122+
1123+
let table_schema = batch.schema();
1124+
let file_meta = make_meta(file_path, data_size);
1125+
let file = PartitionedFile::new(file_path.to_string(), data_size);
1126+
// file.extensions is None by default
1127+
1128+
let opener = make_test_opener(
1129+
object_store.clone(),
1130+
table_schema.clone(),
1131+
Some(vec![0].into()),
1132+
);
1133+
1134+
let stream = opener.open(file_meta, file)?.await?;
1135+
let data = stream.try_collect::<Vec<_>>().await?;
1136+
1137+
let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
1138+
assert_eq!(total_rows, 10);
1139+
1140+
Ok(())
1141+
}
9331142
}

0 commit comments

Comments
 (0)