Skip to content

Commit 5a9f0ae

Browse files
committed
test
1 parent 73fe371 commit 5a9f0ae

File tree

4 files changed

+177
-23
lines changed

4 files changed

+177
-23
lines changed

be/src/olap/memtable_memory_limiter.cpp

Lines changed: 84 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ bool MemTableMemoryLimiter::_soft_limit_reached() {
9898
}
9999

100100
bool MemTableMemoryLimiter::_hard_limit_reached() {
101-
return _mem_tracker->consumption() > _load_hard_mem_limit ||
101+
// Include reserved memory in the check to ensure strict limit enforcement
102+
return (_mem_tracker->consumption() + _reserved_mem_usage) > _load_hard_mem_limit ||
102103
_sys_avail_mem_less_than_warning_water_mark() > 0 ||
103104
_process_used_mem_more_than_soft_mem_limit() > 0;
104105
}
@@ -114,30 +115,39 @@ int64_t MemTableMemoryLimiter::_need_flush() {
114115
return 1;
115116
}
116117
});
117-
int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit;
118+
// Include reserved memory in the calculation
119+
int64_t total_consumption = _mem_tracker->consumption() + _reserved_mem_usage;
120+
int64_t limit1 = total_consumption - _load_soft_mem_limit;
118121
int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark();
119122
int64_t limit3 = _process_used_mem_more_than_soft_mem_limit();
120123
int64_t need_flush = std::max({limit1, limit2, limit3});
121124
return need_flush - _queue_mem_usage - _flush_mem_usage;
122125
}
123126

124-
void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> cancel_check) {
127+
void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> cancel_check,
128+
int64_t estimated_mem_size) {
125129
// Check the soft limit.
126130
DCHECK(_load_soft_mem_limit > 0);
127-
do {
128-
DBUG_EXECUTE_IF("MemTableMemoryLimiter.handle_memtable_flush.limit_reached", {
129-
LOG(INFO) << "debug memtable limit reached";
130-
break;
131-
});
132-
if (!_soft_limit_reached() || _load_usage_low()) {
133-
return;
134-
}
135-
} while (false);
131+
132+
// Fast path: if no reservation needed and under soft limit, return immediately
133+
if (estimated_mem_size <= 0) {
134+
do {
135+
DBUG_EXECUTE_IF("MemTableMemoryLimiter.handle_memtable_flush.limit_reached", {
136+
LOG(INFO) << "debug memtable limit reached";
137+
break;
138+
});
139+
if (!_soft_limit_reached() || _load_usage_low()) {
140+
return;
141+
}
142+
} while (false);
143+
}
144+
136145
MonotonicStopWatch timer;
137146
timer.start();
138147
std::unique_lock<std::mutex> l(_lock);
139148
g_memtable_memory_limit_waiting_threads << 1;
140149
bool first = true;
150+
141151
do {
142152
if (!first) {
143153
auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000));
@@ -151,13 +161,38 @@ void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> cancel_c
151161
return;
152162
}
153163
first = false;
164+
165+
// Check if we can reserve the requested memory
166+
bool can_reserve = false;
167+
if (estimated_mem_size > 0) {
168+
// Try to reserve memory. This ensures we don't exceed hard limit even with
169+
// multiple concurrent writes.
170+
int64_t total_after_reserve = _mem_tracker->consumption() + _reserved_mem_usage +
171+
estimated_mem_size;
172+
// Also check process hard limit, not just soft limit to prevent allocation failures
173+
int64_t process_hard_limit_exceeded = GlobalMemoryArbitrator::process_memory_usage() +
174+
estimated_mem_size -
175+
MemInfo::mem_limit();
176+
can_reserve = (total_after_reserve <= _load_hard_mem_limit &&
177+
_sys_avail_mem_less_than_warning_water_mark() <= 0 &&
178+
_process_used_mem_more_than_soft_mem_limit() <= 0 &&
179+
process_hard_limit_exceeded <= 0);
180+
181+
if (can_reserve) {
182+
// Successfully reserved, we can proceed
183+
_reserved_mem_usage += estimated_mem_size;
184+
break;
185+
}
186+
}
187+
154188
int64_t need_flush = _need_flush();
155-
if (need_flush > 0) {
189+
if (need_flush > 0 || (estimated_mem_size > 0 && !can_reserve)) {
156190
auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT;
157191
LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft")
158192
<< ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", "
159193
<< GlobalMemoryArbitrator::sys_mem_available_details_str()
160194
<< ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
195+
<< ", reserved: " << PrettyPrinter::print_bytes(_reserved_mem_usage)
161196
<< ", memtable writers num: " << _writers.size()
162197
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
163198
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
@@ -176,9 +211,11 @@ void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> cancel_c
176211
} else {
177212
// will not reach here
178213
}
179-
_flush_active_memtables(need_flush);
214+
int64_t flush_size = std::max(need_flush, estimated_mem_size);
215+
_flush_active_memtables(flush_size);
180216
}
181-
} while (_hard_limit_reached() && !_load_usage_low());
217+
} while ((_hard_limit_reached() || (estimated_mem_size > 0)) && !_load_usage_low());
218+
182219
g_memtable_memory_limit_waiting_threads << -1;
183220
timer.stop();
184221
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
@@ -189,6 +226,7 @@ void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> cancel_c
189226
<< ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", "
190227
<< GlobalMemoryArbitrator::sys_mem_available_details_str()
191228
<< ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
229+
<< ", reserved: " << PrettyPrinter::print_bytes(_reserved_mem_usage)
192230
<< ", memtable writers num: " << _writers.size()
193231
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
194232
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
@@ -249,6 +287,36 @@ int64_t MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
249287
return mem_flushed;
250288
}
251289

