Skip to content

Commit 4930599

Browse files
committed
[refactor] Remove free blocks and block reuse
1 parent e3e0590 commit 4930599

File tree

15 files changed

+51
-251
lines changed

15 files changed

+51
-251
lines changed

be/src/pipeline/exec/data_queue.cpp

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ namespace pipeline {
3232
DataQueue::DataQueue(int child_count)
3333
: _queue_blocks_lock(child_count),
3434
_queue_blocks(child_count),
35-
_free_blocks_lock(child_count),
36-
_free_blocks(child_count),
3735
_child_count(child_count),
3836
_is_finished(child_count),
3937
_is_canceled(child_count),
@@ -42,7 +40,6 @@ DataQueue::DataQueue(int child_count)
4240
_flag_queue_idx(0) {
4341
for (int i = 0; i < child_count; ++i) {
4442
_queue_blocks_lock[i].reset(new std::mutex());
45-
_free_blocks_lock[i].reset(new std::mutex());
4643
_is_finished[i] = false;
4744
_is_canceled[i] = false;
4845
_cur_bytes_in_queue[i] = 0;
@@ -52,36 +49,6 @@ DataQueue::DataQueue(int child_count)
5249
_sink_dependencies.resize(child_count, nullptr);
5350
}
5451

55-
std::unique_ptr<vectorized::Block> DataQueue::get_free_block(int child_idx) {
56-
{
57-
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
58-
if (!_free_blocks[child_idx].empty()) {
59-
auto block = std::move(_free_blocks[child_idx].front());
60-
_free_blocks[child_idx].pop_front();
61-
return block;
62-
}
63-
}
64-
65-
return vectorized::Block::create_unique();
66-
}
67-
68-
void DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int child_idx) {
69-
DCHECK(block->rows() == 0);
70-
71-
if (!_is_low_memory_mode) {
72-
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
73-
_free_blocks[child_idx].emplace_back(std::move(block));
74-
}
75-
}
76-
77-
void DataQueue::clear_free_blocks() {
78-
for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) {
79-
std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
80-
std::deque<std::unique_ptr<vectorized::Block>> tmp_queue;
81-
_free_blocks[child_idx].swap(tmp_queue);
82-
}
83-
}
84-
8552
void DataQueue::terminate() {
8653
for (int i = 0; i < _queue_blocks.size(); i++) {
8754
set_finish(i);
@@ -93,7 +60,6 @@ void DataQueue::terminate() {
9360
_sink_dependencies[i]->set_always_ready();
9461
}
9562
}
96-
clear_free_blocks();
9763
}
9864

9965
//check which queue have data, and save the idx in _flag_queue_idx,

