File tree Expand file tree Collapse file tree 2 files changed +10
-8
lines changed
Expand file tree Collapse file tree 2 files changed +10
-8
lines changed Original file line number Diff line number Diff line change @@ -118,7 +118,7 @@ class DistinctStreamingAggOperatorX final
118118 ? DataDistribution (ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
119119 : DataDistribution (ExchangeType::HASH_SHUFFLE, _partition_exprs);
120120 }
121- return StatefulOperatorX<DistinctStreamingAggLocalState>:: required_data_distribution (state) ;
121+ return {ExchangeType::PASSTHROUGH} ;
122122 }
123123
124124 bool require_data_distribution () const override { return _is_colocate; }
Original file line number Diff line number Diff line change @@ -133,14 +133,16 @@ class HashJoinProbeOperatorX MOCK_REMOVE(final)
133133 DataDistribution required_data_distribution (RuntimeState* /* state*/ ) const override {
134134 if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
135135 return {ExchangeType::NOOP};
136+ } else if (_is_broadcast_join) {
137+ return _child && _child->is_serial_operator ()
138+ ? DataDistribution (ExchangeType::PASSTHROUGH)
139+ : DataDistribution (ExchangeType::NOOP);
136140 }
137- return _is_broadcast_join
138- ? DataDistribution (ExchangeType::PASSTHROUGH)
139- : (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
140- _join_distribution == TJoinDistributionType::COLOCATE
141- ? DataDistribution (ExchangeType::BUCKET_HASH_SHUFFLE,
142- _partition_exprs)
143- : DataDistribution (ExchangeType::HASH_SHUFFLE, _partition_exprs));
141+
142+ return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
143+ _join_distribution == TJoinDistributionType::COLOCATE
144+ ? DataDistribution (ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
145+ : DataDistribution (ExchangeType::HASH_SHUFFLE, _partition_exprs));
144146 }
145147 bool is_broadcast_join () const { return _is_broadcast_join; }
146148
You can’t perform that action at this time.
0 commit comments