290+
bool MemTableMemoryLimiter::reserve_memory(int64_t size) {
291+
if (size <= 0) {
292+
return true;
293+
}
294+
int64_t total_after_reserve = _mem_tracker->consumption() + _reserved_mem_usage + size;
295+
if (total_after_reserve > _load_hard_mem_limit ||
296+
_sys_avail_mem_less_than_warning_water_mark() > 0 ||
297+
_process_used_mem_more_than_soft_mem_limit() > 0) {
298+
return false;
299+
}
300+
_reserved_mem_usage += size;
301+
return true;
302+
}
303+
304+
void MemTableMemoryLimiter::release_memory(int64_t size) {
305+
if (size <= 0) {
306+
return;
307+
}
308+
_reserved_mem_usage -= size;
309+
if (_reserved_mem_usage < 0) {
310+
LOG(WARNING) << "reserved memory usage is negative: " << _reserved_mem_usage
311+
<< ", reset to 0";
312+
_reserved_mem_usage = 0;
313+
}
314+
// Notify waiting threads that memory has been released
315+
if (!_hard_limit_reached()) {
316+
_hard_limit_end_cond.notify_all();
317+
}
318+
}
319+
252320
void MemTableMemoryLimiter::refresh_mem_tracker() {
253321
std::lock_guard<std::mutex> l(_lock);
254322
_refresh_mem_tracker();
@@ -271,6 +339,7 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
271339
_log_timer.reset();
272340
LOG(INFO) << ss.str()
273341
<< ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
342+
<< ", reserved: " << PrettyPrinter::print_bytes(_reserved_mem_usage)
274343
<< ", memtable writers num: " << _writers.size()
275344
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
276345
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)

