From 6f11a1974ece90e70f9eaab50c44932dd6618740 Mon Sep 17 00:00:00 2001 From: Pxl Date: Sun, 4 Jan 2026 11:28:57 +0800 Subject: [PATCH 1/2] [Chore](query) add _query_ctx_map_delay_delete (#59262) This pull request introduces a delayed deletion mechanism for `QueryContext` objects in the fragment manager to ensure that runtime filter merging works correctly after a query reaches its end-of-stream (EOS). The main changes involve adding a new map to retain query contexts temporarily, modifying the lifecycle management of query contexts, and introducing utility methods to support this behavior. **Query Context Lifecycle Management:** * Added a new member `_query_ctx_map_delay_delete` to `FragmentMgr` to keep `QueryContext` objects alive for a short period after query EOS, ensuring that the runtime filter coordinator can complete its work. * Modified `FragmentMgr::remove_query_context` to erase entries from `_query_ctx_map_delay_delete` as part of the cleanup process. * Updated the `QueryContext` destructor to call `remove_query_context` only if the fragment manager is available, preventing issues during unit tests. **Runtime Filter Merge Coordination:** * In `FragmentMgr::_get_or_create_query_ctx`, now inserts the `QueryContext` into `_query_ctx_map_delay_delete` if the runtime filter merge handler is not empty, ensuring the context is retained for merging. * Added an `empty()` method to `RuntimeFilterMergeControllerEntity` to check if the filter map is empty, which is used to determine if delayed deletion is needed. --- be/src/runtime/fragment_mgr.cpp | 4 ++++ be/src/runtime/fragment_mgr.h | 4 +++- be/src/runtime/query_context.cpp | 9 ++++---- be/src/runtime_filter/runtime_filter_mgr.cpp | 24 -------------------- be/src/runtime_filter/runtime_filter_mgr.h | 5 +++- 5 files changed, 15 insertions(+), 31 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 14e2751035f7e8..3f5daeb11886a8 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -670,6 +670,7 @@ void FragmentMgr::remove_pipeline_context(std::pair key) { } void FragmentMgr::remove_query_context(const TUniqueId& key) { + _query_ctx_map_delay_delete.erase(key); #ifndef BE_TEST _query_ctx_map.erase(key); #endif @@ -768,6 +769,9 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para query_ctx->runtime_filter_mgr()->set_runtime_filter_params( info.runtime_filter_params); + if (!handler->empty()) { + _query_ctx_map_delay_delete.insert(query_id, query_ctx); + } } if (info.__isset.topn_filter_descs) { query_ctx->init_runtime_predicates(info.topn_filter_descs); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index b0c1a3ad59267d..63cb4dc080804f 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -209,7 +209,9 @@ class FragmentMgr : public RestMonitorIface { // query id -> QueryContext ConcurrentContextMap, QueryContext> _query_ctx_map; - std::unordered_map> _bf_size_map; + // keep query ctx do not delete immediately to make rf coordinator merge filter work well after query eos + ConcurrentContextMap, QueryContext> + _query_ctx_map_delay_delete; CountDownLatch _stop_background_threads_latch; std::shared_ptr _cancel_thread; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index ba633da034d5ed..a5755aa275d5b7 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -231,14 +231,13 @@ QueryContext::~QueryContext() { _runtime_predicates.clear(); file_scan_range_params_map.clear(); obj_pool.clear(); - if (_merge_controller_handler) { - _merge_controller_handler->release_undone_filters(this); - } _merge_controller_handler.reset(); DorisMetrics::instance()->query_ctx_cnt->increment(-1); - // TODO(gabriel): we need to clear outdated query contexts on time - // ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id); + // fragment_mgr is nullptr in unittest + if (ExecEnv::GetInstance()->fragment_mgr()) { + ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id); + } // the only one msg shows query's end. any other msg should append to it if need. LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg); } diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index 48645ffdb8e3b0..b3c1cfa6c93c86 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -433,30 +433,6 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext return st; } -void RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* query_ctx) { - std::unique_lock guard(_filter_map_mutex); - for (auto& [filter_id, ctx] : _filter_map) { - if (!ctx.done && !ctx.targetv2_info.empty()) { - { - std::lock_guard l(ctx.mtx); - ctx.merger->set_wrapper_state_and_ready_to_apply( - RuntimeFilterWrapper::State::DISABLED, - "rf coordinator's query context released before runtime filter is ready to " - "apply"); - } - auto st = _send_rf_to_target(ctx, std::weak_ptr {}, 0, - UniqueId(query_ctx->query_id()).to_proto(), - query_ctx->execution_timeout()); - if (!st.ok()) { - LOG(WARNING) - << "Failed to send runtime filter to target before query done. filter_id:" - << filter_id << " " << ctx.merger->debug_string() << " reason:" << st; - } - } - } - _filter_map.clear(); -} - std::string RuntimeFilterMergeControllerEntity::debug_string() { std::string result = "RuntimeFilterMergeControllerEntity Info:\n"; std::shared_lock guard(_filter_map_mutex); diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 160babf278d119..2dcea3f7a4a6cb 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -150,7 +150,10 @@ class RuntimeFilterMergeControllerEntity { std::string debug_string(); - void release_undone_filters(QueryContext* query_ctx); + bool empty() { + std::shared_lock read_lock(_filter_map_mutex); + return _filter_map.empty(); + } private: Status _init_with_desc(std::shared_ptr query_ctx, From 02aa671dc405ee8a3012043b5c591cc842a67c8e Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 14 Oct 2025 17:36:22 +0800 Subject: [PATCH 2/2] [refactor](query) Clear query context once it is finished (#56911) Now query contexts will not be removed from the FragmentMgr eagerly which may lead to a very large map to hold all query contexts. --- be/src/runtime/fragment_mgr.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 3f5daeb11886a8..8ceb04aa90917a 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -974,10 +974,12 @@ void FragmentMgr::cancel_worker() { std::unordered_map, BrpcItem> brpc_stub_with_queries; { + std::vector> contexts; _query_ctx_map.apply([&](phmap::flat_hash_map>& map) -> Status { for (auto it = map.begin(); it != map.end();) { if (auto q_ctx = it->second.lock()) { + contexts.push_back(q_ctx); if (q_ctx->is_timeout(now)) { LOG_WARNING("Query {} is timeout", print_id(it->first)); queries_timeout.push_back(it->first); @@ -999,6 +1001,7 @@ void FragmentMgr::cancel_worker() { } return Status::OK(); }); + std::vector> {}.swap(contexts); // We use a very conservative cancel strategy. // 0. If there are no running frontends, do not cancel any queries. @@ -1011,11 +1014,13 @@ void FragmentMgr::cancel_worker() { "starting? " << "We will not cancel any outdated queries in this situation."; } else { + std::vector> q_contexts; _query_ctx_map.apply([&](phmap::flat_hash_map>& map) -> Status { for (const auto& it : map) { if (auto q_ctx = it.second.lock()) { + q_contexts.push_back(q_ctx); const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid(); if (fe_process_uuid == 0) { @@ -1369,11 +1374,13 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, void FragmentMgr::get_runtime_query_info( std::vector>* _resource_ctx_list) { + std::vector> contexts; _query_ctx_map.apply( [&](phmap::flat_hash_map>& map) -> Status { for (auto iter = map.begin(); iter != map.end();) { if (auto q_ctx = iter->second.lock()) { _resource_ctx_list->push_back(q_ctx->resource_ctx()); + contexts.push_back(q_ctx); iter++; } else { iter = map.erase(iter);