Skip to content

Commit 20dcce0

Browse files
committed
scanner merge after projection
1 parent 6e937ad commit 20dcce0

File tree

2 files changed

+41
-2
lines changed

2 files changed

+41
-2
lines changed

be/src/vec/exec/scan/scanner.cpp

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,38 @@ Status Scanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
7878
Status Scanner::get_block_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos) {
7979
auto& row_descriptor = _local_state->_parent->row_descriptor();
8080
if (_output_row_descriptor) {
81-
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
82-
RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
81+
if (_alreay_eos) {
82+
*eos = true;
83+
_padding_block.swap(_origin_block);
84+
} else {
85+
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
86+
while (_padding_block.rows() < state->batch_size() / 4 && !*eos) {
87+
RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
88+
if (_origin_block.rows() >= state->batch_size() / 4) {
89+
break;
90+
}
91+
92+
if (_origin_block.rows() + _padding_block.rows() <= state->batch_size()) {
93+
_merge_padding_block();
94+
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
95+
} else {
96+
if (_origin_block.rows() < _padding_block.rows()) {
97+
_padding_block.swap(_origin_block);
98+
}
99+
break;
100+
}
101+
}
102+
}
103+
104+
// first output the origin block change eos = false, next time output padding block
105+
// set the eos to true
106+
if (*eos && !_padding_block.empty() && !_origin_block.empty()) {
107+
_alreay_eos = true;
108+
*eos = false;
109+
}
110+
if (_origin_block.empty() && !_padding_block.empty()) {
111+
_padding_block.swap(_origin_block);
112+
}
83113
return _do_projections(&_origin_block, block);
84114
} else {
85115
return get_block(state, block, eos);

be/src/vec/exec/scan/scanner.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,13 @@ class Scanner {
103103
// Subclass should implement this to return data.
104104
virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0;
105105

106+
void _merge_padding_block() {
107+
if (_padding_block.empty()) {
108+
_padding_block.swap(_origin_block);
109+
}
110+
(void)MutableBlock::build_mutable_block(&_padding_block).merge(_origin_block);
111+
}
112+
106113
// Update the counters before closing this scanner
107114
virtual void _collect_profile_before_close();
108115

@@ -217,6 +224,8 @@ class Scanner {
217224
// Used in common subexpression elimination to compute intermediate results.
218225
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
219226
vectorized::Block _origin_block;
227+
vectorized::Block _padding_block;
228+
bool _alreay_eos = false;
220229

221230
VExprContextSPtrs _common_expr_ctxs_push_down;
222231
// Late arriving runtime filters will update _conjuncts.

0 commit comments

Comments
 (0)