be/src/olap/memtable_memory_limiter.h

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,17 @@ class MemTableMemoryLimiter {
4444
// If yes, it will flush memtable to try to reduce memory consumption.
4545
// Every write operation will call this API to check if need flush memtable OR hang
4646
// when memory is not available.
47-
void handle_memtable_flush(std::function<bool()> cancel_check);
47+
// estimated_mem_size: estimated memory size that will be allocated after this call.
48+
// If > 0, will reserve this amount before returning to ensure
49+
// strict memory limit enforcement.
50+
void handle_memtable_flush(std::function<bool()> cancel_check, int64_t estimated_mem_size = 0);
51+
52+
// Reserve memory space. Returns true if successful, false if would exceed limits.
53+
// Must be called under lock.
54+
bool reserve_memory(int64_t size);
55+
56+
// Release reserved memory. Must be called under lock.
57+
void release_memory(int64_t size);
4858

4959
void register_writer(std::weak_ptr<MemTableWriter> writer);
5060

@@ -54,7 +64,42 @@ class MemTableMemoryLimiter {
5464

5565
int64_t mem_usage() const { return _mem_usage; }
5666

67+
// RAII guard for memory reservation
68+
class MemoryReservationGuard {
69+
public:
70+
MemoryReservationGuard(MemTableMemoryLimiter* limiter, int64_t size)
71+
: _limiter(limiter), _size(size), _reserved(false) {}
72+
73+
~MemoryReservationGuard() {
74+
if (_reserved && _limiter != nullptr) {
75+
std::lock_guard<std::mutex> l(_limiter->_lock);
76+
_limiter->release_memory(_size);
77+
}
78+
}
79+
80+
// Mark as reserved. Should be called after successful reservation.
81+
void set_reserved(bool reserved) { _reserved = reserved; }
82+
83+
// Release the reservation explicitly
84+
void release() {
85+
if (_reserved && _limiter != nullptr) {
86+
std::lock_guard<std::mutex> l(_limiter->_lock);
87+
_limiter->release_memory(_size);
88+
_reserved = false;
89+
}
90+
}
91+
92+
MemoryReservationGuard(const MemoryReservationGuard&) = delete;
93+
MemoryReservationGuard& operator=(const MemoryReservationGuard&) = delete;
94+
95+
private:
96+
MemTableMemoryLimiter* _limiter;
97+
int64_t _size;
98+
bool _reserved;
99+
};
100+
57101
private:
102+
friend class MemoryReservationGuard;
58103
static inline int64_t _sys_avail_mem_less_than_warning_water_mark();
59104
static inline int64_t _process_used_mem_more_than_soft_mem_limit();
60105

@@ -70,6 +115,7 @@ class MemTableMemoryLimiter {
70115
int64_t _flush_mem_usage = 0;
71116
int64_t _queue_mem_usage = 0;
72117
int64_t _active_mem_usage = 0;
118+
int64_t _reserved_mem_usage = 0; // Memory reserved but not yet allocated
73119

74120
// sum of all mem table memory.
75121
std::unique_ptr<MemTracker> _mem_tracker;

be/src/runtime/load_channel_mgr.cpp

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,43 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request,
165165
// If this is a high priority load task, do not handle this.
166166
// because this may block for a while, which may lead to rpc timeout.
167167
SCOPED_TIMER(channel->get_handle_mem_limit_timer());
168-
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
169-
[channel]() { return channel->is_cancelled(); });
168+
169+
// Estimate memory size: use protobuf message size * 2 as conservative estimate
170+
// The actual deserialized block typically uses 1.5-3x the compressed protobuf size
171+
int64_t estimated_mem = 0;
172+
if (request.has_block()) {
173+
estimated_mem = request.block().ByteSizeLong() * 2;
174+
}
175+
176+
auto mem_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
177+
mem_limiter->handle_memtable_flush([channel]() { return channel->is_cancelled(); },
178+
estimated_mem);
170179
if (channel->is_cancelled()) {
171180
return Status::Cancelled("LoadChannel has been cancelled: {}.", load_id.to_string());
172181
}
182+
183+
// Use RAII guard to ensure memory reservation is released after add_batch
184+
MemTableMemoryLimiter::MemoryReservationGuard guard(mem_limiter, estimated_mem);
185+
guard.set_reserved(true); // Mark as reserved (already done in handle_memtable_flush)
186+
187+
// 3. add batch to load channel
188+
// batch may not exist in request(eg: eos request without batch),
189+
// this case will be handled in load channel's add batch method.
190+
Status st = channel->add_batch(request, response);
191+
// Guard will automatically release the reservation when going out of scope
192+
if (UNLIKELY(!st.ok())) {
193+
RETURN_IF_ERROR(channel->cancel());
194+
return st;
195+
}
196+
197+
// 4. handle finish
198+
if (channel->is_finished()) {
199+
_finish_load_channel(load_id);
200+
}
201+
return Status::OK();
173202
}
174203

175-
// 3. add batch to load channel
204+
// 3. add batch to load channel (high priority path without memory limit check)
176205
// batch may not exist in request(eg: eos request without batch),
177206
// this case will be handled in load channel's add batch method.
178207
Status st = channel->add_batch(request, response);

be/src/vec/sink/writer/vtablet_writer_v2.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -579,14 +579,24 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
579579
}
580580
{
581581
SCOPED_TIMER(_wait_mem_limit_timer);
582-
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
583-
[state = _state]() { return state->is_cancelled(); });
582+
// Estimate memory size: use block's allocated bytes as a conservative estimate
583+
// This accounts for the memory that will be allocated during the write operation
584+
int64_t estimated_mem = block->allocated_bytes();
585+
auto mem_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
586+
mem_limiter->handle_memtable_flush([state = _state]() { return state->is_cancelled(); },
587+
estimated_mem);
584588
if (_state->is_cancelled()) {
585589
return _state->cancel_reason();
586590
}
591+
592+
// Use RAII guard to ensure memory reservation is released after write
593+
MemTableMemoryLimiter::MemoryReservationGuard guard(mem_limiter, estimated_mem);
594+
guard.set_reserved(true); // Mark as reserved (already done in handle_memtable_flush)
595+
596+
SCOPED_TIMER(_write_memtable_timer);
597+
st = delta_writer->write(block.get(), rows.row_idxes);
598+
// Guard will automatically release the reservation when going out of scope
587599
}
588-
SCOPED_TIMER(_write_memtable_timer);
589-
st = delta_writer->write(block.get(), rows.row_idxes);
590600
return st;
591601
}
592602

0 commit comments

Comments
 (0)