Skip to content

Commit 8b26ec1

Browse files
committed
support rec cte (be part / proto part)
1 parent b2a218d commit 8b26ec1

35 files changed

+1554
-126
lines changed

be/src/pipeline/exec/exchange_source_operator.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,4 +224,9 @@ Status ExchangeSourceOperatorX::close(RuntimeState* state) {
224224
_is_closed = true;
225225
return OperatorX<ExchangeLocalState>::close(state);
226226
}
227+
228+
Status ExchangeSourceOperatorX::reset(RuntimeState* state) {
229+
auto& local_state = get_local_state(state);
230+
return local_state.reset(state);
231+
}
227232
} // namespace doris::pipeline

be/src/pipeline/exec/exchange_source_operator.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,23 @@ class ExchangeLocalState : public PipelineXLocalState<> {
4848
Status close(RuntimeState* state) override;
4949
std::string debug_string(int indentation_level) const override;
5050

51+
Status reset(RuntimeState* state) {
52+
if (stream_recvr) {
53+
stream_recvr->close();
54+
}
55+
create_stream_recvr(state);
56+
57+
is_ready = false;
58+
num_rows_skipped = 0;
59+
60+
const auto& queues = stream_recvr->sender_queues();
61+
for (size_t i = 0; i < queues.size(); i++) {
62+
deps[i]->block();
63+
queues[i]->set_dependency(deps[i]);
64+
}
65+
return Status::OK();
66+
}
67+
5168
std::vector<Dependency*> dependencies() const override {
5269
std::vector<Dependency*> dep_vec;
5370
std::for_each(deps.begin(), deps.end(),
@@ -83,6 +100,8 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
83100
Status init(const TPlanNode& tnode, RuntimeState* state) override;
84101
Status prepare(RuntimeState* state) override;
85102

103+
Status reset(RuntimeState* state);
104+
86105
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
87106

88107
std::string debug_string(int indentation_level = 0) const override;

be/src/pipeline/exec/operator.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@
6262
#include "pipeline/exec/partitioned_aggregation_source_operator.h"
6363
#include "pipeline/exec/partitioned_hash_join_probe_operator.h"
6464
#include "pipeline/exec/partitioned_hash_join_sink_operator.h"
65+
#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
66+
#include "pipeline/exec/rec_cte_scan_operator.h"
67+
#include "pipeline/exec/rec_cte_sink_operator.h"
68+
#include "pipeline/exec/rec_cte_source_operator.h"
6569
#include "pipeline/exec/repeat_operator.h"
6670
#include "pipeline/exec/result_file_sink_operator.h"
6771
#include "pipeline/exec/result_sink_operator.h"
@@ -803,6 +807,8 @@ DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
803807
DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
804808
DECLARE_OPERATOR(CacheSinkLocalState)
805809
DECLARE_OPERATOR(DictSinkLocalState)
810+
DECLARE_OPERATOR(RecCTESinkLocalState)
811+
DECLARE_OPERATOR(RecCTEAnchorSinkLocalState)
806812

807813
#undef DECLARE_OPERATOR
808814

@@ -836,6 +842,8 @@ DECLARE_OPERATOR(MetaScanLocalState)
836842
DECLARE_OPERATOR(LocalExchangeSourceLocalState)
837843
DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
838844
DECLARE_OPERATOR(CacheSourceLocalState)
845+
DECLARE_OPERATOR(RecCTESourceLocalState)
846+
DECLARE_OPERATOR(RecCTEScanLocalState)
839847

840848
#ifdef BE_TEST
841849
DECLARE_OPERATOR(MockLocalState)
@@ -871,6 +879,7 @@ template class PipelineXSinkLocalState<SetSharedState>;
871879
template class PipelineXSinkLocalState<LocalExchangeSharedState>;
872880
template class PipelineXSinkLocalState<BasicSharedState>;
873881
template class PipelineXSinkLocalState<DataQueueSharedState>;
882+
template class PipelineXSinkLocalState<RecCTESharedState>;
874883

875884
template class PipelineXLocalState<HashJoinSharedState>;
876885
template class PipelineXLocalState<PartitionedHashJoinSharedState>;
@@ -888,6 +897,7 @@ template class PipelineXLocalState<PartitionSortNodeSharedState>;
888897
template class PipelineXLocalState<SetSharedState>;
889898
template class PipelineXLocalState<LocalExchangeSharedState>;
890899
template class PipelineXLocalState<BasicSharedState>;
900+
template class PipelineXLocalState<RecCTESharedState>;
891901

892902
template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>;
893903
template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>;

be/src/pipeline/exec/operator.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,8 @@ class DataSinkOperatorXBase : public OperatorBase {
613613
// For agg/sort/join sink.
614614
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
615615

616+
virtual bool need_rerun(RuntimeState* state) const { return false; }
617+
616618
Status init(const TDataSink& tsink) override;
617619
[[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
618620
const bool use_global_hash_shuffle,
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
19+
20+
namespace doris::pipeline {
21+
#include "common/compile_check_begin.h"
22+
23+
Status RecCTEAnchorSinkLocalState::open(RuntimeState* state) {
24+
SCOPED_TIMER(exec_time_counter());
25+
SCOPED_TIMER(_open_timer);
26+
RETURN_IF_ERROR(Base::open(state));
27+
auto& p = _parent->cast<Parent>();
28+
_child_expr.resize(p._child_expr.size());
29+
for (size_t i = 0; i < p._child_expr.size(); i++) {
30+
RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
31+
}
32+
return Status::OK();
33+
}
34+
35+
Status RecCTEAnchorSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
36+
RETURN_IF_ERROR(Base::init(tnode, state));
37+
DCHECK(tnode.__isset.rec_cte_node);
38+
{
39+
const auto& texprs = tnode.rec_cte_node.result_expr_lists[0];
40+
vectorized::VExprContextSPtrs ctxs;
41+
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
42+
_child_expr = ctxs;
43+
}
44+
_name = "REC_CTE_ANCHOR_SINK_OPERATOR";
45+
return Status::OK();
46+
}
47+
48+
Status RecCTEAnchorSinkOperatorX::prepare(RuntimeState* state) {
49+
RETURN_IF_ERROR(Base::prepare(state));
50+
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc()));
51+
RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _child->row_desc()));
52+
RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
53+
return Status::OK();
54+
}
55+
56+
} // namespace doris::pipeline
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <memory>
21+
22+
#include "common/status.h"
23+
#include "operator.h"
24+
#include "pipeline/exec/union_sink_operator.h"
25+
#include "pipeline/rec_cte_shared_state.h"
26+
#include "vec/core/block.h"
27+
28+
namespace doris {
29+
#include "common/compile_check_begin.h"
30+
class RuntimeState;
31+
32+
namespace pipeline {
33+
class DataQueue;
34+
35+
class RecCTEAnchorSinkOperatorX;
36+
class RecCTEAnchorSinkLocalState final : public PipelineXSinkLocalState<RecCTESharedState> {
37+
public:
38+
ENABLE_FACTORY_CREATOR(RecCTEAnchorSinkLocalState);
39+
RecCTEAnchorSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
40+
: Base(parent, state) {}
41+
Status open(RuntimeState* state) override;
42+
43+
bool is_blockable() const override { return true; }
44+
45+
private:
46+
friend class RecCTEAnchorSinkOperatorX;
47+
using Base = PipelineXSinkLocalState<RecCTESharedState>;
48+
using Parent = RecCTEAnchorSinkOperatorX;
49+
50+
vectorized::VExprContextSPtrs _child_expr;
51+
};
52+
53+
class RecCTEAnchorSinkOperatorX MOCK_REMOVE(final)
54+
: public DataSinkOperatorX<RecCTEAnchorSinkLocalState> {
55+
public:
56+
using Base = DataSinkOperatorX<RecCTEAnchorSinkLocalState>;
57+
58+
friend class RecCTEAnchorSinkLocalState;
59+
RecCTEAnchorSinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode,
60+
const DescriptorTbl& descs)
61+
: Base(sink_id, tnode.node_id, dest_id),
62+
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
63+
64+
~RecCTEAnchorSinkOperatorX() override = default;
65+
66+
Status init(const TPlanNode& tnode, RuntimeState* state) override;
67+
Status prepare(RuntimeState* state) override;
68+
69+
bool is_serial_operator() const override { return true; }
70+
71+
DataDistribution required_data_distribution(RuntimeState* /*state*/) const override {
72+
return {ExchangeType::NOOP};
73+
}
74+
75+
Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override {
76+
auto& local_state = get_local_state(state);
77+
78+
if (_need_notify_rec_side_ready) {
79+
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(state, 0));
80+
_need_notify_rec_side_ready = false;
81+
}
82+
83+
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows());
84+
if (input_block->rows() != 0) {
85+
vectorized::Block block;
86+
RETURN_IF_ERROR(materialize_block(local_state._child_expr, input_block, &block, true));
87+
RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, std::move(block)));
88+
}
89+
90+
if (eos) {
91+
local_state._shared_state->anchor_dep->set_ready();
92+
}
93+
return Status::OK();
94+
}
95+
96+
std::shared_ptr<BasicSharedState> create_shared_state() const override {
97+
std::shared_ptr<BasicSharedState> ss = std::make_shared<RecCTESharedState>();
98+
ss->id = operator_id();
99+
for (const auto& dest : dests_id()) {
100+
ss->related_op_ids.insert(dest);
101+
}
102+
return ss;
103+
}
104+
105+
private:
106+
const RowDescriptor _row_descriptor;
107+
vectorized::VExprContextSPtrs _child_expr;
108+
109+
bool _need_notify_rec_side_ready = true;
110+
};
111+
112+
} // namespace pipeline
113+
#include "common/compile_check_end.h"
114+
} // namespace doris
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include "common/status.h"
21+
#include "pipeline/exec/operator.h"
22+
23+
namespace doris {
24+
#include "common/compile_check_begin.h"
25+
class RuntimeState;
26+
} // namespace doris
27+
28+
namespace doris::pipeline {
29+
30+
class RecCTEScanOperatorX;
31+
class RecCTEScanLocalState final : public PipelineXLocalState<> {
32+
public:
33+
ENABLE_FACTORY_CREATOR(RecCTEScanLocalState);
34+
35+
RecCTEScanLocalState(RuntimeState* state, OperatorXBase* parent)
36+
: PipelineXLocalState<>(state, parent) {
37+
_scan_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
38+
_parent->get_name() + "_DEPENDENCY");
39+
state->get_query_ctx()->registe_cte_scan(state->fragment_instance_id(), parent->node_id(),
40+
this);
41+
}
42+
~RecCTEScanLocalState() override {
43+
state()->get_query_ctx()->deregiste_cte_scan(state()->fragment_instance_id(),
44+
parent()->node_id());
45+
}
46+
47+
Status add_block(const PBlock& pblock) {
48+
vectorized::Block block;
49+
size_t uncompressed_bytes;
50+
int64_t decompress_time;
51+
RETURN_IF_ERROR(block.deserialize(pblock, &uncompressed_bytes, &decompress_time));
52+
_blocks.emplace_back(std::move(block));
53+
return Status::OK();
54+
}
55+
56+
void set_ready() { _scan_dependency->set_ready(); }
57+
58+
std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; }
59+
60+
private:
61+
friend class RecCTEScanOperatorX;
62+
std::vector<vectorized::Block> _blocks;
63+
DependencySPtr _scan_dependency = nullptr;
64+
};
65+
66+
class RecCTEScanOperatorX final : public OperatorX<RecCTEScanLocalState> {
67+
public:
68+
RecCTEScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
69+
const DescriptorTbl& descs)
70+
: OperatorX<RecCTEScanLocalState>(pool, tnode, operator_id, descs) {}
71+
72+
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override {
73+
auto& local_state = get_local_state(state);
74+
75+
if (local_state._blocks.empty()) {
76+
*eos = true;
77+
return Status::OK();
78+
}
79+
*block = std::move(local_state._blocks.back());
80+
RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block, block->columns()));
81+
local_state._blocks.pop_back();
82+
return Status::OK();
83+
}
84+
85+
bool is_source() const override { return true; }
86+
};
87+
88+
#include "common/compile_check_end.h"
89+
} // namespace doris::pipeline

0 commit comments

Comments
 (0)