be/src/pipeline/exec/data_queue.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,8 @@ class DataQueue {
4242

4343
Status push_block(std::unique_ptr<vectorized::Block> block, int child_idx = 0);
4444

45-
std::unique_ptr<vectorized::Block> get_free_block(int child_idx = 0);
46-
4745
void push_free_block(std::unique_ptr<vectorized::Block> output_block, int child_idx = 0);
4846

49-
void clear_free_blocks();
50-
5147
void set_finish(int child_idx = 0);
5248
void set_canceled(int child_idx = 0); // should set before finish
5349
bool is_finish(int child_idx = 0);
@@ -76,7 +72,6 @@ class DataQueue {
7672
void set_low_memory_mode() {
7773
_is_low_memory_mode = true;
7874
_max_blocks_in_sub_queue = 1;
79-
clear_free_blocks();
8075
}
8176

8277
void terminate();
@@ -85,9 +80,6 @@ class DataQueue {
8580
std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
8681
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;
8782

88-
std::vector<std::unique_ptr<std::mutex>> _free_blocks_lock;
89-
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _free_blocks;
90-
9183
//how many deque will be init, always will be one
9284
int _child_count = 0;
9385
std::vector<std::atomic_bool> _is_finished;

be/src/pipeline/exec/operator.cpp

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include "operator.h"
1919

20+
#include <glog/logging.h>
21+
2022
#include "common/status.h"
2123
#include "pipeline/dependency.h"
2224
#include "pipeline/exec/aggregation_sink_operator.h"
@@ -84,6 +86,8 @@
8486
#include "util/debug_util.h"
8587
#include "util/runtime_profile.h"
8688
#include "util/string_util.h"
89+
#include "vec/columns/column_nullable.h"
90+
#include "vec/core/block.h"
8791
#include "vec/exprs/vexpr.h"
8892
#include "vec/exprs/vexpr_context.h"
8993
#include "vec/utils/util.hpp"
@@ -322,46 +326,31 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
322326
}
323327

324328
DCHECK_EQ(rows, input_block.rows());
325-
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
326-
if (to->is_nullable() && !from->is_nullable()) {
327-
if (_keep_origin || !from->is_exclusive()) {
328-
auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
329-
null_column.get_nested_column().insert_range_from(*from, 0, rows);
330-
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
331-
bytes_usage += null_column.allocated_bytes();
332-
} else {
333-
to = make_nullable(from, false)->assume_mutable();
334-
}
335-
} else {
336-
if (_keep_origin || !from->is_exclusive()) {
337-
to->insert_range_from(*from, 0, rows);
338-
bytes_usage += from->allocated_bytes();
339-
} else {
340-
to = from->assume_mutable();
341-
}
342-
}
343-
};
344-
345-
using namespace vectorized;
346-
vectorized::MutableBlock mutable_block =
347-
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
348-
*_output_row_descriptor);
349329
if (rows != 0) {
330+
using namespace vectorized;
331+
MutableBlock mutable_block = VectorizedUtils::build_mutable_mem_reuse_block(
332+
output_block, *_output_row_descriptor);
350333
auto& mutable_columns = mutable_block.mutable_columns();
351-
const size_t origin_columns_count = input_block.columns();
352334
DCHECK_EQ(mutable_columns.size(), local_state->_projections.size()) << debug_string();
353335
for (int i = 0; i < mutable_columns.size(); ++i) {
354-
auto result_column_id = -1;
355336
ColumnPtr column_ptr;
356337
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, column_ptr));
357338
column_ptr = column_ptr->convert_to_full_column_if_const();
358-
if (result_column_id >= origin_columns_count) {
359-
bytes_usage += column_ptr->allocated_bytes();
339+
340+
if (mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()) {
341+
const auto size = column_ptr->size();
342+
mutable_columns[i] = ColumnNullable::create(column_ptr->assume_mutable(),
343+
ColumnUInt8::create(size, 0))
344+
->assume_mutable();
345+
} else {
346+
mutable_columns[i] = column_ptr->assume_mutable();
360347
}
361-
insert_column_datas(mutable_columns[i], column_ptr, rows);
362348
}
363-
DCHECK(mutable_block.rows() == rows);
349+
DCHECK_EQ(mutable_block.rows(), rows);
350+
auto empty_columns = origin_block->clone_empty_columns();
351+
origin_block->set_columns(std::move(empty_columns));
364352
output_block->set_columns(std::move(mutable_columns));
353+
DCHECK_EQ(output_block->rows(), rows);
365354
}
366355

367356
local_state->_estimate_memory_usage += bytes_usage;

be/src/pipeline/exec/scan_operator.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,8 +1151,6 @@ Status ScanLocalState<Derived>::_init_profile() {
11511151
_scanner_profile.reset(new RuntimeProfile("Scanner"));
11521152
custom_profile()->add_child(_scanner_profile.get(), true, nullptr);
11531153

1154-
_newly_create_free_blocks_num =
1155-
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT);
11561154
_scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime");
11571155
_scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScannerCpuTime");
11581156
_filter_timer = ADD_TIMER(_scanner_profile, "ScannerFilterTime");

be/src/pipeline/exec/scan_operator.h

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,6 @@ class ScanLocalStateBase : public PipelineXLocalState<> {
100100

101101
std::shared_ptr<RuntimeProfile> _scanner_profile;
102102
RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
103-
// Num of newly created free blocks when running query
104-
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
105103
// Max num of scanner thread
106104
RuntimeProfile::Counter* _max_scan_concurrency = nullptr;
107105
RuntimeProfile::Counter* _min_scan_concurrency = nullptr;
@@ -361,13 +359,7 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
361359
return {ExchangeType::BUCKET_HASH_SHUFFLE};
362360
}
363361

