From d6b08d58cc5d15a2fbef2f91eeb2db944963888e Mon Sep 17 00:00:00 2001 From: Jared Combs <9115033+jared-m-combs@users.noreply.github.com> Date: Mon, 16 Jun 2025 23:17:19 -0400 Subject: [PATCH] Allow for 'special' partitions that are omitted in the staleness check. It is sometimes useful to have additional information in the file path that we can access programatically but are not 'normal' partitions. For instance, it can be helpful to encode versioning or hash metadata into the path. However, this can prevent the stale dependencies UTF from correctly matching the file metadata against its dependencies. To work around this, I've added the notion of a 'special' partition. These partitions are prefixed with an underscore and must be the leaf most nodes in the path. This makes it very easy to omit these form the path in the same way that we already omit file names. --- src/materialized/dependencies.rs | 50 ++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 5 deletions(-) diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 3afbd96..5b42692 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -187,9 +187,18 @@ impl TableFunctionImpl for StaleFilesUdtf { )? .aggregate( vec![ - // Trim the final path element - regexp_replace(col("file_path"), lit(r"/[^/]*$"), lit("/"), None) - .alias("existing_target"), + // We want to omit the file name along with any "special" partitions + // from the path before comparing it to the target partition. Special + // partitions must be leaf most nodes and are designated by a leading + // underscore. These are useful for adding additional information to a + // filename without affecting partitioning or staleness checks. + regexp_replace( + col("file_path"), + lit(r"(/_[^/=]+=[^/]+)*/[^/]*$"), + lit("/"), + None, + ) + .alias("existing_target"), ], vec![max(col("last_modified")).alias("target_last_modified")], )? @@ -1169,8 +1178,7 @@ mod test { } let cases = &[ - TestCase { - name: "un-transformed partition column", + TestCase { name: "un-transformed partition column", query_to_analyze: "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", table_name: "m1", @@ -1202,6 +1210,38 @@ mod test { "+--------------------------------+----------------------+-----------------------+----------+", ], }, + TestCase { name: "omit 'special' partition columns", + query_to_analyze: + "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", + table_name: "m1", + table_path: ListingTableUrl::parse("s3://m1/").unwrap(), + partition_cols: vec!["partition_column"], + file_extension: ".parquet", + expected_output: vec![ + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| s3://m1/partition_column=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m1/partition_column=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m1/partition_column=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + ], + // second file is old + file_metadata: " + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/_v=123/data.01.parquet', '2023-07-10T16:00:00Z', 0), + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + expected_stale_files_output: vec![ + "+--------------------------------+----------------------+-----------------------+----------+", + "| target | target_last_modified | sources_last_modified | is_stale |", + "+--------------------------------+----------------------+-----------------------+----------+", + "| s3://m1/partition_column=2021/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |", + "| s3://m1/partition_column=2022/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:22 | true |", + "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", + "+--------------------------------+----------------------+-----------------------+----------+", + ], + }, TestCase { name: "transform year/month/day partition into timestamp partition", query_to_analyze: "