Skip to content

Commit a133581

Browse files
branch-4.0: [fix](iterator) Use explicit output schema in new_merge_iterator and new_union_iterator #60772 (#60804)
Cherry-picked from #60772 Co-authored-by: ivin <uchenily@qq.com>
1 parent 5a243f5 commit a133581

File tree

9 files changed

+161
-43
lines changed

9 files changed

+161
-43
lines changed

be/src/olap/rowset/beta_rowset_reader.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
148148
}
149149
VLOG_NOTICE << "read columns size: " << read_columns.size();
150150
_input_schema = std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
151+
// output_schema only contains return_columns (excludes extra columns like delete-predicate columns).
152+
// It is used by merge/union iterators to determine how many columns to copy to the output block.
153+
_output_schema = std::make_shared<Schema>(_read_context->tablet_schema->columns(),
154+
*(_read_context->return_columns));
151155
if (_read_context->predicates != nullptr) {
152156
_read_options.column_predicates.insert(_read_options.column_predicates.end(),
153157
_read_context->predicates->begin(),
@@ -316,15 +320,16 @@ Status BetaRowsetReader::_init_iterator() {
316320
}
317321
}
318322
}
319-
_iterator = vectorized::new_merge_iterator(
320-
std::move(iterators), sequence_loc, _read_context->is_unique,
321-
_read_context->read_orderby_key_reverse, _read_context->merged_rows);
323+
_iterator = vectorized::new_merge_iterator(std::move(iterators), sequence_loc,
324+
_read_context->is_unique,
325+
_read_context->read_orderby_key_reverse,
326+
_read_context->merged_rows, _output_schema);
322327
} else {
323328
if (_read_context->read_orderby_key_reverse) {
324329
// reverse iterators to read backward for ORDER BY key DESC
325330
std::reverse(iterators.begin(), iterators.end());
326331
}
327-
_iterator = vectorized::new_union_iterator(std::move(iterators));
332+
_iterator = vectorized::new_union_iterator(std::move(iterators), _output_schema);
328333
}
329334

330335
auto s = _iterator->init(_read_options);

