Skip to content

Commit 82d1bff

Browse files
committed
feat: Add tests for inferred and specified schema handling for multiple files in ListingTableConfig
1 parent a0ed876 commit 82d1bff

File tree

1 file changed

+247
-5
lines changed
  • datafusion/core/src/datasource/listing

1 file changed

+247
-5
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 247 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1340,6 +1340,7 @@ mod tests {
13401340
use datafusion_physical_plan::ExecutionPlanProperties;
13411341

13421342
use crate::test::object_store::{ensure_head_concurrency, make_test_store_and_state};
1343+
use std::io::Write;
13431344
use tempfile::TempDir;
13441345
use url::Url;
13451346

@@ -2486,11 +2487,7 @@ mod tests {
24862487
.await?;
24872488

24882489
// check count
2489-
let batches = session_ctx
2490-
.sql("select * from foo")
2491-
.await?
2492-
.collect()
2493-
.await?;
2490+
let batches = session_ctx.sql("select * from t").await?.collect().await?;
24942491

24952492
insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
24962493
+-----+-----+---+
@@ -2658,4 +2655,249 @@ mod tests {
26582655

26592656
Ok(())
26602657
}
2658+
2659+
#[tokio::test]
2660+
async fn test_listing_table_config_with_multiple_files_inferred() -> Result<()> {
2661+
// Test case 1: Inferred schema with multiple files having different schemas
2662+
let ctx = SessionContext::new();
2663+
2664+
// Create two test files with different schemas
2665+
let tmp_dir = TempDir::new()?;
2666+
let file_path1 = tmp_dir.path().join("file1.csv");
2667+
let file_path2 = tmp_dir.path().join("file2.csv");
2668+
2669+
// File 1: c1,c2,c3
2670+
let mut file1 = std::fs::File::create(&file_path1)?;
2671+
writeln!(file1, "c1,c2,c3")?;
2672+
writeln!(file1, "1,2,3")?;
2673+
writeln!(file1, "4,5,6")?;
2674+
2675+
// File 2: c1,c2,c3,c4
2676+
let mut file2 = std::fs::File::create(&file_path2)?;
2677+
writeln!(file2, "c1,c2,c3,c4")?;
2678+
writeln!(file2, "7,8,9,10")?;
2679+
writeln!(file2, "11,12,13,14")?;
2680+
2681+
// Parse paths
2682+
let table_path1 = ListingTableUrl::parse(file_path1.to_str().unwrap())?;
2683+
let table_path2 = ListingTableUrl::parse(file_path2.to_str().unwrap())?;
2684+
2685+
// Create config with both paths
2686+
let config =
2687+
ListingTableConfig::new_with_multi_paths(vec![table_path1, table_path2]);
2688+
assert_eq!(*config.schema_source(), SchemaSource::None);
2689+
2690+
// Set up options
2691+
let format = csv::CsvFormat::default()
2692+
.with_schema(None)
2693+
.with_has_header(true);
2694+
let options = ListingOptions::new(Arc::new(format));
2695+
let config = config.with_listing_options(options);
2696+
2697+
// Infer schema (should use first file's schema)
2698+
let config = config.infer_schema(&ctx.state()).await?;
2699+
assert_eq!(*config.schema_source(), SchemaSource::Inferred);
2700+
2701+
// Verify that the inferred schema matches the first file's schema (3 columns)
2702+
let schema = config.file_schema.unwrap();
2703+
assert_eq!(schema.fields().len(), 3);
2704+
assert_eq!(schema.field(0).name(), "c1");
2705+
assert_eq!(schema.field(1).name(), "c2");
2706+
assert_eq!(schema.field(2).name(), "c3");
2707+
2708+
Ok(())
2709+
}
2710+
2711+
#[tokio::test]
2712+
async fn test_listing_table_config_with_multiple_files_specified_schema1(
2713+
) -> Result<()> {
2714+
// Test case 2: Specified schema matching first file schema
2715+
let ctx = SessionContext::new();
2716+
2717+
// Create two test files with different schemas
2718+
let tmp_dir = TempDir::new()?;
2719+
let file_path1 = tmp_dir.path().join("file1.csv");
2720+
let file_path2 = tmp_dir.path().join("file2.csv");
2721+
2722+
// File 1: c1,c2,c3
2723+
let mut file1 = std::fs::File::create(&file_path1)?;
2724+
writeln!(file1, "c1,c2,c3")?;
2725+
writeln!(file1, "1,2,3")?;
2726+
writeln!(file1, "4,5,6")?;
2727+
2728+
// File 2: c1,c2,c3,c4
2729+
let mut file2 = std::fs::File::create(&file_path2)?;
2730+
writeln!(file2, "c1,c2,c3,c4")?;
2731+
writeln!(file2, "7,8,9,10")?;
2732+
writeln!(file2, "11,12,13,14")?;
2733+
2734+
// Parse paths
2735+
let table_path1 = ListingTableUrl::parse(file_path1.to_str().unwrap())?;
2736+
let table_path2 = ListingTableUrl::parse(file_path2.to_str().unwrap())?;
2737+
2738+
// Create specified schema matching first file
2739+
let specified_schema = Arc::new(Schema::new(vec![
2740+
Field::new("c1", DataType::Utf8, true),
2741+
Field::new("c2", DataType::Utf8, true),
2742+
Field::new("c3", DataType::Utf8, true),
2743+
]));
2744+
2745+
// Create config with both paths and specified schema
2746+
let config =
2747+
ListingTableConfig::new_with_multi_paths(vec![table_path1, table_path2])
2748+
.with_schema(specified_schema);
2749+
2750+
assert_eq!(*config.schema_source(), SchemaSource::Specified);
2751+
2752+
// Set up options
2753+
let format = csv::CsvFormat::default()
2754+
.with_schema(None)
2755+
.with_has_header(true);
2756+
let options = ListingOptions::new(Arc::new(format));
2757+
let config = config.with_listing_options(options);
2758+
2759+
// Infer should not change the schema because it's already specified
2760+
let config = config.infer_schema(&ctx.state()).await?;
2761+
assert_eq!(*config.schema_source(), SchemaSource::Specified);
2762+
2763+
// Verify that the schema is still the one we specified (3 columns)
2764+
let schema = config.file_schema.unwrap();
2765+
assert_eq!(schema.fields().len(), 3);
2766+
assert_eq!(schema.field(0).name(), "c1");
2767+
assert_eq!(schema.field(1).name(), "c2");
2768+
assert_eq!(schema.field(2).name(), "c3");
2769+
2770+
// Create the ListingTable and verify it maintains the schema source
2771+
let table = ListingTable::try_new(config)?;
2772+
assert_eq!(*table.schema_source(), SchemaSource::Specified);
2773+
2774+
Ok(())
2775+
}
2776+
2777+
#[tokio::test]
2778+
async fn test_listing_table_config_with_multiple_files_specified_schema2(
2779+
) -> Result<()> {
2780+
// Test case 3: Specified schema matching second file schema
2781+
let ctx = SessionContext::new();
2782+
2783+
// Create two test files with different schemas
2784+
let tmp_dir = TempDir::new()?;
2785+
let file_path1 = tmp_dir.path().join("file1.csv");
2786+
let file_path2 = tmp_dir.path().join("file2.csv");
2787+
2788+
// File 1: c1,c2,c3
2789+
let mut file1 = std::fs::File::create(&file_path1)?;
2790+
writeln!(file1, "c1,c2,c3")?;
2791+
writeln!(file1, "1,2,3")?;
2792+
writeln!(file1, "4,5,6")?;
2793+
2794+
// File 2: c1,c2,c3,c4
2795+
let mut file2 = std::fs::File::create(&file_path2)?;
2796+
writeln!(file2, "c1,c2,c3,c4")?;
2797+
writeln!(file2, "7,8,9,10")?;
2798+
writeln!(file2, "11,12,13,14")?;
2799+
2800+
// Parse paths
2801+
let table_path1 = ListingTableUrl::parse(file_path1.to_str().unwrap())?;
2802+
let table_path2 = ListingTableUrl::parse(file_path2.to_str().unwrap())?;
2803+
2804+
// Create specified schema matching second file (with 4 columns)
2805+
let specified_schema = Arc::new(Schema::new(vec![
2806+
Field::new("c1", DataType::Utf8, true),
2807+
Field::new("c2", DataType::Utf8, true),
2808+
Field::new("c3", DataType::Utf8, true),
2809+
Field::new("c4", DataType::Utf8, true),
2810+
]));
2811+
2812+
// Create config with both paths and specified schema
2813+
let config =
2814+
ListingTableConfig::new_with_multi_paths(vec![table_path1, table_path2])
2815+
.with_schema(specified_schema.clone());
2816+
2817+
assert_eq!(*config.schema_source(), SchemaSource::Specified);
2818+
2819+
// Set up options
2820+
let format = csv::CsvFormat::default()
2821+
.with_schema(None)
2822+
.with_has_header(true);
2823+
let options = ListingOptions::new(Arc::new(format));
2824+
let config = config.with_listing_options(options);
2825+
2826+
// Infer should not change the schema because it's already specified
2827+
let config = config.infer_schema(&ctx.state()).await?;
2828+
assert_eq!(*config.schema_source(), SchemaSource::Specified);
2829+
2830+
// Verify that the schema is still the one we specified (4 columns)
2831+
let schema = config.file_schema.unwrap();
2832+
assert_eq!(schema.fields().len(), 4);
2833+
assert_eq!(schema.field(0).name(), "c1");
2834+
assert_eq!(schema.field(1).name(), "c2");
2835+
assert_eq!(schema.field(2).name(), "c3");
2836+
assert_eq!(schema.field(3).name(), "c4");
2837+
2838+
// Create the ListingTable and verify it maintains the schema source
2839+
let table = ListingTable::try_new(config)?;
2840+
assert_eq!(*table.schema_source(), SchemaSource::Specified);
2841+
2842+
Ok(())
2843+
}
2844+
2845+
#[tokio::test]
2846+
async fn test_listing_table_config_with_multiple_files_inferred_reversed(
2847+
) -> Result<()> {
2848+
// Test case: Inferred schema with multiple files having different schemas,
2849+
// but with the order reversed (schema2 first, then schema1)
2850+
let ctx = SessionContext::new();
2851+
2852+
// Create two test files with different schemas
2853+
let tmp_dir = TempDir::new()?;
2854+
let file_path1 = tmp_dir.path().join("file1.csv");
2855+
let file_path2 = tmp_dir.path().join("file2.csv");
2856+
2857+
// File 1: c1,c2,c3,c4 (now schema2 with 4 columns)
2858+
let mut file1 = std::fs::File::create(&file_path1)?;
2859+
writeln!(file1, "c1,c2,c3,c4")?;
2860+
writeln!(file1, "7,8,9,10")?;
2861+
writeln!(file1, "11,12,13,14")?;
2862+
2863+
// File 2: c1,c2,c3 (now schema1 with 3 columns)
2864+
let mut file2 = std::fs::File::create(&file_path2)?;
2865+
writeln!(file2, "c1,c2,c3")?;
2866+
writeln!(file2, "1,2,3")?;
2867+
writeln!(file2, "4,5,6")?;
2868+
2869+
// Parse paths
2870+
let table_path1 = ListingTableUrl::parse(file_path1.to_str().unwrap())?;
2871+
let table_path2 = ListingTableUrl::parse(file_path2.to_str().unwrap())?;
2872+
2873+
// Create config with both paths, with schema2 (4-column) file first
2874+
let config =
2875+
ListingTableConfig::new_with_multi_paths(vec![table_path1, table_path2]);
2876+
assert_eq!(*config.schema_source(), SchemaSource::None);
2877+
2878+
// Set up options
2879+
let format = csv::CsvFormat::default()
2880+
.with_schema(None)
2881+
.with_has_header(true);
2882+
let options = ListingOptions::new(Arc::new(format));
2883+
let config = config.with_listing_options(options);
2884+
2885+
// Infer schema (should use first file's schema which is now schema2 with 4 columns)
2886+
let config = config.infer_schema(&ctx.state()).await?;
2887+
assert_eq!(*config.schema_source(), SchemaSource::Inferred);
2888+
2889+
// Verify that the inferred schema matches the first file's schema, which now has 4 columns
2890+
let schema = config.file_schema.unwrap();
2891+
assert_eq!(schema.fields().len(), 4);
2892+
assert_eq!(schema.field(0).name(), "c1");
2893+
assert_eq!(schema.field(1).name(), "c2");
2894+
assert_eq!(schema.field(2).name(), "c3");
2895+
assert_eq!(schema.field(3).name(), "c4");
2896+
2897+
// Create a ListingTable and verify it maintains the schema source
2898+
let table = ListingTable::try_new(config)?;
2899+
assert_eq!(*table.schema_source(), SchemaSource::Inferred);
2900+
2901+
Ok(())
2902+
}
26612903
}

0 commit comments

Comments
 (0)