Skip to content

Commit 7114393

Browse files
Merge pull request #6 from polygon-io/rebase-upstream
Rebase upstream
2 parents fa0c4de + 54201fa commit 7114393

File tree

10 files changed

+295
-103
lines changed

10 files changed

+295
-103
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,6 @@ Cargo.lock
2323
# Added by cargo
2424

2525
/target
26+
27+
.idea
28+
.DS_Store

deny.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ allow = [
2323
"BSD-2-Clause",
2424
"BSD-3-Clause",
2525
"CC0-1.0",
26-
"Unicode-3.0"
26+
"Unicode-3.0",
27+
"Zlib"
2728
]
2829
version = 2

src/lib.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,18 @@ pub mod materialized;
4343

4444
/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views.
4545
pub mod rewrite;
46+
47+
/// Configuration options for materialized view related features.
48+
#[derive(Debug, Clone)]
49+
pub struct MaterializedConfig {
50+
/// Whether or not query rewriting should exploit this materialized view.
51+
pub use_in_query_rewrite: bool,
52+
}
53+
54+
impl Default for MaterializedConfig {
55+
fn default() -> Self {
56+
Self {
57+
use_in_query_rewrite: true,
58+
}
59+
}
60+
}

src/materialized.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ use datafusion::{
4444
use datafusion_expr::LogicalPlan;
4545
use itertools::Itertools;
4646

47+
use crate::MaterializedConfig;
48+
4749
/// The identifier of the column that [`RowMetadataSource`](row_metadata::RowMetadataSource) implementations should store row metadata in.
4850
pub const META_COLUMN: &str = "__meta";
4951

@@ -102,6 +104,12 @@ pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTa
102104
pub trait Materialized: ListingTableLike {
103105
/// The query that defines this materialized view.
104106
fn query(&self) -> LogicalPlan;
107+
108+
/// Configuration to control materialized view related features.
109+
/// By default, returns the default value for [`MaterializedConfig`]
110+
fn config(&self) -> MaterializedConfig {
111+
MaterializedConfig::default()
112+
}
105113
}
106114

107115
/// Register a [`Materialized`] implementation in this registry.

src/materialized/dependencies.rs

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use datafusion_expr::{
3131
col, lit, utils::split_conjunction, Expr, LogicalPlan, LogicalPlanBuilder, TableScan,
3232
};
3333
use datafusion_functions::string::expr_fn::{concat, concat_ws};
34-
use datafusion_optimizer::{analyzer::expand_wildcard_rule::ExpandWildcardRule, AnalyzerRule};
3534
use datafusion_sql::TableReference;
3635
use itertools::{Either, Itertools};
3736
use std::{collections::HashSet, sync::Arc};
@@ -188,9 +187,18 @@ impl TableFunctionImpl for StaleFilesUdtf {
188187
)?
189188
.aggregate(
190189
vec![
191-
// Trim the final path element
192-
regexp_replace(col("file_path"), lit(r"/[^/]*$"), lit("/"), None)
193-
.alias("existing_target"),
190+
// We want to omit the file name along with any "special" partitions
191+
// from the path before comparing it to the target partition. Special
192+
// partitions must be leaf most nodes and are designated by a leading
193+
// underscore. These are useful for adding additional information to a
194+
// filename without affecting partitioning or staleness checks.
195+
regexp_replace(
196+
col("file_path"),
197+
lit(r"(/_[^/=]+=[^/]+)*/[^/]*$"),
198+
lit("/"),
199+
None,
200+
)
201+
.alias("existing_target"),
194202
],
195203
vec![max(col("last_modified")).alias("target_last_modified")],
196204
)?
@@ -250,9 +258,6 @@ pub fn mv_dependencies_plan(
250258
.filter_map(|(i, f)| partition_cols.contains(f.name()).then_some(i))
251259
.collect();
252260

253-
// First expand all wildcards
254-
let plan = ExpandWildcardRule {}.analyze(plan, config_options)?;
255-
256261
let pruned_plan_with_source_files = if partition_cols.is_empty() {
257262
get_source_files_all_partitions(
258263
materialized_view,
@@ -1173,8 +1178,7 @@ mod test {
11731178
}
11741179

11751180
let cases = &[
1176-
TestCase {
1177-
name: "un-transformed partition column",
1181+
TestCase { name: "un-transformed partition column",
11781182
query_to_analyze:
11791183
"SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1",
11801184
table_name: "m1",
@@ -1206,6 +1210,38 @@ mod test {
12061210
"+--------------------------------+----------------------+-----------------------+----------+",
12071211
],
12081212
},
1213+
TestCase { name: "omit 'special' partition columns",
1214+
query_to_analyze:
1215+
"SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1",
1216+
table_name: "m1",
1217+
table_path: ListingTableUrl::parse("s3://m1/").unwrap(),
1218+
partition_cols: vec!["partition_column"],
1219+
file_extension: ".parquet",
1220+
expected_output: vec![
1221+
"+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1222+
"| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
1223+
"+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1224+
"| s3://m1/partition_column=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |",
1225+
"| s3://m1/partition_column=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |",
1226+
"| s3://m1/partition_column=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |",
1227+
"+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1228+
],
1229+
// second file is old
1230+
file_metadata: "
1231+
('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1232+
('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/_v=123/data.01.parquet', '2023-07-10T16:00:00Z', 0),
1233+
('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1234+
",
1235+
expected_stale_files_output: vec![
1236+
"+--------------------------------+----------------------+-----------------------+----------+",
1237+
"| target | target_last_modified | sources_last_modified | is_stale |",
1238+
"+--------------------------------+----------------------+-----------------------+----------+",
1239+
"| s3://m1/partition_column=2021/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |",
1240+
"| s3://m1/partition_column=2022/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:22 | true |",
1241+
"| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |",
1242+
"+--------------------------------+----------------------+-----------------------+----------+",
1243+
],
1244+
},
12091245
TestCase {
12101246
name: "transform year/month/day partition into timestamp partition",
12111247
query_to_analyze: "

0 commit comments

Comments
 (0)