be/src/olap/rowset/beta_rowset_reader.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,17 @@ class BetaRowsetReader : public RowsetReader {
146146
std::pair<int64_t, int64_t> _segment_offsets;
147147
std::vector<RowRanges> _segment_row_ranges;
148148

149+
// _input_schema: includes return_columns + delete_predicate_columns.
150+
// Used by SegmentIterator internally (iter->schema() returns this). SegmentIterator
151+
// handles the extra delete predicate columns through _current_return_columns and
152+
// _evaluate_short_circuit_predicate(), independent of the block structure.
153+
// e.g. return_columns={c1, c2}, delete_pred on c3 => input_schema={c1, c2, c3}
149154
SchemaSPtr _input_schema;
155+
// _output_schema: includes only return_columns (a subset of input_schema).
156+
// Passed to VMergeIterator/VUnionIterator. block_reset() builds the internal block
157+
// with this schema, and copy_rows() copies exactly these columns to the destination.
158+
// e.g. return_columns={c1, c2} => output_schema={c1, c2}
159+
SchemaSPtr _output_schema;
150160
RowsetReaderContext* _read_context = nullptr;
151161
BetaRowsetSharedPtr _rowset;
152162

be/src/olap/rowset/segment_v2/segment_iterator.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1938,8 +1938,19 @@ Status SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
19381938
RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
19391939
for (auto cid : _non_predicate_columns) {
19401940
auto loc = _schema_block_id_map[cid];
1941-
// if loc > block->columns() means the column is delete column and should
1942-
// not output by block, so just skip the column.
1941+
// Whether a delete predicate column gets output depends on how the caller builds
1942+
// the block passed to next_batch(). Both calling paths now build the block with
1943+
// only the output schema (return_columns), so delete predicate columns are skipped:
1944+
//
1945+
// 1) VMergeIterator path: block_reset() builds _block using the output schema
1946+
// (return_columns only), e.g. block has 2 columns {c1, c2}.
1947+
// Here loc=2 for delete predicate c3, block->columns()=2, so loc < block->columns()
1948+
// is false, and c3 is skipped.
1949+
//
1950+
// 2) VUnionIterator path: the caller's block is built with only return_columns
1951+
// (output schema), e.g. block has 2 columns {c1, c2}.
1952+
// Here loc=2 for c3, block->columns()=2, so loc < block->columns() is false,
1953+
// and c3 is skipped — same behavior as the VMergeIterator path.
19431954
if (loc < block->columns()) {
19441955
bool column_in_block_is_nothing =
19451956
vectorized::check_and_get_column<const vectorized::ColumnNothing>(

be/src/olap/schema_change.cpp

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,14 @@ Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader
560560
bool eof = false;
561561
do {
562562
auto new_block = vectorized::Block::create_unique(new_tablet_schema->create_block());
563-
auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block(false));
563+
// create_block() skips dropped columns (from light-weight schema change).
564+
// Dropped columns are only needed for delete predicate evaluation, which
565+
// SegmentIterator handles internally — it creates temporary columns for
566+
// predicate columns not present in the block (via `i >= block->columns()`
567+
// guard in _init_current_block). If dropped columns were included here,
568+
// the block would have more columns than VMergeIterator's output_schema
569+
// expects, causing DCHECK failures in copy_rows.
570+
auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
564571

565572
Status st = next_batch(rowset_reader, ref_block.get(), _row_same_bit);
566573
if (!st) {
@@ -629,7 +636,14 @@ Status VBaseSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset
629636

630637
bool eof = false;
631638
do {
632-
auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block(false));
639+
// create_block() skips dropped columns (from light-weight schema change).
640+
// Dropped columns are only needed for delete predicate evaluation, which
641+
// SegmentIterator handles internally — it creates temporary columns for
642+
// predicate columns not present in the block (via `i >= block->columns()`
643+
// guard in _init_current_block). If dropped columns were included here,
644+
// the block would have more columns than VMergeIterator's output_schema
645+
// expects, causing DCHECK failures in copy_rows.
646+
auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
633647
Status st = next_batch(rowset_reader, ref_block.get(), _row_same_bit);
634648
if (!st) {
635649
if (st.is<ErrorCode::END_OF_FILE>()) {
@@ -909,6 +923,27 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques
909923
// dropped column during light weight schema change.
910924
// But the tablet schema in base tablet maybe not the latest from FE, so that if fe pass through
911925
// a tablet schema, then use request schema.
926+
//
927+
// return_columns does NOT include dropped columns. It is computed here BEFORE
928+
// merge_dropped_columns() appends dropped columns to _base_tablet_schema below.
929+
// This means return_columns only covers the original (non-dropped) columns.
930+
//
931+
// This is important because:
932+
// - BetaRowsetReader builds _output_schema from return_columns, which determines the
933+
// number of columns in ref_block (via create_block() which also skips dropped cols).
934+
// - VMergeIterator's copy_rows iterates over _output_schema columns, so ref_block
935+
// must match _output_schema exactly.
936+
// - Dropped columns are only needed for delete predicate evaluation, and SegmentIterator
937+
// handles them internally (creates temporary columns for predicate columns not present
938+
// in the block via `i >= block->columns()` guard in _init_current_block).
939+
//
940+
// Example: table has columns [k1, v1, v2], then DROP COLUMN v1, then
941+
// DELETE FROM t WHERE v1 = 'x' was issued before the drop.
942+
// - _base_tablet_schema after merge_dropped_columns: [k1, v2, v1(DROPPED)]
943+
// - return_columns (computed before merge): [0, 1] → [k1, v2]
944+
// - _output_schema / ref_block columns: [k1, v2] (2 columns)
945+
// - SegmentIterator reads v1 internally for delete predicate, but does not
946+
// output it to ref_block. copy_rows only iterates 2 columns — no OOB access.
912947
size_t num_cols =
913948
request.columns.empty() ? _base_tablet_schema->num_columns() : request.columns.size();
914949
return_columns.resize(num_cols);

be/src/olap/tablet_schema.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1752,10 +1752,10 @@ vectorized::Block TabletSchema::create_block(
17521752
return block;
17531753
}
17541754

1755-
vectorized::Block TabletSchema::create_block(bool ignore_dropped_col) const {
1755+
vectorized::Block TabletSchema::create_block() const {
17561756
vectorized::Block block;
17571757
for (const auto& col : _cols) {
1758-
if (ignore_dropped_col && is_dropped_column(*col)) {
1758+
if (is_dropped_column(*col)) {
17591759
continue;
17601760
}
17611761

be/src/olap/tablet_schema.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
548548
vectorized::Block create_block(
549549
const std::vector<uint32_t>& return_columns,
550550
const std::unordered_set<uint32_t>* tablet_columns_need_convert_null = nullptr) const;
551-
vectorized::Block create_block(bool ignore_dropped_col = true) const;
551+
vectorized::Block create_block() const;
552552
void set_schema_version(int32_t version) { _schema_version = version; }
553553
void set_auto_increment_column(const std::string& auto_increment_column) {
554554
_auto_increment_column = auto_increment_column;

be/src/vec/olap/vgeneric_iterators.cpp

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,20 @@ Status VStatisticsIterator::next_batch(Block* block) {
9898
return Status::EndOfFile("End of VStatisticsIterator");
9999
}
100100

101+
// Build the block using the output schema, which contains only the columns
102+
// the caller requested (return_columns). Delete predicate columns are excluded
103+
// because SegmentIterator handles them independently:
104+
// - _init_current_block() skips predicate columns (including delete predicates)
105+
// via the _is_pred_column[cid] check, so it never accesses the block by those positions.
106+
// - _output_non_pred_columns() checks loc < block->columns() before filling any column,
107+
// so delete predicate columns (whose loc exceeds block->columns()) are simply skipped.
108+
// - Delete predicate evaluation happens entirely through _current_return_columns and
109+
// _evaluate_short_circuit_predicate(), which are independent of the block structure.
101110
Status VMergeIteratorContext::block_reset(const std::shared_ptr<Block>& block) {
102111
if (!block->columns()) {
103-
const Schema& schema = _iter->schema();
104-
const auto& column_ids = schema.column_ids();
105-
for (size_t i = 0; i < schema.num_column_ids(); ++i) {
106-
auto column_desc = schema.column(column_ids[i]);
112+
const auto& column_ids = _output_schema->column_ids();
113+
for (size_t i = 0; i < _output_schema->num_column_ids(); ++i) {
114+
auto column_desc = _output_schema->column(column_ids[i]);
107115
auto data_type = Schema::get_data_type_ptr(*column_desc);
108116
if (data_type == nullptr) {
109117
return Status::RuntimeError("invalid data type");
@@ -143,9 +151,15 @@ bool VMergeIteratorContext::compare(const VMergeIteratorContext& rhs) const {
143151
return result;
144152
}
145153

154+
// Copy rows from the internal _block to the destination block.
155+
// Both blocks are built with the output schema (return_columns only), so they
156+
// have the same number of columns. We iterate over _output_schema->num_column_ids()
157+
// columns to copy from src to dst.
146158
Status VMergeIteratorContext::copy_rows(Block* block, bool advanced) {
147159
Block& src = *_block;
148160
Block& dst = *block;
161+
DCHECK_EQ(src.columns(), _output_schema->num_column_ids());
162+
DCHECK_EQ(dst.columns(), _output_schema->num_column_ids());
149163
if (_cur_batch_num == 0) {
150164
return Status::OK();
151165
}
@@ -154,7 +168,7 @@ Status VMergeIteratorContext::copy_rows(Block* block, bool advanced) {
154168
size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
155169

156170
RETURN_IF_CATCH_EXCEPTION({
157-
for (size_t i = 0; i < _num_columns; ++i) {
171+
for (size_t i = 0; i < _output_schema->num_column_ids(); ++i) {
158172
auto& s_col = src.get_by_position(i);
159173
auto& d_col = dst.get_by_position(i);
160174

@@ -344,13 +358,12 @@ Status VMergeIterator::init(const StorageReadOptions& opts) {
344358
if (_origin_iters.empty()) {
345359
return Status::OK();
346360
}
347-
_schema = &(_origin_iters[0]->schema());
348361
_record_rowids = opts.record_rowids;
349362

350363
for (auto& iter : _origin_iters) {
351-
auto ctx = std::make_shared<VMergeIteratorContext>(std::move(iter), _sequence_id_idx,
352-
_is_unique, _is_reverse,
353-
opts.read_orderby_key_columns);
364+
auto ctx = std::make_shared<VMergeIteratorContext>(
365+
std::move(iter), _sequence_id_idx, _is_unique, _is_reverse,
366+
opts.read_orderby_key_columns, _output_schema);
354367
RETURN_IF_ERROR(ctx->init(opts));
355368
if (!ctx->valid()) {
356369
continue;
@@ -366,20 +379,26 @@ Status VMergeIterator::init(const StorageReadOptions& opts) {
366379
}
367380

368381
// VUnionIterator will read data from input iterator one by one.
382+
// Unlike VMergeIterator, VUnionIterator does NOT have its own internal block or copy_rows().
383+
// It passes the caller's block directly to the underlying SegmentIterator via next_batch(),
384+
// so there is no input-schema vs output-schema mismatch issue here.
385+
// The output_schema parameter is accepted only so that schema() can return the output schema
386+
// consistently with VMergeIterator.
369387
class VUnionIterator : public RowwiseIterator {
370388
public:
371389
// Iterators' ownership it transferred to this class.
372390
// This class will delete all iterators when destructs
373391
// Client should not use iterators anymore.
374-
VUnionIterator(std::vector<RowwiseIteratorUPtr>&& v) : _origin_iters(std::move(v)) {}
392+
VUnionIterator(std::vector<RowwiseIteratorUPtr>&& v, SchemaSPtr output_schema)
393+
: _output_schema(std::move(output_schema)), _origin_iters(std::move(v)) {}
375394

376395
~VUnionIterator() override = default;
377396

378397
Status init(const StorageReadOptions& opts) override;
379398

380399
Status next_batch(Block* block) override;
381400

382-
const Schema& schema() const override { return *_schema; }
401+
const Schema& schema() const override { return *_output_schema; }
383402

384403
Status current_block_row_locations(std::vector<RowLocation>* locations) override;
385404

@@ -390,7 +409,7 @@ class VUnionIterator : public RowwiseIterator {
390409
}
391410

392411
private:
393-
const Schema* _schema = nullptr;
412+
const SchemaSPtr _output_schema;
394413
RowwiseIteratorUPtr _cur_iter = nullptr;
395414
StorageReadOptions _read_options;
396415
std::vector<RowwiseIteratorUPtr> _origin_iters;
@@ -400,7 +419,6 @@ Status VUnionIterator::init(const StorageReadOptions& opts) {
400419
if (_origin_iters.empty()) {
401420
return Status::OK();
402421
}
403-
404422
// we use back() and pop_back() of std::vector to handle each iterator,
405423
// so reverse the vector here to keep result block of next_batch to be
406424
// in the same order as the original segments.
@@ -409,7 +427,6 @@ Status VUnionIterator::init(const StorageReadOptions& opts) {
409427
_read_options = opts;
410428
_cur_iter = std::move(_origin_iters.back());
411429
RETURN_IF_ERROR(_cur_iter->init(_read_options));
412-
_schema = &_cur_iter->schema();
413430
return Status::OK();
414431
}
415432

@@ -441,19 +458,20 @@ Status VUnionIterator::current_block_row_locations(std::vector<RowLocation>* loc
441458

442459
RowwiseIteratorUPtr new_merge_iterator(std::vector<RowwiseIteratorUPtr>&& inputs,
443460
int sequence_id_idx, bool is_unique, bool is_reverse,
444-
uint64_t* merged_rows) {
461+
uint64_t* merged_rows, SchemaSPtr output_schema) {
445462
// when the size of inputs is 1, we also need to use VMergeIterator, because the
446463
// next_block_view function only be implemented in VMergeIterator. The reason why
447464
// the size of inputs is 1 is that the segment was filtered out by zone map or others.
448465
return std::make_unique<VMergeIterator>(std::move(inputs), sequence_id_idx, is_unique,
449-
is_reverse, merged_rows);
466+
is_reverse, merged_rows, std::move(output_schema));
450467
}
451468

452-
RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&& inputs) {
469+
RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&& inputs,
470+
SchemaSPtr output_schema) {
453471
if (inputs.size() == 1) {
454472
return std::move(inputs[0]);
455473
}
456-
return std::make_unique<VUnionIterator>(std::move(inputs));
474+
return std::make_unique<VUnionIterator>(std::move(inputs), std::move(output_schema));
457475
}
458476

459477
RowwiseIterator* new_vstatistics_iterator(std::shared_ptr<Segment> segment, const Schema& schema) {

0 commit comments

Comments
 (0)