Skip to content

Commit 2cda863

Browse files
a10yAdamGS
authored andcommitted
almost there...
Signed-off-by: Andrew Duffy <[email protected]>
1 parent 5c4bf69 commit 2cda863

File tree

2 files changed

+68
-85
lines changed

2 files changed

+68
-85
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ impl FileOpener for VortexOpener {
272272
)));
273273
}
274274

275+
for filter in pushed.iter() {
276+
println!("- pushing: {filter}");
277+
}
278+
275279
make_vortex_predicate(&pushed).transpose()
276280
})
277281
.transpose()

vortex-datafusion/tests/schema_evolution.rs

Lines changed: 64 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,16 @@ use std::sync::LazyLock;
1515
use arrow_schema::DataType;
1616
use arrow_schema::Field;
1717
use arrow_schema::Schema;
18-
use arrow_schema::SchemaRef;
1918
use datafusion::arrow::array::Array;
2019
use datafusion::arrow::array::ArrayRef as ArrowArrayRef;
21-
use datafusion::arrow::array::Int32Array;
2220
use datafusion::arrow::array::RecordBatch;
23-
use datafusion::arrow::array::StringViewArray;
2421
use datafusion::arrow::array::StructArray;
25-
use datafusion::arrow::compute::concat_batches;
2622
use datafusion::datasource::listing::ListingOptions;
2723
use datafusion::datasource::listing::ListingTable;
2824
use datafusion::datasource::listing::ListingTableConfig;
2925
use datafusion::execution::SessionStateBuilder;
3026
use datafusion::execution::context::SessionContext;
27+
use datafusion_common::assert_batches_eq;
3128
use datafusion_common::create_array;
3229
use datafusion_common::record_batch;
3330
use datafusion_datasource::ListingTableUrl;
@@ -146,29 +143,17 @@ async fn test_filter_with_schema_evolution() {
146143
.collect()
147144
.await
148145
.unwrap();
149-
let table = concat_batches(&table_schema, result.iter()).unwrap();
150146

151-
// We read back the full table, with nulls filled in for missing fields
152-
assert_eq!(
153-
table,
154-
record_batch(
155-
&table_schema,
156-
vec![
157-
// a
158-
Arc::new(StringViewArray::from(vec![
159-
Some("one"),
160-
Some("two"),
161-
Some("three"),
162-
])) as ArrowArrayRef,
163-
// b
164-
Arc::new(StringViewArray::from(vec![
165-
Option::<&str>::None,
166-
None,
167-
None
168-
])) as ArrowArrayRef,
169-
]
170-
)
171-
);
147+
let expected = [
148+
"+-------+---+",
149+
"| a | b |",
150+
"+-------+---+",
151+
"| one | |",
152+
"| two | |",
153+
"| three | |",
154+
"+-------+---+",
155+
];
156+
assert_batches_eq!(expected, &result);
172157
}
173158

