Skip to content

Commit b4f5bec

Browse files
committed
[refactor](pipeline) Re-construct ownership of pipeline components (apache#49753)
In this PR, ownership of core components in pipeline engine is re-constructed following the rules below: ![image](https://github.com/user-attachments/assets/04f842b7-f052-4e3c-9005-7df3103b4d7b)
1 parent e78970f commit b4f5bec

File tree

10 files changed

+124
-96
lines changed

10 files changed

+124
-96
lines changed

be/src/pipeline/dependency.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, s
4646
return sink_deps.back().get();
4747
}
4848

49-
void Dependency::_add_block_task(PipelineTask* task) {
50-
DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != task)
49+
void Dependency::_add_block_task(std::shared_ptr<PipelineTask> task) {
50+
DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1].lock() == nullptr ||
51+
_blocked_task[_blocked_task.size() - 1].lock().get() != task.get())
5152
<< "Duplicate task: " << task->debug_string();
5253
_blocked_task.push_back(task);
5354
}
@@ -57,7 +58,7 @@ void Dependency::set_ready() {
5758
return;
5859
}
5960
_watcher.stop();
60-
std::vector<PipelineTask*> local_block_task {};
61+
std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
6162
{
6263
std::unique_lock<std::mutex> lc(_task_lock);
6364
if (_ready) {
@@ -66,12 +67,15 @@ void Dependency::set_ready() {
6667
_ready = true;
6768
local_block_task.swap(_blocked_task);
6869
}
69-
for (auto* task : local_block_task) {
70-
task->wake_up();
70+
for (auto task : local_block_task) {
71+
if (auto t = task.lock()) {
72+
std::unique_lock<std::mutex> lc(_task_lock);
73+
THROW_IF_ERROR(t->wake_up(this));
74+
}
7175
}
7276
}
7377

74-
Dependency* Dependency::is_blocked_by(PipelineTask* task) {
78+
Dependency* Dependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
7579
std::unique_lock<std::mutex> lc(_task_lock);
7680
auto ready = _ready.load();
7781
if (!ready && task) {

be/src/pipeline/dependency.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
106106
[[nodiscard]] int64_t watcher_elapse_time() { return _watcher.elapsed_time(); }
107107

108108
// Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready.
109-
[[nodiscard]] virtual Dependency* is_blocked_by(PipelineTask* task = nullptr);
109+
[[nodiscard]] virtual Dependency* is_blocked_by(std::shared_ptr<PipelineTask> task = nullptr);
110110
// Notify downstream pipeline tasks this dependency is ready.
111111
void set_ready();
112112
void set_ready_to_read() {
@@ -151,7 +151,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
151151
}
152152

153153
protected:
154-
void _add_block_task(PipelineTask* task);
154+
void _add_block_task(std::shared_ptr<PipelineTask> task);
155155

156156
const int _id;
157157
const int _node_id;
@@ -162,7 +162,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
162162
MonotonicStopWatch _watcher;
163163

164164
std::mutex _task_lock;
165-
std::vector<PipelineTask*> _blocked_task;
165+
std::vector<std::weak_ptr<PipelineTask>> _blocked_task;
166166

167167
// If `_always_ready` is true, `block()` will never block tasks.
168168
std::atomic<bool> _always_ready = false;

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,18 +133,17 @@ PipelineFragmentContext::~PipelineFragmentContext() {
133133
// The memory released by the query end is recorded in the query mem tracker.
134134
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker);
135135
auto st = _query_ctx->exec_status();
136-
_query_ctx.reset();
137136
for (size_t i = 0; i < _tasks.size(); i++) {
138137
if (!_tasks[i].empty()) {
139138
_call_back(_tasks[i].front()->runtime_state(), &st);
140139
}
141140
}
142-
_tasks.clear();
143141
for (auto& runtime_states : _task_runtime_states) {
144142
for (auto& runtime_state : runtime_states) {
145143
runtime_state.reset();
146144
}
147145
}
146+
_tasks.clear();
148147
_dag.clear();
149148
_pip_id_to_pipeline.clear();
150149
_pipelines.clear();
@@ -154,6 +153,8 @@ PipelineFragmentContext::~PipelineFragmentContext() {
154153
_runtime_filter_states.clear();
155154
_runtime_filter_mgr_map.clear();
156155
_op_id_to_le_state.clear();
156+
_op_id_to_shared_state.clear();
157+
_query_ctx.reset();
157158
}
158159

159160
bool PipelineFragmentContext::is_timeout(timespec now) const {
@@ -445,10 +446,10 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
445446
auto cur_task_id = _total_tasks++;
446447
task_runtime_state->set_task_id(cur_task_id);
447448
task_runtime_state->set_task_num(pipeline->num_tasks());
448-
auto task = std::make_unique<PipelineTask>(pipeline, cur_task_id,
449-
task_runtime_state.get(), this,
450-
pipeline_id_to_profile[pip_idx].get(),
451-
get_local_exchange_state(pipeline), i);
449+
auto task = std::make_shared<PipelineTask>(
450+
pipeline, cur_task_id, task_runtime_state.get(), shared_from_this(),
451+
pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline),
452+
i);
452453
pipeline->incr_created_tasks(i, task.get());
453454
task_runtime_state->set_task(task.get());
454455
pipeline_id_to_task.insert({pipeline->id(), task.get()});
@@ -1671,7 +1672,7 @@ Status PipelineFragmentContext::submit() {
16711672
auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
16721673
for (auto& task : _tasks) {
16731674
for (auto& t : task) {
1674-
st = scheduler->schedule_task(t.get());
1675+
st = scheduler->schedule_task(t);
16751676
if (!st) {
16761677
cancel(Status::InternalError("submit context to executor fail"));
16771678
std::lock_guard<std::mutex> l(_task_mutex);

be/src/pipeline/pipeline_fragment_context.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
228228

229229
OperatorPtr _root_op = nullptr;
230230
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
231-
std::vector<std::vector<std::unique_ptr<PipelineTask>>> _tasks;
231+
std::vector<std::vector<std::shared_ptr<PipelineTask>>> _tasks;
232232

233233
// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
234234
// of it in pipeline task not the fragment_context

be/src/pipeline/pipeline_task.cpp

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,18 @@ namespace doris::pipeline {
4848

4949
PipelineTask::PipelineTask(
5050
PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
51-
PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile,
51+
std::shared_ptr<PipelineFragmentContext> fragment_context, RuntimeProfile* parent_profile,
5252
std::map<int,
5353
std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
5454
le_state_map,
5555
int task_idx)
56-
: _index(task_id),
56+
:
57+
#ifdef BE_TEST
58+
_query_id(fragment_context ? fragment_context->get_query_id() : TUniqueId()),
59+
#else
60+
_query_id(fragment_context->get_query_id()),
61+
#endif
62+
_index(task_id),
5763
_pipeline(pipeline),
5864
_opened(false),
5965
_state(state),
@@ -120,8 +126,10 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const
120126
std::unique_lock<std::mutex> lc(_dependency_lock);
121127
filter_dependencies.swap(_filter_dependencies);
122128
}
123-
if (query_context()->is_cancelled()) {
124-
clear_blocking_state();
129+
if (auto fragment = _fragment_context.lock()) {
130+
if (fragment->get_query_ctx()->is_cancelled()) {
131+
clear_blocking_state();
132+
}
125133
}
126134
return Status::OK();
127135
}
@@ -225,14 +233,14 @@ bool PipelineTask::_wait_to_start() {
225233
// Before task starting, we should make sure
226234
// 1. Execution dependency is ready (which is controlled by FE 2-phase commit)
227235
// 2. Runtime filter dependencies are ready
228-
_blocked_dep = _execution_dep->is_blocked_by(this);
236+
_blocked_dep = _execution_dep->is_blocked_by(shared_from_this());
229237
if (_blocked_dep != nullptr) {
230238
static_cast<Dependency*>(_blocked_dep)->start_watcher();
231239
return true;
232240
}
233241

234242
for (auto* op_dep : _filter_dependencies) {
235-
_blocked_dep = op_dep->is_blocked_by(this);
243+
_blocked_dep = op_dep->is_blocked_by(shared_from_this());
236244
if (_blocked_dep != nullptr) {
237245
_blocked_dep->start_watcher();
238246
return true;
@@ -253,7 +261,7 @@ bool PipelineTask::_is_blocked() {
253261
for (int i = _read_dependencies.size() - 1; i >= 0; i--) {
254262
// `_read_dependencies` is organized according to operators. For each operator, running condition is met iff all dependencies are ready.
255263
for (auto* dep : _read_dependencies[i]) {
256-
_blocked_dep = dep->is_blocked_by(this);
264+
_blocked_dep = dep->is_blocked_by(shared_from_this());
257265
if (_blocked_dep != nullptr) {
258266
_blocked_dep->start_watcher();
259267
return true;
@@ -272,7 +280,7 @@ bool PipelineTask::_is_blocked() {
272280
}
273281

274282
for (auto* op_dep : _write_dependencies) {
275-
_blocked_dep = op_dep->is_blocked_by(this);
283+
_blocked_dep = op_dep->is_blocked_by(shared_from_this());
276284
if (_blocked_dep != nullptr) {
277285
_blocked_dep->start_watcher();
278286
return true;
@@ -282,6 +290,8 @@ bool PipelineTask::_is_blocked() {
282290
}
283291

284292
Status PipelineTask::execute(bool* eos) {
293+
auto fragment_context = _fragment_context.lock();
294+
DCHECK(fragment_context);
285295
if (_eos) {
286296
*eos = true;
287297
return Status::OK();
@@ -318,7 +328,7 @@ Status PipelineTask::execute(bool* eos) {
318328
}
319329

320330
// The status must be runnable
321-
if (!_opened && !_fragment_context->is_canceled()) {
331+
if (!_opened && !fragment_context->is_canceled()) {
322332
DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", {
323333
auto required_pipeline_id =
324334
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
@@ -350,7 +360,7 @@ Status PipelineTask::execute(bool* eos) {
350360

351361
_task_profile->add_info_string("TaskState", "Runnable");
352362
_task_profile->add_info_string("BlockedByDependency", "");
353-
while (!_fragment_context->is_canceled()) {
363+
while (!fragment_context->is_canceled()) {
354364
SCOPED_RAW_TIMER(&time_spent);
355365
if (_is_blocked()) {
356366
return Status::OK();
@@ -359,7 +369,7 @@ Status PipelineTask::execute(bool* eos) {
359369
/// When a task is cancelled,
360370
/// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
361371
/// Here, checking whether it is cancelled to prevent tasks in a blocking state from being re-executed.
362-
if (_fragment_context->is_canceled()) {
372+
if (fragment_context->is_canceled()) {
363373
break;
364374
}
365375

@@ -428,7 +438,7 @@ Status PipelineTask::execute(bool* eos) {
428438
}
429439
}
430440

431-
RETURN_IF_ERROR(get_task_queue()->push_back(this));
441+
RETURN_IF_ERROR(get_task_queue()->push_back(shared_from_this()));
432442
return Status::OK();
433443
}
434444

@@ -490,6 +500,10 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m
490500
}
491501

492502
void PipelineTask::finalize() {
503+
auto fragment = _fragment_context.lock();
504+
if (!fragment) {
505+
return Status::OK();
506+
}
493507
std::unique_lock<std::mutex> lc(_dependency_lock);
494508
_finalized = true;
495509
_sink_shared_state.reset();
@@ -532,28 +546,34 @@ std::string PipelineTask::debug_string() {
532546
std::unique_lock<std::mutex> lc(_dependency_lock);
533547
fmt::memory_buffer debug_string_buffer;
534548

535-
fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id()));
549+
fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(_query_id));
536550
fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
537551
print_id(_state->fragment_instance_id()));
538552

553+
fmt::format_to(
554+
debug_string_buffer,
555+
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finalized = {}, dry run = "
556+
"{}, _wake_up_early = {}, is running = {}]",
557+
(void*)this, _index, _opened, _eos, _finalized, _dry_run, _wake_up_early.load(),
558+
is_running());
559+
std::unique_lock<std::mutex> lc(_dependency_lock);
539560
auto* cur_blocked_dep = _blocked_dep;
540-
auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
561+
auto fragment = _fragment_context.lock();
562+
if (is_finalized() || !fragment) {
563+
return fmt::to_string(debug_string_buffer);
564+
}
565+
auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC;
541566
fmt::format_to(debug_string_buffer,
542-
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = "
543-
"{}, elapse time = {}s, _wake_up_early = {}], block dependency = {}, is "
544-
"running = {}\noperators: ",
545-
(void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
546-
_wake_up_early.load(),
547-
cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
548-
is_running());
567+
" elapse time = {}s, block dependency = [{}]\noperators: ", elapsed,
568+
cur_blocked_dep && !is_finalized() ? cur_blocked_dep->debug_string() : "NULL");
549569
for (size_t i = 0; i < _operators.size(); i++) {
550570
fmt::format_to(debug_string_buffer, "\n{}",
551571
_opened && !_finalized ? _operators[i]->debug_string(_state, i)
552572
: _operators[i]->debug_string(i));
553573
}
554574
fmt::format_to(debug_string_buffer, "\n{}\n",
555-
_opened && !_finalized ? _sink->debug_string(_state, _operators.size())
556-
: _sink->debug_string(_operators.size()));
575+
_opened && !is_finalized() ? _sink->debug_string(_state, _operators.size())
576+
: _sink->debug_string(_operators.size()));
557577
if (_finalized) {
558578
return fmt::to_string(debug_string_buffer);
559579
}
@@ -588,10 +608,7 @@ std::string PipelineTask::debug_string() {
588608

589609
void PipelineTask::wake_up() {
590610
// call by dependency
591-
static_cast<void>(get_task_queue()->push_back(this));
611+
static_cast<void>(get_task_queue()->push_back(shared_from_this()));
592612
}
593613

594-
QueryContext* PipelineTask::query_context() {
595-
return _fragment_context->get_query_ctx();
596-
}
597614
} // namespace doris::pipeline

0 commit comments

Comments
 (0)