Skip to content

Commit 3983fe0

Browse files
committed
[fix](union) Local shuffle for union operator (apache#56048)
1 parent b96116b commit 3983fe0

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

be/src/pipeline/exec/union_sink_operator.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int sink_id, ObjectPool* po
5757
_first_materialized_child_idx(tnode.union_node.first_materialized_child_idx),
5858
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
5959
_cur_child_id(child_id),
60-
_child_size(tnode.num_children) {}
60+
_child_size(tnode.num_children),
61+
_distribute_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0]
62+
: std::vector<TExpr> {}) {}
6163

6264
Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
6365
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));

be/src/pipeline/exec/union_sink_operator.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,16 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
9494
return _followed_by_shuffled_operator;
9595
}
9696

97+
DataDistribution required_data_distribution() const override {
98+
if (_child->is_serial_operator() && _followed_by_shuffled_operator) {
99+
return DataDistribution(ExchangeType::HASH_SHUFFLE, _distribute_exprs);
100+
}
101+
if (_child->is_serial_operator()) {
102+
return DataDistribution(ExchangeType::PASSTHROUGH);
103+
}
104+
return DataDistribution(ExchangeType::NOOP);
105+
}
106+
97107
bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }
98108

99109
private:
@@ -113,6 +123,7 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
113123
const RowDescriptor _row_descriptor;
114124
const int _cur_child_id;
115125
const int _child_size;
126+
const std::vector<TExpr> _distribute_exprs;
116127
int children_count() const { return _child_size; }
117128
bool is_child_passthrough(int child_idx) const {
118129
DCHECK_LT(child_idx, _child_size);
@@ -152,4 +163,4 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
152163
};
153164

154165
} // namespace pipeline
155-
} // namespace doris
166+
} // namespace doris

0 commit comments

Comments
 (0)