Skip to content

Commit c334ffb

Browse files
fix: a corner case for join match one (openobserve#7016)
fix the below query ```sql with abc as (select histogram(_timestamp, '1 minute') AS zo_sql_key, count(*) AS zo_sql_num from default GROUP BY zo_sql_key ORDER BY zo_sql_key), edf as (select histogram(_timestamp, '1 minute') AS zo_sql_key, sum(_timestamp) AS zo_sql_num from default GROUP BY zo_sql_key ORDER BY zo_sql_key) select abc.zo_sql_key from abc where zo_sql_key in (select zo_sql_key from edf) ``` --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
1 parent f523d56 commit c334ffb

File tree

4 files changed

+125
-91
lines changed

4 files changed

+125
-91
lines changed

src/service/search/datafusion/optimizer/add_sort_and_limit.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ impl OptimizerRule for AddSortAndLimitRule {
5454
plan: LogicalPlan,
5555
_config: &dyn OptimizerConfig,
5656
) -> Result<Transformed<LogicalPlan>> {
57+
if self.limit == 0 {
58+
return Ok(Transformed::new(plan, false, TreeNodeRecursion::Stop));
59+
}
5760
let mut plan = plan.rewrite(&mut AddSortAndLimit::new(self.limit, self.offset))?;
5861
plan.tnr = TreeNodeRecursion::Stop;
5962
Ok(plan)

src/service/search/datafusion/optimizer/limit_join_right_side.rs

Lines changed: 100 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,20 @@
1515

1616
use std::sync::Arc;
1717

18+
use config::TIMESTAMP_COL_NAME;
1819
use datafusion::{
1920
common::{
20-
Result,
21-
tree_node::{Transformed, TreeNode},
21+
Column, Result,
22+
tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter},
2223
},
23-
logical_expr::LogicalPlan,
24+
logical_expr::{Extension, LogicalPlan, Sort, SortExpr},
2425
optimizer::{OptimizerConfig, OptimizerRule, optimizer::ApplyOrder},
25-
prelude::Expr,
26+
prelude::{Expr, col},
2627
};
2728
use itertools::Itertools;
2829

29-
use super::utils::AddSortAndLimit;
30+
use super::utils::{AddSortAndLimit, is_contain_deduplication_plan};
31+
use crate::service::search::datafusion::plan::deduplication::DeduplicationLogicalNode;
3032

3133
#[derive(Default, Debug)]
3234
pub struct LimitJoinRightSide {
@@ -57,6 +59,9 @@ impl OptimizerRule for LimitJoinRightSide {
5759
plan: LogicalPlan,
5860
_config: &dyn OptimizerConfig,
5961
) -> Result<Transformed<LogicalPlan>> {
62+
if self.limit == 0 {
63+
return Ok(Transformed::new(plan, false, TreeNodeRecursion::Stop));
64+
}
6065
match plan {
6166
LogicalPlan::Join(mut join) => {
6267
let right_column = join
@@ -70,31 +75,104 @@ impl OptimizerRule for LimitJoinRightSide {
7075
}
7176
})
7277
.collect_vec();
73-
if right_column.is_empty() {
74-
let plan = (*join.right)
75-
.clone()
76-
.rewrite(&mut AddSortAndLimit::new(self.limit, 0))?
77-
.data;
78-
join.right = Arc::new(plan);
79-
Ok(Transformed::yes(LogicalPlan::Join(join)))
80-
} else {
81-
let plan = (*join.right)
82-
.clone()
83-
.rewrite(&mut AddSortAndLimit::new_with_deduplication(
84-
self.limit,
85-
0,
86-
right_column,
87-
))?
78+
// limit the right side output size
79+
let mut plan = (*join.right)
80+
.clone()
81+
.rewrite(&mut AddSortAndLimit::new(self.limit, 0))?
82+
.data;
83+
if !right_column.is_empty() {
84+
// deduplication on join key
85+
plan = plan
86+
.rewrite(&mut DeduplicationRewriter::new(right_column))?
8887
.data;
89-
join.right = Arc::new(plan);
90-
Ok(Transformed::yes(LogicalPlan::Join(join)))
9188
}
89+
join.right = Arc::new(plan);
90+
Ok(Transformed::yes(LogicalPlan::Join(join)))
9291
}
9392
_ => Ok(Transformed::no(plan)),
9493
}
9594
}
9695
}
9796

