Skip to content

Commit 8b16014

Browse files
trueleonitisht
andauthored
Transform with table reference (#570)
While transforming logical plan to add external time field, the column name that was provided is unqualified ( without table reference ) this caused mismatch in column name in datafusion query runtime. ``` Error during planning: required columns can't push down, columns: {Column { relation: Some(Bare { table: "integrity" }), name: "p_timestamp" }, Column { relation: None, name: " p_timestamp" }} ``` This PR adds the filter expr `start_time_filter` and `end_time_filter` for logical plan is created with table reference so that push down predicate can work properly. Fixes #572 Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 7c83641 commit 8b16014

File tree

1 file changed

+73
-58
lines changed

1 file changed

+73
-58
lines changed

server/src/query.rs

Lines changed: 73 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ mod listing_table_builder;
2121
mod stream_schema_provider;
2222
mod table_provider;
2323

24-
use chrono::TimeZone;
2524
use chrono::{DateTime, Utc};
25+
use chrono::{NaiveDateTime, TimeZone};
2626
use datafusion::arrow::record_batch::RecordBatch;
2727

2828
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion};
@@ -122,61 +122,7 @@ impl Query {
122122

123123
/// return logical plan with all time filters applied through
124124
fn final_logical_plan(&self) -> LogicalPlan {
125-
fn tag_filter(filters: Vec<String>) -> Option<Expr> {
126-
filters
127-
.iter()
128-
.map(|literal| {
129-
Expr::Column(Column::from_name(event::DEFAULT_TAGS_KEY))
130-
.like(lit(format!("%{}%", literal)))
131-
})
132-
.reduce(or)
133-
}
134-
135-
fn transform(
136-
plan: LogicalPlan,
137-
start_time_filter: Expr,
138-
end_time_filter: Expr,
139-
filters: Option<Expr>,
140-
) -> LogicalPlan {
141-
plan.transform(&|plan| match plan {
142-
LogicalPlan::TableScan(table) => {
143-
let mut new_filters = vec![];
144-
if !table.filters.iter().any(|expr| {
145-
let Expr::BinaryExpr(binexpr) = expr else {return false};
146-
matches!(&*binexpr.left, Expr::Column(Column { name, .. }) if name == event::DEFAULT_TIMESTAMP_KEY)
147-
}) {
148-
new_filters.push(start_time_filter.clone());
149-
new_filters.push(end_time_filter.clone());
150-
}
151-
152-
if let Some(tag_filters) = filters.clone() {
153-
new_filters.push(tag_filters)
154-
}
155-
156-
let new_filter = new_filters.into_iter().reduce(and);
157-
158-
if let Some(new_filter) = new_filter {
159-
let filter = Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap();
160-
Ok(Transformed::Yes(LogicalPlan::Filter(filter)))
161-
} else {
162-
Ok(Transformed::No(LogicalPlan::TableScan(table)))
163-
}
164-
},
165-
x => Ok(Transformed::No(x)),
166-
})
167-
.expect("transform only transforms the tablescan")
168-
}
169-
170125
let filters = self.filter_tag.clone().and_then(tag_filter);
171-
let start_time_filter =
172-
PartialTimeFilter::Low(std::ops::Bound::Included(self.start.naive_utc())).binary_expr(
173-
Expr::Column(Column::from_name(event::DEFAULT_TIMESTAMP_KEY)),
174-
);
175-
let end_time_filter =
176-
PartialTimeFilter::High(std::ops::Bound::Excluded(self.end.naive_utc())).binary_expr(
177-
Expr::Column(Column::from_name(event::DEFAULT_TIMESTAMP_KEY)),
178-
);
179-
180126
// see https://github.com/apache/arrow-datafusion/pull/8400
181127
// this can be eliminated in later version of datafusion but with slight caveat
182128
// transform cannot modify stringified plans by itself
@@ -185,8 +131,8 @@ impl Query {
185131
LogicalPlan::Explain(plan) => {
186132
let transformed = transform(
187133
plan.plan.as_ref().clone(),
188-
start_time_filter,
189-
end_time_filter,
134+
self.start.naive_utc(),
135+
self.end.naive_utc(),
190136
filters,
191137
);
192138
LogicalPlan::Explain(Explain {
@@ -199,7 +145,7 @@ impl Query {
199145
logical_optimization_succeeded: plan.logical_optimization_succeeded,
200146
})
201147
}
202-
x => transform(x, start_time_filter, end_time_filter, filters),
148+
x => transform(x, self.start.naive_utc(), self.end.naive_utc(), filters),
203149
}
204150
}
205151

@@ -235,6 +181,75 @@ impl TreeNodeVisitor for TableScanVisitor {
235181
}
236182
}
237183

184+
fn tag_filter(filters: Vec<String>) -> Option<Expr> {
185+
filters
186+
.iter()
187+
.map(|literal| {
188+
Expr::Column(Column::from_name(event::DEFAULT_TAGS_KEY))
189+
.like(lit(format!("%{}%", literal)))
190+
})
191+
.reduce(or)
192+
}
193+
194+
fn transform(
195+
plan: LogicalPlan,
196+
start_time: NaiveDateTime,
197+
end_time: NaiveDateTime,
198+
filters: Option<Expr>,
199+
) -> LogicalPlan {
200+
plan.transform(&|plan| match plan {
201+
LogicalPlan::TableScan(table) => {
202+
let mut new_filters = vec![];
203+
if !table_contains_any_time_filters(&table) {
204+
let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(
205+
start_time,
206+
))
207+
.binary_expr(Expr::Column(Column::new(
208+
Some(table.table_name.to_owned_reference()),
209+
event::DEFAULT_TIMESTAMP_KEY,
210+
)));
211+
let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
212+
.binary_expr(Expr::Column(Column::new(
213+
Some(table.table_name.to_owned_reference()),
214+
event::DEFAULT_TIMESTAMP_KEY,
215+
)));
216+
new_filters.push(start_time_filter);
217+
new_filters.push(end_time_filter);
218+
}
219+
220+
if let Some(tag_filters) = filters.clone() {
221+
new_filters.push(tag_filters)
222+
}
223+
224+
let new_filter = new_filters.into_iter().reduce(and);
225+
226+
if let Some(new_filter) = new_filter {
227+
let filter =
228+
Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap();
229+
Ok(Transformed::Yes(LogicalPlan::Filter(filter)))
230+
} else {
231+
Ok(Transformed::No(LogicalPlan::TableScan(table)))
232+
}
233+
}
234+
x => Ok(Transformed::No(x)),
235+
})
236+
.expect("transform only transforms the tablescan")
237+
}
238+
239+
fn table_contains_any_time_filters(table: &datafusion::logical_expr::TableScan) -> bool {
240+
table
241+
.filters
242+
.iter()
243+
.filter_map(|x| {
244+
if let Expr::BinaryExpr(binexpr) = x {
245+
Some(binexpr)
246+
} else {
247+
None
248+
}
249+
})
250+
.any(|expr| matches!(&*expr.left, Expr::Column(Column { name, .. }) if (name == event::DEFAULT_TIMESTAMP_KEY)))
251+
}
252+
238253
#[allow(dead_code)]
239254
fn get_staging_prefixes(
240255
stream_name: &str,

0 commit comments

Comments
 (0)