Skip to content

Commit 76d7ef8

Browse files
committed
update some fix, todo and comment
fix fix fix Revert "fix" This reverts commit 41bba83. fix update some todo and comment update fix update fix test tets tets remove nullable_tuples update format update update fix update fix fix fix Revert "fix" This reverts commit ce86242. Reapply "fix" This reverts commit 5c58d3d. fix update fix
1 parent 847bb6c commit 76d7ef8

21 files changed

+176
-71
lines changed

be/src/pipeline/exec/exchange_sink_buffer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ namespace pipeline {
9292
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
9393
PlanNodeId node_id, RuntimeState* state,
9494
const std::vector<InstanceLoId>& sender_ins_ids)
95-
: HasTaskExecutionCtx(state),
95+
: HasTaskExecutionCtx(state, false),
9696
_queue_capacity(0),
9797
_is_failed(false),
9898
_query_id(std::move(query_id)),

be/src/pipeline/exec/exchange_sink_buffer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
269269
RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids);
270270
#ifdef BE_TEST
271271
ExchangeSinkBuffer(RuntimeState* state, int64_t sinknum)
272-
: HasTaskExecutionCtx(state), _state(state), _exchange_sink_num(sinknum) {};
272+
: HasTaskExecutionCtx(state, false), _state(state), _exchange_sink_num(sinknum) {};
273273
#endif
274274

275275
~ExchangeSinkBuffer() override = default;

be/src/pipeline/exec/exchange_source_operator.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ ExchangeLocalState::~ExchangeLocalState() {
5050
std::string ExchangeLocalState::debug_string(int indentation_level) const {
5151
fmt::memory_buffer debug_string_buffer;
5252
fmt::format_to(debug_string_buffer, "{}, recvr: ({})", Base::debug_string(indentation_level),
53-
stream_recvr->debug_string());
53+
stream_recvr ? stream_recvr->debug_string() : "null");
5454
return fmt::to_string(debug_string_buffer);
5555
}
5656

@@ -210,6 +210,7 @@ Status ExchangeLocalState::close(RuntimeState* state) {
210210
}
211211
if (stream_recvr != nullptr) {
212212
stream_recvr->close();
213+
stream_recvr.reset();
213214
}
214215
if (_parent->cast<ExchangeSourceOperatorX>()._is_merging) {
215216
vsort_exec_exprs.close(state);

be/src/pipeline/exec/rec_cte_anchor_sink_operator.h

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ class RecCTEAnchorSinkOperatorX MOCK_REMOVE(final)
5858
friend class RecCTEAnchorSinkLocalState;
5959
RecCTEAnchorSinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode,
6060
const DescriptorTbl& descs)
61-
: Base(sink_id, tnode.node_id, dest_id),
62-
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
61+
: Base(sink_id, tnode.node_id, dest_id), _row_descriptor(descs, tnode.row_tuples) {}
6362

6463
~RecCTEAnchorSinkOperatorX() override = default;
6564

@@ -72,13 +71,20 @@ class RecCTEAnchorSinkOperatorX MOCK_REMOVE(final)
7271
return {ExchangeType::NOOP};
7372
}
7473

74+
Status terminate(RuntimeState* state) override {
75+
RETURN_IF_ERROR(_notify_rec_side_ready_if_needed(state));
76+
return Base::terminate(state);
77+
}
78+
79+
Status close(RuntimeState* state) override {
80+
RETURN_IF_ERROR(_notify_rec_side_ready_if_needed(state));
81+
return Base::close(state);
82+
}
83+
7584
Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override {
7685
auto& local_state = get_local_state(state);
7786

78-
if (_need_notify_rec_side_ready) {
79-
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(state, 0));
80-
_need_notify_rec_side_ready = false;
81-
}
87+
RETURN_IF_ERROR(_notify_rec_side_ready_if_needed(state));
8288

8389
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows());
8490
if (input_block->rows() != 0) {
@@ -103,6 +109,14 @@ class RecCTEAnchorSinkOperatorX MOCK_REMOVE(final)
103109
}
104110

105111
private:
112+
Status _notify_rec_side_ready_if_needed(RuntimeState* state) {
113+
if (_need_notify_rec_side_ready) {
114+
_need_notify_rec_side_ready = false;
115+
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(state, 0));
116+
}
117+
return Status::OK();
118+
}
119+
106120
const RowDescriptor _row_descriptor;
107121
vectorized::VExprContextSPtrs _child_expr;
108122

