Skip to content

Commit af4e2dd

Browse files
authored
[enhancement](spilldisk)Cancel query fast when reserver memory failed and could not find revocable tasks (#59330)
### What problem does this PR solve? There are some problems in the previous code: 1. revocable memory size is ineffective for non-spill operators and remains 0 in all cases. In this scenario, if we disable spill and enable reserve, it appears that memlimit will stop working entirely. 2. too complicated in process reserve failed. In this PR, I simplified the logic: 1. If the query is not enable spill, and if reserve memory failed, just disable reserve memory and it will enable memory check immediately and let the query run. 2. If reserve memory failed and not find any revocable tasks, then just cancel the query. 3. If reserve memory failed, should add to paused query immediately because it maybe the last block for join build. If not paused, it may try to allocate a lot of memory. 4. If there are some query in cancel stage, then not check paused query for a bit while because the cancelling query may release some memory. Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent 6377254 commit af4e2dd

File tree

10 files changed

+405
-243
lines changed

10 files changed

+405
-243
lines changed

be/src/pipeline/pipeline_task.cpp

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,14 @@ Status PipelineTask::do_revoke_memory(const std::shared_ptr<SpillContext>& spill
640640

641641
bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size, OperatorBase* op) {
642642
auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
643+
// If reserve memory failed and the query is not enable spill, just disable reserve memory(this will enable
644+
// memory hard limit check, and will cancel the query if allocate memory failed) and let it run.
645+
if (!st.ok() && !_state->enable_spill()) {
646+
LOG(INFO) << print_id(_query_id) << " reserve memory failed due to " << st
647+
<< ", and it is not enable spill, disable reserve memory and let it run";
648+
_state->get_query_ctx()->resource_ctx()->task_controller()->disable_reserve_memory();
649+
return true;
650+
}
643651
COUNTER_UPDATE(_memory_reserve_times, 1);
644652
auto sink_revocable_mem_size = _sink->revocable_mem_size(_state);
645653
if (st.ok() && _state->enable_force_spill() && _sink->is_spillable() &&
@@ -656,13 +664,30 @@ bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size, OperatorBas
656664
op->node_id(), _state->task_id(),
657665
PrettyPrinter::print_bytes(op->revocable_mem_size(_state)),
658666
PrettyPrinter::print_bytes(sink_revocable_mem_size), st.to_string());
659-
// PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str
667+
// PROCESS_MEMORY_EXCEEDED error msg already contains process_mem_log_str
660668
if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
661669
debug_msg +=
662670
fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str());
663671
}
664-
LOG_EVERY_N(INFO, 100) << debug_msg;
665672
// If sink has enough revocable memory, trigger revoke memory
673+
LOG(INFO) << fmt::format(
674+
"Query: {} sink: {}, node id: {}, task id: "
675+
"{}, revocable mem size: {}",
676+
print_id(_query_id), _sink->get_name(), _sink->node_id(), _state->task_id(),
677+
PrettyPrinter::print_bytes(sink_revocable_mem_size));
678+
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
679+
_state->get_query_ctx()->resource_ctx()->shared_from_this(), reserve_size, st);
680+
_spilling = true;
681+
return false;
682+
// !!! Attention:
683+
// In the past, if reserve failed, not add this query to paused list, because it is very small, will not
684+
// consume a lot of memory. But need set low memory mode to indicate that the system should
685+
// not use too much memory.
686+
// But if we only set _state->get_query_ctx()->set_low_memory_mode() here, and return true, the query will
687+
// continue to run and not blocked, and this reserve maybe the last block of join sink opertorator, and it will
688+
// build hash table directly and will consume a lot of memory. So that should return false directly.
689+
// TODO: we should using a global system buffer management logic to deal with low memory mode.
690+
/**
666691
if (sink_revocable_mem_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
667692
LOG(INFO) << fmt::format(
668693
"Query: {} sink: {}, node id: {}, task id: "
@@ -674,11 +699,8 @@ bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size, OperatorBas
674699
_spilling = true;
675700
return false;
676701
} else {
677-
// If reserve failed, not add this query to paused list, because it is very small, will not
678-
// consume a lot of memory. But need set low memory mode to indicate that the system should
679-
// not use too much memory.
680702
_state->get_query_ctx()->set_low_memory_mode();
681-
}
703+
} */
682704
}
683705
return true;
684706
}

be/src/runtime/memory/mem_tracker_limiter.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ class MemTrackerLimiter final {
126126
int64_t group_num() const { return _group_num; }
127127
int64_t limit() const { return _limit; }
128128
void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; }
129-
bool enable_check_limit() const { return _enable_check_limit; }
130129
void set_enable_check_limit(bool enable_check_limit) {
131130
_enable_check_limit = enable_check_limit;
132131
}
@@ -298,7 +297,7 @@ inline void MemTrackerLimiter::cache_consume(int64_t bytes) {
298297
}
299298

300299
inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
301-
if (bytes <= 0 || !enable_check_limit() || _limit <= 0) {
300+
if (bytes <= 0 || !_enable_check_limit || _limit <= 0) {
302301
return Status::OK();
303302
}
304303

0 commit comments

Comments
 (0)