Skip to content

Commit b25ce38

Browse files
committed
[feature](vparquet-reader) Implements parquet file page cache.
1 parent 6bcd55e commit b25ce38

File tree

10 files changed

+1185
-15
lines changed

10 files changed

+1185
-15
lines changed

be/src/common/config.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,11 @@ DEFINE_mInt32(parquet_header_max_size_mb, "1");
10001000
DEFINE_mInt32(parquet_rowgroup_max_buffer_mb, "128");
10011001
// Max buffer size for parquet chunk column
10021002
DEFINE_mInt32(parquet_column_max_buffer_mb, "8");
1003+
// Enable Parquet file page cache for Parquet page reader
1004+
DEFINE_Bool(enable_parquet_file_page_cache, "true");
1005+
// Threshold ratio to decide caching decompressed parquet page. If uncompressed_size <=
1006+
// parquet_page_cache_decompress_threshold * compressed_size, we cache decompressed page.
1007+
DEFINE_mDouble(parquet_page_cache_decompress_threshold, "1.5");
10031008
DEFINE_mDouble(max_amplified_read_ratio, "0.8");
10041009
DEFINE_mInt32(merged_oss_min_io_size, "1048576");
10051010
DEFINE_mInt32(merged_hdfs_min_io_size, "8192");

be/src/common/config.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,13 @@ DECLARE_String(memory_mode);
144144
// if true, process memory limit and memory usage based on cgroup memory info.
145145
DECLARE_mBool(enable_use_cgroup_memory_info);
146146

147+
// Enable Parquet file page cache for Parquet page reader
148+
DECLARE_Bool(enable_parquet_file_page_cache);
149+
150+
// Threshold ratio to decide caching decompressed parquet page. If uncompressed_size <=
151+
// parquet_page_cache_decompress_threshold * compressed_size, we cache decompressed page.
152+
DECLARE_mDouble(parquet_page_cache_decompress_threshold);
153+
147154
// process memory limit specified as number of bytes
148155
// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
149156
// or percentage of the physical memory ('<int>%').

be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp

Lines changed: 168 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include <utility>
2626

