Skip to content

Commit 2b90e70

Browse files
authored
[Bug](runtime-filter) add lock for RuntimeFilterConsumer::acquire_expr/signal to avoid mult… (apache#49739)
…ithread read and write wrapper ### What problem does this PR solve? ```cpp ================================================================= ==28655==ERROR: AddressSanitizer: heap-use-after-free on address 0x61000331f350 at pc 0x5577863cea6c bp 0x7fac1d2a9ef0 sp 0x7fac1d2a9ee8 READ of size 1 at 0x61000331f350 thread T1933 (Pipe_normal [wo) #0 0x5577863cea6b in doris::RuntimeFilterWrapper::debug_string[abi:cxx11]() /root/doris/be/src/runtime_filter/runtime_filter_wrapper.cpp:585:46 #1 0x55778632f8f5 in doris::RuntimeFilter::_debug_string[abi:cxx11]() const /root/doris/be/src/runtime_filter/runtime_filter.cpp:128:61 #2 0x55778638d946 in doris::RuntimeFilterConsumer::debug_string[abi:cxx11]() const /root/doris/be/src/runtime_filter/runtime_filter_consumer.h:61:57 #3 0x55778634ed24 in doris::RuntimeFilterConsumer::_set_state(doris::RuntimeFilterConsumer::State) /root/doris/be/src/runtime_filter/runtime_filter_consumer.h:125:43 #4 0x55778634c6fa in doris::RuntimeFilterConsumer::acquire_expr(std::vector<std::shared_ptr<doris::vectorized::VRuntimeFilterWrapper>, std::allocator<std::shared_ptr<doris::vectorized::VRuntimeFilterWrapper> > >&) /root/doris/be/src/runtime_filter/runtime_filter_consumer.cpp:67:9 #5 0x5577a1fbdd1b in doris::RuntimeFilterConsumerHelper::acquire_runtime_filter(std::vector<std::shared_ptr<doris::vectorized::VExprContext>, std::allocator<std::shared_ptr<doris::vectorized::VExprContext> > >&) /root/doris/be/src/runtime_filter/runtime_filter_consumer_helper.cpp:90:9 #6 0x5577b9aec26c in doris::pipeline::ScanLocalState<doris::pipeline::OlapScanLocalState>::open(doris::RuntimeState*) /root/doris/be/src/pipeline/exec/scan_operator.cpp:100:5 #7 0x5577ba853e05 in doris::pipeline::PipelineTask::_open() /root/doris/be/src/pipeline/pipeline_task.cpp:217:32 #8 0x5577ba8608e0 in doris::pipeline::PipelineTask::execute(bool*) /root/doris/be/src/pipeline/pipeline_task.cpp:407:9 #9 0x5577ba8b4bb5 in doris::pipeline::TaskScheduler::_do_work(int) /root/doris/be/src/pipeline/task_scheduler.cpp:147:9 #10 0x5577869c537c in doris::ThreadPool::dispatch_thread() /root/doris/be/src/util/threadpool.cpp:619:24 #11 0x55778699ba1e in std::function<void ()>::operator()() const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:560:9 #12 0x55778699ba1e in doris::Thread::supervise_thread(void*) /root/doris/be/src/util/thread.cpp:496:5 #13 0x7fb57bef3608 in start_thread /build/glibc-SzIz7B/glibc-2.31/nptl/pthread_create.c:477:8 #14 0x7fb57c1a0132 in __clone /build/glibc-SzIz7B/glibc-2.31/misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:95 0x61000331f350 is located 16 bytes inside of 192-byte region [0x61000331f340,0x61000331f400) freed by thread T817 (brpc_light) here: #0 0x557781e1c80d in operator delete(void*) (/mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0/Cluster0/be/lib/doris_be+0x3395280d) (BuildId: 45c9f8cc52f91b28) #1 0x557781e778b8 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::operator=(std::__shared_count<(__gnu_cxx::_Lock_policy)2> const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:721:15 #2 0x55778634d1d1 in std::__shared_ptr<doris::RuntimeFilterWrapper, (__gnu_cxx::_Lock_policy)2>::operator=(std::__shared_ptr<doris::RuntimeFilterWrapper, (__gnu_cxx::_Lock_policy)2> const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:1148:69 #3 0x55778634d1d1 in std::shared_ptr<doris::RuntimeFilterWrapper>::operator=(std::shared_ptr<doris::RuntimeFilterWrapper> const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr.h:359:65 #4 0x55778634d1d1 in doris::RuntimeFilterConsumer::signal(doris::RuntimeFilter*) /root/doris/be/src/runtime_filter/runtime_filter_consumer.cpp:76:14 #5 0x557785dd52c3 in auto doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0::operator()<std::shared_ptr<doris::RuntimeFilterConsumer> >(std::shared_ptr<doris::RuntimeFilterConsumer>&) const /root/doris/be/src/runtime/fragment_mgr.cpp:1295:72 #6 0x557785dd52c3 in void std::__invoke_impl<void, doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0&, std::shared_ptr<doris::RuntimeFilterConsumer>&>(std::__invoke_other, doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0&, std::shared_ptr<doris::RuntimeFilterConsumer>&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:61:14 #7 0x557785dd52c3 in std::__invoke_result<doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0&, std::shared_ptr<doris::RuntimeFilterConsumer>&>::type std::__invoke<doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0&, std::shared_ptr<doris::RuntimeFilterConsumer>&>(doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0&, std::shared_ptr<doris::RuntimeFilterConsumer>&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14 #8 0x557785dd52c3 in std::ranges::in_fun_result<__gnu_cxx::__normal_iterator<std::shared_ptr<doris::RuntimeFilterConsumer>*, std::vector<std::shared_ptr<doris::RuntimeFilterConsumer>, std::allocator<std::shared_ptr<doris::RuntimeFilterConsumer> > > >, doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0> std::ranges::__for_each_fn::operator()<__gnu_cxx::__normal_iterator<std::shared_ptr<doris::RuntimeFilterConsumer>*, std::vector<std::shared_ptr<doris::RuntimeFilterConsumer>, std::allocator<std::shared_ptr<doris::RuntimeFilterConsumer> > > >, __gnu_cxx::__normal_iterator<std::shared_ptr<doris::RuntimeFilterConsumer>*, std::vector<std::shared_ptr<doris::RuntimeFilterConsumer>, std::allocator<std::shared_ptr<doris::RuntimeFilterConsumer> > > >, std::identity, doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0>(__gnu_cxx::__normal_iterator<std::shared_ptr<doris::RuntimeFilterConsumer>*, std::vector<std::shared_ptr<doris::RuntimeFilterConsumer>, std::allocator<std::shared_ptr<doris::RuntimeFilterConsumer> > > >, __gnu_cxx::__normal_iterator<std::shared_ptr<doris::RuntimeFilterConsumer>*, std::vector<std::shared_ptr<doris::RuntimeFilterConsumer>, std::allocator<std::shared_ptr<doris::RuntimeFilterConsumer> > > >, doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0, std::identity) const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/ranges_algo.h:187:4 #9 0x557785dd52c3 in _ZNKSt6ranges13__for_each_fnclIRSt6vectorISt10shared_ptrIN5doris21RuntimeFilterConsumerEESaIS6_EESt8identityZNS4_11FragmentMgr14apply_filterv2EPKNS4_23PPublishFilterRequestV2EPN5butil26IOBufAsZeroCopyInputStreamEE3$_0EENS_13in_fun_resultINSt11conditionalIL_ZNS_14borrowed_rangeIT_EEEDTclsr8__detailE14__ranges_beginclsr3stdE7declvalIRSM_EEEENS_8danglingEE4typeET1_EEOSM_SS_T0_ /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/ranges_algo.h:197:9 #10 0x557785dd52c3 in doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*) /root/doris/be/src/runtime/fragment_mgr.cpp:1295:13 #11 0x5577865a517e in doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0::operator()() const /root/doris/be/src/service/internal_service.cpp:1444:48 #12 0x5577865a517e in void std::__invoke_impl<void, doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0&>(std::__invoke_other, doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:61:14 #13 0x5577865a517e in std::enable_if<is_invocable_r_v<void, doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0&>, void>::type std::__invoke_r<void, doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0&>(doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:111:2 #14 0x5577865a517e in std::_Function_handler<void (), doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0>::_M_invoke(std::_Any_data const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:291:9 #15 0x5577865f07c1 in std::function<void ()>::operator()() const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:560:9 #16 0x5577865f07c1 in doris::WorkThreadPool<false>::work_thread(int) /root/doris/be/src/util/work_thread_pool.hpp:158:17 #17 0x5577bddb233f in execute_native_thread_routine /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/src/c++11/../../../../../libstdc++-v3/src/c++11/thread.cc:82:18 previously allocated by thread T781 (brpc_light) here: #0 0x557781e1bfad in operator new(unsigned long) (/mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0/Cluster0/be/lib/doris_be+0x33951fad) (BuildId: 45c9f8cc52f91b28) #1 0x5577863411ae in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/ext/new_allocator.h:121:27 #2 0x5577863411ae in std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/allocator.h:173:32 #3 0x5577863411ae in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> >&, unsigned long) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:460:20 #4 0x5577863411ae in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> > >(std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> >&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/allocated_ptr.h:97:21 #5 0x5577863411ae in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, doris::RuntimeFilterParams*>(doris::RuntimeFilterWrapper*&, std::_Sp_alloc_shared_tag<std::allocator<doris::RuntimeFilterWrapper> >, doris::RuntimeFilterParams*&&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:648:19 #6 0x55778632e555 in std::__shared_ptr<doris::RuntimeFilterWrapper, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<doris::RuntimeFilterWrapper>, doris::RuntimeFilterParams*>(std::_Sp_alloc_shared_tag<std::allocator<doris::RuntimeFilterWrapper> >, doris::RuntimeFilterParams*&&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:1337:14 #7 0x55778632e555 in std::shared_ptr<doris::RuntimeFilterWrapper>::shared_ptr<std::allocator<doris::RuntimeFilterWrapper>, doris::RuntimeFilterParams*>(std::_Sp_alloc_shared_tag<std::allocator<doris::RuntimeFilterWrapper> >, doris::RuntimeFilterParams*&&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr.h:409:4 #8 0x55778632e555 in std::shared_ptr<doris::RuntimeFilterWrapper> std::allocate_shared<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, doris::RuntimeFilterParams*>(std::allocator<doris::RuntimeFilterWrapper> const&, doris::RuntimeFilterParams*&&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr.h:860:14 #9 0x55778632e555 in std::shared_ptr<doris::RuntimeFilterWrapper> std::make_shared<doris::RuntimeFilterWrapper, doris::RuntimeFilterParams*>(doris::RuntimeFilterParams*&&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr.h:876:14 #10 0x55778632e555 in doris::RuntimeFilter::_init_with_desc(doris::TRuntimeFilterDesc const*, doris::TQueryOptions const*) /root/doris/be/src/runtime_filter/runtime_filter.cpp:123:16 #11 0x5577863794de in doris::RuntimeFilterConsumer::create(doris::RuntimeFilterParamsContext*, doris::TRuntimeFilterDesc const*, int, std::shared_ptr<doris::RuntimeFilterConsumer>*, doris::RuntimeProfile*) /root/doris/be/src/runtime_filter/runtime_filter_consumer.h:46:9 #12 0x55778636bb9f in doris::RuntimeFilterMgr::register_consumer_filter(doris::TRuntimeFilterDesc const&, int, std::shared_ptr<doris::RuntimeFilterConsumer>*, doris::RuntimeProfile*) /root/doris/be/src/runtime_filter/runtime_filter_mgr.cpp:80:5 #13 0x557786147c10 in doris::RuntimeState::register_consumer_runtime_filter(doris::TRuntimeFilterDesc const&, bool, int, std::shared_ptr<doris::RuntimeFilterConsumer>*, doris::RuntimeProfile*) /root/doris/be/src/runtime/runtime_state.cpp:514:17 #14 0x5577a1fbc3e1 in doris::RuntimeFilterConsumerHelper::_register_runtime_filter(bool) /root/doris/be/src/runtime_filter/runtime_filter_consumer_helper.cpp:51:9 ``` ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [x] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [x] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent 30aece2 commit 2b90e70

File tree

6 files changed

+66
-23
lines changed

6 files changed

+66
-23
lines changed

be/src/pipeline/exec/hashjoin_build_sink.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
220220
}
221221

