diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index 155988582b2ee6..7236722c213ceb 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -32,8 +32,6 @@ namespace pipeline { DataQueue::DataQueue(int child_count) : _queue_blocks_lock(child_count), _queue_blocks(child_count), - _free_blocks_lock(child_count), - _free_blocks(child_count), _child_count(child_count), _is_finished(child_count), _is_canceled(child_count), @@ -42,7 +40,6 @@ DataQueue::DataQueue(int child_count) _flag_queue_idx(0) { for (int i = 0; i < child_count; ++i) { _queue_blocks_lock[i].reset(new std::mutex()); - _free_blocks_lock[i].reset(new std::mutex()); _is_finished[i] = false; _is_canceled[i] = false; _cur_bytes_in_queue[i] = 0; @@ -52,36 +49,6 @@ DataQueue::DataQueue(int child_count) _sink_dependencies.resize(child_count, nullptr); } -std::unique_ptr DataQueue::get_free_block(int child_idx) { - { - INJECT_MOCK_SLEEP(std::lock_guard l(*_free_blocks_lock[child_idx])); - if (!_free_blocks[child_idx].empty()) { - auto block = std::move(_free_blocks[child_idx].front()); - _free_blocks[child_idx].pop_front(); - return block; - } - } - - return vectorized::Block::create_unique(); -} - -void DataQueue::push_free_block(std::unique_ptr block, int child_idx) { - DCHECK(block->rows() == 0); - - if (!_is_low_memory_mode) { - INJECT_MOCK_SLEEP(std::lock_guard l(*_free_blocks_lock[child_idx])); - _free_blocks[child_idx].emplace_back(std::move(block)); - } -} - -void DataQueue::clear_free_blocks() { - for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) { - std::lock_guard l(*_free_blocks_lock[child_idx]); - std::deque> tmp_queue; - _free_blocks[child_idx].swap(tmp_queue); - } -} - void DataQueue::terminate() { for (int i = 0; i < _queue_blocks.size(); i++) { set_finish(i); @@ -93,7 +60,6 @@ void DataQueue::terminate() { _sink_dependencies[i]->set_always_ready(); } } - clear_free_blocks(); } //check which queue have data, and save the idx in _flag_queue_idx, diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index de56438faf441b..115e15870dbb46 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -42,12 +42,8 @@ class DataQueue { Status push_block(std::unique_ptr block, int child_idx = 0); - std::unique_ptr get_free_block(int child_idx = 0); - void push_free_block(std::unique_ptr output_block, int child_idx = 0); - void clear_free_blocks(); - void set_finish(int child_idx = 0); void set_canceled(int child_idx = 0); // should set before finish bool is_finish(int child_idx = 0); @@ -76,7 +72,6 @@ class DataQueue { void set_low_memory_mode() { _is_low_memory_mode = true; _max_blocks_in_sub_queue = 1; - clear_free_blocks(); } void terminate(); @@ -85,9 +80,6 @@ class DataQueue { std::vector> _queue_blocks_lock; std::vector>> _queue_blocks; - std::vector> _free_blocks_lock; - std::vector>> _free_blocks; - //how many deque will be init, always will be one int _child_count = 0; std::vector _is_finished; diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 37ad694199c4d5..f1ef2e6f00749c 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -17,6 +17,8 @@ #include "operator.h" +#include + #include "common/status.h" #include "pipeline/dependency.h" #include "pipeline/exec/aggregation_sink_operator.h" @@ -84,6 +86,8 @@ #include "util/debug_util.h" #include "util/runtime_profile.h" #include "util/string_util.h" +#include "vec/columns/column_nullable.h" +#include "vec/core/block.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" #include "vec/utils/util.hpp" @@ -322,46 +326,31 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori } DCHECK_EQ(rows, input_block.rows()); - auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { - if (to->is_nullable() && !from->is_nullable()) { - if (_keep_origin || !from->is_exclusive()) { - auto& null_column = reinterpret_cast(*to); - null_column.get_nested_column().insert_range_from(*from, 0, rows); - null_column.get_null_map_column().get_data().resize_fill(rows, 0); - bytes_usage += null_column.allocated_bytes(); - } else { - to = make_nullable(from, false)->assume_mutable(); - } - } else { - if (_keep_origin || !from->is_exclusive()) { - to->insert_range_from(*from, 0, rows); - bytes_usage += from->allocated_bytes(); - } else { - to = from->assume_mutable(); - } - } - }; - - using namespace vectorized; - vectorized::MutableBlock mutable_block = - vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, - *_output_row_descriptor); if (rows != 0) { + using namespace vectorized; + MutableBlock mutable_block = VectorizedUtils::build_mutable_mem_reuse_block( + output_block, *_output_row_descriptor); auto& mutable_columns = mutable_block.mutable_columns(); - const size_t origin_columns_count = input_block.columns(); DCHECK_EQ(mutable_columns.size(), local_state->_projections.size()) << debug_string(); for (int i = 0; i < mutable_columns.size(); ++i) { - auto result_column_id = -1; ColumnPtr column_ptr; RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, column_ptr)); column_ptr = column_ptr->convert_to_full_column_if_const(); - if (result_column_id >= origin_columns_count) { - bytes_usage += column_ptr->allocated_bytes(); + + if (mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()) { + const auto size = column_ptr->size(); + mutable_columns[i] = ColumnNullable::create(column_ptr->assume_mutable(), + ColumnUInt8::create(size, 0)) + ->assume_mutable(); + } else { + mutable_columns[i] = column_ptr->assume_mutable(); } - insert_column_datas(mutable_columns[i], column_ptr, rows); } - DCHECK(mutable_block.rows() == rows); + DCHECK_EQ(mutable_block.rows(), rows); + auto empty_columns = origin_block->clone_empty_columns(); + origin_block->set_columns(std::move(empty_columns)); output_block->set_columns(std::move(mutable_columns)); + DCHECK_EQ(output_block->rows(), rows); } local_state->_estimate_memory_usage += bytes_usage; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 9c61e75d114719..4efc222e874b53 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1151,8 +1151,6 @@ Status ScanLocalState::_init_profile() { _scanner_profile.reset(new RuntimeProfile("Scanner")); custom_profile()->add_child(_scanner_profile.get(), true, nullptr); - _newly_create_free_blocks_num = - ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT); _scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime"); _scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScannerCpuTime"); _filter_timer = ADD_TIMER(_scanner_profile, "ScannerFilterTime"); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 8a984cedc6cddd..eb0756699f3291 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -100,8 +100,6 @@ class ScanLocalStateBase : public PipelineXLocalState<> { std::shared_ptr _scanner_profile; RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr; - // Num of newly created free blocks when running query - RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; // Max num of scanner thread RuntimeProfile::Counter* _max_scan_concurrency = nullptr; RuntimeProfile::Counter* _min_scan_concurrency = nullptr; @@ -361,13 +359,7 @@ class ScanOperatorX : public OperatorX { return {ExchangeType::BUCKET_HASH_SHUFFLE}; } - void set_low_memory_mode(RuntimeState* state) override { - auto& local_state = get_local_state(state); - - if (local_state._scanner_ctx) { - local_state._scanner_ctx->clear_free_blocks(); - } - } + void set_low_memory_mode(RuntimeState* state) override {} using OperatorX::node_id; using OperatorX::operator_id; diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 5ce4c75ba87bf9..8e59f8e00defde 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -102,8 +102,7 @@ Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (local_state._output_block == nullptr) { - local_state._output_block = - local_state._shared_state->data_queue.get_free_block(_cur_child_id); + local_state._output_block = vectorized::Block::create_unique(); } if (_cur_child_id < _get_first_materialized_child_idx()) { //pass_through if (in_block->rows() > 0) { diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 38293f3cae0e87..d8b16a8b32e2f8 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -137,8 +137,6 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b return Status::OK(); } block->swap(*output_block); - output_block->clear_column_data(row_descriptor().num_materialized_slots()); - local_state._shared_state->data_queue.push_free_block(std::move(output_block), child_idx); } local_state.reached_limit(block, eos); return Status::OK(); diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index d1d6b7387a0be8..b31558240686a2 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -196,11 +196,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest } } - vectorized::Block data_block; + vectorized::Block data_block = block->clone_empty(); std::shared_ptr new_block_wrapper; - if (!_free_blocks.try_dequeue(data_block)) { - data_block = block->clone_empty(); - } data_block.swap(*block); new_block_wrapper = BlockWrapper::create_shared(std::move(data_block), local_state->_shared_state, -1); @@ -262,11 +259,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest } } - vectorized::Block data_block; + vectorized::Block data_block = block->clone_empty(); std::shared_ptr new_block_wrapper; - if (!_free_blocks.try_dequeue(data_block)) { - data_block = block->clone_empty(); - } data_block.swap(*block); new_block_wrapper = BlockWrapper::create_shared(std::move(data_block), nullptr, -1); if (new_block_wrapper->_data_block.empty()) { @@ -288,10 +282,7 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo if (in_block->empty()) { return Status::OK(); } - vectorized::Block new_block; - if (!_free_blocks.try_dequeue(new_block)) { - new_block = {in_block->clone_empty()}; - } + vectorized::Block new_block = in_block->clone_empty(); new_block.swap(*in_block); auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; BlockWrapperSPtr wrapper = BlockWrapper::create_shared( @@ -340,10 +331,7 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block if (in_block->empty()) { return Status::OK(); } - vectorized::Block new_block; - if (!_free_blocks.try_dequeue(new_block)) { - new_block = {in_block->clone_empty()}; - } + vectorized::Block new_block = in_block->clone_empty(); new_block.swap(*in_block); BlockWrapperSPtr wrapper = BlockWrapper::create_shared( @@ -370,10 +358,6 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo void ExchangerBase::finalize() { DCHECK(_running_source_operators == 0); - vectorized::Block block; - while (_free_blocks.try_dequeue(block)) { - // do nothing - } } Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, @@ -381,10 +365,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block if (in_block->empty()) { return Status::OK(); } - vectorized::Block new_block; - if (!_free_blocks.try_dequeue(new_block)) { - new_block = {in_block->clone_empty()}; - } + vectorized::Block new_block = in_block->clone_empty(); new_block.swap(*in_block); auto wrapper = BlockWrapper::create_shared( std::move(new_block), @@ -430,10 +411,7 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, vectorized::Block* in_block, SinkInfo&& sink_info) { - vectorized::Block new_block; - if (!_free_blocks.try_dequeue(new_block)) { - new_block = {in_block->clone_empty()}; - } + vectorized::Block new_block = in_block->clone_empty(); new_block.swap(*in_block); auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; _enqueue_data_and_set_ready( diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index e2d9ae6807c28e..497d5560446d37 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -68,10 +68,6 @@ class ExchangerBase { /** * `BlockWrapper` is used to wrap a data block with a reference count. * - * In function `unref()`, if `ref_count` decremented to 0, which means this block is not needed by - * operators, so we put it into `_free_blocks` to reuse its memory if needed and refresh memory usage - * in current queue. - * * Note: `ref_count` will be larger than 1 only if this block is shared between multiple queues in * shuffle exchanger. */ @@ -93,15 +89,6 @@ class ExchangerBase { // `_channel_ids` may be empty if exchanger is shuffled exchanger and channel id is // not used by `sub_total_mem_usage`. So we just pass -1 here. _shared_state->sub_total_mem_usage(_allocated_bytes); - if (_shared_state->exchanger->_free_block_limit == 0 || - _shared_state->exchanger->_free_blocks.size_approx() < - _shared_state->exchanger->_free_block_limit * - _shared_state->exchanger->_num_sources) { - _data_block.clear_column_data(); - // Free blocks is used to improve memory efficiency. Failure during pushing back - // free block will not incur any bad result so just ignore the return value. - _shared_state->exchanger->_free_blocks.enqueue(std::move(_data_block)); - } }; } void record_channel_id(int channel_id) { @@ -131,16 +118,14 @@ class ExchangerBase { _running_source_operators(num_partitions), _num_partitions(num_partitions), _num_senders(running_sink_operators), - _num_sources(num_partitions), - _free_block_limit(free_block_limit) {} + _num_sources(num_partitions) {} ExchangerBase(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) : _running_sink_operators(running_sink_operators), _running_source_operators(num_sources), _num_partitions(num_partitions), _num_senders(running_sink_operators), - _num_sources(num_sources), - _free_block_limit(free_block_limit) {} + _num_sources(num_sources) {} virtual ~ExchangerBase() = default; virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) = 0; @@ -149,16 +134,12 @@ class ExchangerBase { virtual ExchangeType get_type() const = 0; // Called if a local exchanger source operator are closed. Free the unused data block in data_queue. virtual void close(SourceInfo&& source_info) = 0; - // Called if all local exchanger source operators are closed. We free the memory in - // `_free_blocks` here. + virtual void finalize(); virtual std::string data_queue_debug_string(int i) = 0; - void set_low_memory_mode() { - _free_block_limit = 0; - clear_blocks(_free_blocks); - } + void set_low_memory_mode() {} protected: friend struct LocalExchangeSharedState; @@ -170,8 +151,6 @@ class ExchangerBase { const int _num_partitions; const int _num_senders; const int _num_sources; - std::atomic_int _free_block_limit = 0; - moodycamel::ConcurrentQueue _free_blocks; }; struct PartitionedRowIdxs { diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 4aabf67c2618ad..fc124db09a2471 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -104,7 +104,6 @@ ScannerContext::ScannerContext( Status ScannerContext::init() { #ifndef BE_TEST _scanner_profile = _local_state->_scanner_profile; - _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; _scanner_memory_used_counter = _local_state->_memory_used_counter; // 3. get thread token @@ -180,9 +179,6 @@ ScannerContext::~ScannerContext() { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); _tasks_queue.clear(); vectorized::BlockUPtr block; - while (_free_blocks.try_dequeue(block)) { - // do nothing - } block.reset(); DorisMetrics::instance()->scanner_ctx_cnt->increment(-1); if (_task_handle) { @@ -194,33 +190,11 @@ ScannerContext::~ScannerContext() { } } -vectorized::BlockUPtr ScannerContext::get_free_block(bool force) { - vectorized::BlockUPtr block = nullptr; - if (_free_blocks.try_dequeue(block)) { - DCHECK(block->mem_reuse()); - _block_memory_usage -= block->allocated_bytes(); - _scanner_memory_used_counter->set(_block_memory_usage); - // A free block is reused, so the memory usage should be decreased - // The caller of get_free_block will increase the memory usage - } else if (_block_memory_usage < _max_bytes_in_queue || force) { - _newly_create_free_blocks_num->update(1); - block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0); - } - return block; -} - -void ScannerContext::return_free_block(vectorized::BlockUPtr block) { - // If under low memory mode, should not return the freeblock, it will occupy too much memory. - if (!_local_state->low_memory_mode() && block->mem_reuse() && - _block_memory_usage < _max_bytes_in_queue) { - size_t block_size_to_reuse = block->allocated_bytes(); - _block_memory_usage += block_size_to_reuse; - _scanner_memory_used_counter->set(_block_memory_usage); - block->clear_column_data(); - // Free blocks is used to improve memory efficiency. Failure during pushing back - // free block will not incur any bad result so just ignore the return value. - _free_blocks.enqueue(std::move(block)); +vectorized::BlockUPtr ScannerContext::create_output_block(bool force) { + if (_block_memory_usage < _max_bytes_in_queue || force) { + return vectorized::Block::create_unique(_output_tuple_desc->slots(), 0); } + return {}; } Status ScannerContext::submit_scan_task(std::shared_ptr scan_task, @@ -234,10 +208,6 @@ Status ScannerContext::submit_scan_task(std::shared_ptr scan_task, return _scanner_scheduler->submit(shared_from_this(), scan_task); } -void ScannerContext::clear_free_blocks() { - clear_blocks(_free_blocks); -} - void ScannerContext::push_back_scan_task(std::shared_ptr scan_task) { if (scan_task->status_ok()) { for (const auto& [block, _] : scan_task->cached_blocks) { @@ -299,7 +269,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo _block_memory_usage -= block_size; // consume current block block->swap(*current_block); - return_free_block(std::move(current_block)); } VLOG_DEBUG << fmt::format( @@ -434,12 +403,12 @@ void ScannerContext::stop_scanners(RuntimeState* state) { std::string ScannerContext::debug_string() { return fmt::format( "id: {}, total scanners: {}, pending tasks: {}," - " _should_stop: {}, _is_finished: {}, free blocks: {}," + " _should_stop: {}, _is_finished: {}, " " limit: {}, _num_running_scanners: {}, _max_thread_num: {}," " _max_bytes_in_queue: {}, query_id: {}", - ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, _is_finished, - _free_blocks.size_approx(), limit, _num_scheduled_scanners, _max_scan_concurrency, - _max_bytes_in_queue, print_id(_query_id)); + ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, _is_finished, limit, + _num_scheduled_scanners, _max_scan_concurrency, _max_bytes_in_queue, + print_id(_query_id)); } void ScannerContext::_set_scanner_done() { diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index d6c3793ff09c73..01c286a1646fbf 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -133,9 +133,7 @@ class ScannerContext : public std::enable_shared_from_this, ~ScannerContext() override; Status init(); - vectorized::BlockUPtr get_free_block(bool force); - void return_free_block(vectorized::BlockUPtr block); - void clear_free_blocks(); + vectorized::BlockUPtr create_output_block(bool force); inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; } int64_t block_memory_usage() { return _block_memory_usage; } @@ -242,7 +240,6 @@ class ScannerContext : public std::enable_shared_from_this, std::shared_ptr _scanner_profile; // This counter refers to scan operator's local state RuntimeProfile::Counter* _scanner_memory_used_counter = nullptr; - RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; std::shared_ptr _resource_ctx; std::shared_ptr _dependency = nullptr; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index b2dc4981003a03..945bb1730db523 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -103,7 +103,6 @@ Status ScannerScheduler::submit(std::shared_ptr ctx, void handle_reserve_memory_failure(RuntimeState* state, std::shared_ptr ctx, const Status& st, size_t reserve_size) { - ctx->clear_free_blocks(); auto* local_state = ctx->local_state(); auto debug_msg = fmt::format( @@ -172,9 +171,6 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, bool eos = false; ASSIGN_STATUS_IF_CATCH_EXCEPTION( RuntimeState* state = ctx->state(); DCHECK(nullptr != state); - // scanner->open may alloc plenty amount of memory(read blocks of data), - // so better to also check low memory and clear free blocks here. - if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); } if (!scanner->has_prepared()) { status = scanner->prepare(); @@ -199,7 +195,6 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, size_t raw_bytes_threshold = config::doris_scanner_row_bytes; if (ctx->low_memory_mode()) { - ctx->clear_free_blocks(); if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) { raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner(); } @@ -227,7 +222,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, DEFER_RELEASE_RESERVED(); BlockUPtr free_block; if (first_read) { - free_block = ctx->get_free_block(first_read); + free_block = ctx->create_output_block(first_read); } else { if (state->get_query_ctx() ->resource_ctx() @@ -241,7 +236,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, break; } } - free_block = ctx->get_free_block(first_read); + free_block = ctx->create_output_block(first_read); } if (free_block == nullptr) { break; @@ -276,7 +271,6 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, // Return block succeed or not, this free_block is not used by this scan task any more. // If block can be reused, its memory usage will be added back. - ctx->return_free_block(std::move(free_block)); ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() - block_size); } else { @@ -307,7 +301,6 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, scanner->update_block_avg_bytes(block_avg_bytes); } if (ctx->low_memory_mode()) { - ctx->clear_free_blocks(); if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) { raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner(); } diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp index 2435b114e1e839..19d58e454390ff 100644 --- a/be/src/vec/exprs/vslot_ref.cpp +++ b/be/src/vec/exprs/vslot_ref.cpp @@ -78,6 +78,7 @@ Status VSlotRef::open(RuntimeState* state, VExprContext* context, Status VSlotRef::execute(VExprContext* context, Block* block, int* result_column_id) const { if (_column_id >= 0 && _column_id >= block->columns()) { + DCHECK(0); return Status::Error( "input block not contain slot column {}, column_id={}, block={}", *_column_name, _column_id, block->dump_structure()); @@ -89,6 +90,7 @@ Status VSlotRef::execute(VExprContext* context, Block* block, int* result_column Status VSlotRef::execute_column(VExprContext* context, const Block* block, size_t count, ColumnPtr& result_column) const { if (_column_id >= 0 && _column_id >= block->columns()) { + DCHECK(0); return Status::Error( "input block not contain slot column {}, column_id={}, block={}", *_column_name, _column_id, block->dump_structure()); @@ -100,6 +102,7 @@ Status VSlotRef::execute_column(VExprContext* context, const Block* block, size_ DataTypePtr VSlotRef::execute_type(const Block* block) const { if (_column_id >= 0 && _column_id >= block->columns()) { + DCHECK(0); throw doris::Exception(ErrorCode::INTERNAL_ERROR, "input block not contain slot column {}, column_id={}, block={}", *_column_name, _column_id, block->dump_structure()); diff --git a/be/test/pipeline/exec/data_queue_test.cpp b/be/test/pipeline/exec/data_queue_test.cpp index 5f8702ab2610fa..88ad241080cefd 100644 --- a/be/test/pipeline/exec/data_queue_test.cpp +++ b/be/test/pipeline/exec/data_queue_test.cpp @@ -113,10 +113,6 @@ TEST_F(DataQueueTest, MultiTest) { EXPECT_TRUE(data_queue->is_finish(i)); } EXPECT_TRUE(data_queue->is_all_finish()); - data_queue->clear_free_blocks(); - for (int i = 0; i < 3; i++) { - EXPECT_TRUE(data_queue->_free_blocks[i].empty()); - } } // ./run-be-ut.sh --run --filter=DataQueueTest.* diff --git a/be/test/scan/scanner_context_test.cpp b/be/test/scan/scanner_context_test.cpp index 09addeb9dcedcd..63b9d13a1bd37d 100644 --- a/be/test/scan/scanner_context_test.cpp +++ b/be/test/scan/scanner_context_test.cpp @@ -661,7 +661,7 @@ TEST_F(ScannerContextTest, scan_queue_mem_limit) { ASSERT_EQ(scanner_context->_max_bytes_in_queue, (1024 * 1024 * 10) * (1 / 300 + 1)); } -TEST_F(ScannerContextTest, get_free_block) { +TEST_F(ScannerContextTest, create_output_block) { const int parallel_tasks = 1; auto scan_operator = std::make_unique( obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); @@ -688,74 +688,26 @@ TEST_F(ScannerContextTest, get_free_block) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, scanners, limit, scan_dependency, parallel_tasks); - scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get(); - scanner_context->_newly_create_free_blocks_num->set(0L); scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get(); scanner_context->_scanner_memory_used_counter->set(0L); - BlockUPtr block = scanner_context->get_free_block(/*force=*/true); + BlockUPtr block = scanner_context->create_output_block(/*force=*/true); ASSERT_NE(block, nullptr); - ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 1); scanner_context->_max_bytes_in_queue = 200; // no free block // force is false, _block_memory_usage < _max_bytes_in_queue - block = scanner_context->get_free_block(/*force=*/false); + block = scanner_context->create_output_block(/*force=*/false); ASSERT_NE(block, nullptr); - ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 2); std::unique_ptr return_block = std::make_unique(); EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100)); EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true)); scanner_context->_free_blocks.enqueue(std::move(return_block)); // get free block from queue - block = scanner_context->get_free_block(/*force=*/false); + block = scanner_context->create_output_block(/*force=*/false); ASSERT_NE(block, nullptr); - ASSERT_EQ(scanner_context->_block_memory_usage, -100); - ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), -100); -} - -TEST_F(ScannerContextTest, return_free_block) { - const int parallel_tasks = 1; - auto scan_operator = std::make_unique( - obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); - - auto olap_scan_local_state = - pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); - - const int64_t limit = 100; - - OlapScanner::Params scanner_params; - scanner_params.state = state.get(); - scanner_params.profile = profile.get(); - scanner_params.limit = limit; - scanner_params.key_ranges = std::vector(); // empty - - std::shared_ptr scanner = - OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); - - std::list> scanners; - for (int i = 0; i < 11; ++i) { - scanners.push_back(std::make_shared(scanner)); - } - - std::shared_ptr scanner_context = ScannerContext::create_shared( - state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); - scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get(); - scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get(); - scanner_context->_max_bytes_in_queue = 200; - scanner_context->_block_memory_usage = 0; - - std::unique_ptr return_block = std::make_unique(); - EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100)); - EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true)); - EXPECT_CALL(*return_block, clear_column_data(testing::_)).WillRepeatedly(testing::Return()); - - scanner_context->return_free_block(std::move(return_block)); - ASSERT_EQ(scanner_context->_block_memory_usage, 100); - ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), 100); - // free_block queue is stabilized, so size_approx is accurate. - ASSERT_EQ(scanner_context->_free_blocks.size_approx(), 1); + ASSERT_EQ(scanner_context->_block_memory_usage, 0); + ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), 0); } TEST_F(ScannerContextTest, get_block_from_queue) { @@ -785,7 +737,6 @@ TEST_F(ScannerContextTest, get_block_from_queue) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, scanners, limit, scan_dependency, parallel_tasks); - scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get(); scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get(); scanner_context->_max_bytes_in_queue = 200; scanner_context->_block_memory_usage = 0;