Skip to content

Commit 3d89dda

Browse files
committed
[feature](vparquet-reader) Implements parquet file page cache.
1 parent 6bcd55e commit 3d89dda

File tree

10 files changed

+1199
-15
lines changed

10 files changed

+1199
-15
lines changed

be/src/common/config.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,11 @@ DEFINE_mInt32(parquet_header_max_size_mb, "1");
10001000
DEFINE_mInt32(parquet_rowgroup_max_buffer_mb, "128");
10011001
// Max buffer size for parquet chunk column
10021002
DEFINE_mInt32(parquet_column_max_buffer_mb, "8");
1003+
// Enable Parquet file page cache for Parquet page reader
1004+
DEFINE_Bool(enable_parquet_file_page_cache, "true");
1005+
// Threshold ratio to decide caching decompressed parquet page. If uncompressed_size <=
1006+
// parquet_page_cache_decompress_threshold * compressed_size, we cache decompressed page.
1007+
DEFINE_mDouble(parquet_page_cache_decompress_threshold, "1.5");
10031008
DEFINE_mDouble(max_amplified_read_ratio, "0.8");
10041009
DEFINE_mInt32(merged_oss_min_io_size, "1048576");
10051010
DEFINE_mInt32(merged_hdfs_min_io_size, "8192");

be/src/common/config.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,13 @@ DECLARE_String(memory_mode);
144144
// if true, process memory limit and memory usage based on cgroup memory info.
145145
DECLARE_mBool(enable_use_cgroup_memory_info);
146146

147+
// Enable Parquet file page cache for Parquet page reader
148+
DECLARE_Bool(enable_parquet_file_page_cache);
149+
150+
// Threshold ratio to decide caching decompressed parquet page. If uncompressed_size <=
151+
// parquet_page_cache_decompress_threshold * compressed_size, we cache decompressed page.
152+
DECLARE_mDouble(parquet_page_cache_decompress_threshold);
153+
147154
// process memory limit specified as number of bytes
148155
// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
149156
// or percentage of the physical memory ('<int>%').

be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp

Lines changed: 169 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include <utility>
2626

