@@ -187,9 +187,18 @@ impl TableFunctionImpl for StaleFilesUdtf {
187187 ) ?
188188 . aggregate (
189189 vec ! [
190- // Trim the final path element
191- regexp_replace( col( "file_path" ) , lit( r"/[^/]*$" ) , lit( "/" ) , None )
192- . alias( "existing_target" ) ,
190+ // We want to omit the file name along with any "special" partitions
191+ // from the path before comparing it to the target partition. Special
192+ // partitions must be leaf most nodes and are designated by a leading
193+ // underscore. These are useful for adding additional information to a
194+ // filename without affecting partitioning or staleness checks.
195+ regexp_replace(
196+ col( "file_path" ) ,
197+ lit( r"(/_[^/=]+=[^/]+)*/[^/]*$" ) ,
198+ lit( "/" ) ,
199+ None ,
200+ )
201+ . alias( "existing_target" ) ,
193202 ] ,
194203 vec ! [ max( col( "last_modified" ) ) . alias( "target_last_modified" ) ] ,
195204 ) ?
@@ -1169,8 +1178,7 @@ mod test {
11691178 }
11701179
11711180 let cases = & [
1172- TestCase {
1173- name : "un-transformed partition column" ,
1181+ TestCase { name : "un-transformed partition column" ,
11741182 query_to_analyze :
11751183 "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1" ,
11761184 table_name : "m1" ,
@@ -1202,6 +1210,38 @@ mod test {
12021210 "+--------------------------------+----------------------+-----------------------+----------+" ,
12031211 ] ,
12041212 } ,
1213+ TestCase { name : "omit 'special' partition columns" ,
1214+ query_to_analyze :
1215+ "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1" ,
1216+ table_name : "m1" ,
1217+ table_path : ListingTableUrl :: parse ( "s3://m1/" ) . unwrap ( ) ,
1218+ partition_cols : vec ! [ "partition_column" ] ,
1219+ file_extension : ".parquet" ,
1220+ expected_output : vec ! [
1221+ "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+" ,
1222+ "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |" ,
1223+ "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+" ,
1224+ "| s3://m1/partition_column=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |" ,
1225+ "| s3://m1/partition_column=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |" ,
1226+ "| s3://m1/partition_column=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |" ,
1227+ "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+" ,
1228+ ] ,
1229+ // second file is old
1230+ file_metadata : "
1231+ ('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1232+ ('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/_v=123/data.01.parquet', '2023-07-10T16:00:00Z', 0),
1233+ ('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1234+ " ,
1235+ expected_stale_files_output : vec ! [
1236+ "+--------------------------------+----------------------+-----------------------+----------+" ,
1237+ "| target | target_last_modified | sources_last_modified | is_stale |" ,
1238+ "+--------------------------------+----------------------+-----------------------+----------+" ,
1239+ "| s3://m1/partition_column=2021/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |" ,
1240+ "| s3://m1/partition_column=2022/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:22 | true |" ,
1241+ "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |" ,
1242+ "+--------------------------------+----------------------+-----------------------+----------+" ,
1243+ ] ,
1244+ } ,
12051245 TestCase {
12061246 name : "transform year/month/day partition into timestamp partition" ,
12071247 query_to_analyze : "
0 commit comments