364-
void set_low_memory_mode(RuntimeState* state) override {
365-
auto& local_state = get_local_state(state);
366-
367-
if (local_state._scanner_ctx) {
368-
local_state._scanner_ctx->clear_free_blocks();
369-
}
370-
}
362+
void set_low_memory_mode(RuntimeState* state) override {}
371363

372364
using OperatorX<LocalStateType>::node_id;
373365
using OperatorX<LocalStateType>::operator_id;

be/src/pipeline/exec/union_sink_operator.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block
102102
SCOPED_TIMER(local_state.exec_time_counter());
103103
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
104104
if (local_state._output_block == nullptr) {
105-
local_state._output_block =
106-
local_state._shared_state->data_queue.get_free_block(_cur_child_id);
105+
local_state._output_block = vectorized::Block::create_unique();
107106
}
108107
if (_cur_child_id < _get_first_materialized_child_idx()) { //pass_through
109108
if (in_block->rows() > 0) {

be/src/pipeline/exec/union_source_operator.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,6 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b
137137
return Status::OK();
138138
}
139139
block->swap(*output_block);
140-
output_block->clear_column_data(row_descriptor().num_materialized_slots());
141-
local_state._shared_state->data_queue.push_free_block(std::move(output_block), child_idx);
142140
}
143141
local_state.reached_limit(block, eos);
144142
return Status::OK();

be/src/pipeline/local_exchange/local_exchanger.cpp

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
196196
}
197197
}
198198

199-
vectorized::Block data_block;
199+
vectorized::Block data_block = block->clone_empty();
200200
std::shared_ptr<BlockWrapper> new_block_wrapper;
201-
if (!_free_blocks.try_dequeue(data_block)) {
202-
data_block = block->clone_empty();
203-
}
204201
data_block.swap(*block);
205202
new_block_wrapper =
206203
BlockWrapper::create_shared(std::move(data_block), local_state->_shared_state, -1);
@@ -262,11 +259,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
262259
}
263260
}
264261

