Skip to content

Commit 2907f45

Browse files
haohuaijinconnortsui20
authored andcommitted
feat: enhance VortexOpener with selection support in ScanBuilder
Signed-off-by: Huaijin <[email protected]> Signed-off-by: Connor Tsui <[email protected]>
1 parent 9a76489 commit 2907f45

File tree

1 file changed

+238
-0
lines changed

1 file changed

+238
-0
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use vortex::layout::LayoutReader;
4343
use vortex::layout::layouts::USE_VORTEX_OPERATORS;
4444
use vortex::metrics::VortexMetrics;
4545
use vortex::scan::ScanBuilder;
46+
use vortex::scan::Selection;
4647
use vortex::session::VortexSession;
4748
use vortex_utils::aliases::dash_map::DashMap;
4849
use vortex_utils::aliases::dash_map::Entry;
@@ -177,6 +178,7 @@ impl FileOpener for VortexOpener {
177178
let metrics = self.metrics.clone();
178179
let layout_reader = self.layout_readers.clone();
179180
let has_output_ordering = self.has_output_ordering;
181+
let extensions = file.extensions.clone();
180182

181183
let projected_schema = match projection.as_ref() {
182184
None => logical_schema.clone(),
@@ -315,6 +317,9 @@ impl FileOpener for VortexOpener {
315317
};
316318

317319
let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader);
320+
if let Some(selection) = get_selection_from_extensions(extensions) {
321+
scan_builder = scan_builder.with_selection(selection);
322+
}
318323
if let Some(file_range) = file_meta.range {
319324
scan_builder = apply_byte_range(
320325
file_range,
@@ -424,6 +429,29 @@ fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u
424429
start_row..u64::min(row_count, end_row)
425430
}
426431

432+
/// Attempts to extract a `Selection` from the extensions object, if present.
433+
///
434+
/// This function is used to retrieve the row selection plan that may have been
435+
/// attached to a `PartitionedFile` via its `extensions` field.
436+
///
437+
/// # Arguments
438+
///
439+
/// * `extensions` - Optional type-erased extensions object that may contain a `Selection`
440+
///
441+
/// # Returns
442+
///
443+
/// Returns `Some(Selection)` if the extensions contain a valid `Selection`, otherwise `None`.
444+
fn get_selection_from_extensions(
445+
extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
446+
) -> Option<Selection> {
447+
if let Some(extensions) = extensions
448+
&& let Some(selection) = extensions.downcast_ref::<Selection>()
449+
{
450+
return Some(selection.clone());
451+
}
452+
None
453+
}
454+
427455
#[cfg(test)]
428456
mod tests {
429457
use std::sync::LazyLock;
@@ -450,6 +478,7 @@ mod tests {
450478
use rstest::rstest;
451479
use vortex::VortexSessionDefault;
452480
use vortex::array::arrow::FromArrowArray;
481+
use vortex::buffer::Buffer;
453482
use vortex::file::WriteOptionsSessionExt;
454483
use vortex::io::ObjectStoreWriter;
455484
use vortex::io::VortexWrite;
@@ -534,6 +563,18 @@ mod tests {
534563
}
535564
}
536565

566+
fn make_test_batch_with_10_rows() -> RecordBatch {
567+
record_batch!(
568+
("a", Int32, (0..=9).map(Some).collect::<Vec<_>>()),
569+
(
570+
"b",
571+
Utf8,
572+
(0..=9).map(|i| Some(format!("r{}", i))).collect::<Vec<_>>()
573+
)
574+
)
575+
.unwrap()
576+
}
577+
537578
#[rstest]
538579
#[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), (1, 3), (0, 0))]
539580
// If we don't have a physical expr adapter, we just drop filters on partition values
@@ -930,4 +971,201 @@ mod tests {
930971

931972
Ok(())
932973
}
974+
975+
#[tokio::test]
976+
// Test that Selection::IncludeByIndex filters to specific row indices.
977+
async fn test_selection_include_by_index() -> anyhow::Result<()> {
978+
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
979+
980+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
981+
let file_path = "/path/file.vortex";
982+
983+
let batch = make_test_batch_with_10_rows();
984+
let data_size =
985+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
986+
987+
let table_schema = batch.schema();
988+
let file_meta = make_meta(file_path, data_size);
989+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
990+
file.extensions = Some(Arc::new(Selection::IncludeByIndex(Buffer::from_iter(
991+
vec![1, 3, 5, 7],
992+
))));
993+
994+
let opener = VortexOpener {
995+
session: SESSION.clone(),
996+
object_store: object_store.clone(),
997+
projection: Some(vec![0, 1].into()),
998+
filter: None,
999+
file_pruning_predicate: None,
1000+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
1001+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1002+
partition_fields: vec![],
1003+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
1004+
logical_schema: table_schema.clone(),
1005+
batch_size: 100,
1006+
limit: None,
1007+
metrics: Default::default(),
1008+
layout_readers: Default::default(),
1009+
has_output_ordering: false,
1010+
};
1011+
1012+
let stream = opener.open(file_meta, file)?.await?;
1013+
let data = stream.try_collect::<Vec<_>>().await?;
1014+
let format_opts = FormatOptions::new().with_types_info(true);
1015+
1016+
assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
1017+
+-------+------+
1018+
| a | b |
1019+
| Int32 | Utf8 |
1020+
+-------+------+
1021+
| 1 | r1 |
1022+
| 3 | r3 |
1023+
| 5 | r5 |
1024+
| 7 | r7 |
1025+
+-------+------+
1026+
");
1027+
1028+
Ok(())
1029+
}
1030+
1031+
#[tokio::test]
1032+
// Test that Selection::ExcludeByIndex excludes specific row indices.
1033+
async fn test_selection_exclude_by_index() -> anyhow::Result<()> {
1034+
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
1035+
1036+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1037+
let file_path = "/path/file.vortex";
1038+
1039+
let batch = make_test_batch_with_10_rows();
1040+
let data_size =
1041+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1042+
1043+
let table_schema = batch.schema();
1044+
let file_meta = make_meta(file_path, data_size);
1045+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
1046+
file.extensions = Some(Arc::new(Selection::ExcludeByIndex(Buffer::from_iter(
1047+
vec![0, 2, 4, 6, 8],
1048+
))));
1049+
1050+
let opener = VortexOpener {
1051+
session: SESSION.clone(),
1052+
object_store: object_store.clone(),
1053+
projection: Some(vec![0, 1].into()),
1054+
filter: None,
1055+
file_pruning_predicate: None,
1056+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
1057+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1058+
partition_fields: vec![],
1059+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
1060+
logical_schema: table_schema.clone(),
1061+
batch_size: 100,
1062+
limit: None,
1063+
metrics: Default::default(),
1064+
layout_readers: Default::default(),
1065+
has_output_ordering: false,
1066+
};
1067+
1068+
let stream = opener.open(file_meta, file)?.await?;
1069+
let data = stream.try_collect::<Vec<_>>().await?;
1070+
let format_opts = FormatOptions::new().with_types_info(true);
1071+
1072+
assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
1073+
+-------+------+
1074+
| a | b |
1075+
| Int32 | Utf8 |
1076+
+-------+------+
1077+
| 1 | r1 |
1078+
| 3 | r3 |
1079+
| 5 | r5 |
1080+
| 7 | r7 |
1081+
| 9 | r9 |
1082+
+-------+------+
1083+
");
1084+
1085+
Ok(())
1086+
}
1087+
1088+
#[tokio::test]
1089+
// Test that Selection::All returns all rows.
1090+
async fn test_selection_all() -> anyhow::Result<()> {
1091+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1092+
let file_path = "/path/file.vortex";
1093+
1094+
let batch = make_test_batch_with_10_rows();
1095+
let data_size =
1096+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1097+
1098+
let table_schema = batch.schema();
1099+
let file_meta = make_meta(file_path, data_size);
1100+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
1101+
file.extensions = Some(Arc::new(Selection::All));
1102+
1103+
let opener = VortexOpener {
1104+
session: SESSION.clone(),
1105+
object_store: object_store.clone(),
1106+
projection: Some(vec![0].into()),
1107+
filter: None,
1108+
file_pruning_predicate: None,
1109+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
1110+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1111+
partition_fields: vec![],
1112+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
1113+
logical_schema: table_schema.clone(),
1114+
batch_size: 100,
1115+
limit: None,
1116+
metrics: Default::default(),
1117+
layout_readers: Default::default(),
1118+
has_output_ordering: false,
1119+
};
1120+
1121+
let stream = opener.open(file_meta, file)?.await?;
1122+
let data = stream.try_collect::<Vec<_>>().await?;
1123+
1124+
let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
1125+
assert_eq!(total_rows, 10);
1126+
1127+
Ok(())
1128+
}
1129+
1130+
#[tokio::test]
1131+
// Test that when no extensions are provided, all rows are returned (backward compatibility).
1132+
async fn test_selection_no_extensions() -> anyhow::Result<()> {
1133+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1134+
let file_path = "/path/file.vortex";
1135+
1136+
let batch = make_test_batch_with_10_rows();
1137+
let data_size =
1138+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1139+
1140+
let table_schema = batch.schema();
1141+
let file_meta = make_meta(file_path, data_size);
1142+
let file = PartitionedFile::new(file_path.to_string(), data_size);
1143+
// file.extensions is None by default
1144+
1145+
let opener = VortexOpener {
1146+
session: SESSION.clone(),
1147+
object_store: object_store.clone(),
1148+
projection: Some(vec![0].into()),
1149+
filter: None,
1150+
file_pruning_predicate: None,
1151+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
1152+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1153+
partition_fields: vec![],
1154+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
1155+
logical_schema: table_schema.clone(),
1156+
batch_size: 100,
1157+
limit: None,
1158+
metrics: Default::default(),
1159+
layout_readers: Default::default(),
1160+
has_output_ordering: false,
1161+
};
1162+
1163+
let stream = opener.open(file_meta, file)?.await?;
1164+
let data = stream.try_collect::<Vec<_>>().await?;
1165+
1166+
let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
1167+
assert_eq!(total_rows, 10);
1168+
1169+
Ok(())
1170+
}
9331171
}

0 commit comments

Comments
 (0)