Skip to content

Commit 1af4b45

Browse files
committed
[fix](runtime) Avoid merging results into one large result in BufferControlBlock (#49571)
1 parent e8553f8 commit 1af4b45

File tree

4 files changed

+40
-3
lines changed

4 files changed

+40
-3
lines changed

be/src/runtime/buffer_control_block.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <glog/logging.h>
2424
#include <google/protobuf/stubs/callback.h>
2525
// IWYU pragma: no_include <bits/chrono.h>
26+
#include <algorithm>
2627
#include <chrono> // IWYU pragma: keep
2728
#include <ostream>
2829
#include <string>
@@ -130,16 +131,21 @@ Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
130131
}
131132

132133
if (_waiting_rpc.empty()) {
134+
size_t bytes = 0;
135+
std::for_each(result->result_batch.rows.cbegin(), result->result_batch.rows.cend(),
136+
[&bytes](const std::string& row) { bytes += row.size(); });
133137
// Merge result into batch to reduce rpc times
134138
if (!_batch_queue.empty() &&
135139
((_batch_queue.back()->result_batch.rows.size() + num_rows) < _buffer_limit) &&
136-
!result->eos) {
140+
!result->eos && (bytes + _last_batch_bytes) <= config::thrift_max_message_size) {
137141
std::vector<std::string>& back_rows = _batch_queue.back()->result_batch.rows;
138142
std::vector<std::string>& result_rows = result->result_batch.rows;
139143
back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
140144
std::make_move_iterator(result_rows.end()));
145+
_last_batch_bytes += bytes;
141146
} else {
142147
_batch_queue.push_back(std::move(result));
148+
_last_batch_bytes = bytes;
143149
}
144150
_buffer_rows += num_rows;
145151
_data_arrival.notify_one();

be/src/runtime/buffer_control_block.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ class BufferControlBlock {
103103
std::atomic_int _buffer_rows;
104104
const int _buffer_limit;
105105
int64_t _packet_num;
106+
size_t _last_batch_bytes = 0;
106107

107108
// blocking queue for batch
108109
ResultQueue _batch_queue;

be/src/vec/sink/vmysql_result_writer.cpp

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -601,9 +601,8 @@ int VMysqlResultWriter<is_binary_format>::_add_one_cell(const ColumnPtr& column_
601601
template <bool is_binary_format>
602602
Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
603603
SCOPED_TIMER(_append_row_batch_timer);
604-
Status status = Status::OK();
605604
if (UNLIKELY(input_block.rows() == 0)) {
606-
return status;
605+
return Status::OK();
607606
}
608607

609608
DCHECK(_output_vexpr_ctxs.empty() != true);
@@ -613,7 +612,36 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
613612
Block block;
614613
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
615614
input_block, &block));
615+
616+
const auto total_bytes = block.bytes();
617+
if (total_bytes > config::thrift_max_message_size) [[unlikely]] {
618+
const auto total_rows = block.rows();
619+
const auto sub_block_count = (total_bytes + config::thrift_max_message_size - 1) /
620+
config::thrift_max_message_size;
621+
const auto sub_block_rows = (total_rows + sub_block_count - 1) / sub_block_count;
622+
623+
size_t offset = 0;
624+
while (offset < total_rows) {
625+
size_t rows = std::min(static_cast<size_t>(sub_block_rows), total_rows - offset);
626+
auto sub_block = block.clone_empty();
627+
for (size_t i = 0; i != block.columns(); ++i) {
628+
sub_block.get_by_position(i).column =
629+
block.get_by_position(i).column->cut(offset, rows);
630+
}
631+
offset += rows;
632+
633+
RETURN_IF_ERROR(_append_block(sub_block));
634+
}
635+
return Status::OK();
636+
}
637+
638+
return _append_block(block);
639+
}
640+
641+
template <bool is_binary_format>
642+
Status VMysqlResultWriter<is_binary_format>::_append_block(Block& block) {
616643
// convert one batch
644+
Status status = Status::OK();
617645
auto result = std::make_unique<TFetchDataResult>();
618646
auto num_rows = block.rows();
619647
result->result_batch.rows.resize(num_rows);

be/src/vec/sink/vmysql_result_writer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ class VMysqlResultWriter final : public ResultWriter {
6666
int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const DataTypePtr& type,
6767
MysqlRowBuffer<is_binary_format>& buffer, int scale = -1);
6868

69+
Status _append_block(Block& block);
70+
6971
BufferControlBlock* _sinker;
7072

7173
const VExprContextSPtrs& _output_vexpr_ctxs;

0 commit comments

Comments
 (0)