Skip to content

Commit bf7164a

Browse files
committed
[refactor] Remove free blocks and block reuse
1 parent e3e0590 commit bf7164a

23 files changed

+81
-276
lines changed

be/src/pipeline/exec/data_queue.cpp

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ namespace pipeline {
3232
DataQueue::DataQueue(int child_count)
3333
: _queue_blocks_lock(child_count),
3434
_queue_blocks(child_count),
35-
_free_blocks_lock(child_count),
36-
_free_blocks(child_count),
3735
_child_count(child_count),
3836
_is_finished(child_count),
3937
_is_canceled(child_count),
@@ -42,7 +40,6 @@ DataQueue::DataQueue(int child_count)
4240
_flag_queue_idx(0) {
4341
for (int i = 0; i < child_count; ++i) {
4442
_queue_blocks_lock[i].reset(new std::mutex());
45-
_free_blocks_lock[i].reset(new std::mutex());
4643
_is_finished[i] = false;
4744
_is_canceled[i] = false;
4845
_cur_bytes_in_queue[i] = 0;
@@ -52,36 +49,6 @@ DataQueue::DataQueue(int child_count)
5249
_sink_dependencies.resize(child_count, nullptr);
5350
}
5451

55-
std::unique_ptr<vectorized::Block> DataQueue::get_free_block(int child_idx) {
56-
{
57-
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
58-
if (!_free_blocks[child_idx].empty()) {
59-
auto block = std::move(_free_blocks[child_idx].front());
60-
_free_blocks[child_idx].pop_front();
61-
return block;
62-
}
63-
}
64-
65-
return vectorized::Block::create_unique();
66-
}
67-
68-
void DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int child_idx) {
69-
DCHECK(block->rows() == 0);
70-
71-
if (!_is_low_memory_mode) {
72-
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
73-
_free_blocks[child_idx].emplace_back(std::move(block));
74-
}
75-
}
76-
77-
void DataQueue::clear_free_blocks() {
78-
for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) {
79-
std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
80-
std::deque<std::unique_ptr<vectorized::Block>> tmp_queue;
81-
_free_blocks[child_idx].swap(tmp_queue);
82-
}
83-
}
84-
8552
void DataQueue::terminate() {
8653
for (int i = 0; i < _queue_blocks.size(); i++) {
8754
set_finish(i);
@@ -93,7 +60,6 @@ void DataQueue::terminate() {
9360
_sink_dependencies[i]->set_always_ready();
9461
}
9562
}
96-
clear_free_blocks();
9763
}
9864

9965
//check which queue have data, and save the idx in _flag_queue_idx,

