Skip to content

Commit 78a5e52

Browse files
authored
Merge pull request ClickHouse#63233 from CurtizJ/return-back-61551
Return back ClickHouse#61551 (More optimal loading of marks)
2 parents d5b8d9b + ccfe3f8 commit 78a5e52

17 files changed

+212
-41
lines changed

src/Storages/MergeTree/MarkRange.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ size_t MarkRanges::getNumberOfMarks() const
8181
return result;
8282
}
8383

84+
bool MarkRanges::isOneRangeForWholePart(size_t num_marks_in_part) const
85+
{
86+
return size() == 1 && front().begin == 0 && front().end == num_marks_in_part;
87+
}
88+
8489
void MarkRanges::serialize(WriteBuffer & out) const
8590
{
8691
writeBinaryLittleEndian(this->size(), out);

src/Storages/MergeTree/MarkRange.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ struct MarkRanges : public std::deque<MarkRange>
3636
using std::deque<MarkRange>::deque; /// NOLINT(modernize-type-traits)
3737

3838
size_t getNumberOfMarks() const;
39+
bool isOneRangeForWholePart(size_t num_marks_in_part) const;
3940

4041
void serialize(WriteBuffer & out) const;
4142
String describe() const;

src/Storages/MergeTree/MergeTreeIOSettings.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ struct MergeTreeReaderSettings
4444
bool enable_multiple_prewhere_read_steps = false;
4545
/// If true, try to lower size of read buffer according to granule size and compressed block size.
4646
bool adjust_read_buffer_size = true;
47+
/// If true, it's allowed to read the whole part without reading marks.
48+
bool can_read_part_without_marks = false;
4749
};
4850

4951
struct MergeTreeWriterSettings

src/Storages/MergeTree/MergeTreeIndexReader.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
3131
load_marks_threadpool,
3232
/*num_columns_in_mark=*/ 1);
3333