2727
#include "common/compiler_util.h" // IWYU pragma: keep
28+
#include "io/fs/buffered_reader.h"
29+
#include "olap/page_cache.h"
2830
#include "util/bit_util.h"
2931
#include "util/block_compression.h"
3032
#include "util/runtime_profile.h"
@@ -103,7 +105,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_parse_first_page_header(
103105
RETURN_IF_ERROR(parse_page_header());
104106

105107
const tparquet::PageHeader* header = nullptr;
106-
RETURN_IF_ERROR(_page_reader->get_page_header(header));
108+
RETURN_IF_ERROR(_page_reader->get_page_header(&header));
107109
if (header->type == tparquet::PageType::DICTIONARY_PAGE) {
108110
// the first page maybe directory page even if _metadata.__isset.dictionary_page_offset == false,
109111
// so we should parse the directory page in next_page()
@@ -124,8 +126,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
124126
RETURN_IF_ERROR(_page_reader->parse_page_header());
125127

126128
const tparquet::PageHeader* header = nullptr;
127-
;
128-
RETURN_IF_ERROR(_page_reader->get_page_header(header));
129+
RETURN_IF_ERROR(_page_reader->get_page_header(&header));
129130
int32_t page_num_values = _page_reader->is_header_v2() ? header->data_page_header_v2.num_values
130131
: header->data_page_header.num_values;
131132
_remaining_rep_nums = page_num_values;
@@ -168,40 +169,163 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() {
168169
}
169170

170171
const tparquet::PageHeader* header = nullptr;
171-
RETURN_IF_ERROR(_page_reader->get_page_header(header));
172+
RETURN_IF_ERROR(_page_reader->get_page_header(&header));
172173
int32_t uncompressed_size = header->uncompressed_page_size;
173174

175+
// First, try to reuse a cache handle previously discovered by PageReader
176+
// (header-only lookup) to avoid a second lookup here. If no handle is
177+
// attached, fall back to a StoragePageCache lookup for a decompressed page.
178+
if (config::enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
179+
StoragePageCache::instance() != nullptr) {
180+
if (_page_reader->has_page_cache_handle()) {
181+
const PageCacheHandle& handle = _page_reader->page_cache_handle();
182+
Slice cached = handle.data();
183+
size_t header_size = _page_reader->header_bytes().size();
184+
//size_t levels_size = 0;
185+
size_t levels_size = 0;
186+
if (header->__isset.data_page_header_v2) {
187+
const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
188+
size_t rl = header_v2.repetition_levels_byte_length;
189+
size_t dl = header_v2.definition_levels_byte_length;
190+
levels_size = rl + dl;
191+
_v2_rep_levels =
192+
Slice(reinterpret_cast<const uint8_t*>(cached.data) + header_size, rl);
193+
_v2_def_levels =
194+
Slice(reinterpret_cast<const uint8_t*>(cached.data) + header_size + rl, dl);
195+
}
196+
// payload_slice points to the bytes after header and levels
197+
Slice payload_slice(cached.data + header_size + levels_size,
198+
cached.size - header_size - levels_size);
199+
// Determine whether cached payload is compressed by comparing sizes
200+
bool payload_is_compressed = false;
201+
// if (header->__isset.data_page_header_v2) {
202+
// const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
203+
// levels_size = header_v2.repetition_levels_byte_length + header_v2.definition_levels_byte_length;
204+
// // compressed_page_size is stored on the top-level PageHeader
205+
// payload_is_compressed = (payload_slice.size == static_cast<size_t>(header->compressed_page_size) - levels_size);
206+
// } else if (header->__isset.data_page_header) {
207+
// payload_is_compressed = (payload_slice.size == static_cast<size_t>(header->compressed_page_size));
208+
// }
209+
210+
if (header->__isset.data_page_header_v2) {
211+
const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
212+
levels_size = header_v2.repetition_levels_byte_length +
213+
header_v2.definition_levels_byte_length;
214+
// compressed_page_size is stored on the top-level PageHeader
215+
payload_is_compressed =
216+
(!(payload_slice.size ==
217+
static_cast<size_t>(header->uncompressed_page_size) - levels_size)) &&
218+
(payload_slice.size ==
219+
static_cast<size_t>(header->compressed_page_size) - levels_size);
220+
} else if (header->__isset.data_page_header) {
221+
payload_is_compressed =
222+
(!(payload_slice.size ==
223+
static_cast<size_t>(header->uncompressed_page_size))) &&
224+
(payload_slice.size == static_cast<size_t>(header->compressed_page_size));
225+
}
226+
227+
if (payload_is_compressed && _block_compress_codec != nullptr) {
228+
// Decompress cached payload into _decompress_buf for decoding
229+
size_t uncompressed_payload_size =
230+
header->__isset.data_page_header_v2
231+
? static_cast<size_t>(header->uncompressed_page_size) - levels_size
232+
: static_cast<size_t>(header->uncompressed_page_size);
233+
_reserve_decompress_buf(uncompressed_payload_size);
234+
_page_data = Slice(_decompress_buf.get(), uncompressed_payload_size);
235+
SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
236+
_chunk_statistics.decompress_cnt++;
237+
RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &_page_data));
238+
} else {
239+
// Cached payload is already uncompressed
240+
_page_data = payload_slice;
241+
}
242+
// page cache counters were incremented when PageReader did the header-only
243+
// cache lookup. Do not increment again to avoid double-counting.
244+
goto data_loaded;
245+
} else {
246+
}
247+
}
248+
174249
if (_block_compress_codec != nullptr) {
175250
Slice compressed_data;
176251
RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
252+
std::vector<uint8_t> level_bytes;
177253
if (header->__isset.data_page_header_v2) {
178254
const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
179255
// uncompressed_size = rl + dl + uncompressed_data_size
180256
// compressed_size = rl + dl + compressed_data_size
181257
uncompressed_size -= header_v2.repetition_levels_byte_length +
182258
header_v2.definition_levels_byte_length;
259+
// copy level bytes (rl + dl) so that we can cache header + levels + uncompressed payload
260+
size_t rl = header_v2.repetition_levels_byte_length;
261+
size_t dl = header_v2.definition_levels_byte_length;
262+
level_bytes.resize(rl + dl);
263+
memcpy(level_bytes.data(), compressed_data.data, rl + dl);
264+
// now remove levels from compressed_data for decompression
183265
_get_uncompressed_levels(header_v2, compressed_data);
184266
}
185267
bool is_v2_compressed =
186268
header->__isset.data_page_header_v2 && header->data_page_header_v2.is_compressed;
187-
if (header->__isset.data_page_header || is_v2_compressed) {
188-
// check decompressed buffer size
269+
bool page_has_compression = header->__isset.data_page_header || is_v2_compressed;
270+
271+
if (page_has_compression) {
272+
// Decompress payload for immediate decoding
189273
_reserve_decompress_buf(uncompressed_size);
190274
_page_data = Slice(_decompress_buf.get(), uncompressed_size);
191275
SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
192276
_chunk_statistics.decompress_cnt++;
193277
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data));
278+
279+
// Decide whether to cache decompressed payload or compressed payload based on threshold
280+
bool should_cache_decompressed = false;
281+
if (header->compressed_page_size > 0) {
282+
should_cache_decompressed =
283+
(static_cast<double>(header->uncompressed_page_size) <=
284+
static_cast<double>(config::parquet_page_cache_decompress_threshold) *
285+
static_cast<double>(header->compressed_page_size));
286+
}
287+
288+
if (config::enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
289+
StoragePageCache::instance() != nullptr && !_page_reader->header_bytes().empty()) {
290+
if (should_cache_decompressed) {
291+
_insert_page_into_cache(level_bytes, _page_data);
292+
} else {
293+
// cache the compressed payload as-is (header | levels | compressed_payload)
294+
_insert_page_into_cache(level_bytes,
295+
Slice(compressed_data.data, compressed_data.size));
296+
}
297+
}
194298
} else {
195-
// Don't need decompress
299+
// no compression on this page, use the data directly
196300
_page_data = Slice(compressed_data.data, compressed_data.size);
301+
if (config::enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
302+
StoragePageCache::instance() != nullptr) {
303+
_insert_page_into_cache(level_bytes, _page_data);
304+
}
197305
}
198306
} else {
199-
RETURN_IF_ERROR(_page_reader->get_page_data(_page_data));
307+
// For uncompressed page, we may still need to extract v2 levels
308+
std::vector<uint8_t> level_bytes;
309+
Slice uncompressed_data;
310+
RETURN_IF_ERROR(_page_reader->get_page_data(uncompressed_data));
200311
if (header->__isset.data_page_header_v2) {
201-
_get_uncompressed_levels(header->data_page_header_v2, _page_data);
312+
const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
313+
size_t rl = header_v2.repetition_levels_byte_length;
314+
size_t dl = header_v2.definition_levels_byte_length;
315+
level_bytes.resize(rl + dl);
316+
memcpy(level_bytes.data(), uncompressed_data.data, rl + dl);
317+
_get_uncompressed_levels(header_v2, uncompressed_data);
318+
}
319+
// copy page data out
320+
_page_data = Slice(uncompressed_data.data, uncompressed_data.size);
321+
// Optionally cache uncompressed data for uncompressed pages
322+
if (config::enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
323+
StoragePageCache::instance() != nullptr) {
324+
_insert_page_into_cache(level_bytes, _page_data);
202325
}
203326
}
204327

