Skip to content

Commit 8ddc32a

Browse files
committed
[fix](cloud) Fix lifecycle in lambda function (apache#53605)
==15185==ERROR: AddressSanitizer: heap-use-after-free on address 0x7da44ced96d0 at pc 0x55795dac5f46 bp 0x7bc26b9a3e50 sp 0x7bc26b9a3e48 READ of size 8 at 0x7da44ced96d0 thread T1357 #0 0x55795dac5f45 in std::_Function_base::_M_empty() const /usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:249:37 apache#1 0x55795dac5f45 in std::function<doris::Status ()>::operator()() const /usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:591:6 apache#2 0x55795dac5f45 in doris::cloud::bthread_fork_join(std::vector<std::function<doris::Status ()>, std::allocator<std::function<doris::Status ()>>> const&, int)::$_0::operator()() const /root/doris/be/src/cloud/cloud_meta_mgr.cpp:106:23 apache#3 0x55795dac5f45 in void std::_invoke_impl<void, doris::cloud::bthread_fork_join(std::vector<std::function<doris::Status ()>, std::allocator<std::function<doris::Status ()>>> const&, int)::$_0&>(std::_invoke_other, doris::cloud::bthread_fork_join(std::vector<std::function<doris::Status ()>, std::allocator<std::function<doris::Status ()>>> const&, int)::$_0&) /usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/invoke.h:63:14 apache#4 0x55795dac5f45 in std::enable_if<is_invocable_r_v<void, doris::cloud::bthread_fork_join(std::vector<std::function<doris::Status ()>, std::allocator<std::function<doris::Status ()>>> const&, int)::$0&>, void>::type std::_invoke_r<void, doris::cloud::bthread_fork_join(std::vector<std::function<doris::Status ()>, std::allocator<std::function<doris::Status ()>>> const&, int)::$_0&>(doris::cloud::bthread_fork_join(std::vector<std::function<doris::Status ()>, std::allocator<std::function<doris::Status ()>>> const&, int)::$_0&) /usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/invoke.h:113:2 apache#5 0x55795dac5f45 in std::_Function_handler<void (), doris::cloud::bthread_fork_join(std::vector<std::function<doris::Status ()>, std::allocator<std::function<doris::Status ()>>> const&, int)::$_0>::_M_invoke(std::_Any_data const&) /usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:292:9 apache#6 0x55795da78d1d in std::function<void ()>::operator()() const /usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:593:9 apache#7 0x55795da78d1d in doris::cloud::run_bthread_work(void*) /root/doris/be/src/cloud/cloud_meta_mgr.cpp:74:5 apache#8 0x55795f3da334 in bthread::TaskGroup::task_runner(long) (/home/work/unlimit_teamcity/TeamCity/Agents/20250718121603agent_172.16.0.202_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x50a4e334) apache#9 0x55795f3ccc80 in bthread_make_fcontext (/home/work/unlimit_teamcity/TeamCity/Agents/20250718121603agent_172.16.0.202_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x50a40c80) 0x7da44ced96d0 is located 2640 bytes inside of 2688-byte region [0x7da44ced8c80,0x7da44ced9700) freed by thread T1431 here: #0 0x5579380e1f12 in operator delete(void*, unsigned long) (/home/work/unlimit_teamcity/TeamCity/Agents/20250718121603agent_172.16.0.202_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x29755f12) previously allocated by thread T795 (brpc_light) here: #0 0x5579380e12ad in operator new(unsigned long) (/home/work/unlimit_teamcity/TeamCity/Agents/20250718121603agent_172.16.0.202_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x297552ad) apache#1 0x55795c7b57ab in std::__new_allocator<std::function<doris::Status ()>>::allocate(unsigned long, void const*) /usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/new_allocator.h:151:27 apache#2 0x55795c7b57ab in std::allocator<std::function<doris::Status ()>>::allocate(unsigned long) /usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/allocator.h:203:32 apache#3 0x55795c7b57ab in std::allocator_traits<std::allocator<std::function<doris::Status ()>>>::allocate(std::allocator<std::function<doris::Status ()>>&, unsigned long) /usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/alloc_traits.h:614:20 apache#4 0x55795c7b57ab in std::_Vector_base<std::function<doris::Status ()>, std::allocator<std::function<doris::Status ()>>>::_M_allocate(unsigned long) /usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/stl_vector.h:387:20 apache#5 0x55795c7b57ab in std::vector<std::function<doris::Status ()>, std::allocator<std::function<doris::Status ()>>>::reserve(unsigned long) /usr/local/ldb-toolchain-v0.25/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/vector.tcc:79:22 apache#6 0x55795c7787f6 in doris::pipeline::OlapScanLocalState::_sync_cloud_tablets(doris::RuntimeState*) /root/doris/be/src/pipeline/exec/olap_scan_operator.cpp:466:20 apache#7 0x55795c777e7d in doris::pipeline::OlapScanLocalState::init(doris::RuntimeState*, doris::pipeline::LocalStateInfo&) /root/doris/be/src/pipeline/exec/olap_scan_operator.cpp:51:5 apache#8 0x55795b459bf9 in doris::pipeline::OperatorX<doris::pipeline::OlapScanLocalState>::setup_local_state(doris::RuntimeState*, doris::pipeline::LocalStateInfo&) /root/doris/be/src/pipeline/exec/operator.cpp:486:5 apache#9 0x55795d9c9a36 in doris::pipeline::PipelineTask::prepare(std::vector<doris::TScanRangeParams, std::allocator<doris::TScanRangeParams>> const&, int, doris::TDataSink const&) /root/doris/be/src/pipeline/pipeline_task.cpp:118:9
1 parent 0e63e36 commit 8ddc32a

File tree

5 files changed

+13
-11
lines changed

5 files changed

+13
-11
lines changed

be/src/cloud/cloud_meta_mgr.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,13 @@ Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int
132132
return status;
133133
}
134134

