Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 0 additions & 34 deletions be/src/pipeline/exec/data_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ namespace pipeline {
DataQueue::DataQueue(int child_count)
: _queue_blocks_lock(child_count),
_queue_blocks(child_count),
_free_blocks_lock(child_count),
_free_blocks(child_count),
_child_count(child_count),
_is_finished(child_count),
_is_canceled(child_count),
Expand All @@ -42,7 +40,6 @@ DataQueue::DataQueue(int child_count)
_flag_queue_idx(0) {
for (int i = 0; i < child_count; ++i) {
_queue_blocks_lock[i].reset(new std::mutex());
_free_blocks_lock[i].reset(new std::mutex());
_is_finished[i] = false;
_is_canceled[i] = false;
_cur_bytes_in_queue[i] = 0;
Expand All @@ -52,36 +49,6 @@ DataQueue::DataQueue(int child_count)
_sink_dependencies.resize(child_count, nullptr);
}

std::unique_ptr<vectorized::Block> DataQueue::get_free_block(int child_idx) {
{
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
if (!_free_blocks[child_idx].empty()) {
auto block = std::move(_free_blocks[child_idx].front());
_free_blocks[child_idx].pop_front();
return block;
}
}

return vectorized::Block::create_unique();
}

void DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int child_idx) {
DCHECK(block->rows() == 0);

if (!_is_low_memory_mode) {
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
_free_blocks[child_idx].emplace_back(std::move(block));
}
}

void DataQueue::clear_free_blocks() {
for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) {
std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
std::deque<std::unique_ptr<vectorized::Block>> tmp_queue;
_free_blocks[child_idx].swap(tmp_queue);
}
}

void DataQueue::terminate() {
for (int i = 0; i < _queue_blocks.size(); i++) {
set_finish(i);
Expand All @@ -93,7 +60,6 @@ void DataQueue::terminate() {
_sink_dependencies[i]->set_always_ready();
}
}
clear_free_blocks();
}

//check which queue have data, and save the idx in _flag_queue_idx,
Expand Down
8 changes: 0 additions & 8 deletions be/src/pipeline/exec/data_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@ class DataQueue {

Status push_block(std::unique_ptr<vectorized::Block> block, int child_idx = 0);

std::unique_ptr<vectorized::Block> get_free_block(int child_idx = 0);

void push_free_block(std::unique_ptr<vectorized::Block> output_block, int child_idx = 0);

void clear_free_blocks();

void set_finish(int child_idx = 0);
void set_canceled(int child_idx = 0); // should set before finish
bool is_finish(int child_idx = 0);
Expand Down Expand Up @@ -76,7 +72,6 @@ class DataQueue {
void set_low_memory_mode() {
_is_low_memory_mode = true;
_max_blocks_in_sub_queue = 1;
clear_free_blocks();
}

void terminate();
Expand All @@ -85,9 +80,6 @@ class DataQueue {
std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;

std::vector<std::unique_ptr<std::mutex>> _free_blocks_lock;
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _free_blocks;

//how many deque will be init, always will be one
int _child_count = 0;
std::vector<std::atomic_bool> _is_finished;
Expand Down
49 changes: 19 additions & 30 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "operator.h"

#include <glog/logging.h>

#include "common/status.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/aggregation_sink_operator.h"
Expand Down Expand Up @@ -84,6 +86,8 @@
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "util/string_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/utils/util.hpp"
Expand Down Expand Up @@ -322,46 +326,31 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
}

DCHECK_EQ(rows, input_block.rows());
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
null_column.get_nested_column().insert_range_from(*from, 0, rows);
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
bytes_usage += null_column.allocated_bytes();
} else {
to = make_nullable(from, false)->assume_mutable();
}
} else {
if (_keep_origin || !from->is_exclusive()) {
to->insert_range_from(*from, 0, rows);
bytes_usage += from->allocated_bytes();
} else {
to = from->assume_mutable();
}
}
};

