Skip to content

Commit e7a16d7

Browse files
committed
Address code review comments
1 parent 399c9f6 commit e7a16d7

File tree

5 files changed

+76
-29
lines changed

5 files changed

+76
-29
lines changed

src/Analyzer/QueryNode.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,8 @@ class QueryNode final : public IQueryTreeNode
647647

648648
void addCorrelatedColumn(const QueryTreeNodePtr & correlated_column);
649649

650+
/// Returns result type of projection expression if query is correlated
651+
/// or throws an exception otherwise.
650652
DataTypePtr getResultType() const override;
651653

652654
QueryTreeNodeType getNodeType() const override

src/Interpreters/JoinInfo.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,16 @@ JoinActionRef JoinActionRef::deserialize(ReadBuffer & in, const ActionsDAGRawPtr
356356
return res;
357357
}
358358

359+
JoinActionRef JoinActionRef::clone(const ActionsDAG * actions_dag_) const
360+
{
361+
return JoinActionRef{actions_dag_, column_name};
362+
}
363+
364+
JoinActionRef::JoinActionRef(const ActionsDAG * actions_dag_, const String & column_name_)
365+
: actions_dag(actions_dag_)
366+
, column_name(column_name_)
367+
{}
368+
359369
void JoinPredicate::serialize(WriteBuffer & out, const JoinActionRef::ActionsDAGRawPtrs & dags) const
360370
{
361371
serializePredicateOperator(op, out);
@@ -435,6 +445,40 @@ JoinCondition JoinCondition::deserialize(ReadBuffer & in, const JoinActionRef::A
435445
};
436446
}
437447