be/src/pipeline/exec/data_queue.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,8 @@ class DataQueue {
4242

4343
Status push_block(std::unique_ptr<vectorized::Block> block, int child_idx = 0);
4444

45-
std::unique_ptr<vectorized::Block> get_free_block(int child_idx = 0);
46-
4745
void push_free_block(std::unique_ptr<vectorized::Block> output_block, int child_idx = 0);
4846

49-
void clear_free_blocks();
50-
5147
void set_finish(int child_idx = 0);
5248
void set_canceled(int child_idx = 0); // should set before finish
5349
bool is_finish(int child_idx = 0);
@@ -76,7 +72,6 @@ class DataQueue {
7672
void set_low_memory_mode() {
7773
_is_low_memory_mode = true;
7874
_max_blocks_in_sub_queue = 1;
79-
clear_free_blocks();
8075
}
8176

8277
void terminate();
@@ -85,9 +80,6 @@ class DataQueue {
8580
std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
8681
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;
8782

88-
std::vector<std::unique_ptr<std::mutex>> _free_blocks_lock;
89-
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _free_blocks;
90-
9183
//how many deque will be init, always will be one
9284
int _child_count = 0;
9385
std::vector<std::atomic_bool> _is_finished;

be/src/pipeline/exec/operator.cpp

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include "operator.h"
1919

20+
#include <glog/logging.h>
21+
2022
#include "common/status.h"
2123
#include "pipeline/dependency.h"
2224
#include "pipeline/exec/aggregation_sink_operator.h"
@@ -84,6 +86,8 @@
8486
#include "util/debug_util.h"
8587
#include "util/runtime_profile.h"
8688
#include "util/string_util.h"
89+
#include "vec/columns/column_nullable.h"
90+
#include "vec/core/block.h"
8791
#include "vec/exprs/vexpr.h"
8892
#include "vec/exprs/vexpr_context.h"
8993
#include "vec/utils/util.hpp"
@@ -322,46 +326,31 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
322326
}
323327

324328
DCHECK_EQ(rows, input_block.rows());
325-
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
326-
if (to->is_nullable() && !from->is_nullable()) {
327-
if (_keep_origin || !from->is_exclusive()) {
328-
auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
329-
null_column.get_nested_column().insert_range_from(*from, 0, rows);
330-
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
331-
bytes_usage += null_column.allocated_bytes();
332-
} else {
333-
to = make_nullable(from, false)->assume_mutable();
334-
}
335-
} else {
336-
if (_keep_origin || !from->is_exclusive()) {
337-
to->insert_range_from(*from, 0, rows);
338-
bytes_usage += from->allocated_bytes();
339-
} else {
340-
to = from->assume_mutable();
341-
}
342-
}
343-
};
344-
345-
using namespace vectorized;
346-
vectorized::MutableBlock mutable_block =
347-
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
348-
*_output_row_descriptor);
349329
if (rows != 0) {
330+
using namespace vectorized;
331+
MutableBlock mutable_block =
332+
VectorizedUtils::build_mutable_block(output_block, *_output_row_descriptor);
350333
auto& mutable_columns = mutable_block.mutable_columns();
351-
const size_t origin_columns_count = input_block.columns();
352334
DCHECK_EQ(mutable_columns.size(), local_state->_projections.size()) << debug_string();
353335
for (int i = 0; i < mutable_columns.size(); ++i) {
354-
auto result_column_id = -1;
355336
ColumnPtr column_ptr;
356337
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, column_ptr));
357338
column_ptr = column_ptr->convert_to_full_column_if_const();
358-
if (result_column_id >= origin_columns_count) {
359-
bytes_usage += column_ptr->allocated_bytes();
339+
340+
if (mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()) {
341+
const auto size = column_ptr->size();
342+
mutable_columns[i] = ColumnNullable::create(column_ptr->assume_mutable(),
343+
ColumnUInt8::create(size, 0))
344+
->assume_mutable();
345+
} else {
346+
mutable_columns[i] = column_ptr->assume_mutable();
360347
}
361-
insert_column_datas(mutable_columns[i], column_ptr, rows);
362348
}
363-
DCHECK(mutable_block.rows() == rows);
349+
DCHECK_EQ(mutable_block.rows(), rows);
350+
auto empty_columns = origin_block->clone_empty_columns();
351+
origin_block->set_columns(std::move(empty_columns));
364352
output_block->set_columns(std::move(mutable_columns));
353+
DCHECK_EQ(output_block->rows(), rows);
365354
}
366355

367356
local_state->_estimate_memory_usage += bytes_usage;