174159
#[tokio::test]
@@ -236,19 +221,16 @@ async fn test_filter_schema_evolution_order() {
236221
.collect()
237222
.await
238223
.unwrap();
239-
let result = concat_batches(&table_schema, result.iter()).unwrap();
240224

241-
assert_eq!(
242-
result,
243-
record_batch(
244-
&table_schema,
245-
vec![
246-
// a
247-
Arc::new(Int32Array::from(vec![Some(2)])) as ArrowArrayRef,
248-
// b
249-
Arc::new(StringViewArray::from(vec![Some("two"),])) as ArrowArrayRef,
250-
]
251-
)
225+
assert_batches_eq!(
226+
&[
227+
"+---+-----+",
228+
"| a | b |",
229+
"+---+-----+",
230+
"| 2 | two |",
231+
"+---+-----+",
232+
],
233+
&result
252234
);
253235

254236
// Filter on the "a" column, which has different types for each file
@@ -260,26 +242,22 @@ async fn test_filter_schema_evolution_order() {
260242
.collect()
261243
.await
262244
.unwrap();
263-
let table = concat_batches(&table_schema, result.iter()).unwrap();
245+
// let table = concat_batches(&table_schema, result.iter()).unwrap();
264246

265-
// file1, then file2
266-
assert_eq!(
267-
table,
268-
record_batch(
269-
&table_schema,
270-
vec![
271-
// a field: present in both files
272-
Arc::new(Int32Array::from(vec![Some(3), Some(5), Some(4), Some(6)]))
273-
as ArrowArrayRef,
274-
// b field: only present in file2, file1 fills with nulls
275-
Arc::new(StringViewArray::from(vec![
276-
None,
277-
None,
278-
Some("four"),
279-
Some("six")
280-
])) as ArrowArrayRef,
281-
]
282-
)
247+
// a field: present in both files
248+
// b field: only present in file2, file1 fills with nulls
249+
assert_batches_eq!(
250+
&[
251+
"+---+------+",
252+
"| a | b |",
253+
"+---+------+",
254+
"| 3 | |",
255+
"| 5 | |",
256+
"| 4 | four |",
257+
"| 6 | six |",
258+
"+---+------+",
259+
],
260+
&result
283261
);
284262
}
285263

@@ -372,18 +350,24 @@ async fn test_filter_schema_evolution_struct_fields() {
372350

373351
// Scan all the records, NULLs are filled in for nested optional fields.
374352
let full_scan = df.collect().await.unwrap();
375-
let full_scan = concat_batches(&table_schema, full_scan.iter()).unwrap();
376353

377-
let expected = concat_batches(
378-
&table_schema,
354+
assert_batches_eq!(
379355
&[
380-
// host01 with extra nulls for the payload.instance field
381-
make_metrics("host01.local", vec![1, 2, 3, 4], Some(vec![None; 4])),
382-
host02,
356+
"+--------------+-----------------------------+",
357+
"| hostname | payload |",
358+
"+--------------+-----------------------------+",
359+
"| host01.local | {uptime: 1, instance: } |",
360+
"| host01.local | {uptime: 2, instance: } |",
361+
"| host01.local | {uptime: 3, instance: } |",
362+
"| host01.local | {uptime: 4, instance: } |",
363+
"| host02.local | {uptime: 10, instance: c6i} |",
364+
"| host02.local | {uptime: 20, instance: c6i} |",
365+
"| host02.local | {uptime: 30, instance: m5} |",
366+
"| host02.local | {uptime: 40, instance: r5} |",
367+
"+--------------+-----------------------------+",
383368
],
384-
)
385-
.unwrap();
386-
assert_eq!(full_scan, expected);
369+
&full_scan
370+
);
387371

388372
// run a filter that touches both the payload.uptime AND the payload.instance nested fields
389373
let df = ctx.read_table(table.clone()).unwrap();
@@ -400,25 +384,20 @@ async fn test_filter_schema_evolution_struct_fields() {
400384
.collect()
401385
.await
402386
.unwrap();
403-
let filtered_scan = concat_batches(&table_schema, filtered_scan.iter()).unwrap();
404-
let expected = concat_batches(
405-
&table_schema,
387+
388+
assert_batches_eq!(
406389
&[
407-
make_metrics("host01.local", vec![1, 2, 3, 4], Some(vec![None; 4])),
408-
make_metrics(
409-
"host02.local",
410-
vec![10, 20],
411-
Some(vec![Some("c6i"), Some("c6i")]),
412-
),
390+
"+--------------+-----------------------------+",
391+
"| hostname | payload |",
392+
"+--------------+-----------------------------+",
393+
"| host01.local | {uptime: 1, instance: } |",
394+
"| host01.local | {uptime: 2, instance: } |",
395+
"| host01.local | {uptime: 3, instance: } |",
396+
"| host01.local | {uptime: 4, instance: } |",
397+
"| host02.local | {uptime: 10, instance: c6i} |",
398+
"| host02.local | {uptime: 20, instance: c6i} |",
399+
"+--------------+-----------------------------+",
413400
],
414-
)
415-
.unwrap();
416-
assert_eq!(filtered_scan, expected);
417-
}
418-
419-
fn record_batch(
420-
schema: &SchemaRef,
421-
fields: impl IntoIterator<Item = ArrowArrayRef>,
422-
) -> RecordBatch {
423-
RecordBatch::try_new(schema.clone(), fields.into_iter().collect()).unwrap()
401+
&filtered_scan
402+
);
424403
}

0 commit comments

Comments
 (0)