Skip to content

Commit 759542a

Browse files
XLPEXLPE
authored andcommitted
[fix](pipeline) premature exit causing core dump during concurrent prepare execution (apache#51492)
Issue Number: close apache#51491 Problem Summary: When the queue of the FragmentMgrAsync thread pool is full, newly submitted tasks are rejected and return early. However, previously submitted tasks may still be scheduled for execution later. This can lead to premature destruction of objects such as PipelineFragmentContext and TPipelineFragmentParams that are referenced by those tasks, resulting in null pointer exceptions during task execution and ultimately causing a coredump. The pr policy is to wait until all previously submitted tasks are completed before returning. ``` *** SIGSEGV address not mapped to object (@0x1c8) received by PID 3941201 (TID 2115617 OR 0xfe1685bb97f0) from PID 456; stack trace: *** 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/common/signal_handler.h:421 1# os::Linux::chained_handler(int, siginfo_t*, void*) in /usr/jdk64/current/jre/lib/aarch64/server/libjvm.so 2# JVM_handle_linux_signal in /usr/jdk64/current/jre/lib/aarch64/server/libjvm.so 3# signalHandler(int, siginfo_t*, void*) in /usr/jdk64/current/jre/lib/aarch64/server/libjvm.so 4# 0x0000FFFF6B2A07C0 in linux-vdso.so.1 5# doris::TUniqueId::TUniqueId(doris::TUniqueId const&) at /home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/gensrc/build/gen_cpp/Types_types.cpp:2354 6# doris::AttachTask::AttachTask(doris::QueryContext*) at /home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/runtime/thread_context.cpp:60 7# std::_Function_handler<void (), doris::pipeline::PipelineXFragmentContext::_build_pipeline_x_tasks(doris::TPipelineFragmentParams const&, doris::ThreadPool*)::$_0>::_M_invoke(std::_Any_data const&) at /usr/lib/gcc/aarch64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290 8# doris::ThreadPool::dispatch_thread() at /home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/util/threadpool.cpp:552 9# doris::Thread::supervise_thread(void*) at /home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/util/thread.cpp:499 10# 0x0000FFFF6AF187AC in /lib64/libpthread.so.0 11# 0x0000FFFF6B16548C in /lib64/libc.so.6 ``` Co-authored-by: XLPE <weiwh1@chinatelecom.cn>
1 parent 3488cd9 commit 759542a

File tree

2 files changed

+27
-18
lines changed

2 files changed

+27
-18
lines changed

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
#include "runtime/thread_context.h"
107107
#include "service/backend_options.h"
108108
#include "util/container_util.hpp"
109+
#include "util/countdown_latch.h"
109110
#include "util/debug_util.h"
110111
#include "util/uid_util.h"
111112
#include "vec/common/sort/heap_sorter.h"
@@ -513,27 +514,28 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
513514
target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
514515
// If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
515516
std::vector<Status> prepare_status(target_size);
516-
std::mutex m;
517-
std::condition_variable cv;
518-
int prepare_done = 0;
517+
int submitted_tasks = 0;
518+
Status submit_status;
519+
CountDownLatch latch((int)target_size);
519520
for (size_t i = 0; i < target_size; i++) {
520-
RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
521+
submit_status = thread_pool->submit_func([&, i]() {
521522
SCOPED_ATTACH_TASK(_query_ctx.get());
522523
prepare_status[i] = pre_and_submit(i, this);
523-
std::unique_lock<std::mutex> lock(m);
524-
prepare_done++;
525-
if (prepare_done == target_size) {
526-
cv.notify_one();
527-
}
528-
}));
529-
}
530-
std::unique_lock<std::mutex> lock(m);
531-
if (prepare_done != target_size) {
532-
cv.wait(lock);
533-
for (size_t i = 0; i < target_size; i++) {
534-
if (!prepare_status[i].ok()) {
535-
return prepare_status[i];
536-
}
524+
latch.count_down();
525+
});
526+
if (LIKELY(submit_status.ok())) {
527+
submitted_tasks++;
528+
} else {
529+
break;
530+
}
531+
}
532+
latch.arrive_and_wait(target_size - submitted_tasks);
533+
if (UNLIKELY(!submit_status.ok())) {
534+
return submit_status;
535+
}
536+
for (int i = 0; i < submitted_tasks; i++) {
537+
if (!prepare_status[i].ok()) {
538+
return prepare_status[i];
537539
}
538540
}
539541
} else {

be/src/util/countdown_latch.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ class CountDownLatch {
9191
}
9292
}
9393

94+
// decrements the internal counter by n and blocks the calling thread until the counter reaches zero.
95+
void arrive_and_wait(uint64_t n) {
96+
DCHECK_GE(n, 0);
97+
count_down(n);
98+
wait();
99+
}
100+
94101
uint64_t count() const {
95102
std::lock_guard<std::mutex> lock(_lock);
96103
return _count;

0 commit comments

Comments
 (0)