2727
#include "common/compiler_util.h" // IWYU pragma: keep
28+
#include "io/fs/buffered_reader.h"
29+
#include "olap/page_cache.h"
2830
#include "util/bit_util.h"
2931
#include "util/block_compression.h"
3032
#include "util/runtime_profile.h"
@@ -103,7 +105,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_parse_first_page_header(
103105
RETURN_IF_ERROR(parse_page_header());
104106

105107
const tparquet::PageHeader* header = nullptr;
106-
RETURN_IF_ERROR(_page_reader->get_page_header(header));
108+
RETURN_IF_ERROR(_page_reader->get_page_header(&header));
107109
if (header->type == tparquet::PageType::DICTIONARY_PAGE) {
108110
// the first page maybe directory page even if _metadata.__isset.dictionary_page_offset == false,
109111
// so we should parse the directory page in next_page()
@@ -124,8 +126,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
124126
RETURN_IF_ERROR(_page_reader->parse_page_header());
125127

126128
const tparquet::PageHeader* header = nullptr;
127-
;
128-
RETURN_IF_ERROR(_page_reader->get_page_header(header));
129+
RETURN_IF_ERROR(_page_reader->get_page_header(&header));
129130
int32_t page_num_values = _page_reader->is_header_v2() ? header->data_page_header_v2.num_values
130131
: header->data_page_header.num_values;
131132
_remaining_rep_nums = page_num_values;
@@ -168,40 +169,161 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() {
168169
}
169170

170171
const tparquet::PageHeader* header = nullptr;
171-
RETURN_IF_ERROR(_page_reader->get_page_header(header));
172+
RETURN_IF_ERROR(_page_reader->get_page_header(&header));
172173
int32_t uncompressed_size = header->uncompressed_page_size;
173174

175+
// First, try to reuse a cache handle previously discovered by PageReader
176+
// (header-only lookup) to avoid a second lookup here. If no handle is
177+
// attached, fall back to a StoragePageCache lookup for a decompressed page.
178+
if (config::enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
179+
StoragePageCache::instance() != nullptr) {
180+
if (_page_reader->has_page_cache_handle()) {
181+
const PageCacheHandle& handle = _page_reader->page_cache_handle();
182+
Slice cached = handle.data();
183+
size_t header_size = _page_reader->header_bytes().size();
184+
//size_t levels_size = 0;
185+
size_t levels_size = 0;
186+
if (header->__isset.data_page_header_v2) {
187+
const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
188+
size_t rl = header_v2.repetition_levels_byte_length;
189+
size_t dl = header_v2.definition_levels_byte_length;
190+
levels_size = rl + dl;
191+
_v2_rep_levels =
192+
Slice(reinterpret_cast<const uint8_t*>(cached.data) + header_size, rl);
193+
_v2_def_levels =
194+
Slice(reinterpret_cast<const uint8_t*>(cached.data) + header_size + rl, dl);
195+
}
196+
// payload_slice points to the bytes after header and levels
197+
Slice payload_slice(cached.data + header_size + levels_size,
198+
cached.size - header_size - levels_size);
199+
// Determine whether cached payload is compressed by comparing sizes
200+
bool payload_is_compressed = false;
201+
// if (header->__isset.data_page_header_v2) {
202+
// const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
203+
// levels_size = header_v2.repetition_levels_byte_length + header_v2.definition_levels_byte_length;
204+
// // compressed_page_size is stored on the top-level PageHeader
205+
// payload_is_compressed = (payload_slice.size == static_cast<size_t>(header->compressed_page_size) - levels_size);
206+
// } else if (header->__isset.data_page_header) {
207+
// payload_is_compressed = (payload_slice.size == static_cast<size_t>(header->compressed_page_size));
208+
// }
209+
210+
if (header->__isset.data_page_header_v2) {
211+
const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
212+
levels_size = header_v2.repetition_levels_byte_length +
213+
header_v2.definition_levels_byte_length;
214+
// compressed_page_size is stored on the top-level PageHeader
215+
payload_is_compressed =
216+
(!(payload_slice.size ==
217+
static_cast<size_t>(header->uncompressed_page_size) - levels_size)) &&
218+
(payload_slice.size ==
219+
static_cast<size_t>(header->compressed_page_size) - levels_size);
220+
} else if (header->__isset.data_page_header) {
221+
payload_is_compressed =
222+
(!(payload_slice.size ==
223+
static_cast<size_t>(header->uncompressed_page_size))) &&
224+
(payload_slice.size == static_cast<size_t>(header->compressed_page_size));
225+
}
226+
227+
if (payload_is_compressed && _block_compress_codec != nullptr) {
228+
// Decompress cached payload into _decompress_buf for decoding
229+
size_t uncompressed_payload_size =
230+
header->__isset.data_page_header_v2
231+
? static_cast<size_t>(header->uncompressed_page_size) - levels_size
232+
: static_cast<size_t>(header->uncompressed_page_size);
233+
_reserve_decompress_buf(uncompressed_payload_size);
234+
_page_data = Slice(_decompress_buf.get(), uncompressed_payload_size);
235+
RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &_page_data));
236+
} else {
237+
// Cached payload is already uncompressed
238+
_page_data = payload_slice;
239+
}
240+
// page cache counters were incremented when PageReader did the header-only
241+
// cache lookup. Do not increment again to avoid double-counting.
242+
goto data_loaded;
243+
} else {
244+
}
245+
}
246+
174247
if (_block_compress_codec != nullptr) {
175248
Slice compressed_data;
176249
RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
250+
std::vector<uint8_t> level_bytes;
177251
if (header->__isset.data_page_header_v2) {
178252
const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
179253
// uncompressed_size = rl + dl + uncompressed_data_size
180254
// compressed_size = rl + dl + compressed_data_size
181255
uncompressed_size -= header_v2.repetition_levels_byte_length +
182256
header_v2.definition_levels_byte_length;
257+
// copy level bytes (rl + dl) so that we can cache header + levels + uncompressed payload
258+
size_t rl = header_v2.repetition_levels_byte_length;
259+
size_t dl = header_v2.definition_levels_byte_length;
260+
level_bytes.resize(rl + dl);
261+
memcpy(level_bytes.data(), compressed_data.data, rl + dl);
262+
// now remove levels from compressed_data for decompression
183263
_get_uncompressed_levels(header_v2, compressed_data);
184264
}
185265
bool is_v2_compressed =
186266
header->__isset.data_page_header_v2 && header->data_page_header_v2.is_compressed;
187-
if (header->__isset.data_page_header || is_v2_compressed) {
188-
// check decompressed buffer size
267+
bool page_has_compression = header->__isset.data_page_header || is_v2_compressed;
268+
269+
if (page_has_compression) {
270+
// Decompress payload for immediate decoding
189271
_reserve_decompress_buf(uncompressed_size);
190272
_page_data = Slice(_decompress_buf.get(), uncompressed_size);
191273
SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
192274
_chunk_statistics.decompress_cnt++;
193275
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data));
276+
277+
// Decide whether to cache decompressed payload or compressed payload based on threshold
278+
bool should_cache_decompressed = false;
279+
if (header->compressed_page_size > 0) {
280+
should_cache_decompressed =
281+
(static_cast<double>(header->uncompressed_page_size) <=
282+
static_cast<double>(config::parquet_page_cache_decompress_threshold) *
283+
static_cast<double>(header->compressed_page_size));
284+
}
285+
286+
if (config::enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
287+
StoragePageCache::instance() != nullptr && !_page_reader->header_bytes().empty()) {
288+
if (should_cache_decompressed) {
289+
_insert_page_into_cache(level_bytes, _page_data);
290+
} else {
291+
// cache the compressed payload as-is (header | levels | compressed_payload)
292+
_insert_page_into_cache(level_bytes,
293+
Slice(compressed_data.data, compressed_data.size));
294+
}
295+
}
194296
} else {
195-
// Don't need decompress
297+
// no compression on this page, use the data directly
196298
_page_data = Slice(compressed_data.data, compressed_data.size);
299+
if (config::enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
300+
StoragePageCache::instance() != nullptr) {
301+
_insert_page_into_cache(level_bytes, _page_data);
302+
}
197303
}
198304
} else {
199-
RETURN_IF_ERROR(_page_reader->get_page_data(_page_data));
305+
// For uncompressed page, we may still need to extract v2 levels
306+
std::vector<uint8_t> level_bytes;
307+
Slice uncompressed_data;
308+
RETURN_IF_ERROR(_page_reader->get_page_data(uncompressed_data));
200309
if (header->__isset.data_page_header_v2) {
201-
_get_uncompressed_levels(header->data_page_header_v2, _page_data);
310+
const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
311+
size_t rl = header_v2.repetition_levels_byte_length;
312+
size_t dl = header_v2.definition_levels_byte_length;
313+
level_bytes.resize(rl + dl);
314+
memcpy(level_bytes.data(), uncompressed_data.data, rl + dl);
315+
_get_uncompressed_levels(header_v2, uncompressed_data);
316+
}
317+
// copy page data out
318+
_page_data = Slice(uncompressed_data.data, uncompressed_data.size);
319+
// Optionally cache uncompressed data for uncompressed pages
320+
if (config::enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
321+
StoragePageCache::instance() != nullptr) {
322+
_insert_page_into_cache(level_bytes, _page_data);
202323
}
203324
}
204325

