8282#include " pipeline/exec/partitioned_aggregation_source_operator.h"
8383#include " pipeline/exec/partitioned_hash_join_probe_operator.h"
8484#include " pipeline/exec/partitioned_hash_join_sink_operator.h"
85- #include " pipeline/exec/queue_sink_operator.h"
86- #include " pipeline/exec/queue_source_operator.h"
8785#include " pipeline/exec/rec_cte_anchor_sink_operator.h"
8886#include " pipeline/exec/rec_cte_scan_operator.h"
8987#include " pipeline/exec/rec_cte_sink_operator.h"
@@ -1316,53 +1314,12 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
13161314 RETURN_IF_ERROR (cur_pipe->operators ().front ()->set_child (op));
13171315 cur_pipe = new_pipe;
13181316 } else {
1319- // Check if parent is SetProbeSinkOperatorX by checking if the sink is a SetProbeSinkOperatorX
1320- bool parent_is_set_probe = false ;
1321- if (cur_pipe->sink () != nullptr ) {
1322- auto sink = cur_pipe->sink ();
1323- // Try to dynamic_cast to both template instances of SetProbeSinkOperatorX
1324- if (dynamic_cast <SetProbeSinkOperatorX<true >*>(sink) ||
1325- dynamic_cast <SetProbeSinkOperatorX<false >*>(sink)) {
1326- parent_is_set_probe = true ;
1327- }
1328- }
1329-
1330- if (parent_is_set_probe) {
1331- // Create QueueSourceOperatorX
1332- auto queue_source_id = next_operator_id ();
1333- OperatorPtr queue_source_op = std::make_shared<QueueSourceOperatorX>(
1334- pool, tnode.node_id , queue_source_id);
1335- RETURN_IF_ERROR (cur_pipe->add_operator (queue_source_op, _parallel_instances));
1336-
1337- // Create new pipeline for QueueSinkOperatorX
1338- const auto downstream_pipeline_id = cur_pipe->id ();
1339- if (!_dag.contains (downstream_pipeline_id)) {
1340- _dag.insert ({downstream_pipeline_id, {}});
1341- }
1342- PipelinePtr queue_side_pipe = add_pipeline (cur_pipe);
1343- _dag[downstream_pipeline_id].push_back (queue_side_pipe->id ());
1344-
1345- // Create QueueSinkOperatorX
1346- auto queue_sink_id = next_sink_operator_id ();
1347- DataSinkOperatorPtr queue_sink_op = std::make_shared<QueueSinkOperatorX>(
1348- queue_sink_id, queue_source_id, queue_source_op->operator_id ());
1349- RETURN_IF_ERROR (queue_side_pipe->set_sink (queue_sink_op));
1350-
1351- // Create DistinctStreamingAggOperatorX in the new pipeline
1352- // Note: we assign to op so that _create_tree_helper will set its child correctly
1353- op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id (),
1354- tnode, descs, _require_bucket_distribution);
1355- RETURN_IF_ERROR (queue_side_pipe->add_operator (op, _parallel_instances));
1356- RETURN_IF_ERROR (queue_source_op->set_child (op));
1357- cur_pipe = queue_side_pipe;
1358- } else {
1359- op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id (),
1360- tnode, descs, _require_bucket_distribution);
1361- op->set_followed_by_shuffled_operator (followed_by_shuffled_operator);
1362- _require_bucket_distribution =
1363- _require_bucket_distribution || op->require_data_distribution ();
1364- RETURN_IF_ERROR (cur_pipe->add_operator (op, _parallel_instances));
1365- }
1317+ op = std::make_shared<DistinctStreamingAggOperatorX>(
1318+ pool, next_operator_id (), tnode, descs, _require_bucket_distribution);
1319+ op->set_followed_by_shuffled_operator (followed_by_shuffled_operator);
1320+ _require_bucket_distribution =
1321+ _require_bucket_distribution || op->require_data_distribution ();
1322+ RETURN_IF_ERROR (cur_pipe->add_operator (op, _parallel_instances));
13661323 }
13671324 } else if (is_streaming_agg) {
13681325 if (need_create_cache_op) {
@@ -1764,7 +1721,6 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(
17641721 PipelinePtr probe_side_pipe = add_pipeline (cur_pipe);
17651722 _dag[downstream_pipeline_id].push_back (probe_side_pipe->id ());
17661723
1767- // Create appropriate sink operator based on child_id
17681724 DataSinkOperatorPtr sink;
17691725 if (child_id == 0 ) {
17701726 sink.reset (new SetSinkOperatorX<is_intersect>(child_id, next_sink_operator_id (),
@@ -1773,7 +1729,6 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(
17731729 sink.reset (new SetProbeSinkOperatorX<is_intersect>(
17741730 child_id, next_sink_operator_id (), op->operator_id (), pool, tnode, descs));
17751731 }
1776- // Common code for both cases
17771732 sink->set_followed_by_shuffled_operator (followed_by_shuffled_operator);
17781733 RETURN_IF_ERROR (probe_side_pipe->set_sink (sink));
17791734 RETURN_IF_ERROR (probe_side_pipe->sink ()->init (tnode, _runtime_state.get ()));
0 commit comments