Skip to content

Commit 16a60b0

Browse files
Storage: should seek before read (#7161)
close pingcap/tidb#42555
1 parent 348d424 commit 16a60b0

File tree

4 files changed

+33
-3
lines changed

4 files changed

+33
-3
lines changed

dbms/src/Flash/Coprocessor/TiDBTableScan.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ void TiDBTableScan::constructTableScanForRemoteRead(tipb::TableScan * tipb_table
7070
tipb_table_scan->set_table_id(table_id);
7171
for (const auto & column : partition_table_scan.columns())
7272
*tipb_table_scan->add_columns() = column;
73+
for (const auto & filter : partition_table_scan.pushed_down_filter_conditions())
74+
*tipb_table_scan->add_pushed_down_filter_conditions() = filter;
7375
tipb_table_scan->set_desc(partition_table_scan.desc());
7476
for (auto id : partition_table_scan.primary_column_ids())
7577
tipb_table_scan->add_primary_column_ids(id);

dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ size_t DMFileReader::skipNextBlock()
295295

296296
scan_context->total_dmfile_skipped_rows += read_rows;
297297
next_row_offset += read_rows;
298+
last_read_skipped = true;
298299
return read_rows;
299300
}
300301

@@ -396,6 +397,8 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)
396397
// merge blocks
397398
Block res = vstackBlocks(std::move(blocks));
398399
res.setStartOffset(start_row_offset);
400+
401+
last_read_skipped = false;
399402
return res;
400403
}
401404

@@ -639,6 +642,8 @@ Block DMFileReader::read()
639642
e.rethrow();
640643
}
641644
}
645+
646+
last_read_skipped = false;
642647
return res;
643648
}
644649

@@ -654,7 +659,7 @@ void DMFileReader::readFromDisk(
654659
if (auto iter = column_streams.find(stream_name); iter != column_streams.end())
655660
{
656661
auto & top_stream = iter->second;
657-
bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0;
662+
bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0 || last_read_skipped;
658663

659664
auto data_type = dmfile->getColumnStat(column_define.id).type;
660665
data_type->deserializeBinaryBulkWithMultipleStreams( //

dbms/src/Storages/DeltaMerge/File/DMFileReader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ class DMFileReader
180180

181181
std::unique_ptr<ColumnSharingCacheMap> col_data_cache{};
182182
std::unordered_map<ColId, bool> last_read_from_cache{};
183+
184+
/// call skipNextBlock() before read()
185+
bool last_read_skipped{false};
183186
};
184187

185188
} // namespace DM

dbms/src/Storages/DeltaMerge/LateMaterializationBlockInputStream.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,26 @@ Block LateMaterializationBlockInputStream::readImpl()
5555
// If filter is nullptr, it means that these push down filters are always true.
5656
if (!filter)
5757
{
58-
Block rest_column_block = rest_column_stream->read();
58+
IColumn::Filter col_filter;
59+
col_filter.resize(filter_column_block.rows());
60+
Block rest_column_block;
61+
if (bitmap_filter->get(col_filter, filter_column_block.startOffset(), filter_column_block.rows()))
62+
{
63+
rest_column_block = rest_column_stream->read();
64+
}
65+
else
66+
{
67+
rest_column_block = rest_column_stream->read();
68+
size_t passed_count = countBytesInFilter(col_filter);
69+
for (auto & col : rest_column_block)
70+
{
71+
col.column = col.column->filter(col_filter, passed_count);
72+
}
73+
for (auto & col : filter_column_block)
74+
{
75+
col.column = col.column->filter(col_filter, passed_count);
76+
}
77+
}
5978
return hstackBlocks({std::move(filter_column_block), std::move(rest_column_block)}, header);
6079
}
6180

@@ -66,7 +85,7 @@ Block LateMaterializationBlockInputStream::readImpl()
6685
if (size_t passed_count = countBytesInFilter(*filter); passed_count == 0)
6786
{
6887
// if all rows are filtered, skip the next block of rest_column_stream
69-
if (rest_column_stream->skipNextBlock() == 0)
88+
if (size_t skipped_rows = rest_column_stream->skipNextBlock(); skipped_rows == 0)
7089
{
7190
// if we fail to skip, we need to call read() of rest_column_stream, but ignore the result
7291
// NOTE: skipNextBlock() return 0 only if failed to skip or meets the end of stream,
@@ -77,6 +96,7 @@ Block LateMaterializationBlockInputStream::readImpl()
7796
}
7897
else
7998
{
99+
RUNTIME_CHECK(skipped_rows == rows);
80100
LOG_DEBUG(log, "Late materialization skip read block at start_offset: {}, rows: {}", filter_column_block.startOffset(), filter_column_block.rows());
81101
}
82102
}

0 commit comments

Comments
 (0)