diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 3afbd96..5b42692 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -187,9 +187,18 @@ impl TableFunctionImpl for StaleFilesUdtf { )? .aggregate( vec![ - // Trim the final path element - regexp_replace(col("file_path"), lit(r"/[^/]*$"), lit("/"), None) - .alias("existing_target"), + // We want to omit the file name along with any "special" partitions + // from the path before comparing it to the target partition. Special + // partitions must be leaf most nodes and are designated by a leading + // underscore. These are useful for adding additional information to a + // filename without affecting partitioning or staleness checks. + regexp_replace( + col("file_path"), + lit(r"(/_[^/=]+=[^/]+)*/[^/]*$"), + lit("/"), + None, + ) + .alias("existing_target"), ], vec![max(col("last_modified")).alias("target_last_modified")], )? @@ -1169,8 +1178,7 @@ mod test { } let cases = &[ - TestCase { - name: "un-transformed partition column", + TestCase { name: "un-transformed partition column", query_to_analyze: "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", table_name: "m1", @@ -1202,6 +1210,38 @@ mod test { "+--------------------------------+----------------------+-----------------------+----------+", ], }, + TestCase { name: "omit 'special' partition columns", + query_to_analyze: + "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", + table_name: "m1", + table_path: ListingTableUrl::parse("s3://m1/").unwrap(), + partition_cols: vec!["partition_column"], + file_extension: ".parquet", + expected_output: vec![ + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| s3://m1/partition_column=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m1/partition_column=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m1/partition_column=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + ], + // second file is old + file_metadata: " + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/_v=123/data.01.parquet', '2023-07-10T16:00:00Z', 0), + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + expected_stale_files_output: vec![ + "+--------------------------------+----------------------+-----------------------+----------+", + "| target | target_last_modified | sources_last_modified | is_stale |", + "+--------------------------------+----------------------+-----------------------+----------+", + "| s3://m1/partition_column=2021/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |", + "| s3://m1/partition_column=2022/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:22 | true |", + "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", + "+--------------------------------+----------------------+-----------------------+----------+", + ], + }, TestCase { name: "transform year/month/day partition into timestamp partition", query_to_analyze: "