be/src/pipeline/exec/rec_cte_sink_operator.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ class RecCTESinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<RecCTESi
5555
friend class RecCTESinkLocalState;
5656
RecCTESinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode,
5757
const DescriptorTbl& descs)
58-
: Base(sink_id, tnode.node_id, dest_id),
59-
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
58+
: Base(sink_id, tnode.node_id, dest_id), _row_descriptor(descs, tnode.row_tuples) {}
6059

6160
~RecCTESinkOperatorX() override = default;
6261

be/src/pipeline/exec/rec_cte_source_operator.cpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,10 @@ Status RecCTESourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
4141
_shared_state->anchor_dep = _anchor_dependency.get();
4242

4343
auto& p = _parent->cast<Parent>();
44-
_child_expr.resize(p._child_expr.size());
45-
for (size_t i = 0; i < p._child_expr.size(); i++) {
46-
RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
47-
}
4844
if (!_parent->cast<RecCTESourceOperatorX>()._is_union_all) {
4945
_shared_state->agg_data = std::make_unique<DistinctDataVariants>();
5046
RETURN_IF_ERROR(init_hash_method<DistinctDataVariants>(_shared_state->agg_data.get(),
51-
get_data_types(_child_expr), false));
47+
p._hash_table_key_types, false));
5248
}
5349

5450
_shared_state->hash_table_compute_timer =
@@ -70,16 +66,19 @@ Status RecCTESourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
7066
const auto& texprs = tnode.rec_cte_node.result_expr_lists[1];
7167
vectorized::VExprContextSPtrs ctxs;
7268
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
73-
_child_expr = ctxs;
69+
_hash_table_key_types = get_data_types(ctxs);
7470
}
7571
return Status::OK();
7672
}
7773

7874
Status RecCTESourceOperatorX::prepare(RuntimeState* state) {
7975
RETURN_IF_ERROR(Base::prepare(state));
80-
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc()));
81-
RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _child->row_desc()));
82-
RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
76+
for (auto fragment : _fragments_to_reset) {
77+
if (state->fragment_id() == fragment.fragment_id) {
78+
return Status::InternalError("Fragment {} contains a recursive CTE node",
79+
fragment.fragment_id);
80+
}
81+
}
8382
return Status::OK();
8483
}
8584

be/src/pipeline/exec/rec_cte_source_operator.h

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ class RecCTESourceLocalState final : public PipelineXLocalState<RecCTESharedStat
5858
friend class RecCTESourceOperatorX;
5959
friend class OperatorX<RecCTESourceLocalState>;
6060

61-
vectorized::VExprContextSPtrs _child_expr;
62-
6361
std::shared_ptr<Dependency> _anchor_dependency = nullptr;
6462
};
6563