328+
data_loaded:
205329
// Initialize repetition level and definition level. Skip when level = 0, which means required field.
206330
if (_max_rep_level > 0) {
207331
SCOPED_RAW_TIMER(&_chunk_statistics.decode_level_time);
@@ -243,7 +367,15 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() {
243367
_decoders[static_cast<int>(encoding)] = std::move(page_decoder);
244368
_page_decoder = _decoders[static_cast<int>(encoding)].get();
245369
}
246-
// Reset page data for each page
370+
// Reset page data for each page.
371+
// If this is a v2 data page, _page_data currently contains rl+dl followed by payload.
372+
// The decoder expects payload-only, so strip the level bytes into a temporary Slice
373+
// that points into the same cached memory (so ownership remains with the cache handle).
374+
// Slice payload_slice = _page_data;
375+
// if (header->__isset.data_page_header_v2) {
376+
// const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
377+
// _get_uncompressed_levels(header_v2, payload_slice);
378+
// }
247379
RETURN_IF_ERROR(_page_decoder->set_data(&_page_data));
248380

249381
_state = DATA_LOADED;
@@ -253,7 +385,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() {
253385
template <bool IN_COLLECTION, bool OFFSET_INDEX>
254386
Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_decode_dict_page() {
255387
const tparquet::PageHeader* header = nullptr;
256-
RETURN_IF_ERROR(_page_reader->get_page_header(header));
388+
RETURN_IF_ERROR(_page_reader->get_page_header(&header));
257389
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type);
258390
SCOPED_RAW_TIMER(&_chunk_statistics.decode_dict_time);
259391

@@ -305,6 +437,32 @@ void ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_reserve_decompress_buf(siz
305437
}
306438
}
307439

440+
template <bool IN_COLLECTION, bool OFFSET_INDEX>
441+
void ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_insert_page_into_cache(
442+
const std::vector<uint8_t>& level_bytes, const Slice& payload) {
443+
int64_t header_start = _page_reader->header_start_offset();
444+
StoragePageCache::CacheKey key(_stream_reader->path(), _page_reader->file_end_offset(),
445+
header_start);
446+
const std::vector<uint8_t>& header_bytes = _page_reader->header_bytes();
447+
size_t total = header_bytes.size() + level_bytes.size() + payload.size;
448+
auto* page = new DataPage(total, true, segment_v2::DATA_PAGE);
449+
size_t pos = 0;
450+
memcpy(page->data() + pos, header_bytes.data(), header_bytes.size());
451+
pos += header_bytes.size();
452+
if (!level_bytes.empty()) {
453+
memcpy(page->data() + pos, level_bytes.data(), level_bytes.size());
454+
pos += level_bytes.size();
455+
}
456+
if (payload.size > 0) {
457+
memcpy(page->data() + pos, payload.data, payload.size);
458+
pos += payload.size;
459+
}
460+
page->reset_size(total);
461+
PageCacheHandle handle;
462+
StoragePageCache::instance()->insert(key, page, &handle, segment_v2::DATA_PAGE);
463+
_chunk_statistics.page_cache_write_counter += 1;
464+
}
465+
308466
template <bool IN_COLLECTION, bool OFFSET_INDEX>
309467
Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::skip_values(size_t num_values,
310468
bool skip_data) {

be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,16 @@ struct ColumnChunkReaderStatistics {
6161
int64_t skip_page_header_num = 0;
6262
int64_t parse_page_header_num = 0;
6363
int64_t read_page_header_time = 0;
64+
// page cache metrics
65+
// total pages read (from cache or file)
66+
int64_t page_read_counter = 0;
67+
int64_t page_cache_write_counter = 0;
68+
// number of cache hits (either compressed or decompressed)
69+
int64_t page_cache_hit_counter = 0;
70+
int64_t page_cache_missing_counter = 0;
71+
// per-hit breakdown
72+
int64_t page_cache_compressed_hit_counter = 0;
73+
int64_t page_cache_decompressed_hit_counter = 0;
6474
};
6575

6676
/**
@@ -74,6 +84,15 @@ struct ColumnChunkReaderStatistics {
7484
int64_t decode_level_time = 0;
7585
int64_t skip_page_header_num = 0;
7686
int64_t parse_page_header_num = 0;
87+
// page cache metrics
88+
// total pages read (from cache or file)
89+
int64_t page_read_counter = 0;
90+
int64_t page_cache_write_counter = 0;
91+
// number of cache hits (either compressed or decompressed)
92+
int64_t page_cache_hit_counter = 0;
93+
// per-hit breakdown
94+
int64_t page_cache_compressed_hit_counter = 0;
95+
int64_t page_cache_decompressed_hit_counter = 0;
7796
};
7897
* // Create chunk reader
7998
* ColumnChunkReader chunk_reader(BufferedStreamReader* reader,
@@ -155,6 +174,17 @@ class ColumnChunkReader {
155174
_page_reader->page_statistics().parse_page_header_num;
156175
_chunk_statistics.read_page_header_time =
157176
_page_reader->page_statistics().read_page_header_time;
177+
_chunk_statistics.page_read_counter += _page_reader->page_statistics().page_read_counter;
178+
_chunk_statistics.page_cache_write_counter +=
179+
_page_reader->page_statistics().page_cache_write_counter;
180+
_chunk_statistics.page_cache_hit_counter +=
181+
_page_reader->page_statistics().page_cache_hit_counter;
182+
_chunk_statistics.page_cache_missing_counter +=
183+
_page_reader->page_statistics().page_cache_missing_counter;
184+
_chunk_statistics.page_cache_compressed_hit_counter +=
185+
_page_reader->page_statistics().page_cache_compressed_hit_counter;
186+
_chunk_statistics.page_cache_decompressed_hit_counter +=
187+
_page_reader->page_statistics().page_cache_decompressed_hit_counter;
158188
return _chunk_statistics;
159189
}
160190

@@ -193,6 +223,12 @@ class ColumnChunkReader {
193223
size_t* result_rows, bool* cross_page);
194224
Status load_cross_page_nested_row(std::vector<level_t>& rep_levels, bool* cross_page);
195225

226+
// Test helpers / accessors
227+
Slice get_page_data() const { return _page_data; }
228+
const Slice& v2_rep_levels() const { return _v2_rep_levels; }
229+
const Slice& v2_def_levels() const { return _v2_def_levels; }
230+
ColumnChunkReaderStatistics& statistics() { return chunk_statistics(); }
231+
196232
private:
197233
enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED, PAGE_SKIPPED };
198234

@@ -202,6 +238,7 @@ class ColumnChunkReader {
202238

203239
void _reserve_decompress_buf(size_t size);
204240
int32_t _get_type_length();
241+
void _insert_page_into_cache(const std::vector<uint8_t>& level_bytes, const Slice& payload);
205242

206243
void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, Slice& page_data);
207244
Status _skip_nested_rows_in_page(size_t num_rows);

0 commit comments

Comments
 (0)