using namespace vectorized;
vectorized::MutableBlock mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*_output_row_descriptor);
if (rows != 0) {
using namespace vectorized;
MutableBlock mutable_block = VectorizedUtils::build_mutable_mem_reuse_block(
output_block, *_output_row_descriptor);
auto& mutable_columns = mutable_block.mutable_columns();
const size_t origin_columns_count = input_block.columns();
DCHECK_EQ(mutable_columns.size(), local_state->_projections.size()) << debug_string();
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
ColumnPtr column_ptr;
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, column_ptr));
column_ptr = column_ptr->convert_to_full_column_if_const();
if (result_column_id >= origin_columns_count) {
bytes_usage += column_ptr->allocated_bytes();

if (mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()) {
const auto size = column_ptr->size();
mutable_columns[i] = ColumnNullable::create(column_ptr->assume_mutable(),
ColumnUInt8::create(size, 0))
->assume_mutable();
} else {
mutable_columns[i] = column_ptr->assume_mutable();
}
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
DCHECK_EQ(mutable_block.rows(), rows);
auto empty_columns = origin_block->clone_empty_columns();
origin_block->set_columns(std::move(empty_columns));
output_block->set_columns(std::move(mutable_columns));
DCHECK_EQ(output_block->rows(), rows);
}

local_state->_estimate_memory_usage += bytes_usage;
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,8 +1151,6 @@ Status ScanLocalState<Derived>::_init_profile() {
_scanner_profile.reset(new RuntimeProfile("Scanner"));
custom_profile()->add_child(_scanner_profile.get(), true, nullptr);

_newly_create_free_blocks_num =
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT);
_scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime");
_scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScannerCpuTime");
_filter_timer = ADD_TIMER(_scanner_profile, "ScannerFilterTime");
Expand Down
10 changes: 1 addition & 9 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ class ScanLocalStateBase : public PipelineXLocalState<> {

std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
// Num of newly created free blocks when running query
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
// Max num of scanner thread
RuntimeProfile::Counter* _max_scan_concurrency = nullptr;
RuntimeProfile::Counter* _min_scan_concurrency = nullptr;
Expand Down Expand Up @@ -361,13 +359,7 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
return {ExchangeType::BUCKET_HASH_SHUFFLE};
}

void set_low_memory_mode(RuntimeState* state) override {
auto& local_state = get_local_state(state);

if (local_state._scanner_ctx) {
local_state._scanner_ctx->clear_free_blocks();
}
}
void set_low_memory_mode(RuntimeState* state) override {}

using OperatorX<LocalStateType>::node_id;
using OperatorX<LocalStateType>::operator_id;
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/union_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
if (local_state._output_block == nullptr) {
local_state._output_block =
local_state._shared_state->data_queue.get_free_block(_cur_child_id);
local_state._output_block = vectorized::Block::create_unique();
}
if (_cur_child_id < _get_first_materialized_child_idx()) { //pass_through
if (in_block->rows() > 0) {
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/union_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b
return Status::OK();
}
block->swap(*output_block);
output_block->clear_column_data(row_descriptor().num_materialized_slots());
local_state._shared_state->data_queue.push_free_block(std::move(output_block), child_idx);
}
local_state.reached_limit(block, eos);
return Status::OK();
Expand Down
34 changes: 6 additions & 28 deletions be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
}
}

vectorized::Block data_block;
vectorized::Block data_block = block->clone_empty();
std::shared_ptr<BlockWrapper> new_block_wrapper;
if (!_free_blocks.try_dequeue(data_block)) {
data_block = block->clone_empty();
}
data_block.swap(*block);
new_block_wrapper =
BlockWrapper::create_shared(std::move(data_block), local_state->_shared_state, -1);
Expand Down Expand Up @@ -262,11 +259,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
}
}

