Skip to content

Commit 320d6fa

Browse files
committed
more things
Signed-off-by: Adam Gutglick <[email protected]>
1 parent e93217a commit 320d6fa

File tree

2 files changed

+198
-148
lines changed

2 files changed

+198
-148
lines changed

vortex-datafusion/src/persistent/access_plan.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,19 @@ use vortex::scan::Selection;
99
/// This is intended as a low-level interface for users building their own data systems, see the [advance index] example from the DataFusion repo for a similar usage with Parquet.
1010
///
1111
/// [advance index]: https://github.com/apache/datafusion/blob/47df535d2cd5aac5ad5a92bdc837f38e05ea0f0f/datafusion-examples/examples/data_io/parquet_advanced_index.rs
12+
#[derive(Default)]
1213
pub struct VortexAccessPlan {
1314
selection: Option<Selection>,
1415
}
1516

17+
impl VortexAccessPlan {
18+
/// Sets a [`Selection`] for this plan.
19+
pub fn with_selection(mut self, selection: Selection) -> Self {
20+
self.selection = Some(selection);
21+
self
22+
}
23+
}
24+
1625
impl VortexAccessPlan {
1726
/// Apply the plan to the scan's builder.
1827
pub fn apply_to_builder<A>(&self, mut scan_builder: ScanBuilder<A>) -> ScanBuilder<A>

vortex-datafusion/src/persistent/opener.rs

Lines changed: 189 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ mod tests {
370370

371371
use arrow_schema::Field;
372372
use arrow_schema::Fields;
373+
use arrow_schema::SchemaRef;
373374
use datafusion::arrow::array::RecordBatch;
374375
use datafusion::arrow::array::StringArray;
375376
use datafusion::arrow::array::StructArray;
@@ -396,6 +397,7 @@ mod tests {
396397
use vortex::session::VortexSession;
397398

398399
use super::*;
400+
use crate::VortexAccessPlan;
399401
use crate::vendor::schema_rewriter::DF52PhysicalExprAdapterFactory;
400402

401403
static SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
@@ -886,152 +888,191 @@ mod tests {
886888
Ok(())
887889
}
888890

889-
// #[tokio::test]
890-
// // Test that Selection::IncludeByIndex filters to specific row indices.
891-
// async fn test_selection_include_by_index() -> anyhow::Result<()> {
892-
// use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
893-
894-
// let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
895-
// let file_path = "/path/file.vortex";
896-
897-
// let batch = make_test_batch_with_10_rows();
898-
// let data_size =
899-
// write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
900-
901-
// let table_schema = batch.schema();
902-
// let file_meta = make_meta(file_path, data_size);
903-
// let mut file = PartitionedFile::new(file_path.to_string(), data_size);
904-
// file.extensions = Some(Arc::new(Selection::IncludeByIndex(Buffer::from_iter(
905-
// vec![1, 3, 5, 7],
906-
// ))));
907-
908-
// let opener = make_test_opener(
909-
// object_store.clone(),
910-
// table_schema.clone(),
911-
// Some(vec![0, 1].into()),
912-
// );
913-
914-
// let stream = opener.open(file_meta, file)?.await?;
915-
// let data = stream.try_collect::<Vec<_>>().await?;
916-
// let format_opts = FormatOptions::new().with_types_info(true);
917-
918-
// assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
919-
// +-------+------+
920-
// | a | b |
921-
// | Int32 | Utf8 |
922-
// +-------+------+
923-
// | 1 | r1 |
924-
// | 3 | r3 |
925-
// | 5 | r5 |
926-
// | 7 | r7 |
927-
// +-------+------+
928-
// ");
929-
930-
// Ok(())
931-
// }
932-
933-
// #[tokio::test]
934-
// // Test that Selection::ExcludeByIndex excludes specific row indices.
935-
// async fn test_selection_exclude_by_index() -> anyhow::Result<()> {
936-
// use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
937-
938-
// let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
939-
// let file_path = "/path/file.vortex";
940-
941-
// let batch = make_test_batch_with_10_rows();
942-
// let data_size =
943-
// write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
944-
945-
// let table_schema = batch.schema();
946-
// let file_meta = make_meta(file_path, data_size);
947-
// let mut file = PartitionedFile::new(file_path.to_string(), data_size);
948-
// file.extensions = Some(Arc::new(Selection::ExcludeByIndex(Buffer::from_iter(
949-
// vec![0, 2, 4, 6, 8],
950-
// ))));
951-
952-
// let opener = make_test_opener(
953-
// object_store.clone(),
954-
// table_schema.clone(),
955-
// Some(vec![0, 1].into()),
956-
// );
957-
958-
// let stream = opener.open(file_meta, file)?.await?;
959-
// let data = stream.try_collect::<Vec<_>>().await?;
960-
// let format_opts = FormatOptions::new().with_types_info(true);
961-
962-
// assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
963-
// +-------+------+
964-
// | a | b |
965-
// | Int32 | Utf8 |
966-
// +-------+------+
967-
// | 1 | r1 |
968-
// | 3 | r3 |
969-
// | 5 | r5 |
970-
// | 7 | r7 |
971-
// | 9 | r9 |
972-
// +-------+------+
973-
// ");
974-
975-
// Ok(())
976-
// }
977-
978-
// #[tokio::test]
979-
// // Test that Selection::All returns all rows.
980-
// async fn test_selection_all() -> anyhow::Result<()> {
981-
// let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
982-
// let file_path = "/path/file.vortex";
983-
984-
// let batch = make_test_batch_with_10_rows();
985-
// let data_size =
986-
// write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
987-
988-
// let table_schema = batch.schema();
989-
// let file_meta = make_meta(file_path, data_size);
990-
// let mut file = PartitionedFile::new(file_path.to_string(), data_size);
991-
// file.extensions = Some(Arc::new(Selection::All));
992-
993-
// let opener = make_test_opener(
994-
// object_store.clone(),
995-
// table_schema.clone(),
996-
// Some(vec![0].into()),
997-
// );
998-
999-
// let stream = opener.open(file_meta, file)?.await?;
1000-
// let data = stream.try_collect::<Vec<_>>().await?;
1001-
1002-
// let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
1003-
// assert_eq!(total_rows, 10);
1004-
1005-
// Ok(())
1006-
// }
1007-
1008-
// #[tokio::test]
1009-
// // Test that when no extensions are provided, all rows are returned (backward compatibility).
1010-
// async fn test_selection_no_extensions() -> anyhow::Result<()> {
1011-
// let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1012-
// let file_path = "/path/file.vortex";
1013-
1014-
// let batch = make_test_batch_with_10_rows();
1015-
// let data_size =
1016-
// write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1017-
1018-
// let table_schema = batch.schema();
1019-
// let file_meta = make_meta(file_path, data_size);
1020-
// let file = PartitionedFile::new(file_path.to_string(), data_size);
1021-
// // file.extensions is None by default
1022-
1023-
// let opener = make_test_opener(
1024-
// object_store.clone(),
1025-
// table_schema.clone(),
1026-
// Some(vec![0].into()),
1027-
// );
1028-
1029-
// let stream = opener.open(file_meta, file)?.await?;
1030-
// let data = stream.try_collect::<Vec<_>>().await?;
1031-
1032-
// let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
1033-
// assert_eq!(total_rows, 10);
1034-
1035-
// Ok(())
1036-
// }
891+
fn make_test_batch_with_10_rows() -> RecordBatch {
892+
record_batch!(
893+
("a", Int32, (0..=9).map(Some).collect::<Vec<_>>()),
894+
(
895+
"b",
896+
Utf8,
897+
(0..=9).map(|i| Some(format!("r{}", i))).collect::<Vec<_>>()
898+
)
899+
)
900+
.unwrap()
901+
}
902+
903+
fn make_test_opener(
904+
object_store: Arc<dyn ObjectStore>,
905+
schema: SchemaRef,
906+
projection: Option<Arc<[usize]>>,
907+
) -> VortexOpener {
908+
VortexOpener {
909+
session: SESSION.clone(),
910+
object_store,
911+
projection,
912+
filter: None,
913+
file_pruning_predicate: None,
914+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
915+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
916+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
917+
table_schema: TableSchema::from_file_schema(schema),
918+
batch_size: 100,
919+
limit: None,
920+
metrics: Default::default(),
921+
layout_readers: Default::default(),
922+
has_output_ordering: false,
923+
}
924+
}
925+
926+
#[tokio::test]
927+
// Test that Selection::IncludeByIndex filters to specific row indices.
928+
async fn test_selection_include_by_index() -> anyhow::Result<()> {
929+
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
930+
use vortex::buffer::Buffer;
931+
use vortex::scan::Selection;
932+
933+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
934+
let file_path = "/path/file.vortex";
935+
936+
let batch = make_test_batch_with_10_rows();
937+
let data_size =
938+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
939+
940+
let table_schema = batch.schema();
941+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
942+
file.extensions = Some(Arc::new(VortexAccessPlan::default().with_selection(
943+
Selection::IncludeByIndex(Buffer::from_iter(vec![1, 3, 5, 7])),
944+
)));
945+
946+
let opener = make_test_opener(
947+
object_store.clone(),
948+
table_schema.clone(),
949+
Some(vec![0, 1].into()),
950+
);
951+
952+
let stream = opener.open(file)?.await?;
953+
let data = stream.try_collect::<Vec<_>>().await?;
954+
let format_opts = FormatOptions::new().with_types_info(true);
955+
956+
assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
957+
+-------+------+
958+
| a | b |
959+
| Int32 | Utf8 |
960+
+-------+------+
961+
| 1 | r1 |
962+
| 3 | r3 |
963+
| 5 | r5 |
964+
| 7 | r7 |
965+
+-------+------+
966+
");
967+
968+
Ok(())
969+
}
970+
971+
#[tokio::test]
972+
// Test that Selection::ExcludeByIndex excludes specific row indices.
973+
async fn test_selection_exclude_by_index() -> anyhow::Result<()> {
974+
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
975+
use vortex::buffer::Buffer;
976+
use vortex::scan::Selection;
977+
978+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
979+
let file_path = "/path/file.vortex";
980+
981+
let batch = make_test_batch_with_10_rows();
982+
let data_size =
983+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
984+
985+
let table_schema = batch.schema();
986+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
987+
file.extensions = Some(Arc::new(VortexAccessPlan::default().with_selection(
988+
Selection::ExcludeByIndex(Buffer::from_iter(vec![0, 2, 4, 6, 8])),
989+
)));
990+
991+
let opener = make_test_opener(
992+
object_store.clone(),
993+
table_schema.clone(),
994+
Some(vec![0, 1].into()),
995+
);
996+
997+
let stream = opener.open(file)?.await?;
998+
let data = stream.try_collect::<Vec<_>>().await?;
999+
let format_opts = FormatOptions::new().with_types_info(true);
1000+
1001+
assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
1002+
+-------+------+
1003+
| a | b |
1004+
| Int32 | Utf8 |
1005+
+-------+------+
1006+
| 1 | r1 |
1007+
| 3 | r3 |
1008+
| 5 | r5 |
1009+
| 7 | r7 |
1010+
| 9 | r9 |
1011+
+-------+------+
1012+
");
1013+
1014+
Ok(())
1015+
}
1016+
1017+
#[tokio::test]
1018+
// Test that Selection::All returns all rows.
1019+
async fn test_selection_all() -> anyhow::Result<()> {
1020+
use vortex::scan::Selection;
1021+
1022+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1023+
let file_path = "/path/file.vortex";
1024+
1025+
let batch = make_test_batch_with_10_rows();
1026+
let data_size =
1027+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1028+
1029+
let table_schema = batch.schema();
1030+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
1031+
file.extensions = Some(Arc::new(
1032+
VortexAccessPlan::default().with_selection(Selection::All),
1033+
));
1034+
1035+
let opener = make_test_opener(
1036+
object_store.clone(),
1037+
table_schema.clone(),
1038+
Some(vec![0].into()),
1039+
);
1040+
1041+
let stream = opener.open(file)?.await?;
1042+
let data = stream.try_collect::<Vec<_>>().await?;
1043+
1044+
let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
1045+
assert_eq!(total_rows, 10);
1046+
1047+
Ok(())
1048+
}
1049+
1050+
#[tokio::test]
1051+
// Test that when no extensions are provided, all rows are returned (backward compatibility).
1052+
async fn test_selection_no_extensions() -> anyhow::Result<()> {
1053+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1054+
let file_path = "/path/file.vortex";
1055+
1056+
let batch = make_test_batch_with_10_rows();
1057+
let data_size =
1058+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1059+
1060+
let table_schema = batch.schema();
1061+
let file = PartitionedFile::new(file_path.to_string(), data_size);
1062+
// file.extensions is None by default
1063+
1064+
let opener = make_test_opener(
1065+
object_store.clone(),
1066+
table_schema.clone(),
1067+
Some(vec![0].into()),
1068+
);
1069+
1070+
let stream = opener.open(file)?.await?;
1071+
let data = stream.try_collect::<Vec<_>>().await?;
1072+
1073+
let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
1074+
assert_eq!(total_rows, 10);
1075+
1076+
Ok(())
1077+
}
10371078
}

0 commit comments

Comments
 (0)