Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ pub fn split_files(
chunks
}

#[derive(Debug)]
pub struct Partition {
/// The path to the partition, including the table prefix
path: Path,
Expand Down Expand Up @@ -245,7 +246,16 @@ async fn prune_partitions(
partition_cols: &[(String, DataType)],
) -> Result<Vec<Partition>> {
if filters.is_empty() {
return Ok(partitions);
// prune partitions which don't contain the partition columns
return Ok(partitions
.into_iter()
.filter(|p| {
let cols = partition_cols.iter().map(|x| x.0.as_str());
!parse_partitions_for_path(table_path, &p.path, cols)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to worry about filters such as <partition column> IS NULL ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question, and has made me open a little can of worms.

For the filtering case, it looks like we do not infer the filter for <partition column> IS NULL, and the column is defined as not nullable when the listing table is built. We should probably fix this to support null columns, but we'll need to introduce some configuration for users to specify the null fallback value outside of the default __HIVE_DEFAULT_PARTITION__.

For the non-filtering case, this PR will indeed match null partition column values in the current implementation - but because we don't have any special treatment of them, it would return as the literal text "__HIVE_DEFAULT_PARTITION__" for example. If your partition column is then set as an Int32 for example, the query will fail.

I think implementing proper support for the nulls will need more work outside of this PR. Because we already define the column as non-nullable, what do you think about manually excluding __HIVE_DEFAULT_PARTITION__ values from the parse_partitions_for_path to prevent query errors like the one I describe above, until proper support for nulls is added? I can raise an issue and start working on it as well.

This won't help people with custom null fallback values, but would help for all default cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is an existing issue before this PR as well, a non-filter scan (e.g. SELECT <partition column> FROM blah) will also match all files including the null fallback partition:

DataSourceExec: file_groups={1 group: [[peasee-hive-test/pid=1/data.parquet, peasee-hive-test/pid=2/data.parquet, peasee-hive-test/pid=__HIVE_DEFAULT_PARTITION__/data.parquet]]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is an existing issue before this PR as well, a non-filter scan (e.g. SELECT FROM blah) will also match all files including the null fallback partition:

It seems like reasonable behavior to me that SELECT <partition column> FROM blah would also return data without a value for the partition column (as a value of NULL).

But that being said I am not sure even what defines the "expected" behavior in this case (e.g. what does Hive do in this case 🤔 )

Copy link
Contributor Author

@peasee peasee Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I should've been clearer that the SELECT <partition column> FROM blah works but it returns the value as __HIVE_DEFAULT_PARTITION__ for example - we don't convert them to NULL yet, which I think we probably ought to.

Hive itself seems to not convert the value at all, suggested by this open issue. I struggled to find direct references to HIVE_DEFAULT_PARTITION in the Hive docs, I only came across this ingestion guide which explains how the Hive writer rewrites NULL into the HIVE_DEFAULT_PARTITION string.

I guess it is up to the implementer to decide what to do with that, because a text partition on a partition column you expect to be an integer could be annoying 😕 It looks like Impala decided to treat them as NULL values: IMPALA-252.

I found the Impala ticket via this Spark issue and associated GitHub PR apache/spark#17277 which also had an interesting discussion about this.

The side affect both of these tickets mention is that Hive does not differentiate between an empty string or a null value, so there is no way to tell which one a HIVE_DEFAULT_PARTITION is.

.unwrap_or_default()
.is_empty()
})
.collect());
}

let mut builders: Vec<_> = (0..partition_cols.len())
Expand Down Expand Up @@ -432,6 +442,7 @@ pub async fn pruned_partition_list<'a>(
}

let partition_prefix = evaluate_partition_prefix(partition_cols, filters);

let partitions =
list_partitions(store, table_path, partition_cols.len(), partition_prefix)
.await?;
Expand Down Expand Up @@ -502,12 +513,12 @@ where
let subpath = table_path.strip_prefix(file_path)?;

let mut part_values = vec![];
for (part, pn) in subpath.zip(table_partition_cols) {
for (part, expected_partition) in subpath.zip(table_partition_cols) {
match part.split_once('=') {
Some((name, val)) if name == pn => part_values.push(val),
Some((name, val)) if name == expected_partition => part_values.push(val),
_ => {
debug!(
"Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{pn}'",
"Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'",
);
return None;
}
Expand Down Expand Up @@ -594,6 +605,8 @@ mod tests {
("tablepath/mypartition=val1/notparquetfile", 100),
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/file.parquet", 100),
("tablepath/notapartition/file.parquet", 100),
("tablepath/notmypartition=val1/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
Expand All @@ -619,6 +632,8 @@ mod tests {
("tablepath/mypartition=val2/file.parquet", 100),
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/mypartition=val1/other=val3/file.parquet", 100),
("tablepath/notapartition/file.parquet", 100),
("tablepath/notmypartition=val1/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
Expand Down
47 changes: 47 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2732,6 +2732,52 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> {
let files = [
"bucket/test/pid=1/file1",
"bucket/test/pid=1/file2",
"bucket/test/pid=2/file3",
"bucket/test/pid=2/file4",
"bucket/test/other/file5",
];

let ctx = SessionContext::new();
register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());

let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
.with_file_extension_opt(Some(""))
.with_table_partition_cols(vec![("pid".to_string(), DataType::Int32)]);

let table_path = ListingTableUrl::parse("test:///bucket/test/").unwrap();
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(Arc::new(schema));

let table = ListingTable::try_new(config)?;

let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
assert_eq!(file_list.len(), 1);

let files = file_list[0].clone();

assert_eq!(
files
.iter()
.map(|f| f.path().to_string())
.collect::<Vec<_>>(),
vec![
"bucket/test/pid=1/file1",
"bucket/test/pid=1/file2",
"bucket/test/pid=2/file3",
"bucket/test/pid=2/file4",
]
);

Ok(())
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn test_table_stats_behaviors() -> Result<()> {
Expand All @@ -2750,6 +2796,7 @@ mod tests {
let config_default = ListingTableConfig::new(table_path.clone())
.with_listing_options(opt_default)
.with_schema(schema_default);

let table_default = ListingTable::try_new(config_default)?;

let exec_default = table_default.scan(&state, None, &[], None).await?;
Expand Down