265-
vectorized::Block data_block;
262+
vectorized::Block data_block = block->clone_empty();
266263
std::shared_ptr<BlockWrapper> new_block_wrapper;
267-
if (!_free_blocks.try_dequeue(data_block)) {
268-
data_block = block->clone_empty();
269-
}
270264
data_block.swap(*block);
271265
new_block_wrapper = BlockWrapper::create_shared(std::move(data_block), nullptr, -1);
272266
if (new_block_wrapper->_data_block.empty()) {
@@ -288,10 +282,7 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo
288282
if (in_block->empty()) {
289283
return Status::OK();
290284
}
291-
vectorized::Block new_block;
292-
if (!_free_blocks.try_dequeue(new_block)) {
293-
new_block = {in_block->clone_empty()};
294-
}
285+
vectorized::Block new_block = in_block->clone_empty();
295286
new_block.swap(*in_block);
296287
auto channel_id = ((*sink_info.channel_id)++) % _num_partitions;
297288
BlockWrapperSPtr wrapper = BlockWrapper::create_shared(
@@ -340,10 +331,7 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block
340331
if (in_block->empty()) {
341332
return Status::OK();
342333
}
343-
vectorized::Block new_block;
344-
if (!_free_blocks.try_dequeue(new_block)) {
345-
new_block = {in_block->clone_empty()};
346-
}
334+
vectorized::Block new_block = in_block->clone_empty();
347335
new_block.swap(*in_block);
348336

349337
BlockWrapperSPtr wrapper = BlockWrapper::create_shared(
@@ -370,21 +358,14 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo
370358

371359
void ExchangerBase::finalize() {
372360
DCHECK(_running_source_operators == 0);
373-
vectorized::Block block;
374-
while (_free_blocks.try_dequeue(block)) {
375-
// do nothing
376-
}
377361
}
378362

379363
Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
380364
Profile&& profile, SinkInfo&& sink_info) {
381365
if (in_block->empty()) {
382366
return Status::OK();
383367
}
384-
vectorized::Block new_block;
385-
if (!_free_blocks.try_dequeue(new_block)) {
386-
new_block = {in_block->clone_empty()};
387-
}
368+
vectorized::Block new_block = in_block->clone_empty();
388369
new_block.swap(*in_block);
389370
auto wrapper = BlockWrapper::create_shared(
390371
std::move(new_block),
@@ -430,10 +411,7 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo
430411
Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
431412
vectorized::Block* in_block,
432413
SinkInfo&& sink_info) {
433-
vectorized::Block new_block;
434-
if (!_free_blocks.try_dequeue(new_block)) {
435-
new_block = {in_block->clone_empty()};
436-
}
414+
vectorized::Block new_block = in_block->clone_empty();
437415
new_block.swap(*in_block);
438416
auto channel_id = ((*sink_info.channel_id)++) % _num_partitions;
439417
_enqueue_data_and_set_ready(

be/src/pipeline/local_exchange/local_exchanger.h

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,6 @@ class ExchangerBase {
6868
/**
6969
* `BlockWrapper` is used to wrap a data block with a reference count.
7070
*
71-
* In function `unref()`, if `ref_count` decremented to 0, which means this block is not needed by
72-
* operators, so we put it into `_free_blocks` to reuse its memory if needed and refresh memory usage
73-
* in current queue.
74-
*
7571
* Note: `ref_count` will be larger than 1 only if this block is shared between multiple queues in
7672
* shuffle exchanger.
7773
*/
@@ -93,15 +89,6 @@ class ExchangerBase {
9389
// `_channel_ids` may be empty if exchanger is shuffled exchanger and channel id is
9490
// not used by `sub_total_mem_usage`. So we just pass -1 here.
9591
_shared_state->sub_total_mem_usage(_allocated_bytes);
96-
if (_shared_state->exchanger->_free_block_limit == 0 ||
97-
_shared_state->exchanger->_free_blocks.size_approx() <
98-
_shared_state->exchanger->_free_block_limit *
99-
_shared_state->exchanger->_num_sources) {
100-
_data_block.clear_column_data();
101-
// Free blocks is used to improve memory efficiency. Failure during pushing back
102-
// free block will not incur any bad result so just ignore the return value.
103-
_shared_state->exchanger->_free_blocks.enqueue(std::move(_data_block));
104-
}
10592
};
10693
}
10794
void record_channel_id(int channel_id) {
@@ -131,16 +118,14 @@ class ExchangerBase {
131118
_running_source_operators(num_partitions),
132119
_num_partitions(num_partitions),
133120
_num_senders(running_sink_operators),
134-
_num_sources(num_partitions),
135-
_free_block_limit(free_block_limit) {}
121+
_num_sources(num_partitions) {}
136122
ExchangerBase(int running_sink_operators, int num_sources, int num_partitions,
137123
int free_block_limit)
138124
: _running_sink_operators(running_sink_operators),
139125
_running_source_operators(num_sources),
140126
_num_partitions(num_partitions),
141127
_num_senders(running_sink_operators),
142-
_num_sources(num_sources),
143-
_free_block_limit(free_block_limit) {}
128+
_num_sources(num_sources) {}
144129
virtual ~ExchangerBase() = default;
145130
virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
146131
Profile&& profile, SourceInfo&& source_info) = 0;
@@ -149,16 +134,12 @@ class ExchangerBase {
149134
virtual ExchangeType get_type() const = 0;
150135
// Called if a local exchanger source operator are closed. Free the unused data block in data_queue.
151136
virtual void close(SourceInfo&& source_info) = 0;
152-
// Called if all local exchanger source operators are closed. We free the memory in
153-
// `_free_blocks` here.
137+
154138
virtual void finalize();
155139

156140
virtual std::string data_queue_debug_string(int i) = 0;
157141

158-
void set_low_memory_mode() {
159-
_free_block_limit = 0;
160-
clear_blocks(_free_blocks);
161-
}
142+
void set_low_memory_mode() {}
162143

163144
protected:
164145
friend struct LocalExchangeSharedState;
@@ -170,8 +151,6 @@ class ExchangerBase {
170151
const int _num_partitions;
171152
const int _num_senders;
172153
const int _num_sources;
173-
std::atomic_int _free_block_limit = 0;
174-
moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks;
175154
};
176155

177156
struct PartitionedRowIdxs {

0 commit comments

Comments
 (0)