448+
JoinCondition JoinCondition::clone(const JoinExpressionActions & expression_actions) const
449+
{
450+
JoinCondition copy;
451+
452+
copy.predicates.reserve(predicates.size());
453+
for (const auto & predicate : predicates)
454+
{
455+
copy.predicates.emplace_back(
456+
predicate.left_node.clone(expression_actions.left_pre_join_actions.get()),
457+
predicate.right_node.clone(expression_actions.right_pre_join_actions.get()),
458+
predicate.op);
459+
}
460+
461+
copy.left_filter_conditions.reserve(left_filter_conditions.size());
462+
for (const auto & condition: left_filter_conditions)
463+
{
464+
copy.left_filter_conditions.emplace_back(condition.clone(expression_actions.left_pre_join_actions.get()));
465+
}
466+
467+
copy.right_filter_conditions.reserve(right_filter_conditions.size());
468+
for (const auto & condition: right_filter_conditions)
469+
{
470+
copy.right_filter_conditions.emplace_back(condition.clone(expression_actions.right_pre_join_actions.get()));
471+
}
472+
473+
copy.residual_conditions.reserve(residual_conditions.size());
474+
for (const auto & condition: residual_conditions)
475+
{
476+
copy.residual_conditions.emplace_back(condition.clone(expression_actions.post_join_actions.get()));
477+
}
478+
479+
return copy;
480+
}
481+
438482
void JoinExpression::serialize(WriteBuffer & out, const JoinActionRef::ActionsDAGRawPtrs & dags) const
439483
{
440484
UInt8 is_using_flag = is_using ? 1 : 0;
@@ -473,6 +517,20 @@ JoinExpression JoinExpression::deserialize(ReadBuffer & in, const JoinActionRef:
473517
return {std::move(condition), std::move(disjunctive_conditions), bool(is_using_flag)};
474518
}
475519

520+
JoinExpression JoinExpression::clone(const JoinExpressionActions & expression_copy) const
521+
{
522+
JoinExpression copy;
523+
copy.condition = condition.clone(expression_copy);
524+
525+
copy.disjunctive_conditions.reserve(disjunctive_conditions.size());
526+
for (const auto & disjunctive_condition : disjunctive_conditions)
527+
copy.disjunctive_conditions.emplace_back(disjunctive_condition.clone(expression_copy));
528+
529+
copy.is_using = is_using;
530+
531+
return copy;
532+
}
533+
476534
void JoinInfo::serialize(WriteBuffer & out, const JoinActionRef::ActionsDAGRawPtrs & dags) const
477535
{
478536
expression.serialize(out, dags);

src/Interpreters/JoinInfo.h

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ struct JoinExpressionActions
7777

7878
JoinExpressionActions clone() const
7979
{
80-
return JoinExpressionActions(
80+
return JoinExpressionActions(
8181
std::make_unique<ActionsDAG>(left_pre_join_actions->clone()),
8282
std::make_unique<ActionsDAG>(right_pre_join_actions->clone()),
8383
std::make_unique<ActionsDAG>(post_join_actions->clone())
@@ -117,7 +117,11 @@ class JoinActionRef
117117
void serialize(WriteBuffer & out, const ActionsDAGRawPtrs & dags) const;
118118
static JoinActionRef deserialize(ReadBuffer & in, const ActionsDAGRawPtrs & dags);
119119

120+
JoinActionRef clone(const ActionsDAG * actions_dag_) const;
121+
120122
private:
123+
JoinActionRef(const ActionsDAG * actions_dag_, const String & column_name_);
124+
121125
const ActionsDAG * actions_dag = nullptr;
122126
String column_name;
123127
};
@@ -148,32 +152,10 @@ struct JoinCondition
148152
/// Unlike the join predicates, these conditions can be arbitrary expressions.
149153
std::vector<JoinActionRef> residual_conditions;
150154

151-
void fixReferences(const JoinExpressionActions & expression_actions)
152-
{
153-
for (auto & predicate : predicates)
154-
{
155-
predicate.left_node = JoinActionRef(&expression_actions.left_pre_join_actions->findInOutputs(predicate.left_node.getColumnName()), expression_actions.left_pre_join_actions.get());
156-
predicate.right_node = JoinActionRef(&expression_actions.right_pre_join_actions->findInOutputs(predicate.right_node.getColumnName()), expression_actions.right_pre_join_actions.get());
157-
}
158-
159-
for (auto & condition: left_filter_conditions)
160-
{
161-
condition = JoinActionRef(&expression_actions.left_pre_join_actions->findInOutputs(condition.getColumnName()), expression_actions.left_pre_join_actions.get());
162-
}
163-
164-
for (auto & condition: right_filter_conditions)
165-
{
166-
condition = JoinActionRef(&expression_actions.right_pre_join_actions->findInOutputs(condition.getColumnName()), expression_actions.right_pre_join_actions.get());
167-
}
168-
169-
for (auto & condition: residual_conditions)
170-
{
171-
condition = JoinActionRef(&expression_actions.post_join_actions->findInOutputs(condition.getColumnName()), expression_actions.post_join_actions.get());
172-
}
173-
}
174-
175155
void serialize(WriteBuffer & out, const JoinActionRef::ActionsDAGRawPtrs & dags) const;
176156
static JoinCondition deserialize(ReadBuffer & in, const JoinActionRef::ActionsDAGRawPtrs & dags);
157+
158+
JoinCondition clone(const JoinExpressionActions & expression_actions) const;
177159
};
178160

179161
struct JoinExpression
@@ -192,6 +174,8 @@ struct JoinExpression
192174

193175
void serialize(WriteBuffer & out, const JoinActionRef::ActionsDAGRawPtrs & dags) const;
194176
static JoinExpression deserialize(ReadBuffer & in, const JoinActionRef::ActionsDAGRawPtrs & dags);
177+
178+
JoinExpression clone(const JoinExpressionActions & expression_copy) const;
195179
};
196180

197181
struct JoinInfo
@@ -208,6 +192,11 @@ struct JoinInfo
208192
/// The locality of the join (e.g., LOCAL, GLOBAL)
209193
JoinLocality locality;
210194

195+
JoinInfo clone(const JoinExpressionActions & expression_actions) const
196+
{
197+
return JoinInfo{ expression.clone(expression_actions), kind, strictness, locality};
198+
}
199+
211200
void serialize(WriteBuffer & out, const JoinActionRef::ActionsDAGRawPtrs & dags) const;
212201
static JoinInfo deserialize(ReadBuffer & in, const JoinActionRef::ActionsDAGRawPtrs & dags);
213202
};

src/Processors/QueryPlan/JoinStepLogical.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -892,10 +892,7 @@ std::unique_ptr<IQueryPlanStep> JoinStepLogical::deserialize(Deserialization & c
892892
QueryPlanStepPtr JoinStepLogical::clone() const
893893
{
894894
auto new_expression_actions = expression_actions.clone();
895-
auto new_join_info = join_info;
896-
new_join_info.expression.condition.fixReferences(new_expression_actions);
897-
for (auto & condition : new_join_info.expression.disjunctive_conditions)
898-
condition.fixReferences(new_expression_actions);
895+
auto new_join_info = join_info.clone(new_expression_actions);
899896

900897
auto result_step = std::make_unique<JoinStepLogical>(
901898
getInputHeaders().front(), getInputHeaders().back(),

tests/queries/0_stateless/03447_analyzer_correlated_subqueries_tpc_h.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ INSERT INTO lineitem SELECT * FROM generateRandom() LIMIT 1;
100100

101101
set enable_analyzer = 1;
102102
set allow_experimental_correlated_subqueries = 1;
103+
SET enable_parallel_replicas = 0;
103104

104105
-- Q2
105106
SELECT

0 commit comments

Comments
 (0)