be/src/pipeline/exec/repeat_operator.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,7 @@ Status RepeatLocalState::get_repeated_block(vectorized::Block* input_block, int
114114
size_t input_column_size = input_block->columns();
115115
size_t output_column_size = p._output_slots.size();
116116
DCHECK_LT(input_column_size, output_column_size);
117-
auto m_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
118-
p._output_slots);
117+
auto m_block = vectorized::VectorizedUtils::build_mutable_block(output_block, p._output_slots);
119118
auto& output_columns = m_block.mutable_columns();
120119
/* Fill all slots according to child, for example:select tc1,tc2,sum(tc3) from t1 group by grouping sets((tc1),(tc2));
121120
* insert into t1 values(1,2,1),(1,3,1),(2,1,1),(3,1,1);
@@ -238,8 +237,8 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp
238237
_repeat_id_idx = 0;
239238
}
240239
} else if (local_state._expr_ctxs.empty()) {
241-
auto m_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
242-
output_block, _output_slots);
240+
auto m_block =
241+
vectorized::VectorizedUtils::build_mutable_block(output_block, _output_slots);
243242
auto rows = _child_block.rows();
244243
auto& columns = m_block.mutable_columns();
245244

be/src/pipeline/exec/scan_operator.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -361,13 +361,7 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
361361
return {ExchangeType::BUCKET_HASH_SHUFFLE};
362362
}
363363

364-
void set_low_memory_mode(RuntimeState* state) override {
365-
auto& local_state = get_local_state(state);
366-
367-
if (local_state._scanner_ctx) {
368-
local_state._scanner_ctx->clear_free_blocks();
369-
}
370-
}
364+
void set_low_memory_mode(RuntimeState* state) override {}
371365

372366
using OperatorX<LocalStateType>::node_id;
373367
using OperatorX<LocalStateType>::operator_id;

be/src/pipeline/exec/table_function_operator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ bool TableFunctionLocalState::_is_inner_and_empty() {
155155
Status TableFunctionLocalState::get_expanded_block(RuntimeState* state,
156156
vectorized::Block* output_block, bool* eos) {
157157
auto& p = _parent->cast<TableFunctionOperatorX>();
158-
vectorized::MutableBlock m_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
159-
output_block, p._output_slots);
158+
vectorized::MutableBlock m_block =
159+
vectorized::VectorizedUtils::build_mutable_block(output_block, p._output_slots);
160160
vectorized::MutableColumns& columns = m_block.mutable_columns();
161161

162162
for (int i = 0; i < p._fn_num; i++) {

be/src/pipeline/exec/union_sink_operator.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block
102102
SCOPED_TIMER(local_state.exec_time_counter());
103103
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
104104
if (local_state._output_block == nullptr) {
105-
local_state._output_block =
106-
local_state._shared_state->data_queue.get_free_block(_cur_child_id);
105+
local_state._output_block = vectorized::Block::create_unique();
107106
}
108107
if (_cur_child_id < _get_first_materialized_child_idx()) { //pass_through
109108
if (in_block->rows() > 0) {

be/src/pipeline/exec/union_sink_operator.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,8 @@ class UnionSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<UnionSink
149149
DCHECK_LT(child_id, _child_size);
150150
DCHECK(!is_child_passthrough(child_id));
151151
if (input_block->rows() > 0) {
152-
vectorized::MutableBlock mblock =
153-
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
154-
row_descriptor());
152+
vectorized::MutableBlock mblock = vectorized::VectorizedUtils::build_mutable_block(
153+
output_block, row_descriptor());
155154
vectorized::Block res;
156155
RETURN_IF_ERROR(materialize_block(state, input_block, child_id, &res));
157156
RETURN_IF_ERROR(mblock.merge(res));

be/src/pipeline/exec/union_source_operator.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,6 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b
137137
return Status::OK();
138138
}
139139
block->swap(*output_block);
140-
output_block->clear_column_data(row_descriptor().num_materialized_slots());
141-
local_state._shared_state->data_queue.push_free_block(std::move(output_block), child_idx);
142140
}
143141
local_state.reached_limit(block, eos);
144142
return Status::OK();
@@ -153,7 +151,7 @@ Status UnionSourceOperatorX::get_next_const(RuntimeState* state, vectorized::Blo
153151

154152
auto& _const_expr_list_idx = local_state._const_expr_list_idx;
155153
vectorized::MutableBlock mblock =
156-
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block, row_descriptor());
154+
vectorized::VectorizedUtils::build_mutable_block(block, row_descriptor());
157155

158156
vectorized::ColumnsWithTypeAndName tmp_block_columns;
159157
for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() < state->batch_size();

be/src/pipeline/local_exchange/local_exchanger.cpp

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block
165165
if (_dequeue_data(source_info.local_state, partitioned_block, eos, block,
166166
source_info.channel_id)) {
167167
SCOPED_TIMER(profile.copy_data_timer);
168-
mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
168+
mutable_block = vectorized::VectorizedUtils::build_mutable_block(
169169
block, partitioned_block.first->_data_block);
170170
RETURN_IF_ERROR(get_data());
171171
}
@@ -198,9 +198,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
198198

199199
vectorized::Block data_block;
200200
std::shared_ptr<BlockWrapper> new_block_wrapper;
201-
if (!_free_blocks.try_dequeue(data_block)) {
202-
data_block = block->clone_empty();
203-
}
204201
data_block.swap(*block);
205202
new_block_wrapper =
206203
BlockWrapper::create_shared(std::move(data_block), local_state->_shared_state, -1);
@@ -264,9 +261,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
264261

265262
vectorized::Block data_block;
266263
std::shared_ptr<BlockWrapper> new_block_wrapper;
267-
if (!_free_blocks.try_dequeue(data_block)) {
268-
data_block = block->clone_empty();
269-
}
270264
data_block.swap(*block);
271265
new_block_wrapper = BlockWrapper::create_shared(std::move(data_block), nullptr, -1);
272266
if (new_block_wrapper->_data_block.empty()) {
@@ -289,9 +283,6 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo
289283
return Status::OK();
290284
}
291285
vectorized::Block new_block;
292-
if (!_free_blocks.try_dequeue(new_block)) {
293-
new_block = {in_block->clone_empty()};
294-
}
295286
new_block.swap(*in_block);
296287
auto channel_id = ((*sink_info.channel_id)++) % _num_partitions;
297288
BlockWrapperSPtr wrapper = BlockWrapper::create_shared(
@@ -341,9 +332,6 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block
341332
return Status::OK();
342333
}
343334
vectorized::Block new_block;
344-
if (!_free_blocks.try_dequeue(new_block)) {
345-
new_block = {in_block->clone_empty()};
346-
}
347335
new_block.swap(*in_block);
348336

349337
BlockWrapperSPtr wrapper = BlockWrapper::create_shared(
@@ -370,10 +358,6 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo
370358

371359
void ExchangerBase::finalize() {
372360
DCHECK(_running_source_operators == 0);
373-
vectorized::Block block;
374-
while (_free_blocks.try_dequeue(block)) {
375-
// do nothing
376-
}
377361
}
378362

379363
Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -382,9 +366,6 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block
382366
return Status::OK();
383367
}
384368
vectorized::Block new_block;
385-
if (!_free_blocks.try_dequeue(new_block)) {
386-
new_block = {in_block->clone_empty()};
387-
}
388369
new_block.swap(*in_block);
389370
auto wrapper = BlockWrapper::create_shared(
390371
std::move(new_block),
@@ -415,9 +396,8 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo
415396
if (_dequeue_data(source_info.local_state, partitioned_block, eos, block,
416397
source_info.channel_id)) {
417398
SCOPED_TIMER(profile.copy_data_timer);
418-
vectorized::MutableBlock mutable_block =
419-
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
420-
block, partitioned_block.first->_data_block);
399+
vectorized::MutableBlock mutable_block = vectorized::VectorizedUtils::build_mutable_block(
400+
block, partitioned_block.first->_data_block);
421401
auto block_wrapper = partitioned_block.first;
422402
RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->_data_block,
423403
partitioned_block.second.offset_start,
@@ -431,9 +411,6 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
431411
vectorized::Block* in_block,
432412
SinkInfo&& sink_info) {
433413
vectorized::Block new_block;
434-
if (!_free_blocks.try_dequeue(new_block)) {
435-
new_block = {in_block->clone_empty()};
436-
}
437414
new_block.swap(*in_block);
438415
auto channel_id = ((*sink_info.channel_id)++) % _num_partitions;
439416
_enqueue_data_and_set_ready(

0 commit comments

Comments
 (0)