diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 00e9c71df348..82cc36867939 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -156,6 +156,7 @@ pub fn split_files( chunks } +#[derive(Debug)] pub struct Partition { /// The path to the partition, including the table prefix path: Path, @@ -245,7 +246,16 @@ async fn prune_partitions( partition_cols: &[(String, DataType)], ) -> Result> { if filters.is_empty() { - return Ok(partitions); + // prune partitions which don't contain the partition columns + return Ok(partitions + .into_iter() + .filter(|p| { + let cols = partition_cols.iter().map(|x| x.0.as_str()); + !parse_partitions_for_path(table_path, &p.path, cols) + .unwrap_or_default() + .is_empty() + }) + .collect()); } let mut builders: Vec<_> = (0..partition_cols.len()) @@ -432,6 +442,7 @@ pub async fn pruned_partition_list<'a>( } let partition_prefix = evaluate_partition_prefix(partition_cols, filters); + let partitions = list_partitions(store, table_path, partition_cols.len(), partition_prefix) .await?; @@ -502,12 +513,12 @@ where let subpath = table_path.strip_prefix(file_path)?; let mut part_values = vec![]; - for (part, pn) in subpath.zip(table_partition_cols) { + for (part, expected_partition) in subpath.zip(table_partition_cols) { match part.split_once('=') { - Some((name, val)) if name == pn => part_values.push(val), + Some((name, val)) if name == expected_partition => part_values.push(val), _ => { debug!( - "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{pn}'", + "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'", ); return None; } @@ -594,6 +605,8 @@ mod tests { ("tablepath/mypartition=val1/notparquetfile", 100), ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), ("tablepath/file.parquet", 100), + ("tablepath/notapartition/file.parquet", 100), + ("tablepath/notmypartition=val1/file.parquet", 100), ]); let filter = Expr::eq(col("mypartition"), lit("val1")); let pruned = pruned_partition_list( @@ -619,6 +632,8 @@ mod tests { ("tablepath/mypartition=val2/file.parquet", 100), ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), ("tablepath/mypartition=val1/other=val3/file.parquet", 100), + ("tablepath/notapartition/file.parquet", 100), + ("tablepath/notmypartition=val1/file.parquet", 100), ]); let filter = Expr::eq(col("mypartition"), lit("val1")); let pruned = pruned_partition_list( diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3ce58938d77e..4ffb6d41864f 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -2732,6 +2732,52 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> { + let files = [ + "bucket/test/pid=1/file1", + "bucket/test/pid=1/file2", + "bucket/test/pid=2/file3", + "bucket/test/pid=2/file4", + "bucket/test/other/file5", + ]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_table_partition_cols(vec![("pid".to_string(), DataType::Int32)]); + + let table_path = ListingTableUrl::parse("test:///bucket/test/").unwrap(); + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(Arc::new(schema)); + + let table = ListingTable::try_new(config)?; + + let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + assert_eq!(file_list.len(), 1); + + let files = file_list[0].clone(); + + assert_eq!( + files + .iter() + .map(|f| f.path().to_string()) + .collect::>(), + vec![ + "bucket/test/pid=1/file1", + "bucket/test/pid=1/file2", + "bucket/test/pid=2/file3", + "bucket/test/pid=2/file4", + ] + ); + + Ok(()) + } + #[cfg(feature = "parquet")] #[tokio::test] async fn test_table_stats_behaviors() -> Result<()> { @@ -2750,6 +2796,7 @@ mod tests { let config_default = ListingTableConfig::new(table_path.clone()) .with_listing_options(opt_default) .with_schema(schema_default); + let table_default = ListingTable::try_new(config_default)?; let exec_default = table_default.scan(&state, None, &[], None).await?;