@@ -135,13 +133,13 @@ class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> {
135133

136134
private:
137135
Status _send_close(RuntimeState* state) {
138-
if (!_aready_send_close && !_is_used_by_other_rec_cte) {
139-
RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::close));
140-
_aready_send_close = true;
141-
136+
if (!_already_send_close && !_is_used_by_other_rec_cte) {
137+
_already_send_close = true;
142138
auto* round_counter = ADD_COUNTER(get_local_state(state).Base::custom_profile(),
143139
"RecursiveRound", TUnit::UNIT);
144140
round_counter->set(int64_t(get_local_state(state)._shared_state->current_round));
141+
142+
RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::close));
145143
}
146144
return Status::OK();
147145
}
@@ -179,10 +177,6 @@ class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> {
179177

180178
Status _send_rerun_fragments(RuntimeState* state, PRerunFragmentParams_Opcode stage) const {
181179
for (auto fragment : _fragments_to_reset) {
182-
if (state->fragment_id() == fragment.fragment_id) {
183-
return Status::InternalError("Fragment {} contains a recursive CTE node",
184-
fragment.fragment_id);
185-
}
186180
auto stub =
187181
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(
188182
fragment.addr);
@@ -202,14 +196,16 @@ class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> {
202196
return Status::InternalError(controller.ErrorText());
203197
}
204198

205-
RETURN_IF_ERROR(Status::create(result.status()));
199+
auto rpc_st = Status::create(result.status());
200+
if (!rpc_st.ok()) {
201+
return rpc_st;
202+
}
206203
}
207204
return Status::OK();
208205
}
209206

210207
friend class RecCTESourceLocalState;
211-
212-
vectorized::VExprContextSPtrs _child_expr;
208+
std::vector<vectorized::DataTypePtr> _hash_table_key_types;
213209

214210
const bool _is_union_all = false;
215211
std::vector<TRecCTETarget> _targets;
@@ -218,7 +214,7 @@ class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> {
218214

219215
int _max_recursion_depth = 0;
220216

221-
bool _aready_send_close = false;
217+
bool _already_send_close = false;
222218

223219
bool _is_used_by_other_rec_cte = false;
224220
};

be/src/pipeline/exec/scan_operator.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,6 +1366,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
13661366
SCOPED_TIMER(exec_time_counter());
13671367
if (_scanner_ctx) {
13681368
_scanner_ctx->stop_scanners(state);
1369+
_scanner_ctx.reset();
13691370
}
13701371
std::list<std::shared_ptr<vectorized::ScannerDelegate>> {}.swap(_scanners);
13711372
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
320320

321321
auto* fragment_context = this;
322322

323-
if (!_need_notify_close && _params.query_options.__isset.is_report_success) {
323+
if (_params.query_options.__isset.is_report_success) {
324324
fragment_context->set_is_report_success(_params.query_options.is_report_success);
325325
}
326326

@@ -1793,10 +1793,16 @@ void PipelineFragmentContext::_close_fragment_instance() {
17931793
}
17941794
Defer defer_op {[&]() {
17951795
_is_fragment_instance_closed = true;
1796-
_close_cv.notify_all();
1796+
_notify_cv.notify_all();
17971797
}};
17981798
_fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1799-
static_cast<void>(send_report(true));
1799+
if (!_need_notify_close) {
1800+
auto st = send_report(true);
1801+
if (!st) {
1802+
LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1803+
print_id(_query_id), _fragment_id, st.to_string());
1804+
}
1805+
}
18001806
// Print profile content in info log is a tempoeray solution for stream load and external_connector.
18011807
// Since stream load does not have someting like coordinator on FE, so
18021808
// backend can not report profile to FE, ant its profile can not be shown
@@ -1971,8 +1977,11 @@ std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const
19711977
std::string PipelineFragmentContext::debug_string() {
19721978
std::lock_guard<std::mutex> l(_task_mutex);
19731979
fmt::memory_buffer debug_string_buffer;
1974-
fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\nneed_notify_close: {}\n",
1975-
_need_notify_close);
1980+
fmt::format_to(debug_string_buffer,
1981+
"PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
1982+
"need_notify_close={}, has_task_execution_ctx_ref_count={}\n",
1983+
_closed_tasks, _total_tasks, _need_notify_close,
1984+
_has_task_execution_ctx_ref_count);
19761985
for (size_t j = 0; j < _tasks.size(); j++) {
19771986
fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
19781987
for (size_t i = 0; i < _tasks[j].size(); i++) {
@@ -2057,10 +2066,17 @@ Status PipelineFragmentContext::wait_close(bool close) {
20572066

20582067
{
20592068
std::unique_lock<std::mutex> lock(_task_mutex);
2060-
_close_cv.wait(lock, [this] { return _is_fragment_instance_closed.load(); });
2069+
_notify_cv.wait(lock, [this] {
2070+
return _is_fragment_instance_closed.load() && !_has_task_execution_ctx_ref_count;
2071+
});
20612072
}
20622073

20632074
if (close) {
2075+
auto st = send_report(true);
2076+
if (!st) {
2077+
LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2078+
print_id(_query_id), _fragment_id, st.to_string());
2079+
}
20642080
_exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
20652081
}
20662082
return Status::OK();

be/src/pipeline/pipeline_fragment_context.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@ class PipelineFragmentContext : public TaskExecutionContext {
195195
Pipelines _pipelines;
196196
PipelineId _next_pipeline_id = 0;
197197
std::mutex _task_mutex;
198-
std::condition_variable _close_cv;
199198
int _closed_tasks = 0;
200199
// After prepared, `_total_tasks` is equal to the size of `_tasks`.
201200
// When submit fail, `_total_tasks` is equal to the number of tasks submitted.

0 commit comments

Comments
 (0)