Skip to content

Commit 6377254

Browse files
authored
[Chore](shuffle) adjust some local shuffle rules (#59366)
1. make distinct streaming agg always shuffle(Improve parallelism) 2. make broadcast join probe do not shuffle This pull request updates the logic for determining required data distributions in the aggregation and join pipeline operators. The main focus is on improving the handling of exchange types, especially for passthrough and broadcast scenarios. Key changes include: **Data distribution logic updates:** * In `DistinctStreamingAggOperatorX`, the method now always returns `ExchangeType::PASSTHROUGH` instead of delegating to the base class, simplifying the distribution requirement when colocation is not needed. * In `HashJoinProbeOperatorX`, the logic for broadcast joins is refined: if the child is a serial operator, it returns `ExchangeType::PASSTHROUGH`; otherwise, it returns `ExchangeType::NOOP`. The handling of bucket shuffle and colocate join distributions is also clarified.
1 parent d49d620 commit 6377254

File tree

2 files changed

+10
-8
lines changed

2 files changed

+10
-8
lines changed

be/src/pipeline/exec/distinct_streaming_aggregation_operator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class DistinctStreamingAggOperatorX final
120120
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
121121
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
122122
}
123-
return StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution(state);
123+
return {ExchangeType::PASSTHROUGH};
124124
}
125125

126126
bool require_data_distribution() const override { return _is_colocate; }

be/src/pipeline/exec/hashjoin_probe_operator.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,16 @@ class HashJoinProbeOperatorX MOCK_REMOVE(final)
133133
DataDistribution required_data_distribution(RuntimeState* /*state*/) const override {
134134
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
135135
return {ExchangeType::NOOP};
136+
} else if (_is_broadcast_join) {
137+
return _child && _child->is_serial_operator()
138+
? DataDistribution(ExchangeType::PASSTHROUGH)
139+
: DataDistribution(ExchangeType::NOOP);
136140
}
137-
return _is_broadcast_join
138-
? DataDistribution(ExchangeType::PASSTHROUGH)
139-
: (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
140-
_join_distribution == TJoinDistributionType::COLOCATE
141-
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
142-
_partition_exprs)
143-
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
141+
142+
return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
143+
_join_distribution == TJoinDistributionType::COLOCATE
144+
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
145+
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
144146
}
145147
bool is_broadcast_join() const { return _is_broadcast_join; }
146148

0 commit comments

Comments
 (0)