Skip to content

Commit d86daef

Browse files
authored
[Chore](query) add _query_ctx_map_delay_delete (#59262)
This pull request introduces a delayed deletion mechanism for `QueryContext` objects in the fragment manager to ensure that runtime filter merging works correctly after a query reaches its end-of-stream (EOS). The main changes involve adding a new map to retain query contexts temporarily, modifying the lifecycle management of query contexts, and introducing utility methods to support this behavior. **Query Context Lifecycle Management:** * Added a new member `_query_ctx_map_delay_delete` to `FragmentMgr` to keep `QueryContext` objects alive for a short period after query EOS, ensuring that the runtime filter coordinator can complete its work. * Modified `FragmentMgr::remove_query_context` to erase entries from `_query_ctx_map_delay_delete` as part of the cleanup process. * Updated the `QueryContext` destructor to call `remove_query_context` only if the fragment manager is available, preventing issues during unit tests. **Runtime Filter Merge Coordination:** * In `FragmentMgr::_get_or_create_query_ctx`, now inserts the `QueryContext` into `_query_ctx_map_delay_delete` if the runtime filter merge handler is not empty, ensuring the context is retained for merging. * Added an `empty()` method to `RuntimeFilterMergeControllerEntity` to check if the filter map is empty, which is used to determine if delayed deletion is needed.
1 parent 1cbe487 commit d86daef

File tree

5 files changed

+15
-29
lines changed

5 files changed

+15
-29
lines changed

be/src/runtime/fragment_mgr.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,7 @@ void FragmentMgr::remove_pipeline_context(std::pair<TUniqueId, int> key) {
670670
}
671671

672672
void FragmentMgr::remove_query_context(const TUniqueId& key) {
673+
_query_ctx_map_delay_delete.erase(key);
673674
#ifndef BE_TEST
674675
_query_ctx_map.erase(key);
675676
#endif
@@ -768,6 +769,9 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para
768769

769770
query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
770771
info.runtime_filter_params);
772+
if (!handler->empty()) {
773+
_query_ctx_map_delay_delete.insert(query_id, query_ctx);
774+
}
771775
}
772776
if (info.__isset.topn_filter_descs) {
773777
query_ctx->init_runtime_predicates(info.topn_filter_descs);

be/src/runtime/fragment_mgr.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ class FragmentMgr : public RestMonitorIface {
209209

210210
// query id -> QueryContext
211211
ConcurrentContextMap<TUniqueId, std::weak_ptr<QueryContext>, QueryContext> _query_ctx_map;
212+
// keep query ctx do not delete immediately to make rf coordinator merge filter work well after query eos
213+
ConcurrentContextMap<TUniqueId, std::shared_ptr<QueryContext>, QueryContext>
214+
_query_ctx_map_delay_delete;
212215

213216
CountDownLatch _stop_background_threads_latch;
214217
std::shared_ptr<Thread> _cancel_thread;

be/src/runtime/query_context.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,13 @@ QueryContext::~QueryContext() {
231231
_runtime_predicates.clear();
232232
file_scan_range_params_map.clear();
233233
obj_pool.clear();
234-
if (_merge_controller_handler) {
235-
_merge_controller_handler->release_undone_filters(this);
236-
}
237234
_merge_controller_handler.reset();
238235

239236
DorisMetrics::instance()->query_ctx_cnt->increment(-1);
240-
ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id);
237+
// fragment_mgr is nullptr in unittest
238+
if (ExecEnv::GetInstance()->fragment_mgr()) {
239+
ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id);
240+
}
241241
// the only one msg shows query's end. any other msg should append to it if need.
242242
LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg);
243243
}

be/src/runtime_filter/runtime_filter_mgr.cpp

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -433,30 +433,6 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
433433
return st;
434434
}
435435

436-
void RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* query_ctx) {
437-
std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
438-
for (auto& [filter_id, ctx] : _filter_map) {
439-
if (!ctx.done && !ctx.targetv2_info.empty()) {
440-
{
441-
std::lock_guard<std::mutex> l(ctx.mtx);
442-
ctx.merger->set_wrapper_state_and_ready_to_apply(
443-
RuntimeFilterWrapper::State::DISABLED,
444-
"rf coordinator's query context released before runtime filter is ready to "
445-
"apply");
446-
}
447-
auto st = _send_rf_to_target(ctx, std::weak_ptr<QueryContext> {}, 0,
448-
UniqueId(query_ctx->query_id()).to_proto(),
449-
query_ctx->execution_timeout());
450-
if (!st.ok()) {
451-
LOG(WARNING)
452-
<< "Failed to send runtime filter to target before query done. filter_id:"
453-
<< filter_id << " " << ctx.merger->debug_string() << " reason:" << st;
454-
}
455-
}
456-
}
457-
_filter_map.clear();
458-
}
459-
460436
std::string RuntimeFilterMergeControllerEntity::debug_string() {
461437
std::string result = "RuntimeFilterMergeControllerEntity Info:\n";
462438
std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);

be/src/runtime_filter/runtime_filter_mgr.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,10 @@ class RuntimeFilterMergeControllerEntity {
150150

151151
std::string debug_string();
152152

153-
void release_undone_filters(QueryContext* query_ctx);
153+
bool empty() {
154+
std::shared_lock<std::shared_mutex> read_lock(_filter_map_mutex);
155+
return _filter_map.empty();
156+
}
154157

155158
private:
156159
Status _init_with_desc(std::shared_ptr<QueryContext> query_ctx,

0 commit comments

Comments
 (0)