Skip to content

Commit 0075df5

Browse files
authored
[workloadgroup](memory) flush memtable when memory is not enough (apache#54642)
### What problem does this PR solve? 1. flush memtable when memory is not enough 2. cancel the query that use more memory than min_memory_percent
1 parent f894a59 commit 0075df5

25 files changed

+389
-548
lines changed

be/src/common/config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,6 +1391,8 @@ DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");
13911391
// paused query in queue timeout(ms) will be resumed or canceled
13921392
DEFINE_Int64(spill_in_paused_queue_timeout_ms, "60000");
13931393

1394+
DEFINE_Int64(wait_cancel_release_memory_ms, "5000");
1395+
13941396
DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
13951397

13961398
DEFINE_mBool(force_azure_blob_global_endpoint, "false");

be/src/common/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1447,6 +1447,7 @@ DECLARE_mInt32(spill_gc_work_time_ms);
14471447
DECLARE_Int32(spill_io_thread_pool_thread_num);
14481448
DECLARE_Int32(spill_io_thread_pool_queue_size);
14491449
DECLARE_Int64(spill_in_paused_queue_timeout_ms);
1450+
DECLARE_Int64(wait_cancel_release_memory_ms);
14501451

14511452
DECLARE_mBool(check_segment_when_build_rowset_meta);
14521453

be/src/common/daemon.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ void Daemon::memory_maintenance_thread() {
348348

349349
// step 6. Refresh weighted memory ratio of workload groups.
350350
doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
351-
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
351+
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_memory_state();
352352

353353
// step 7: handle paused queries(caused by memory insufficient)
354354
doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries();

be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendWorkloadGroupResourceUsage::
3838
{"CPU_USAGE_PERCENT", TYPE_DOUBLE, sizeof(double), false},
3939
{"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
4040
{"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
41-
{"WRITE_BUFFER_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
4241
};
4342

4443
SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage()

be/src/olap/memtable_memory_limiter.cpp

Lines changed: 8 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -119,43 +119,11 @@ int64_t MemTableMemoryLimiter::_need_flush() {
119119
return need_flush - _queue_mem_usage - _flush_mem_usage;
120120
}
121121

122-
void MemTableMemoryLimiter::handle_workload_group_memtable_flush(
123-
WorkloadGroupPtr wg, std::function<bool()> cancel_check) {
124-
// It means some query is pending on here to flush memtable and to continue running.
125-
// So that should wait here.
126-
// Wait at most 3s, because this code is not aware cancel flag. If the load task is cancelled
127-
// Should releae memory quickly.
128-
using namespace std::chrono_literals;
129-
int32_t max_sleep_times = 30;
130-
int32_t sleep_times = max_sleep_times;
131-
MonotonicStopWatch timer;
132-
timer.start();
133-
while (wg != nullptr && wg->enable_write_buffer_limit() && wg->exceed_write_buffer_limit() &&
134-
sleep_times > 0) {
135-
if (cancel_check && cancel_check()) {
136-
LOG(INFO) << "cancelled when waiting for memtable flush, wg: "
137-
<< (wg == nullptr ? "null" : wg->debug_string());
138-
return;
139-
}
140-
std::this_thread::sleep_for(100ms);
141-
--sleep_times;
142-
}
143-
if (sleep_times < max_sleep_times) {
144-
timer.stop();
145-
VLOG_DEBUG << "handle_workload_group_memtable_flush waited "
146-
<< PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
147-
<< ", wg: " << wg->debug_string();
148-
}
149-
// Check process memory again.
150-
_handle_memtable_flush(wg, cancel_check);
151-
}
152-
153-
void MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
154-
std::function<bool()> cancel_check) {
122+
void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> cancel_check) {
155123
// Check the soft limit.
156124
DCHECK(_load_soft_mem_limit > 0);
157125
do {
158-
DBUG_EXECUTE_IF("MemTableMemoryLimiter._handle_memtable_flush.limit_reached", {
126+
DBUG_EXECUTE_IF("MemTableMemoryLimiter.handle_memtable_flush.limit_reached", {
159127
LOG(INFO) << "debug memtable limit reached";
160128
break;
161129
});
@@ -176,8 +144,7 @@ void MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
176144
}
177145
}
178146
if (cancel_check && cancel_check()) {
179-
LOG(INFO) << "cancelled when waiting for memtable flush, wg: "
180-
<< (wg == nullptr ? "null" : wg->debug_string());
147+
LOG(INFO) << "cancelled when waiting for memtable flush";
181148
return;
182149
}
183150
first = false;
@@ -192,15 +159,14 @@ void MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
192159
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
193160
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
194161
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage)
195-
<< ", need flush: " << PrettyPrinter::print_bytes(need_flush)
196-
<< ", wg: " << (wg ? wg->debug_string() : "null");
162+
<< ", need flush: " << PrettyPrinter::print_bytes(need_flush);
197163
if (VLOG_DEBUG_IS_ON) {
198164
auto log_str = doris::ProcessProfile::instance()
199165
->memory_profile()
200166
->process_memory_detail_str();
201167
LOG_LONG_STRING(INFO, log_str);
202168
}
203-
_flush_active_memtables(0, need_flush);
169+
_flush_active_memtables(need_flush);
204170
}
205171
} while (_hard_limit_reached() && !_load_usage_low());
206172
g_memtable_memory_limit_waiting_threads << -1;
@@ -216,37 +182,11 @@ void MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
216182
<< ", memtable writers num: " << _writers.size()
217183
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
218184
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
219-
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage)
220-
<< ", wg: " << (wg ? wg->debug_string() : "null.");
221-
}
222-
}
223-
224-
int64_t MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush) {
225-
std::unique_lock<std::mutex> l(_lock);
226-
return _flush_active_memtables(wg_id, need_flush);
227-
}
228-
229-
void MemTableMemoryLimiter::get_workload_group_memtable_usage(uint64_t wg_id, int64_t* active_bytes,
230-
int64_t* queue_bytes,
231-
int64_t* flush_bytes) {
232-
std::unique_lock<std::mutex> l(_lock);
233-
*active_bytes = 0;
234-
*queue_bytes = 0;
235-
*flush_bytes = 0;
236-
for (auto it = _writers.begin(); it != _writers.end(); ++it) {
237-
if (auto writer = it->lock()) {
238-
// If wg id is specified, but wg id not match, then not need flush
239-
if (writer->workload_group_id() != wg_id) {
240-
continue;
241-
}
242-
*active_bytes += writer->active_memtable_mem_consumption();
243-
*queue_bytes += writer->mem_consumption(MemType::WRITE_FINISHED);
244-
*flush_bytes += writer->mem_consumption(MemType::FLUSH);
245-
}
185+
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
246186
}
247187
}
248188

