Skip to content

Commit 752f7b6

Browse files
committed
update
1 parent b4f5bec commit 752f7b6

File tree

8 files changed

+34
-21
lines changed

8 files changed

+34
-21
lines changed

be/src/pipeline/dependency.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ void Dependency::set_ready() {
7070
for (auto task : local_block_task) {
7171
if (auto t = task.lock()) {
7272
std::unique_lock<std::mutex> lc(_task_lock);
73-
THROW_IF_ERROR(t->wake_up(this));
73+
t->wake_up();
7474
}
7575
}
7676
}
@@ -109,7 +109,7 @@ std::string RuntimeFilterDependency::debug_string(int indentation_level) {
109109
return fmt::to_string(debug_string_buffer);
110110
}
111111

112-
Dependency* RuntimeFilterDependency::is_blocked_by(PipelineTask* task) {
112+
Dependency* RuntimeFilterDependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
113113
std::unique_lock<std::mutex> lc(_task_lock);
114114
auto ready = _ready.load();
115115
if (!ready && task) {

be/src/pipeline/dependency.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ class RuntimeFilterDependency final : public Dependency {
282282
: Dependency(id, node_id, name), _runtime_filter(runtime_filter) {}
283283
std::string debug_string(int indentation_level = 0) override;
284284

285-
Dependency* is_blocked_by(PipelineTask* task) override;
285+
Dependency* is_blocked_by(std::shared_ptr<PipelineTask> task) override;
286286

287287
private:
288288
const IRuntimeFilter* _runtime_filter = nullptr;

be/src/pipeline/pipeline.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
135135
}
136136

137137
int num_tasks_of_parent() const { return _num_tasks_of_parent; }
138+
std::string& name() { return _name; }
138139

139140
private:
140141
void _init_profile();

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ PipelineFragmentContext::~PipelineFragmentContext() {
153153
_runtime_filter_states.clear();
154154
_runtime_filter_mgr_map.clear();
155155
_op_id_to_le_state.clear();
156-
_op_id_to_shared_state.clear();
157156
_query_ctx.reset();
158157
}
159158

@@ -447,7 +446,8 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
447446
task_runtime_state->set_task_id(cur_task_id);
448447
task_runtime_state->set_task_num(pipeline->num_tasks());
449448
auto task = std::make_shared<PipelineTask>(
450-
pipeline, cur_task_id, task_runtime_state.get(), shared_from_this(),
449+
pipeline, cur_task_id, task_runtime_state.get(),
450+
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
451451
pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline),
452452
i);
453453
pipeline->incr_created_tasks(i, task.get());

be/src/pipeline/pipeline_task.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ PipelineTask::PipelineTask(
5959
#else
6060
_query_id(fragment_context->get_query_id()),
6161
#endif
62+
_pip_id(pipeline->id()),
6263
_index(task_id),
6364
_pipeline(pipeline),
6465
_opened(false),
@@ -71,7 +72,8 @@ PipelineTask::PipelineTask(
7172
_sink(pipeline->sink_shared_pointer()),
7273
_le_state_map(std::move(le_state_map)),
7374
_task_idx(task_idx),
74-
_execution_dep(state->get_query_ctx()->get_execution_dependency()) {
75+
_execution_dep(state->get_query_ctx()->get_execution_dependency()),
76+
_pipeline_name(_pipeline->name()){
7577
_pipeline_task_watcher.start();
7678

7779
auto shared_state = _sink->create_shared_state();
@@ -314,11 +316,11 @@ Status PipelineTask::execute(bool* eos) {
314316
}
315317
int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
316318
_task_cpu_timer->update(delta_cpu_time);
317-
auto cpu_qs = query_context()->get_cpu_statistics();
319+
auto cpu_qs = fragment_context->get_query_ctx()->get_cpu_statistics();
318320
if (cpu_qs) {
319321
cpu_qs->add_cpu_nanos(delta_cpu_time);
320322
}
321-
query_context()->update_cpu_time(delta_cpu_time);
323+
fragment_context->get_query_ctx()->update_cpu_time(delta_cpu_time);
322324
}};
323325
if (_wait_to_start()) {
324326
if (config::enable_prefetch_tablet) {
@@ -502,13 +504,18 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m
502504
void PipelineTask::finalize() {
503505
auto fragment = _fragment_context.lock();
504506
if (!fragment) {
505-
return Status::OK();
507+
return;
506508
}
509+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker);
507510
std::unique_lock<std::mutex> lc(_dependency_lock);
508511
_finalized = true;
509512
_sink_shared_state.reset();
510513
_op_shared_states.clear();
511514
_le_state_map.clear();
515+
_block.reset();
516+
_operators.clear();
517+
_sink.reset();
518+
_pipeline.reset();
512519
}
513520

514521
Status PipelineTask::close(Status exec_status, bool close_sink) {
@@ -543,7 +550,6 @@ Status PipelineTask::close(Status exec_status, bool close_sink) {
543550
}
544551

545552
std::string PipelineTask::debug_string() {
546-
std::unique_lock<std::mutex> lc(_dependency_lock);
547553
fmt::memory_buffer debug_string_buffer;
548554

549555
fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(_query_id));
@@ -560,6 +566,7 @@ std::string PipelineTask::debug_string() {
560566
auto* cur_blocked_dep = _blocked_dep;
561567
auto fragment = _fragment_context.lock();
562568
if (is_finalized() || !fragment) {
569+
fmt::format_to(debug_string_buffer, " pipeline name = {}", _pipeline_name);
563570
return fmt::to_string(debug_string_buffer);
564571
}
565572
auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC;

be/src/pipeline/pipeline_task.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ class Dependency;
4848
class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
4949
public:
5050
PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
51-
std::shared_ptr<PipelineFragmentContext> fragment_context, RuntimeProfile* parent_profile,
51+
std::shared_ptr<PipelineFragmentContext> fragment_context,
52+
RuntimeProfile* parent_profile,
5253
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
5354
std::shared_ptr<Dependency>>>
5455
le_state_map,
@@ -224,6 +225,8 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
224225

225226
RuntimeState* runtime_state() const { return _state; }
226227

228+
RuntimeProfile* get_task_profile() const { return _task_profile.get(); }
229+
227230
std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }
228231

229232
void stop_if_finished() {
@@ -235,7 +238,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
235238
}
236239
}
237240

238-
PipelineId pipeline_id() const { return _pipeline->id(); }
241+
PipelineId pipeline_id() const { return _pip_id; }
239242

240243
bool wake_up_early() const { return _wake_up_early; }
241244

@@ -250,6 +253,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
250253
Status _open();
251254

252255
const TUniqueId _query_id;
256+
const PipelineId _pip_id;
253257
const uint32_t _index;
254258
PipelinePtr _pipeline;
255259
bool _has_exceed_timeout = false;
@@ -258,7 +262,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
258262
int _previous_schedule_id = -1;
259263
uint32_t _schedule_time = 0;
260264
std::unique_ptr<doris::vectorized::Block> _block;
261-
std::weak_ptr<PipelineFragmentContext> _fragment_context = nullptr;
265+
std::weak_ptr<PipelineFragmentContext> _fragment_context;
262266
TaskQueue* _task_queue = nullptr;
263267

264268
// used for priority queue
@@ -321,6 +325,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
321325
std::atomic<bool> _running = false;
322326
std::atomic<bool> _eos = false;
323327
std::atomic<bool> _wake_up_early = false;
328+
const std::string _pipeline_name;
324329
};
325330

326331
using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;

be/src/pipeline/task_queue.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ class TaskQueue {
4040
virtual void close() = 0;
4141
// Get the task by core id.
4242
// TODO: To think the logic is useful?
43-
virtual PipelineTask* take(int core_id) = 0;
43+
virtual PipelineTaskSPtr take(int core_id) = 0;
4444

4545
// push from scheduler
46-
virtual Status push_back(PipelineTask* task) = 0;
46+
virtual Status push_back(PipelineTaskSPtr task) = 0;
4747

4848
// push from worker
49-
virtual Status push_back(PipelineTask* task, int core_id) = 0;
49+
virtual Status push_back(PipelineTaskSPtr task, int core_id) = 0;
5050

5151
virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
5252

@@ -133,7 +133,7 @@ class MultiCoreTaskQueue : public TaskQueue {
133133
void close() override;
134134

135135
// Get the task by core id.
136-
PipelineTask* take(int core_id) override;
136+
PipelineTaskSPtr take(int core_id) override;
137137

138138
// TODO combine these methods to `push_back(task, core_id = -1)`
139139
Status push_back(PipelineTaskSPtr task) override;

be/src/pipeline/task_scheduler.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ void _close_task(PipelineTask* task, Status exec_status, PipelineFragmentContext
9292

9393
void TaskScheduler::_do_work(size_t index) {
9494
while (_markers[index]) {
95-
auto task = _task_queue.take(index);
95+
auto task = _task_queue->take(index);
9696
if (!task) {
9797
continue;
9898
}
@@ -122,7 +122,7 @@ void TaskScheduler::_do_work(size_t index) {
122122
// If pipeline is canceled, it will report after pipeline closed, and will propagate
123123
// errors to downstream through exchange. So, here we needn't send_report.
124124
// fragment_ctx->send_report(true);
125-
_close_task(task, fragment_context->get_query_ctx()->exec_status(),
125+
_close_task(task.get(), fragment_context->get_query_ctx()->exec_status(),
126126
fragment_context.get());
127127
continue;
128128
}
@@ -166,7 +166,7 @@ void TaskScheduler::_do_work(size_t index) {
166166
LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}",
167167
print_id(fragment_context->get_query_ctx()->query_id()),
168168
status.to_string());
169-
_close_task(task, status, fragment_context.get());
169+
_close_task(task.get(), status, fragment_context.get());
170170
continue;
171171
}
172172
fragment_context->trigger_report_if_necessary();
@@ -179,7 +179,7 @@ void TaskScheduler::_do_work(size_t index) {
179179
task->set_running(false);
180180
} else {
181181
Status exec_status = fragment_context->get_query_ctx()->exec_status();
182-
_close_task(task, exec_status, fragment_context.get());
182+
_close_task(task.get(), exec_status, fragment_context.get());
183183
}
184184
continue;
185185
}

0 commit comments

Comments
 (0)