@@ -385,6 +385,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
385385 return Status::OK ();
386386 }
387387
388+ auto get_serializer_mem_bytes = [&local_state]() -> int64_t {
389+ int64_t mem_usage = local_state._serializer .mem_usage ();
390+ for (auto & channel : local_state.channels ) {
391+ mem_usage += channel->mem_usage ();
392+ }
393+ return mem_usage;
394+ };
395+
396+ int64_t before_serializer_mem_bytes = get_serializer_mem_bytes ();
397+
388398 auto send_to_current_channel = [&]() -> Status {
389399 // 1. select channel
390400 auto & current_channel = local_state.channels [local_state.current_channel_idx ];
@@ -427,11 +437,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
427437 auto block_holder = vectorized::BroadcastPBlockHolder::create_shared ();
428438 {
429439 bool serialized = false ;
430- int64_t old_block_mem_bytes = local_state._serializer .mem_usage ();
431- Defer update_mem ([&]() {
432- COUNTER_UPDATE (local_state.memory_used_counter (),
433- local_state._serializer .mem_usage () - old_block_mem_bytes);
434- });
435440 RETURN_IF_ERROR (local_state._serializer .next_serialized_block (
436441 block, block_holder->get_block (), local_state._rpc_channels_num ,
437442 &serialized, eos));
@@ -505,13 +510,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
505510 // 2. dispatch rows to channel
506511 }
507512
513+ int64_t after_serializer_mem_bytes = get_serializer_mem_bytes ();
514+
515+ int64_t delta_mem_bytes = after_serializer_mem_bytes - before_serializer_mem_bytes;
516+ COUNTER_UPDATE (local_state.memory_used_counter (), delta_mem_bytes);
517+
508518 Status final_st = Status::OK ();
509519 if (eos) {
510- int64_t block_mem_bytes = local_state._serializer .mem_usage ();
511- COUNTER_UPDATE (local_state.memory_used_counter (), -block_mem_bytes);
520+ COUNTER_UPDATE (local_state.memory_used_counter (), -after_serializer_mem_bytes);
512521 local_state._serializer .reset_block ();
513522 for (auto & channel : local_state.channels ) {
514- COUNTER_UPDATE (local_state.memory_used_counter (), -channel->mem_usage ());
515523 Status st = channel->close (state);
516524 /* *
517525 * Consider this case below:
0 commit comments