Skip to content

Commit cd458b2

Browse files
committed
chore: refactor planner read schema tests
1 parent adcfce4 commit cd458b2

File tree

1 file changed

+68
-184
lines changed

1 file changed

+68
-184
lines changed

native/core/src/execution/planner.rs

Lines changed: 68 additions & 184 deletions
Original file line numberDiff line numberDiff line change
@@ -2553,7 +2553,7 @@ mod tests {
25532553
use futures::{poll, StreamExt};
25542554
use std::{sync::Arc, task::Poll};
25552555

2556-
use arrow::array::{Array, DictionaryArray, Int32Array, StringArray};
2556+
use arrow::array::{Array, DictionaryArray, Int32Array, RecordBatch, StringArray};
25572557
use arrow::datatypes::{DataType, Field, Fields, Schema};
25582558
use datafusion::catalog::memory::DataSourceExec;
25592559
use datafusion::datasource::listing::PartitionedFile;
@@ -3133,49 +3133,30 @@ mod tests {
31333133
});
31343134
}
31353135

3136-
/*
3137-
Testing a nested types scenario
3138-
3139-
select arr[0].a, arr[0].c from (
3140-
select array(named_struct('a', 1, 'b', 'n', 'c', 'x')) arr)
3141-
*/
3142-
#[tokio::test]
3143-
async fn test_nested_types_list_of_struct_by_index() -> Result<(), DataFusionError> {
3136+
/// Executes a `test_data_query` SQL query
3137+
/// and saves the result into a temp folder using parquet format
3138+
/// Read the file back to the memory using a custom schema
3139+
async fn make_parquet_data(
3140+
test_data_query: &str,
3141+
read_schema: Schema,
3142+
) -> Result<RecordBatch, DataFusionError> {
31443143
let session_ctx = SessionContext::new();
31453144

31463145
// generate test data in the temp folder
3147-
let test_data = "select make_array(named_struct('a', 1, 'b', 'n', 'c', 'x')) c0";
31483146
let tmp_dir = TempDir::new()?;
31493147
let test_path = tmp_dir.path().to_str().unwrap().to_string();
31503148

31513149
let plan = session_ctx
3152-
.sql(test_data)
3150+
.sql(test_data_query)
31533151
.await?
31543152
.create_physical_plan()
31553153
.await?;
31563154

3157-
// Write parquet file into temp folder
3155+
// Write a parquet file into temp folder
31583156
session_ctx
31593157
.write_parquet(plan, test_path.clone(), None)
31603158
.await?;
31613159

3162-
// Define schema Comet reads with
3163-
let required_schema = Schema::new(Fields::from(vec![Field::new(
3164-
"c0",
3165-
DataType::List(
3166-
Field::new(
3167-
"element",
3168-
DataType::Struct(Fields::from(vec![
3169-
Field::new("a", DataType::Int32, true),
3170-
Field::new("c", DataType::Utf8, true),
3171-
] as Vec<Field>)),
3172-
true,
3173-
)
3174-
.into(),
3175-
),
3176-
true,
3177-
)]));
3178-
31793160
// Register all parquet with temp data as file groups
31803161
let mut file_groups: Vec<FileGroup> = vec![];
31813162
for entry in std::fs::read_dir(&test_path)? {
@@ -3200,16 +3181,44 @@ mod tests {
32003181

32013182
let object_store_url = ObjectStoreUrl::local_filesystem();
32023183
let file_scan_config =
3203-
FileScanConfigBuilder::new(object_store_url, required_schema.into(), source)
3184+
FileScanConfigBuilder::new(object_store_url, read_schema.into(), source)
32043185
.with_file_groups(file_groups)
32053186
.build();
32063187

32073188
// Run native read
32083189
let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));
3209-
let stream = scan.execute(0, session_ctx.task_ctx())?;
3210-
let result: Vec<_> = stream.collect().await;
3190+
let result: Vec<_> = scan.execute(0, session_ctx.task_ctx())?.collect().await;
3191+
Ok(result.first().unwrap().as_ref().unwrap().clone())
3192+
}
3193+
3194+
/*
3195+
Testing a nested types scenario
3196+
3197+
select arr[0].a, arr[0].c from (
3198+
select array(named_struct('a', 1, 'b', 'n', 'c', 'x')) arr)
3199+
*/
3200+
#[tokio::test]
3201+
async fn test_nested_types_list_of_struct_by_index() -> Result<(), DataFusionError> {
3202+
let test_data = "select make_array(named_struct('a', 1, 'b', 'n', 'c', 'x')) c0";
3203+
3204+
// Define schema Comet reads with
3205+
let required_schema = Schema::new(Fields::from(vec![Field::new(
3206+
"c0",
3207+
DataType::List(
3208+
Field::new(
3209+
"element",
3210+
DataType::Struct(Fields::from(vec![
3211+
Field::new("a", DataType::Int32, true),
3212+
Field::new("c", DataType::Utf8, true),
3213+
] as Vec<Field>)),
3214+
true,
3215+
)
3216+
.into(),
3217+
),
3218+
true,
3219+
)]));
32113220

3212-
let actual = result.first().unwrap().as_ref().unwrap();
3221+
let actual = make_parquet_data(test_data, required_schema).await?;
32133222

32143223
let expected = [
32153224
"+----------------+",
@@ -3218,7 +3227,7 @@ mod tests {
32183227
"| [{a: 1, c: x}] |",
32193228
"+----------------+",
32203229
];
3221-
assert_batches_eq!(expected, &[actual.clone()]);
3230+
assert_batches_eq!(expected, &[actual]);
32223231

32233232
Ok(())
32243233
}
@@ -3231,47 +3240,7 @@ mod tests {
32313240
*/
32323241
#[tokio::test]
32333242
async fn test_nested_types_map_keys() -> Result<(), DataFusionError> {
3234-
let session_ctx = SessionContext::new();
3235-
3236-
// generate test data in the temp folder
32373243
let test_data = "select map([named_struct('a', 1, 'b', 'n', 'c', 'x')], [named_struct('a', 2, 'b', 'm', 'c', 'y')]) c0";
3238-
let tmp_dir = TempDir::new()?;
3239-
let test_path = tmp_dir.path().to_str().unwrap().to_string();
3240-
3241-
let plan = session_ctx
3242-
.sql(test_data)
3243-
.await?
3244-
.create_physical_plan()
3245-
.await?;
3246-
3247-
// Write a parquet file into temp folder
3248-
session_ctx
3249-
.write_parquet(plan, test_path.clone(), None)
3250-
.await?;
3251-
3252-
// Register all parquet with temp data as file groups
3253-
let mut file_groups: Vec<FileGroup> = vec![];
3254-
for entry in std::fs::read_dir(&test_path)? {
3255-
let entry = entry?;
3256-
let path = entry.path();
3257-
3258-
if path.extension().and_then(|ext| ext.to_str()) == Some("parquet") {
3259-
if let Some(path_str) = path.to_str() {
3260-
file_groups.push(FileGroup::new(vec![PartitionedFile::from_path(
3261-
path_str.into(),
3262-
)?]));
3263-
}
3264-
}
3265-
}
3266-
3267-
let source = ParquetSource::default().with_schema_adapter_factory(Arc::new(
3268-
SparkSchemaAdapterFactory::new(
3269-
SparkParquetOptions::new(EvalMode::Ansi, "", false),
3270-
None,
3271-
),
3272-
))?;
3273-
3274-
// Define schema Comet reads with
32753244
let required_schema = Schema::new(Fields::from(vec![Field::new(
32763245
"c0",
32773246
DataType::Map(
@@ -3305,19 +3274,7 @@ mod tests {
33053274
true,
33063275
)]));
33073276

3308-
let object_store_url = ObjectStoreUrl::local_filesystem();
3309-
let file_scan_config =
3310-
FileScanConfigBuilder::new(object_store_url, required_schema.into(), source)
3311-
.with_file_groups(file_groups)
3312-
.build();
3313-
3314-
// Run native read
3315-
let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));
3316-
let stream = scan.execute(0, session_ctx.task_ctx())?;
3317-
let result: Vec<_> = stream.collect().await;
3318-
3319-
let actual = result.first().unwrap().as_ref().unwrap();
3320-
3277+
let actual = make_parquet_data(test_data, required_schema).await?;
33213278
let expected = [
33223279
"+------------------------------+",
33233280
"| c0 |",
@@ -3330,51 +3287,12 @@ mod tests {
33303287
Ok(())
33313288
}
33323289

3290+
// Read struct using schema where schema fields do not overlap with
3291+
// struct fields
33333292
#[tokio::test]
3334-
async fn test_nested_types_extract_missing_struct_names() -> Result<(), DataFusionError> {
3335-
let session_ctx = SessionContext::new();
3336-
3337-
// generate test data in the temp folder
3293+
async fn test_nested_types_extract_missing_struct_names_non_overlap(
3294+
) -> Result<(), DataFusionError> {
33383295
let test_data = "select named_struct('a', 1, 'b', 'abc') c0";
3339-
let tmp_dir = TempDir::new()?;
3340-
let test_path = tmp_dir.path().to_str().unwrap().to_string();
3341-
3342-
let plan = session_ctx
3343-
.sql(test_data)
3344-
.await?
3345-
.create_physical_plan()
3346-
.await?;
3347-
3348-
// Write a parquet file into temp folder
3349-
session_ctx
3350-
.write_parquet(plan, test_path.clone(), None)
3351-
.await?;
3352-
3353-
// Register all parquet with temp data as file groups
3354-
let mut file_groups: Vec<FileGroup> = vec![];
3355-
for entry in std::fs::read_dir(&test_path)? {
3356-
let entry = entry?;
3357-
let path = entry.path();
3358-
3359-
if path.extension().and_then(|ext| ext.to_str()) == Some("parquet") {
3360-
if let Some(path_str) = path.to_str() {
3361-
file_groups.push(FileGroup::new(vec![PartitionedFile::from_path(
3362-
path_str.into(),
3363-
)?]));
3364-
}
3365-
}
3366-
}
3367-
3368-
let source = ParquetSource::default().with_schema_adapter_factory(Arc::new(
3369-
SparkSchemaAdapterFactory::new(
3370-
SparkParquetOptions::new(EvalMode::Ansi, "", false),
3371-
None,
3372-
),
3373-
))?;
3374-
3375-
let object_store_url = ObjectStoreUrl::local_filesystem();
3376-
3377-
// Define schema Comet reads with
33783296
let required_schema = Schema::new(Fields::from(vec![Field::new(
33793297
"c0",
33803298
DataType::Struct(Fields::from(vec![
@@ -3383,57 +3301,39 @@ mod tests {
33833301
])),
33843302
true,
33853303
)]));
3386-
3387-
let file_scan_config = FileScanConfigBuilder::new(
3388-
object_store_url.clone(),
3389-
required_schema.into(),
3390-
Arc::clone(&source),
3391-
)
3392-
.with_file_groups(file_groups.clone())
3393-
.build();
3394-
3395-
// Run native read
3396-
let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));
3397-
let stream = scan.execute(0, session_ctx.task_ctx())?;
3398-
let result: Vec<_> = stream.collect().await;
3399-
3400-
let actual = result.first().unwrap().as_ref().unwrap();
3401-
3304+
let actual = make_parquet_data(test_data, required_schema).await?;
34023305
let expected = ["+----+", "| c0 |", "+----+", "| |", "+----+"];
3403-
assert_batches_eq!(expected, &[actual.clone()]);
3306+
assert_batches_eq!(expected, &[actual]);
3307+
Ok(())
3308+
}
34043309

3405-
// Define schema Comet reads with
3310+
// Read struct using custom schema to read just a single field from the struct
3311+
#[tokio::test]
3312+
async fn test_nested_types_extract_missing_struct_names_single_field(
3313+
) -> Result<(), DataFusionError> {
3314+
let test_data = "select named_struct('a', 1, 'b', 'abc') c0";
34063315
let required_schema = Schema::new(Fields::from(vec![Field::new(
34073316
"c0",
34083317
DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int64, true)])),
34093318
true,
34103319
)]));
3411-
3412-
let file_scan_config = FileScanConfigBuilder::new(
3413-
object_store_url.clone(),
3414-
required_schema.into(),
3415-
Arc::clone(&source),
3416-
)
3417-
.with_file_groups(file_groups.clone())
3418-
.build();
3419-
3420-
// Run native read
3421-
let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));
3422-
let stream = scan.execute(0, session_ctx.task_ctx())?;
3423-
let result: Vec<_> = stream.collect().await;
3424-
3425-
let actual = result.first().unwrap().as_ref().unwrap();
3426-
3320+
let actual = make_parquet_data(test_data, required_schema).await?;
34273321
let expected = [
34283322
"+--------+",
34293323
"| c0 |",
34303324
"+--------+",
34313325
"| {a: 1} |",
34323326
"+--------+",
34333327
];
3434-
assert_batches_eq!(expected, &[actual.clone()]);
3328+
assert_batches_eq!(expected, &[actual]);
3329+
Ok(())
3330+
}
34353331

