Skip to content

Commit 4203462

Browse files
committed
[refactor](pipeline) Re-construct ownership of pipeline components (apache#49753)
In this PR, ownership of core components in pipeline engine is re-constructed following the rules below: ![image](https://github.com/user-attachments/assets/04f842b7-f052-4e3c-9005-7df3103b4d7b)
1 parent 8031a86 commit 4203462

File tree

11 files changed

+159
-107
lines changed

11 files changed

+159
-107
lines changed

be/src/pipeline/dependency.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, s
4646
return sink_deps.back().get();
4747
}
4848

49-
void Dependency::_add_block_task(PipelineTask* task) {
50-
DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != task)
49+
void Dependency::_add_block_task(std::shared_ptr<PipelineTask> task) {
50+
DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1].lock() == nullptr ||
51+
_blocked_task[_blocked_task.size() - 1].lock().get() != task.get())
5152
<< "Duplicate task: " << task->debug_string();
5253
_blocked_task.push_back(task);
5354
}
@@ -57,7 +58,7 @@ void Dependency::set_ready() {
5758
return;
5859
}
5960
_watcher.stop();
60-
std::vector<PipelineTask*> local_block_task {};
61+
std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
6162
{
6263
std::unique_lock<std::mutex> lc(_task_lock);
6364
if (_ready) {
@@ -66,12 +67,15 @@ void Dependency::set_ready() {
6667
_ready = true;
6768
local_block_task.swap(_blocked_task);
6869
}
69-
for (auto* task : local_block_task) {
70-
task->wake_up();
70+
for (auto task : local_block_task) {
71+
if (auto t = task.lock()) {
72+
std::unique_lock<std::mutex> lc(_task_lock);
73+
t->wake_up();
74+
}
7175
}
7276
}
7377

74-
Dependency* Dependency::is_blocked_by(PipelineTask* task) {
78+
Dependency* Dependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
7579
std::unique_lock<std::mutex> lc(_task_lock);
7680
auto ready = _ready.load();
7781
if (!ready && task) {
@@ -105,7 +109,7 @@ std::string RuntimeFilterDependency::debug_string(int indentation_level) {
105109
return fmt::to_string(debug_string_buffer);
106110
}
107111

108-
Dependency* RuntimeFilterDependency::is_blocked_by(PipelineTask* task) {
112+
Dependency* RuntimeFilterDependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
109113
std::unique_lock<std::mutex> lc(_task_lock);
110114
auto ready = _ready.load();
111115
if (!ready && task) {

be/src/pipeline/dependency.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
106106
[[nodiscard]] int64_t watcher_elapse_time() { return _watcher.elapsed_time(); }
107107

108108
// Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready.
109-
[[nodiscard]] virtual Dependency* is_blocked_by(PipelineTask* task = nullptr);
109+
[[nodiscard]] virtual Dependency* is_blocked_by(std::shared_ptr<PipelineTask> task = nullptr);
110110
// Notify downstream pipeline tasks this dependency is ready.
111111
void set_ready();
112112
void set_ready_to_read() {
@@ -151,7 +151,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
151151
}
152152

153153
protected:
154-
void _add_block_task(PipelineTask* task);
154+
void _add_block_task(std::shared_ptr<PipelineTask> task);
155155

156156
const int _id;
157157
const int _node_id;
@@ -162,7 +162,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
162162
MonotonicStopWatch _watcher;
163163

164164
std::mutex _task_lock;
165-
std::vector<PipelineTask*> _blocked_task;
165+
std::vector<std::weak_ptr<PipelineTask>> _blocked_task;
166166

167167
// If `_always_ready` is true, `block()` will never block tasks.
168168
std::atomic<bool> _always_ready = false;
@@ -282,7 +282,7 @@ class RuntimeFilterDependency final : public Dependency {
282282
: Dependency(id, node_id, name), _runtime_filter(runtime_filter) {}
283283
std::string debug_string(int indentation_level = 0) override;
284284

285-
Dependency* is_blocked_by(PipelineTask* task) override;
285+
Dependency* is_blocked_by(std::shared_ptr<PipelineTask> task) override;
286286

287287
private:
288288
const IRuntimeFilter* _runtime_filter = nullptr;

be/src/pipeline/pipeline.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
135135
}
136136

137137
int num_tasks_of_parent() const { return _num_tasks_of_parent; }
138+
std::string& name() { return _name; }
138139

139140
private:
140141
void _init_profile();

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,18 +133,17 @@ PipelineFragmentContext::~PipelineFragmentContext() {
133133
// The memory released by the query end is recorded in the query mem tracker.
134134
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker);
135135
auto st = _query_ctx->exec_status();
136-
_query_ctx.reset();
137136
for (size_t i = 0; i < _tasks.size(); i++) {
138137
if (!_tasks[i].empty()) {
139138
_call_back(_tasks[i].front()->runtime_state(), &st);
140139
}
141140
}
142-
_tasks.clear();
143141
for (auto& runtime_states : _task_runtime_states) {
144142
for (auto& runtime_state : runtime_states) {
145143
runtime_state.reset();
146144
}
147145
}
146+
_tasks.clear();
148147
_dag.clear();
149148
_pip_id_to_pipeline.clear();
150149
_pipelines.clear();
@@ -154,6 +153,7 @@ PipelineFragmentContext::~PipelineFragmentContext() {
154153
_runtime_filter_states.clear();
155154
_runtime_filter_mgr_map.clear();
156155
_op_id_to_le_state.clear();
156+
_query_ctx.reset();
157157
}
158158

159159
bool PipelineFragmentContext::is_timeout(timespec now) const {
@@ -449,10 +449,11 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
449449
auto cur_task_id = _total_tasks++;
450450
task_runtime_state->set_task_id(cur_task_id);
451451
task_runtime_state->set_task_num(pipeline->num_tasks());
452-
auto task = std::make_unique<PipelineTask>(pipeline, cur_task_id,
453-
task_runtime_state.get(), this,
454-
pipeline_id_to_profile[pip_idx].get(),
455-
get_local_exchange_state(pipeline), i);
452+
auto task = std::make_shared<PipelineTask>(
453+
pipeline, cur_task_id, task_runtime_state.get(),
454+
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
455+
pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline),
456+
i);
456457
pipeline->incr_created_tasks(i, task.get());
457458
task_runtime_state->set_task(task.get());
458459
pipeline_id_to_task.insert({pipeline->id(), task.get()});
@@ -1675,7 +1676,7 @@ Status PipelineFragmentContext::submit() {
16751676
auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
16761677
for (auto& task : _tasks) {
16771678
for (auto& t : task) {
1678-
st = scheduler->schedule_task(t.get());
1679+
st = scheduler->schedule_task(t);
16791680
if (!st) {
16801681
cancel(Status::InternalError("submit context to executor fail"));
16811682
std::lock_guard<std::mutex> l(_task_mutex);

be/src/pipeline/pipeline_fragment_context.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
228228

229229
OperatorPtr _root_op = nullptr;
230230
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
231-
std::vector<std::vector<std::unique_ptr<PipelineTask>>> _tasks;
231+
std::vector<std::vector<std::shared_ptr<PipelineTask>>> _tasks;
232232

233233
// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
234234
// of it in pipeline task not the fragment_context

be/src/pipeline/pipeline_task.cpp

Lines changed: 72 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,19 @@ namespace doris::pipeline {
4848

4949
PipelineTask::PipelineTask(
5050
PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
51-
PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile,
51+
std::shared_ptr<PipelineFragmentContext> fragment_context, RuntimeProfile* parent_profile,
5252
std::map<int,
5353
std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
5454
le_state_map,
5555
int task_idx)
56-
: _index(task_id),
56+
:
57+
#ifdef BE_TEST
58+
_query_id(fragment_context ? fragment_context->get_query_id() : TUniqueId()),
59+
#else
60+
_query_id(fragment_context->get_query_id()),
61+
#endif
62+
_pip_id(pipeline->id()),
63+
_index(task_id),
5764
_pipeline(pipeline),
5865
_opened(false),
5966
_state(state),
@@ -64,7 +71,9 @@ PipelineTask::PipelineTask(
6471
_root(_operators.back().get()),
6572
_sink(pipeline->sink_shared_pointer()),
6673
_le_state_map(std::move(le_state_map)),
67-
_task_idx(task_idx) {
74+
_task_idx(task_idx),
75+
_execution_dep(state->get_query_ctx()->get_execution_dependency()),
76+
_pipeline_name(_pipeline->name()) {
6877
_pipeline_task_watcher.start();
6978
_execution_dependencies.push_back(state->get_query_ctx()->get_execution_dependency());
7079
auto shared_state = _sink->create_shared_state();
@@ -117,8 +126,12 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const
117126
std::copy(deps.begin(), deps.end(),
118127
std::inserter(_execution_dependencies, _execution_dependencies.end()));
119128
}
120-
if (query_context()->is_cancelled()) {
121-
clear_blocking_state();
129+
if (auto fragment = _fragment_context.lock()) {
130+
if (fragment->get_query_ctx()->is_cancelled()) {
131+
clear_blocking_state();
132+
}
133+
} else {
134+
return Status::InternalError("Fragment already finished! Query: {}", print_id(_query_id));
122135
}
123136
return Status::OK();
124137
}
@@ -231,7 +244,7 @@ bool PipelineTask::_wait_to_start() {
231244
// 2. Runtime filter dependencies are ready
232245
// 3. All tablets are loaded into local storage
233246
for (auto* op_dep : _execution_dependencies) {
234-
_blocked_dep = op_dep->is_blocked_by(this);
247+
_blocked_dep = op_dep->is_blocked_by(shared_from_this());
235248
if (_blocked_dep != nullptr) {
236249
_blocked_dep->start_watcher();
237250
return true;
@@ -252,7 +265,7 @@ bool PipelineTask::_is_blocked() {
252265
for (int i = _read_dependencies.size() - 1; i >= 0; i--) {
253266
// `_read_dependencies` is organized according to operators. For each operator, running condition is met iff all dependencies are ready.
254267
for (auto* dep : _read_dependencies[i]) {
255-
_blocked_dep = dep->is_blocked_by(this);
268+
_blocked_dep = dep->is_blocked_by(shared_from_this());
256269
if (_blocked_dep != nullptr) {
257270
_blocked_dep->start_watcher();
258271
return true;
@@ -271,7 +284,7 @@ bool PipelineTask::_is_blocked() {
271284
}
272285

273286
for (auto* op_dep : _write_dependencies) {
274-
_blocked_dep = op_dep->is_blocked_by(this);
287+
_blocked_dep = op_dep->is_blocked_by(shared_from_this());
275288
if (_blocked_dep != nullptr) {
276289
_blocked_dep->start_watcher();
277290
return true;
@@ -281,6 +294,10 @@ bool PipelineTask::_is_blocked() {
281294
}
282295

283296
Status PipelineTask::execute(bool* eos) {
297+
auto fragment_context = _fragment_context.lock();
298+
if (!fragment_context) {
299+
return Status::InternalError("Fragment already finished! Query: {}", print_id(_query_id));
300+
}
284301
if (_eos) {
285302
*eos = true;
286303
return Status::OK();
@@ -303,11 +320,11 @@ Status PipelineTask::execute(bool* eos) {
303320
}
304321
int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
305322
_task_cpu_timer->update(delta_cpu_time);
306-
auto cpu_qs = query_context()->get_cpu_statistics();
323+
auto cpu_qs = fragment_context->get_query_ctx()->get_cpu_statistics();
307324
if (cpu_qs) {
308325
cpu_qs->add_cpu_nanos(delta_cpu_time);
309326
}
310-
query_context()->update_cpu_time(delta_cpu_time);
327+
fragment_context->get_query_ctx()->update_cpu_time(delta_cpu_time);
311328
}};
312329
if (!_wake_up_early) {
313330
RETURN_IF_ERROR(_prepare());
@@ -318,7 +335,7 @@ Status PipelineTask::execute(bool* eos) {
318335
RETURN_IF_ERROR(_prepare());
319336

320337
// The status must be runnable
321-
if (!_opened && !_fragment_context->is_canceled()) {
338+
if (!_opened && !fragment_context->is_canceled()) {
322339
DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", {
323340
auto required_pipeline_id =
324341
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
@@ -350,7 +367,7 @@ Status PipelineTask::execute(bool* eos) {
350367

351368
_task_profile->add_info_string("TaskState", "Runnable");
352369
_task_profile->add_info_string("BlockedByDependency", "");
353-
while (!_fragment_context->is_canceled()) {
370+
while (!fragment_context->is_canceled()) {
354371
SCOPED_RAW_TIMER(&time_spent);
355372
if (_is_blocked()) {
356373
return Status::OK();
@@ -359,7 +376,7 @@ Status PipelineTask::execute(bool* eos) {
359376
/// When a task is cancelled,
360377
/// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
361378
/// Here, checking whether it is cancelled to prevent tasks in a blocking state from being re-executed.
362-
if (_fragment_context->is_canceled()) {
379+
if (fragment_context->is_canceled()) {
363380
break;
364381
}
365382

@@ -428,7 +445,7 @@ Status PipelineTask::execute(bool* eos) {
428445
}
429446
}
430447

431-
RETURN_IF_ERROR(get_task_queue()->push_back(this));
448+
RETURN_IF_ERROR(get_task_queue()->push_back(shared_from_this()));
432449
return Status::OK();
433450
}
434451

@@ -489,12 +506,34 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m
489506
}
490507
}
491508

509+
void PipelineTask::stop_if_finished() {
510+
auto fragment = _fragment_context.lock();
511+
if (!fragment) {
512+
return;
513+
}
514+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker);
515+
if (auto sink = _sink) {
516+
if (sink->is_finished(_state)) {
517+
clear_blocking_state();
518+
}
519+
}
520+
}
521+
492522
void PipelineTask::finalize() {
523+
auto fragment = _fragment_context.lock();
524+
if (!fragment) {
525+
return;
526+
}
527+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker);
493528
std::unique_lock<std::mutex> lc(_dependency_lock);
494529
_finalized = true;
495530
_sink_shared_state.reset();
496531
_op_shared_states.clear();
497532
_le_state_map.clear();
533+
_block.reset();
534+
_operators.clear();
535+
_sink.reset();
536+
_pipeline.reset();
498537
}
499538

500539
Status PipelineTask::close(Status exec_status, bool close_sink) {
@@ -529,31 +568,37 @@ Status PipelineTask::close(Status exec_status, bool close_sink) {
529568
}
530569

531570
std::string PipelineTask::debug_string() {
532-
std::unique_lock<std::mutex> lc(_dependency_lock);
533571
fmt::memory_buffer debug_string_buffer;
534572

535-
fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id()));
573+
fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(_query_id));
536574
fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
537575
print_id(_state->fragment_instance_id()));
538576

577+
fmt::format_to(
578+
debug_string_buffer,
579+
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finalized = {}, dry run = "
580+
"{}, _wake_up_early = {}, is running = {}]",
581+
(void*)this, _index, _opened, _eos, _finalized, _dry_run, _wake_up_early.load(),
582+
is_running());
583+
std::unique_lock<std::mutex> lc(_dependency_lock);
539584
auto* cur_blocked_dep = _blocked_dep;
540-
auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
585+
auto fragment = _fragment_context.lock();
586+
if (is_finalized() || !fragment) {
587+
fmt::format_to(debug_string_buffer, " pipeline name = {}", _pipeline_name);
588+
return fmt::to_string(debug_string_buffer);
589+
}
590+
auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC;
541591
fmt::format_to(debug_string_buffer,
542-
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = "
543-
"{}, elapse time = {}s, _wake_up_early = {}], block dependency = {}, is "
544-
"running = {}\noperators: ",
545-
(void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
546-
_wake_up_early.load(),
547-
cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
548-
is_running());
592+
" elapse time = {}s, block dependency = [{}]\noperators: ", elapsed,
593+
cur_blocked_dep && !is_finalized() ? cur_blocked_dep->debug_string() : "NULL");
549594
for (size_t i = 0; i < _operators.size(); i++) {
550595
fmt::format_to(debug_string_buffer, "\n{}",
551596
_opened && !_finalized ? _operators[i]->debug_string(_state, i)
552597
: _operators[i]->debug_string(i));
553598
}
554599
fmt::format_to(debug_string_buffer, "\n{}\n",
555-
_opened && !_finalized ? _sink->debug_string(_state, _operators.size())
556-
: _sink->debug_string(_operators.size()));
600+
_opened && !is_finalized() ? _sink->debug_string(_state, _operators.size())
601+
: _sink->debug_string(_operators.size()));
557602
if (_finalized) {
558603
return fmt::to_string(debug_string_buffer);
559604
}
@@ -588,10 +633,7 @@ std::string PipelineTask::debug_string() {
588633

589634
void PipelineTask::wake_up() {
590635
// call by dependency
591-
static_cast<void>(get_task_queue()->push_back(this));
636+
static_cast<void>(get_task_queue()->push_back(shared_from_this()));
592637
}
593638

594-
QueryContext* PipelineTask::query_context() {
595-
return _fragment_context->get_query_ctx();
596-
}
597639
} // namespace doris::pipeline

0 commit comments

Comments
 (0)