34+
marks_loader->startAsyncLoad();
35+
3436
return std::make_unique<MergeTreeReaderStreamSingleColumn>(
3537
part->getDataPartStoragePtr(),
3638
index->getFileName(), extension, marks_count,
@@ -65,6 +67,7 @@ MergeTreeIndexReader::MergeTreeIndexReader(
6567
mark_cache,
6668
uncompressed_cache,
6769
std::move(settings));
70+
6871
version = index_format.version;
6972

7073
stream->adjustRightMark(getLastMark(all_mark_ranges_));

src/Storages/MergeTree/MergeTreeMarksLoader.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ MergeTreeMarksLoader::MergeTreeMarksLoader(
6464
, read_settings(read_settings_)
6565
, num_columns_in_mark(num_columns_in_mark_)
6666
, load_marks_threadpool(load_marks_threadpool_)
67+
{
68+
}
69+
70+
void MergeTreeMarksLoader::startAsyncLoad()
6771
{
6872
if (load_marks_threadpool)
6973
future = loadMarksAsync();
@@ -102,6 +106,8 @@ MergeTreeMarksGetterPtr MergeTreeMarksLoader::loadMarks()
102106

103107
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
104108
{
109+
LOG_TEST(getLogger("MergeTreeMarksLoader"), "Loading marks from path {}", mrk_path);
110+
105111
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
106112
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
107113

@@ -218,7 +224,9 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksSync()
218224
}
219225
}
220226
else
227+
{
221228
loaded_marks = loadMarksImpl();
229+
}
222230

223231
if (!loaded_marks)
224232
{

src/Storages/MergeTree/MergeTreeMarksLoader.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class MergeTreeMarksLoader
5050

5151
~MergeTreeMarksLoader();
5252

53+
void startAsyncLoad();
5354
MergeTreeMarksGetterPtr loadMarks();
5455
size_t getNumColumns() const { return num_columns_in_mark; }
5556

src/Storages/MergeTree/MergeTreeReaderCompact.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
4848
, profile_callback(profile_callback_)
4949
, clock_type(clock_type_)
5050
{
51+
marks_loader->startAsyncLoad();
5152
}
5253

5354
void MergeTreeReaderCompact::fillColumnPositions()

src/Storages/MergeTree/MergeTreeReaderStream.cpp

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ namespace ErrorCodes
1313
{
1414
extern const int ARGUMENT_OUT_OF_BOUND;
1515
extern const int CANNOT_READ_ALL_DATA;
16+
extern const int LOGICAL_ERROR;
1617
}
1718

1819
MergeTreeReaderStream::MergeTreeReaderStream(
@@ -41,14 +42,17 @@ MergeTreeReaderStream::MergeTreeReaderStream(
4142
{
4243
}
4344

45+
void MergeTreeReaderStream::loadMarks()
46+
{
47+
if (!marks_getter)
48+
marks_getter = marks_loader->loadMarks();
49+
}
50+
4451
void MergeTreeReaderStream::init()
4552
{
4653
if (initialized)
4754
return;
4855

49-
initialized = true;
50-
marks_getter = marks_loader->loadMarks();
51-
5256
/// Compute the size of the buffer.
5357
auto [max_mark_range_bytes, sum_mark_range_bytes] = estimateMarkRangeBytes(all_mark_ranges);
5458

@@ -110,11 +114,15 @@ void MergeTreeReaderStream::init()
110114
data_buffer = non_cached_buffer.get();
111115
compressed_data_buffer = non_cached_buffer.get();
112116
}
117+
118+
initialized = true;
113119
}
114120

115121
void MergeTreeReaderStream::seekToMarkAndColumn(size_t row_index, size_t column_position)
116122
{
117123
init();
124+
loadMarks();
125+
118126
const auto & mark = marks_getter->getMark(row_index, column_position);
119127

120128
try
@@ -193,7 +201,7 @@ CompressedReadBufferBase * MergeTreeReaderStream::getCompressedDataBuffer()
193201
return compressed_data_buffer;
194202
}
195203

196-
size_t MergeTreeReaderStreamSingleColumn::getRightOffset(size_t right_mark) const
204+
size_t MergeTreeReaderStreamSingleColumn::getRightOffset(size_t right_mark)
197205
{
198206
/// NOTE: if we are reading the whole file, then right_mark == marks_count
199207
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
@@ -202,7 +210,8 @@ size_t MergeTreeReaderStreamSingleColumn::getRightOffset(size_t right_mark) cons
202210
if (marks_count == 0)
203211
return 0;
204212

205-
assert(right_mark <= marks_count);
213+
chassert(right_mark <= marks_count);
214+
loadMarks();
206215

207216
if (right_mark == 0)
208217
return marks_getter->getMark(right_mark, 0).offset_in_compressed_file;
@@ -281,9 +290,9 @@ size_t MergeTreeReaderStreamSingleColumn::getRightOffset(size_t right_mark) cons
281290
return file_size;
282291
}
283292

284-
std::pair<size_t, size_t> MergeTreeReaderStreamSingleColumn::estimateMarkRangeBytes(const MarkRanges & mark_ranges) const
293+
std::pair<size_t, size_t> MergeTreeReaderStreamSingleColumn::estimateMarkRangeBytes(const MarkRanges & mark_ranges)
285294
{
286-
assert(marks_getter != nullptr);
295+
loadMarks();
287296

288297
size_t max_range_bytes = 0;
289298
size_t sum_range_bytes = 0;
@@ -302,7 +311,34 @@ std::pair<size_t, size_t> MergeTreeReaderStreamSingleColumn::estimateMarkRangeBy
302311
return {max_range_bytes, sum_range_bytes};
303312
}
304313

305-
size_t MergeTreeReaderStreamMultipleColumns::getRightOffsetOneColumn(size_t right_mark_non_included, size_t column_position) const
314+
size_t MergeTreeReaderStreamSingleColumnWholePart::getRightOffset(size_t right_mark)
315+
{
316+
if (right_mark != marks_count)
317+
{
318+
throw Exception(ErrorCodes::LOGICAL_ERROR,
319+
"Expected one right mark: {}, got: {}",
320+
marks_count, right_mark);
321+
}
322+
return file_size;
323+
}
324+
325+
std::pair<size_t, size_t> MergeTreeReaderStreamSingleColumnWholePart::estimateMarkRangeBytes(const MarkRanges & mark_ranges)
326+
{
327+
if (!mark_ranges.isOneRangeForWholePart(marks_count))
328+
{
329+
throw Exception(ErrorCodes::LOGICAL_ERROR,
330+
"Expected one mark range that covers the whole part, got: {}",
331+
mark_ranges.describe());
332+
}
333+
return {file_size, file_size};
334+
}
335+
336+
void MergeTreeReaderStreamSingleColumnWholePart::seekToMark(size_t)
337+
{
338+
throw Exception(ErrorCodes::LOGICAL_ERROR, "MergeTreeReaderStreamSingleColumnWholePart cannot seek to marks");
339+
}
340+
341+
size_t MergeTreeReaderStreamMultipleColumns::getRightOffsetOneColumn(size_t right_mark_non_included, size_t column_position)
306342
{
307343
/// NOTE: if we are reading the whole file, then right_mark == marks_count
308344
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
@@ -311,7 +347,8 @@ size_t MergeTreeReaderStreamMultipleColumns::getRightOffsetOneColumn(size_t righ
311347
if (marks_count == 0)
312348
return 0;
313349

314-
assert(right_mark_non_included <= marks_count);
350+
chassert(right_mark_non_included <= marks_count);
351+
loadMarks();
315352

316353
if (right_mark_non_included == 0)
317354
return marks_getter->getMark(right_mark_non_included, column_position).offset_in_compressed_file;
@@ -347,9 +384,9 @@ size_t MergeTreeReaderStreamMultipleColumns::getRightOffsetOneColumn(size_t righ
347384
}
348385

349386
std::pair<size_t, size_t>
350-
MergeTreeReaderStreamMultipleColumns::estimateMarkRangeBytesOneColumn(const MarkRanges & mark_ranges, size_t column_position) const
387+
MergeTreeReaderStreamMultipleColumns::estimateMarkRangeBytesOneColumn(const MarkRanges & mark_ranges, size_t column_position)
351388
{
352-
assert(marks_getter != nullptr);
389+
loadMarks();
353390

354391
/// As a maximal range we return the maximal size of a whole stripe.
355392
size_t max_range_bytes = 0;
@@ -386,8 +423,9 @@ MergeTreeReaderStreamMultipleColumns::estimateMarkRangeBytesOneColumn(const Mark
386423
return {max_range_bytes, sum_range_bytes};
387424
}
388425

389-
MarkInCompressedFile MergeTreeReaderStreamMultipleColumns::getStartOfNextStripeMark(size_t row_index, size_t column_position) const
426+
MarkInCompressedFile MergeTreeReaderStreamMultipleColumns::getStartOfNextStripeMark(size_t row_index, size_t column_position)
390427
{
428+
loadMarks();
391429
const auto & current_mark = marks_getter->getMark(row_index, column_position);
392430

393431
if (marks_getter->getNumColumns() == 1)
@@ -434,27 +472,27 @@ MarkInCompressedFile MergeTreeReaderStreamMultipleColumns::getStartOfNextStripeM
434472
return marks_getter->getMark(mark_index + 1, column_position + 1);
435473
}
436474

437-
size_t MergeTreeReaderStreamOneOfMultipleColumns::getRightOffset(size_t right_mark_non_included) const
475+
size_t MergeTreeReaderStreamOneOfMultipleColumns::getRightOffset(size_t right_mark_non_included)
438476
{
439477
return getRightOffsetOneColumn(right_mark_non_included, column_position);
440478
}
441479

442-
std::pair<size_t, size_t> MergeTreeReaderStreamOneOfMultipleColumns::estimateMarkRangeBytes(const MarkRanges & mark_ranges) const
480+
std::pair<size_t, size_t> MergeTreeReaderStreamOneOfMultipleColumns::estimateMarkRangeBytes(const MarkRanges & mark_ranges)
443481
{
444482
return estimateMarkRangeBytesOneColumn(mark_ranges, column_position);
445483
}
446484

447-
size_t MergeTreeReaderStreamAllOfMultipleColumns::getRightOffset(size_t right_mark_non_included) const
485+
size_t MergeTreeReaderStreamAllOfMultipleColumns::getRightOffset(size_t right_mark_non_included)
448486
{
449487
return getRightOffsetOneColumn(right_mark_non_included, marks_loader->getNumColumns() - 1);
450488
}
451489

452-
std::pair<size_t, size_t> MergeTreeReaderStreamAllOfMultipleColumns::estimateMarkRangeBytes(const MarkRanges & mark_ranges) const
490+
std::pair<size_t, size_t> MergeTreeReaderStreamAllOfMultipleColumns::estimateMarkRangeBytes(const MarkRanges & mark_ranges)
453491
{
454492
size_t max_range_bytes = 0;
455493
size_t sum_range_bytes = 0;
456494

457-
for (size_t i = 0; i < marks_getter->getNumColumns(); ++i)
495+
for (size_t i = 0; i < marks_loader->getNumColumns(); ++i)
458496
{
459497
auto [current_max, current_sum] = estimateMarkRangeBytesOneColumn(mark_ranges, i);
460498

src/Storages/MergeTree/MergeTreeReaderStream.h

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class MergeTreeReaderStream
4040
/// Seeks to exact mark in file.
4141
void seekToMarkAndColumn(size_t row_index, size_t column_position);
4242

43+
/// Seeks to the start of the file.
4344
void seekToStart();
4445

4546
/**
@@ -53,11 +54,11 @@ class MergeTreeReaderStream
5354

5455
private:
5556
/// Returns offset in file up to which it's needed to read file to read all rows up to @right_mark mark.
56-
virtual size_t getRightOffset(size_t right_mark) const = 0;
57+
virtual size_t getRightOffset(size_t right_mark) = 0;
5758

5859
/// Returns estimated max amount of bytes to read among mark ranges (which is used as size for read buffer)
5960
/// and total amount of bytes to read in all mark ranges.
60-
virtual std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) const = 0;
61+
virtual std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) = 0;
6162

6263
const ReadBufferFromFileBase::ProfileCallback profile_callback;
6364
const clockid_t clock_type;
@@ -80,6 +81,7 @@ class MergeTreeReaderStream
8081

8182
protected:
8283
void init();
84+
void loadMarks();
8385

8486
const MergeTreeReaderSettings settings;
8587
const size_t marks_count;
@@ -100,11 +102,25 @@ class MergeTreeReaderStreamSingleColumn : public MergeTreeReaderStream
100102
{
101103
}
102104

103-
size_t getRightOffset(size_t right_mark_non_included) const override;
104-
std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) const override;
105+
size_t getRightOffset(size_t right_mark_non_included) override;
106+
std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) override;
105107
void seekToMark(size_t row_index) override { seekToMarkAndColumn(row_index, 0); }
106108
};
107109

110+
class MergeTreeReaderStreamSingleColumnWholePart : public MergeTreeReaderStream
111+
{
112+
public:
113+
template <typename... Args>
114+
explicit MergeTreeReaderStreamSingleColumnWholePart(Args &&... args)
115+
: MergeTreeReaderStream{std::forward<Args>(args)...}
116+
{
117+
}
118+
119+
size_t getRightOffset(size_t right_mark_non_included) override;
120+
std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) override;
121+
void seekToMark(size_t row_index) override;
122+
};
123+
108124
/// Base class for reading from file that contains multiple columns.
109125
/// It is used to read from compact parts.
110126
/// See more details about data layout in MergeTreeDataPartCompact.h.
@@ -118,9 +134,9 @@ class MergeTreeReaderStreamMultipleColumns : public MergeTreeReaderStream
118134
}
119135

120136
protected:
121-
size_t getRightOffsetOneColumn(size_t right_mark_non_included, size_t column_position) const;
122-
std::pair<size_t, size_t> estimateMarkRangeBytesOneColumn(const MarkRanges & mark_ranges, size_t column_position) const;
123-
MarkInCompressedFile getStartOfNextStripeMark(size_t row_index, size_t column_position) const;
137+
size_t getRightOffsetOneColumn(size_t right_mark_non_included, size_t column_position);
138+
std::pair<size_t, size_t> estimateMarkRangeBytesOneColumn(const MarkRanges & mark_ranges, size_t column_position);
139+
MarkInCompressedFile getStartOfNextStripeMark(size_t row_index, size_t column_position);
124140
};
125141

126142
/// Class for reading a single column from file that contains multiple columns
@@ -135,8 +151,8 @@ class MergeTreeReaderStreamOneOfMultipleColumns : public MergeTreeReaderStreamMu
135151
{
136152
}
137153

138-
size_t getRightOffset(size_t right_mark_non_included) const override;
139-
std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) const override;
154+
size_t getRightOffset(size_t right_mark_non_included) override;
155+
std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) override;
140156
void seekToMark(size_t row_index) override { seekToMarkAndColumn(row_index, column_position); }
141157

142158
private:
@@ -154,8 +170,8 @@ class MergeTreeReaderStreamAllOfMultipleColumns : public MergeTreeReaderStreamMu
154170
{
155171
}
156172

157-
size_t getRightOffset(size_t right_mark_non_included) const override;
158-
std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) const override;
173+
size_t getRightOffset(size_t right_mark_non_included) override;
174+
std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) override;
159175
void seekToMark(size_t row_index) override { seekToMarkAndColumn(row_index, 0); }
160176
};
161177

0 commit comments

Comments
 (0)