3436-
// Define schema Comet reads with
3332+
// Read struct using custom schema to handle a missing field
3333+
#[tokio::test]
3334+
async fn test_nested_types_extract_missing_struct_names_missing_field(
3335+
) -> Result<(), DataFusionError> {
3336+
let test_data = "select named_struct('a', 1, 'b', 'abc') c0";
34373337
let required_schema = Schema::new(Fields::from(vec![Field::new(
34383338
"c0",
34393339
DataType::Struct(Fields::from(vec![
@@ -3442,31 +3342,15 @@ mod tests {
34423342
])),
34433343
true,
34443344
)]));
3445-
3446-
let file_scan_config = FileScanConfigBuilder::new(
3447-
object_store_url.clone(),
3448-
required_schema.into(),
3449-
Arc::clone(&source),
3450-
)
3451-
.with_file_groups(file_groups.clone())
3452-
.build();
3453-
3454-
// Run native read
3455-
let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));
3456-
let stream = scan.execute(0, session_ctx.task_ctx())?;
3457-
let result: Vec<_> = stream.collect().await;
3458-
3459-
let actual = result.first().unwrap().as_ref().unwrap();
3460-
3345+
let actual = make_parquet_data(test_data, required_schema).await?;
34613346
let expected = [
34623347
"+-------------+",
34633348
"| c0 |",
34643349
"+-------------+",
34653350
"| {a: 1, x: } |",
34663351
"+-------------+",
34673352
];
3468-
assert_batches_eq!(expected, &[actual.clone()]);
3469-
3353+
assert_batches_eq!(expected, &[actual]);
34703354
Ok(())
34713355
}
34723356
}

0 commit comments

Comments
 (0)