Skip to content

Commit 1a32b1b

Browse files
branch-4.0: [enhance](memory) back pressure writing when memory usage is high in sink operation #58530 (#58704)
Cherry-picked from #58530 Co-authored-by: hui lai <[email protected]>
1 parent 71d21d3 commit 1a32b1b

File tree

2 files changed

+8
-1
lines changed

2 files changed

+8
-1
lines changed

be/src/vec/sink/writer/vtablet_writer.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,7 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, i
509509
_node_channel_tracker = std::make_shared<MemTracker>(
510510
fmt::format("NodeChannel:indexID={}:threadId={}",
511511
std::to_string(_index_channel->_index_id), ThreadContext::get_thread_id()));
512+
_load_mem_limit = MemInfo::mem_limit() * config::load_process_max_memory_limit_percent / 100;
512513
}
513514

514515
VNodeChannel::~VNodeChannel() = default;
@@ -747,8 +748,13 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload)
747748
// But there is still some unfinished things, we do mem limit here temporarily.
748749
// _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
749750
// It's fine to do a fake add_block() and return OK, because we will check _cancelled in next add_block() or mark_close().
751+
bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
752+
auto current_load_mem_value = MemoryProfile::load_current_usage();
753+
bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
754+
current_load_mem_value > _load_mem_limit ||
755+
_pending_batches_bytes > _max_pending_batches_bytes;
750756
while (!_cancelled && !_state->is_cancelled() && _pending_batches_num > 0 &&
751-
_pending_batches_bytes > _max_pending_batches_bytes) {
757+
mem_limit_exceeded) {
752758
SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns);
753759
std::this_thread::sleep_for(std::chrono::milliseconds(10));
754760
}

be/src/vec/sink/writer/vtablet_writer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ class VNodeChannel {
357357
std::string _name;
358358

359359
std::shared_ptr<MemTracker> _node_channel_tracker;
360+
int64_t _load_mem_limit = -1;
360361

361362
TupleDescriptor* _tuple_desc = nullptr;
362363
NodeInfo _node_info;

0 commit comments

Comments
 (0)