249-
int64_t MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t need_flush) {
189+
int64_t MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
250190
if (need_flush <= 0) {
251191
return 0;
252192
}
@@ -278,10 +218,7 @@ int64_t MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t n
278218
if (w == nullptr) {
279219
continue;
280220
}
281-
// If wg id is specified, but wg id not match, then not need flush
282-
if (wg_id != 0 && w->workload_group_id() != wg_id) {
283-
continue;
284-
}
221+
285222
int64_t mem = w->active_memtable_mem_consumption();
286223
if (mem < sort_mem * 0.9) {
287224
// if the memtable writer just got flushed, don't flush it again

be/src/olap/memtable_memory_limiter.h

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,11 @@ class MemTableMemoryLimiter {
4040

4141
Status init(int64_t process_mem_limit);
4242

43-
void handle_workload_group_memtable_flush(WorkloadGroupPtr wg,
44-
std::function<bool()> cancel_check);
45-
46-
int64_t flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush_bytes);
47-
48-
void get_workload_group_memtable_usage(uint64_t wg_id, int64_t* active_bytes,
49-
int64_t* queue_bytes, int64_t* flush_bytes);
43+
// check if the total mem consumption exceeds limit.
44+
// If yes, it will flush memtable to try to reduce memory consumption.
45+
// Every write operation will call this API to check if need flush memtable OR hang
46+
// when memory is not available.
47+
void handle_memtable_flush(std::function<bool()> cancel_check);
5048

5149
void register_writer(std::weak_ptr<MemTableWriter> writer);
5250

@@ -57,22 +55,15 @@ class MemTableMemoryLimiter {
5755
int64_t mem_usage() const { return _mem_usage; }
5856

5957
private:
60-
// check if the total mem consumption exceeds limit.
61-
// If yes, it will flush memtable to try to reduce memory consumption.
62-
// Every write operation will call this API to check if need flush memtable OR hang
63-
// when memory is not available.
64-
void _handle_memtable_flush(WorkloadGroupPtr wg, std::function<bool()> cancel_check);
65-
6658
static inline int64_t _sys_avail_mem_less_than_warning_water_mark();
6759
static inline int64_t _process_used_mem_more_than_soft_mem_limit();
6860

6961
bool _soft_limit_reached();
7062
bool _hard_limit_reached();
7163
bool _load_usage_low();
7264
int64_t _need_flush();
73-
int64_t _flush_active_memtables(uint64_t wg_id, int64_t need_flush);
65+
int64_t _flush_active_memtables(int64_t need_flush);
7466
void _refresh_mem_tracker();
75-
7667
std::mutex _lock;
7768
std::condition_variable _hard_limit_end_cond;
7869
int64_t _mem_usage = 0;

be/src/runtime/load_channel_mgr.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request,
152152
// If this is a high priority load task, do not handle this.
153153
// because this may block for a while, which may lead to rpc timeout.
154154
SCOPED_TIMER(channel->get_handle_mem_limit_timer());
155-
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_workload_group_memtable_flush(
156-
channel->workload_group(), [channel]() { return channel->is_cancelled(); });
155+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
156+
[channel]() { return channel->is_cancelled(); });
157157
if (channel->is_cancelled()) {
158158
return Status::Cancelled("LoadChannel has been cancelled: {}.", load_id.to_string());
159159
}

be/src/runtime/memory/global_memory_arbitrator.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ std::atomic<double> GlobalMemoryArbitrator::last_periodic_refreshed_cache_capaci
6262
std::atomic<double> GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted {1};
6363
// The value that take affect
6464
std::atomic<double> GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1};
65-
std::atomic<bool> GlobalMemoryArbitrator::any_workload_group_exceed_limit {false};
6665
std::mutex GlobalMemoryArbitrator::memtable_memory_refresh_lock;
6766
std::condition_variable GlobalMemoryArbitrator::memtable_memory_refresh_cv;
6867
std::atomic<bool> GlobalMemoryArbitrator::memtable_memory_refresh_notify {false};

be/src/runtime/memory/global_memory_arbitrator.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ class GlobalMemoryArbitrator {
166166
static std::atomic<double> last_memory_exceeded_cache_capacity_adjust_weighted;
167167
// The value that take affect
168168
static std::atomic<double> last_affected_cache_capacity_adjust_weighted;
169-
static std::atomic<bool> any_workload_group_exceed_limit;
170169

171170
static void notify_cache_adjust_capacity() {
172171
cache_adjust_capacity_notify.store(true, std::memory_order_relaxed);

be/src/runtime/memory/mem_tracker_limiter.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,6 @@ class MemTrackerLimiter final {
220220
static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile, int top_num);
221221
static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
222222

223-
int64_t write_buffer_size() const { return _write_tracker->consumption(); }
224-
225223
std::shared_ptr<MemTrackerLimiter> write_tracker() { return _write_tracker; }
226224

227225
void print_log_usage(const std::string& msg);

0 commit comments

Comments
 (0)