326+
data_loaded:
205327
// Initialize repetition level and definition level. Skip when level = 0, which means required field.
206328
if (_max_rep_level > 0) {
207329
SCOPED_RAW_TIMER(&_chunk_statistics.decode_level_time);
@@ -243,7 +365,15 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() {
243365
_decoders[static_cast<int>(encoding)] = std::move(page_decoder);
244366
_page_decoder = _decoders[static_cast<int>(encoding)].get();
245367
}
246-
// Reset page data for each page
368+
// Reset page data for each page.
369+
// If this is a v2 data page, _page_data currently contains rl+dl followed by payload.
370+
// The decoder expects payload-only, so strip the level bytes into a temporary Slice
371+
// that points into the same cached memory (so ownership remains with the cache handle).
372+
// Slice payload_slice = _page_data;
373+
// if (header->__isset.data_page_header_v2) {
374+
// const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
375+
// _get_uncompressed_levels(header_v2, payload_slice);
376+
// }
247377
RETURN_IF_ERROR(_page_decoder->set_data(&_page_data));
248378

249379
_state = DATA_LOADED;
@@ -253,7 +383,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() {
253383
template <bool IN_COLLECTION, bool OFFSET_INDEX>
254384
Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_decode_dict_page() {
255385
const tparquet::PageHeader* header = nullptr;
256-
RETURN_IF_ERROR(_page_reader->get_page_header(header));
386+
RETURN_IF_ERROR(_page_reader->get_page_header(&header));
257387
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type);
258388
SCOPED_RAW_TIMER(&_chunk_statistics.decode_dict_time);
259389

@@ -305,6 +435,33 @@ void ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_reserve_decompress_buf(siz
305435
}
306436
}
307437

438+
template <bool IN_COLLECTION, bool OFFSET_INDEX>
439+
void ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_insert_page_into_cache(
440+
const std::vector<uint8_t>& level_bytes, const Slice& payload) {
441+
int64_t header_start = _page_reader->header_start_offset();
442+
StoragePageCache::CacheKey key(_stream_reader->path(), _page_reader->file_end_offset(),
443+
header_start);
444+
const std::vector<uint8_t>& header_bytes = _page_reader->header_bytes();
445+
size_t total = header_bytes.size() + level_bytes.size() + payload.size;
446+
auto* page = new DataPage(total, true, segment_v2::DATA_PAGE);
447+
size_t pos = 0;
448+
memcpy(page->data() + pos, header_bytes.data(), header_bytes.size());
449+
pos += header_bytes.size();
450+
if (!level_bytes.empty()) {
451+
memcpy(page->data() + pos, level_bytes.data(), level_bytes.size());
452+
pos += level_bytes.size();
453+
}
454+
if (payload.size > 0) {
455+
memcpy(page->data() + pos, payload.data, payload.size);
456+
pos += payload.size;
457+
}
458+
page->reset_size(total);
459+
PageCacheHandle handle;
460+
StoragePageCache::instance()->insert(key, page, &handle, segment_v2::DATA_PAGE);
461+
_page_reader->attach_page_cache_handle(std::move(handle));
462+
_chunk_statistics.page_cache_write_counter += 1;
463+
}
464+
308465
template <bool IN_COLLECTION, bool OFFSET_INDEX>
309466
Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::skip_values(size_t num_values,
310467
bool skip_data) {

be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,15 @@ struct ColumnChunkReaderStatistics {
6161
int64_t skip_page_header_num = 0;
6262
int64_t parse_page_header_num = 0;
6363
int64_t read_page_header_time = 0;
64+
// page cache metrics
65+
// total pages read (from cache or file)
66+
int64_t page_read_counter = 0;
67+
int64_t page_cache_write_counter = 0;
68+
// number of cache hits (either compressed or decompressed)
69+
int64_t page_cache_hit_counter = 0;
70+
// per-hit breakdown
71+
int64_t page_cache_compressed_hit_counter = 0;
72+
int64_t page_cache_decompressed_hit_counter = 0;
6473
};
6574

6675
/**
@@ -74,6 +83,15 @@ struct ColumnChunkReaderStatistics {
7483
int64_t decode_level_time = 0;
7584
int64_t skip_page_header_num = 0;
7685
int64_t parse_page_header_num = 0;
86+
// page cache metrics
87+
// total pages read (from cache or file)
88+
int64_t page_read_counter = 0;
89+
int64_t page_cache_write_counter = 0;
90+
// number of cache hits (either compressed or decompressed)
91+
int64_t page_cache_hit_counter = 0;
92+
// per-hit breakdown
93+
int64_t page_cache_compressed_hit_counter = 0;
94+
int64_t page_cache_decompressed_hit_counter = 0;
7795
};
7896
* // Create chunk reader
7997
* ColumnChunkReader chunk_reader(BufferedStreamReader* reader,
@@ -155,6 +173,15 @@ class ColumnChunkReader {
155173
_page_reader->page_statistics().parse_page_header_num;
156174
_chunk_statistics.read_page_header_time =
157175
_page_reader->page_statistics().read_page_header_time;
176+
_chunk_statistics.page_read_counter += _page_reader->page_statistics().page_read_counter;
177+
_chunk_statistics.page_cache_write_counter +=
178+
_page_reader->page_statistics().page_cache_write_counter;
179+
_chunk_statistics.page_cache_hit_counter +=
180+
_page_reader->page_statistics().page_cache_hit_counter;
181+
_chunk_statistics.page_cache_compressed_hit_counter +=
182+
_page_reader->page_statistics().page_cache_compressed_hit_counter;
183+
_chunk_statistics.page_cache_decompressed_hit_counter +=
184+
_page_reader->page_statistics().page_cache_decompressed_hit_counter;
158185
return _chunk_statistics;
159186
}
160187

@@ -193,6 +220,12 @@ class ColumnChunkReader {
193220
size_t* result_rows, bool* cross_page);
194221
Status load_cross_page_nested_row(std::vector<level_t>& rep_levels, bool* cross_page);
195222

223+
// Test helpers / accessors
224+
Slice get_page_data() const { return _page_data; }
225+
const Slice& v2_rep_levels() const { return _v2_rep_levels; }
226+
const Slice& v2_def_levels() const { return _v2_def_levels; }
227+
ColumnChunkReaderStatistics& statistics() { return chunk_statistics(); }
228+
196229
private:
197230
enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED, PAGE_SKIPPED };
198231

@@ -202,6 +235,7 @@ class ColumnChunkReader {
202235

203236
void _reserve_decompress_buf(size_t size);
204237
int32_t _get_type_length();
238+
void _insert_page_into_cache(const std::vector<uint8_t>& level_bytes, const Slice& payload);
205239

206240
void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, Slice& page_data);
207241
Status _skip_nested_rows_in_page(size_t num_rows);

0 commit comments

Comments
 (0)