vectorized::Block data_block;
vectorized::Block data_block = block->clone_empty();
std::shared_ptr<BlockWrapper> new_block_wrapper;
if (!_free_blocks.try_dequeue(data_block)) {
data_block = block->clone_empty();
}
data_block.swap(*block);
new_block_wrapper = BlockWrapper::create_shared(std::move(data_block), nullptr, -1);
if (new_block_wrapper->_data_block.empty()) {
Expand All @@ -288,10 +282,7 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo
if (in_block->empty()) {
return Status::OK();
}
vectorized::Block new_block;
if (!_free_blocks.try_dequeue(new_block)) {
new_block = {in_block->clone_empty()};
}
vectorized::Block new_block = in_block->clone_empty();
new_block.swap(*in_block);
auto channel_id = ((*sink_info.channel_id)++) % _num_partitions;
BlockWrapperSPtr wrapper = BlockWrapper::create_shared(
Expand Down Expand Up @@ -340,10 +331,7 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block
if (in_block->empty()) {
return Status::OK();
}
vectorized::Block new_block;
if (!_free_blocks.try_dequeue(new_block)) {
new_block = {in_block->clone_empty()};
}
vectorized::Block new_block = in_block->clone_empty();
new_block.swap(*in_block);

BlockWrapperSPtr wrapper = BlockWrapper::create_shared(
Expand All @@ -370,21 +358,14 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo

void ExchangerBase::finalize() {
DCHECK(_running_source_operators == 0);
vectorized::Block block;
while (_free_blocks.try_dequeue(block)) {
// do nothing
}
}

Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
Profile&& profile, SinkInfo&& sink_info) {
if (in_block->empty()) {
return Status::OK();
}
vectorized::Block new_block;
if (!_free_blocks.try_dequeue(new_block)) {
new_block = {in_block->clone_empty()};
}
vectorized::Block new_block = in_block->clone_empty();
new_block.swap(*in_block);
auto wrapper = BlockWrapper::create_shared(
std::move(new_block),
Expand Down Expand Up @@ -430,10 +411,7 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo
Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
vectorized::Block* in_block,
SinkInfo&& sink_info) {
vectorized::Block new_block;
if (!_free_blocks.try_dequeue(new_block)) {
new_block = {in_block->clone_empty()};
}
vectorized::Block new_block = in_block->clone_empty();
new_block.swap(*in_block);
auto channel_id = ((*sink_info.channel_id)++) % _num_partitions;
_enqueue_data_and_set_ready(
Expand Down
29 changes: 4 additions & 25 deletions be/src/pipeline/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ class ExchangerBase {
/**
* `BlockWrapper` is used to wrap a data block with a reference count.
*
* In function `unref()`, if `ref_count` decremented to 0, which means this block is not needed by
* operators, so we put it into `_free_blocks` to reuse its memory if needed and refresh memory usage
* in current queue.
*
* Note: `ref_count` will be larger than 1 only if this block is shared between multiple queues in
* shuffle exchanger.
*/
Expand All @@ -93,15 +89,6 @@ class ExchangerBase {
// `_channel_ids` may be empty if exchanger is shuffled exchanger and channel id is
// not used by `sub_total_mem_usage`. So we just pass -1 here.
_shared_state->sub_total_mem_usage(_allocated_bytes);
if (_shared_state->exchanger->_free_block_limit == 0 ||
_shared_state->exchanger->_free_blocks.size_approx() <
_shared_state->exchanger->_free_block_limit *
_shared_state->exchanger->_num_sources) {
_data_block.clear_column_data();
// Free blocks is used to improve memory efficiency. Failure during pushing back
// free block will not incur any bad result so just ignore the return value.
_shared_state->exchanger->_free_blocks.enqueue(std::move(_data_block));
}
};
}
void record_channel_id(int channel_id) {
Expand Down Expand Up @@ -131,16 +118,14 @@ class ExchangerBase {
_running_source_operators(num_partitions),
_num_partitions(num_partitions),
_num_senders(running_sink_operators),
_num_sources(num_partitions),
_free_block_limit(free_block_limit) {}
_num_sources(num_partitions) {}
ExchangerBase(int running_sink_operators, int num_sources, int num_partitions,
int free_block_limit)
: _running_sink_operators(running_sink_operators),
_running_source_operators(num_sources),
_num_partitions(num_partitions),
_num_senders(running_sink_operators),
_num_sources(num_sources),
_free_block_limit(free_block_limit) {}
_num_sources(num_sources) {}
virtual ~ExchangerBase() = default;
virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
Profile&& profile, SourceInfo&& source_info) = 0;
Expand All @@ -149,16 +134,12 @@ class ExchangerBase {
virtual ExchangeType get_type() const = 0;
// Called if a local exchanger source operator are closed. Free the unused data block in data_queue.
virtual void close(SourceInfo&& source_info) = 0;
// Called if all local exchanger source operators are closed. We free the memory in
// `_free_blocks` here.

virtual void finalize();

virtual std::string data_queue_debug_string(int i) = 0;

void set_low_memory_mode() {
_free_block_limit = 0;
clear_blocks(_free_blocks);
}
void set_low_memory_mode() {}

protected:
friend struct LocalExchangeSharedState;
Expand All @@ -170,8 +151,6 @@ class ExchangerBase {
const int _num_partitions;
const int _num_senders;
const int _num_sources;
std::atomic_int _free_block_limit = 0;
moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks;
};

struct PartitionedRowIdxs {
Expand Down
Loading
Loading