222222
if (p._use_shared_hash_table) {
223-
std::unique_lock(p._mutex);
223+
std::unique_lock lock(p._mutex);
224224
p._signaled = true;
225225
for (auto& dep : _shared_state->sink_deps) {
226226
dep->set_ready();

be/src/runtime_filter/runtime_filter_consumer.cpp

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,23 +65,13 @@ Status RuntimeFilterConsumer::acquire_expr(std::vector<vectorized::VRuntimeFilte
6565
}
6666
if (_rf_state != State::APPLIED && _rf_state != State::TIMEOUT) {
6767
_set_state(State::TIMEOUT);
68-
DorisMetrics::instance()->runtime_filter_consumer_timeout_num->increment(1);
69-
_profile->add_info_string("ReachTimeoutLimit", "true");
7068
}
7169
return Status::OK();
7270
}
7371

7472
void RuntimeFilterConsumer::signal(RuntimeFilter* other) {
7573
COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS));
76-
_wrapper = other->_wrapper;
77-
_check_wrapper_state({RuntimeFilterWrapper::State::DISABLED,
78-
RuntimeFilterWrapper::State::IGNORED,
79-
RuntimeFilterWrapper::State::READY});
80-
_check_state({State::NOT_READY, State::TIMEOUT});
81-
_set_state(State::READY);
82-
DorisMetrics::instance()->runtime_filter_consumer_ready_num->increment(1);
83-
DorisMetrics::instance()->runtime_filter_consumer_wait_ready_ms->increment(MonotonicMillis() -
84-
_registration_time);
74+
_set_state(State::READY, other->_wrapper);
8575
if (!_filter_timer.empty()) {
8676
for (auto& timer : _filter_timer) {
8777
timer->call_ready();

be/src/runtime_filter/runtime_filter_consumer.h

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ class RuntimeFilterConsumer : public RuntimeFilter {
5858
Status acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>& push_exprs);
5959

6060
std::string debug_string() const override {
61-
return fmt::format("Consumer: ({}, state: {})", _debug_string(), to_string(_rf_state));
61+
return fmt::format("Consumer: ({}, state: {}, reached_timeout: {}, timeout_limit: {}ms)",
62+
_debug_string(), to_string(_rf_state),
63+
_reached_timeout ? "true" : "false", std::to_string(_rf_wait_time_ms));
6264
}
6365

6466
bool is_applied() { return _rf_state == State::APPLIED; }
@@ -120,7 +122,25 @@ class RuntimeFilterConsumer : public RuntimeFilter {
120122
}
121123
}
122124

123-
void _set_state(State rf_state) {
125+
void _set_state(State rf_state, std::shared_ptr<RuntimeFilterWrapper> other = nullptr) {
126+
std::unique_lock<std::mutex> l(_mtx);
127+
if (rf_state == State::TIMEOUT) {
128+
DorisMetrics::instance()->runtime_filter_consumer_timeout_num->increment(1);
129+
_reached_timeout = true;
130+
if (_rf_state != State::NOT_READY) {
131+
// reach timeout but do not change State::ready to State::timeout
132+
return;
133+
}
134+
} else if (rf_state == State::READY) {
135+
DorisMetrics::instance()->runtime_filter_consumer_ready_num->increment(1);
136+
DorisMetrics::instance()->runtime_filter_consumer_wait_ready_ms->increment(
137+
MonotonicMillis() - _registration_time);
138+
_wrapper = other;
139+
_check_wrapper_state({RuntimeFilterWrapper::State::DISABLED,
140+
RuntimeFilterWrapper::State::IGNORED,
141+
RuntimeFilterWrapper::State::READY});
142+
_check_state({State::NOT_READY, State::TIMEOUT});
143+
}
124144
_rf_state = rf_state;
125145
_profile->add_info_string("Info", debug_string());
126146
}
@@ -142,6 +162,11 @@ class RuntimeFilterConsumer : public RuntimeFilter {
142162
const int64_t _registration_time;
143163

144164
std::atomic<State> _rf_state;
165+
// only used to lock _set_state() to make _wrapper and _rf_state is protected
166+
// signal and acquire_expr are called in different threads at the same time
167+
std::mutex _mtx;
168+
169+
bool _reached_timeout = false;
145170

146171
friend class RuntimeFilterProducer;
147172
};

be/test/io/fs/s3_obj_storage_client_test.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,16 @@ class S3ObjStorageClientTest : public testing::Test {
3939

4040
S3ObjStorageClientTest::bucket = std::getenv("AWS_BUCKET");
4141

42-
S3ObjStorageClientTest::obj_storage_client =
43-
S3ClientFactory::instance().create({.endpoint = endpoint,
44-
.region = "dummy-region",
45-
.ak = access_key,
46-
.sk = secret_key,
47-
.bucket = bucket,
48-
.provider = io::ObjStorageType::AWS,
49-
.use_virtual_addressing = false});
42+
S3ObjStorageClientTest::obj_storage_client = S3ClientFactory::instance().create({
43+
.endpoint = endpoint,
44+
.region = "dummy-region",
45+
.ak = access_key,
46+
.sk = secret_key,
47+
.token = "",
48+
.bucket = bucket,
49+
.provider = io::ObjStorageType::AWS,
50+
.use_virtual_addressing = false,
51+
});
5052

5153
ASSERT_TRUE(S3ObjStorageClientTest::obj_storage_client != nullptr);
5254
}

be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,9 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, InitLocalState) {
119119

120120
RuntimeProfile runtime_profile("test");
121121
TDataSink t_sink;
122-
LocalSinkStateInfo info {.parent_profile = &runtime_profile,
122+
LocalSinkStateInfo info {.task_idx = 0,
123+
.parent_profile = &runtime_profile,
124+
.sender_id = 0,
123125
.shared_state = shared_state.get(),
124126
.shared_state_map = {},
125127
.tsink = t_sink};
@@ -222,6 +224,7 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, SinkEosAndSpill) {
222224

223225
LocalSinkStateInfo sink_info {.task_idx = 0,
224226
.parent_profile = _helper.runtime_profile.get(),
227+
.sender_id = 0,
225228
.shared_state = shared_state.get(),
226229
.shared_state_map = {},
227230
.tsink = TDataSink()};

be/test/runtime_filter/runtime_filter_consumer_test.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,4 +237,27 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
237237
RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile));
238238
}
239239

240+
TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) {
241+
for (int i = 0; i < 100; i++) {
242+
std::shared_ptr<RuntimeFilterConsumer> consumer;
243+
auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
244+
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
245+
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
246+
&desc, 0, &consumer, &_profile));
247+
248+
std::shared_ptr<RuntimeFilterProducer> producer;
249+
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
250+
RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile));
251+
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
252+
253+
std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
254+
std::thread thread1(
255+
[&]() { [[maybe_unused]] auto res = consumer->acquire_expr(push_exprs); });
256+
std::thread thread2([&]() { consumer->signal(producer.get()); });
257+
thread1.join();
258+
thread2.join();
259+
260+
ASSERT_NE(consumer->_rf_state, RuntimeFilterConsumer::State::TIMEOUT);
261+
}
262+
}
240263
} // namespace doris

0 commit comments

Comments
 (0)