Skip to content

Commit b3ff405

Browse files
authored
[Chore](runtime-filter) catch Exception from runtime filter rpc (apache#57085)
catch Exception from runtime filter rpc ```cpp 12340; stack trace: *** what(): [E6] consumer meet invalid state, Consumer: ([id: 1, state: [DISABLED], type: BLOOM_FILTER, column_type: INT, reason: get disabled from remote], mode: GLOBAL, state: READY, reached_timeout: false, timeout_limit: 600000ms), assumed_states is [NOT_READY, TIMEOUT] 0# doris::Exception::Exception(int, std::basic_string_view > const&, bool) at /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_ptr.h:193 1# doris::Exception::Exception(int, std::basic_string_view > const&) at /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/basic_string.h:239 2# doris::Exception::Exception, std::allocator >, std::__cxx11::basic_string, std::allocator > >(int, std::basic_string_view > const&, std::__cxx11::basic_string, std::allocator >&&, std::__cxx11::basic_string, std::allocator >&&) at /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/basic_string.h:239 3# doris::RuntimeFilterConsumer::_check_state(std::vector >) at /home/zcp/repo_center/doris_branch-4.0/doris/be/src/runtime_filter/runtime_filter_consumer.h:0 4# doris::RuntimeFilterConsumer::_set_state(doris::RuntimeFilterConsumer::State, std::shared_ptr) at /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/stl_vector.h:375 5# doris::RuntimeFilterConsumer::signal(doris::RuntimeFilter*) at /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1068 6# doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*) at /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/stl_iterator.h:1103 7# std::_Function_handler::_M_invoke(std::_Any_data const&) at /home/zcp/repo_center/doris_branch-4.0/doris/be/src/common/status.h:524 8# doris::WorkThreadPool::work_thread(int) at /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/atomic_base.h:641 9# execute_native_thread_routine ```
1 parent 6b41b1b commit b3ff405

File tree

1 file changed

+24
-4
lines changed

1 file changed

+24
-4
lines changed

be/src/service/internal_service.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1389,7 +1389,12 @@ void PInternalService::merge_filter(::google::protobuf::RpcController* controlle
13891389
brpc::ClosureGuard closure_guard(done);
13901390
auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
13911391
butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
1392-
Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream);
1392+
Status st;
1393+
try {
1394+
st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream);
1395+
} catch (Exception& e) {
1396+
st = e.to_status();
1397+
}
13931398
st.to_protobuf(response->mutable_status());
13941399
});
13951400
if (!ret) {
@@ -1405,7 +1410,12 @@ void PInternalService::send_filter_size(::google::protobuf::RpcController* contr
14051410
bool ret = _light_work_pool.try_offer([this, request, response, done]() {
14061411
signal::SignalTaskIdKeeper keeper(request->query_id());
14071412
brpc::ClosureGuard closure_guard(done);
1408-
Status st = _exec_env->fragment_mgr()->send_filter_size(request);
1413+
Status st;
1414+
try {
1415+
st = _exec_env->fragment_mgr()->send_filter_size(request);
1416+
} catch (Exception& e) {
1417+
st = e.to_status();
1418+
}
14091419
st.to_protobuf(response->mutable_status());
14101420
});
14111421
if (!ret) {
@@ -1421,7 +1431,12 @@ void PInternalService::sync_filter_size(::google::protobuf::RpcController* contr
14211431
bool ret = _light_work_pool.try_offer([this, request, response, done]() {
14221432
signal::SignalTaskIdKeeper keeper(request->query_id());
14231433
brpc::ClosureGuard closure_guard(done);
1424-
Status st = _exec_env->fragment_mgr()->sync_filter_size(request);
1434+
Status st;
1435+
try {
1436+
st = _exec_env->fragment_mgr()->sync_filter_size(request);
1437+
} catch (Exception& e) {
1438+
st = e.to_status();
1439+
}
14251440
st.to_protobuf(response->mutable_status());
14261441
});
14271442
if (!ret) {
@@ -1440,7 +1455,12 @@ void PInternalService::apply_filterv2(::google::protobuf::RpcController* control
14401455
auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
14411456
butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
14421457
VLOG_NOTICE << "rpc apply_filterv2 recv";
1443-
Status st = _exec_env->fragment_mgr()->apply_filterv2(request, &zero_copy_input_stream);
1458+
Status st;
1459+
try {
1460+
st = _exec_env->fragment_mgr()->apply_filterv2(request, &zero_copy_input_stream);
1461+
} catch (Exception& e) {
1462+
st = e.to_status();
1463+
}
14441464
if (!st.ok()) {
14451465
LOG(WARNING) << "apply filter meet error: " << st.to_string();
14461466
}

0 commit comments

Comments
 (0)