135-
Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency,
135+
Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency,
136136
std::future<Status>* fut) {
137137
// std::function will cause `copy`, we need to use heap memory to avoid copy ctor called
138138
auto prom = std::make_shared<std::promise<Status>>();
139139
*fut = prom->get_future();
140-
std::function<void()>* fn =
141-
new std::function<void()>([&tasks, concurrency, p = std::move(prom)]() mutable {
140+
std::function<void()>* fn = new std::function<void()>(
141+
[tasks = std::move(tasks), concurrency, p = std::move(prom)]() mutable {
142142
p->set_value(bthread_fork_join(tasks, concurrency));
143143
});
144144

be/src/cloud/cloud_meta_mgr.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int
5757

5858
// An async wrap of `bthread_fork_join` declared previously using promise-future
5959
// return OK if fut successfully created, otherwise return error
60-
Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency,
60+
Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency,
6161
std::future<Status>* fut);
6262

6363
class CloudMetaMgr {

be/src/pipeline/exec/olap_scan_operator.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) {
466466
_cloud_tablet_dependency = Dependency::create_shared(
467467
_parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP");
468468
_tablets.resize(_scan_ranges.size());
469-
_tasks.reserve(_scan_ranges.size());
469+
std::vector<std::function<Status()>> tasks;
470470
_sync_statistics.resize(_scan_ranges.size());
471471
for (size_t i = 0; i < _scan_ranges.size(); i++) {
472472
auto* sync_stats = &_sync_statistics[i];
@@ -475,7 +475,7 @@ Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) {
475475
_scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(),
476476
version);
477477
auto task_ctx = state->get_task_execution_context();
478-
_tasks.emplace_back([this, sync_stats, version, i, task_ctx]() {
478+
tasks.emplace_back([this, sync_stats, version, i, task_ctx]() {
479479
auto task_lock = task_ctx.lock();
480480
if (task_lock == nullptr) {
481481
return Status::OK();
@@ -497,8 +497,9 @@ Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) {
497497
return Status::OK();
498498
});
499499
}
500-
RETURN_IF_ERROR(cloud::bthread_fork_join(
501-
_tasks, config::init_scanner_sync_rowsets_parallelism, &_cloud_tablet_future));
500+
RETURN_IF_ERROR(cloud::bthread_fork_join(std::move(tasks),
501+
config::init_scanner_sync_rowsets_parallelism,
502+
&_cloud_tablet_future));
502503
}
503504
_sync_tablet = true;
504505
}

be/src/pipeline/exec/olap_scan_operator.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
102102

103103
std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
104104
std::vector<SyncRowsetStats> _sync_statistics;
105-
std::vector<std::function<Status()>> _tasks;
106105
MonotonicStopWatch _sync_cloud_tablets_watcher;
107106
std::shared_ptr<Dependency> _cloud_tablet_dependency;
108107
std::atomic<size_t> _pending_tablets_num = 0;

be/test/cloud/cloud_meta_mgr_test.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
5252
{
5353
std::future<Status> fut;
5454
auto start = steady_clock::now();
55-
EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return immediately
55+
auto t = tasks;
56+
EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return immediately
5657
auto end = steady_clock::now();
5758
auto elapsed = duration_cast<milliseconds>(end - start).count();
5859
EXPECT_LE(elapsed, 40); // async
@@ -74,7 +75,8 @@ TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
7475
{
7576
std::future<Status> fut;
7677
auto start = steady_clock::now();
77-
EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return immediately
78+
auto t = tasks;
79+
EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return immediately
7880
auto end = steady_clock::now();
7981
auto elapsed = duration_cast<milliseconds>(end - start).count();
8082
EXPECT_LE(elapsed, 40); // async

0 commit comments

Comments
 (0)