97+
struct DeduplicationRewriter {
98+
pub deduplication_columns: Vec<Column>,
99+
}
100+
101+
impl DeduplicationRewriter {
102+
pub fn new(deduplication_columns: Vec<Column>) -> Self {
103+
Self {
104+
deduplication_columns,
105+
}
106+
}
107+
}
108+
109+
impl TreeNodeRewriter for DeduplicationRewriter {
110+
type Node = LogicalPlan;
111+
112+
fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
113+
if is_contain_deduplication_plan(&node) {
114+
return Ok(Transformed::new(node, false, TreeNodeRecursion::Stop));
115+
}
116+
117+
// insert deduplication to first plan that contains deduplication columns
118+
let plan = match node {
119+
LogicalPlan::Projection(_) | LogicalPlan::SubqueryAlias(_) => {
120+
let schema = node.inputs().first().unwrap().schema();
121+
for column in self.deduplication_columns.iter() {
122+
if schema.field_with_name(None, column.name()).is_err() {
123+
let plan = generate_deduplication_plan(
124+
Arc::new(node),
125+
self.deduplication_columns.clone(),
126+
);
127+
return Ok(Transformed::new(plan, true, TreeNodeRecursion::Stop));
128+
}
129+
}
130+
Transformed::no(node)
131+
}
132+
_ => {
133+
let plan =
134+
generate_deduplication_plan(Arc::new(node), self.deduplication_columns.clone());
135+
Transformed::new(plan, true, TreeNodeRecursion::Stop)
136+
}
137+
};
138+
139+
Ok(plan)
140+
}
141+
}
142+
143+
fn generate_deduplication_plan(
144+
node: Arc<LogicalPlan>,
145+
deduplication_columns: Vec<Column>,
146+
) -> LogicalPlan {
147+
let mut sort_columns = Vec::with_capacity(deduplication_columns.len() + 1);
148+
let schema = node.schema().clone();
149+
150+
for column in deduplication_columns.iter() {
151+
sort_columns.push(SortExpr {
152+
expr: col(column.name()),
153+
asc: false,
154+
nulls_first: false,
155+
});
156+
}
157+
158+
if schema.field_with_name(None, TIMESTAMP_COL_NAME).is_ok() {
159+
sort_columns.push(SortExpr {
160+
expr: col(TIMESTAMP_COL_NAME.to_string()),
161+
asc: false,
162+
nulls_first: false,
163+
});
164+
}
165+
166+
let sort = LogicalPlan::Sort(Sort {
167+
expr: sort_columns,
168+
input: node,
169+
fetch: None,
170+
});
171+
LogicalPlan::Extension(Extension {
172+
node: Arc::new(DeduplicationLogicalNode::new(sort, deduplication_columns)),
173+
})
174+
}
175+
98176
#[cfg(test)]
99177
mod tests {
100178
use std::sync::Arc;

src/service/search/datafusion/optimizer/utils.rs

Lines changed: 5 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,18 @@ use std::sync::Arc;
1818
use config::TIMESTAMP_COL_NAME;
1919
use datafusion::{
2020
common::{
21-
Column, DFSchema, Result,
21+
DFSchema, Result,
2222
tree_node::{
2323
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
2424
},
2525
},
2626
datasource::DefaultTableSource,
27-
logical_expr::{
28-
Extension, Limit, LogicalPlan, Projection, Sort, SortExpr, TableScan, TableSource, col,
29-
},
27+
logical_expr::{Limit, LogicalPlan, Projection, Sort, SortExpr, TableScan, TableSource, col},
3028
prelude::Expr,
3129
scalar::ScalarValue,
3230
};
3331

34-
use crate::service::search::datafusion::{
35-
plan::deduplication::DeduplicationLogicalNode, table_provider::empty_table::NewEmptyTable,
36-
};
32+
use crate::service::search::datafusion::table_provider::empty_table::NewEmptyTable;
3733

3834
// check if the plan is a complex query that we can't add sort _timestamp
3935
pub fn is_complex_query(plan: &LogicalPlan) -> bool {
@@ -53,38 +49,18 @@ pub fn is_complex_query(plan: &LogicalPlan) -> bool {
5349
pub struct AddSortAndLimit {
5450
pub limit: usize,
5551
pub offset: usize,
56-
pub deduplication_columns: Vec<Column>,
5752
}
5853

5954
impl AddSortAndLimit {
6055
pub fn new(limit: usize, offset: usize) -> Self {
61-
Self {
62-
limit,
63-
offset,
64-
deduplication_columns: vec![],
65-
}
66-
}
67-
68-
pub fn new_with_deduplication(
69-
limit: usize,
70-
offset: usize,
71-
deduplication_columns: Vec<Column>,
72-
) -> Self {
73-
Self {
74-
limit,
75-
offset,
76-
deduplication_columns,
77-
}
56+
Self { limit, offset }
7857
}
7958
}
8059

8160
impl TreeNodeRewriter for AddSortAndLimit {
8261
type Node = LogicalPlan;
8362

8463
fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
85-
if self.limit == 0 {
86-
return Ok(Transformed::new(node, false, TreeNodeRecursion::Stop));
87-
}
8864
if is_contain_deduplication_plan(&node) {
8965
return Ok(Transformed::new(node, false, TreeNodeRecursion::Stop));
9066
}
@@ -145,43 +121,6 @@ impl TreeNodeRewriter for AddSortAndLimit {
145121
transformed.tnr = TreeNodeRecursion::Stop;
146122
}
147123

148-
// support deduplication on join key
149-
// sort -> deduplication
150-
// only add when is_stop == true
151-
if !self.deduplication_columns.is_empty() && is_stop {
152-
let mut sort_columns = Vec::with_capacity(self.deduplication_columns.len() + 1);
153-
let schema = transformed.data.schema().clone();
154-
155-
for column in self.deduplication_columns.iter() {
156-
sort_columns.push(SortExpr {
157-
expr: col(column.name()),
158-
asc: false,
159-
nulls_first: false,
160-
});
161-
}
162-
163-
if schema.field_with_name(None, TIMESTAMP_COL_NAME).is_ok() {
164-
sort_columns.push(SortExpr {
165-
expr: col(TIMESTAMP_COL_NAME.to_string()),
166-
asc: false,
167-
nulls_first: false,
168-
});
169-
}
170-
171-
let sort = LogicalPlan::Sort(Sort {
172-
expr: sort_columns,
173-
input: Arc::new(transformed.data),
174-
fetch: None,
175-
});
176-
let dedup = LogicalPlan::Extension(Extension {
177-
node: Arc::new(DeduplicationLogicalNode::new(
178-
sort,
179-
self.deduplication_columns.clone(),
180-
)),
181-
});
182-
transformed.data = dedup;
183-
}
184-
185124
if let Some(schema) = schema {
186125
let plan = transformed.data;
187126
let proj = LogicalPlan::Projection(Projection::new_from_schema(Arc::new(plan), schema));
@@ -358,7 +297,7 @@ fn generate_table_source_with_sorted_by_time(
358297
}
359298
}
360299

361-
fn is_contain_deduplication_plan(plan: &LogicalPlan) -> bool {
300+
pub fn is_contain_deduplication_plan(plan: &LogicalPlan) -> bool {
362301
plan.exists(|plan| Ok(matches!(plan, LogicalPlan::Extension(_))))
363302
.unwrap()
364303
}

src/service/search/datafusion/plan/deduplication_exec.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ use std::{
1919
task::{Context, Poll},
2020
};
2121

22-
use arrow::array::{BooleanArray, Float64Array, Int64Array, RecordBatch, StringArray, UInt64Array};
23-
use arrow_schema::{DataType, SortOptions};
22+
use arrow::array::{
23+
BooleanArray, Float64Array, Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray,
24+
UInt64Array,
25+
};
26+
use arrow_schema::{DataType, SortOptions, TimeUnit};
2427
use config::TIMESTAMP_COL_NAME;
2528
use datafusion::{
2629
arrow::datatypes::SchemaRef,
@@ -255,6 +258,7 @@ enum Array {
255258
UInt64(UInt64Array),
256259
Boolean(BooleanArray),
257260
Float64(Float64Array),
261+
TimestampMicrosecond(TimestampMicrosecondArray),
258262
}
259263

260264
impl Array {
@@ -265,6 +269,7 @@ impl Array {
265269
Array::UInt64(array) => Value::UInt64(array.value(i)),
266270
Array::Boolean(array) => Value::Boolean(array.value(i)),
267271
Array::Float64(array) => Value::Float64(array.value(i)),
272+
Array::TimestampMicrosecond(array) => Value::Int64(array.value(i)),
268273
}
269274
}
270275
}
@@ -318,7 +323,16 @@ fn generate_deduplication_arrays(
318323
.unwrap()
319324
.clone(),
320325
),
321-
_ => panic!("Unsupported data type"),
326+
DataType::Timestamp(TimeUnit::Microsecond, None) => Array::TimestampMicrosecond(
327+
array
328+
.as_any()
329+
.downcast_ref::<TimestampMicrosecondArray>()
330+
.unwrap()
331+
.clone(),
332+
),
333+
_ => {
334+
panic!("Unsupported data type: {}", array.data_type());
335+
}
322336
}
323337
})
324338
.collect_vec();

0 commit comments

Comments
 (0)