Skip to content

Commit 8a8bb63

Browse files
authored
[feature](vparquet-reader) Implements parquet file page cache. (#59307)
### What problem does this PR solve? Problem Summary: ### Release note [Feature] Implementation of Parquet File Page Cache and Integration with Unified Page Cache Framework #### Solution Overview This PR implements a page-level caching mechanism for Parquet files and integrates it with Apache Doris's existing unified page cache framework, significantly improving query performance by caching decompressed (or compressed) data pages in memory. Key Features 1. Unified Page Cache Integration • Leverages Existing Framework: Directly integrates with Doris's StoragePageCache infrastructure used for internal tables • Shared Resource Management: Parquet cache shares memory pool and eviction policies with internal table caches • Consistent Monitoring: Reuses existing cache statistics and RuntimeProfile for unified performance monitoring • Cache Type Identification: Uses segment_v2::DATA_PAGE as cache page type, consistent with internal table data page caching 2. Smart Caching Strategy • Compression Ratio Awareness: Automatically chooses between caching compressed or decompressed data based on parquet_page_cache_decompress_threshold (default: 1.5) • Flexible Storage: Caches decompressed data when uncompressed_size/compressed_size ≤ threshold, otherwise caches compressed data if enable_parquet_cache_compressed_pages=true • Cache Key Design: Uses file_path::mtime::offset as key to ensure cache consistency across file modifications
1 parent 86ebb7c commit 8a8bb63

40 files changed

+1469
-94
lines changed

be/src/common/config.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,12 @@ DEFINE_Int32(index_page_cache_percentage, "10");
404404
DEFINE_mBool(disable_storage_page_cache, "false");
405405
// whether to disable row cache feature in storage
406406
DEFINE_mBool(disable_storage_row_cache, "true");
407+
// Parquet page cache: threshold ratio for caching decompressed vs compressed pages
408+
// If uncompressed_size / compressed_size <= threshold, cache decompressed;
409+
// otherwise cache compressed if enable_parquet_cache_compressed_pages = true
410+
DEFINE_Double(parquet_page_cache_decompress_threshold, "1.5");
411+
// Parquet page cache: whether to enable caching compressed pages (when ratio exceeds threshold)
412+
DEFINE_Bool(enable_parquet_cache_compressed_pages, "false");
407413
// whether to disable pk page cache feature in storage
408414
DEFINE_Bool(disable_pk_storage_page_cache, "false");
409415

be/src/common/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,12 @@ DECLARE_Int32(index_page_cache_percentage);
453453
DECLARE_Bool(disable_storage_page_cache);
454454
// whether to disable row cache feature in storage
455455
DECLARE_mBool(disable_storage_row_cache);
456+
// Parquet page cache: threshold ratio for caching decompressed vs compressed pages
457+
// If uncompressed_size / compressed_size <= threshold, cache decompressed;
458+
// otherwise cache compressed if enable_parquet_cache_compressed_pages = true
459+
DECLARE_Double(parquet_page_cache_decompress_threshold);
460+
// Parquet page cache: whether to enable caching compressed pages (when ratio exceeds threshold)
461+
DECLARE_Bool(enable_parquet_cache_compressed_pages);
456462
// whether to disable pk page cache feature in storage
457463
DECLARE_Bool(disable_pk_storage_page_cache);
458464

be/src/io/cache/cached_remote_file_reader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class CachedRemoteFileReader final : public FileReader {
5555

5656
static std::pair<size_t, size_t> s_align_size(size_t offset, size_t size, size_t length);
5757

58+
int64_t mtime() const override { return _remote_file_reader->mtime(); }
59+
5860
protected:
5961
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
6062
const IOContext* io_ctx) override;

be/src/io/file_factory.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,18 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
203203
const io::FileSystemProperties& system_properties,
204204
const io::FileDescription& file_description, const io::FileReaderOptions& reader_options,
205205
RuntimeProfile* profile) {
206+
auto reader_res = _create_file_reader_internal(system_properties, file_description,
207+
reader_options, profile);
208+
if (!reader_res.has_value()) {
209+
return unexpected(std::move(reader_res).error());
210+
}
211+
return std::move(reader_res).value();
212+
}
213+
214+
Result<io::FileReaderSPtr> FileFactory::_create_file_reader_internal(
215+
const io::FileSystemProperties& system_properties,
216+
const io::FileDescription& file_description, const io::FileReaderOptions& reader_options,
217+
RuntimeProfile* profile) {
206218
TFileType::type type = system_properties.system_type;
207219
switch (type) {
208220
case TFileType::FILE_LOCAL: {

be/src/io/file_factory.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@ class FileFactory {
126126

127127
private:
128128
static std::string _get_fs_name(const io::FileDescription& file_description);
129+
130+
/// Create FileReader without FS
131+
static Result<io::FileReaderSPtr> _create_file_reader_internal(
132+
const io::FileSystemProperties& system_properties,
133+
const io::FileDescription& file_description,
134+
const io::FileReaderOptions& reader_options, RuntimeProfile* profile = nullptr);
129135
};
130136

131137
} // namespace doris

be/src/io/fs/broker_file_reader.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ struct IOContext;
3939

4040
BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size,
4141
TBrokerFD fd,
42-
std::shared_ptr<BrokerServiceConnection> connection)
42+
std::shared_ptr<BrokerServiceConnection> connection,
43+
int64_t mtime)
4344
: _path(std::move(path)),
4445
_file_size(file_size),
4546
_broker_addr(broker_addr),
4647
_fd(fd),
47-
_connection(std::move(connection)) {
48+
_connection(std::move(connection)),
49+
_mtime(mtime) {
4850
DorisMetrics::instance()->broker_file_open_reading->increment(1);
4951
DorisMetrics::instance()->broker_file_reader_total->increment(1);
5052
}

be/src/io/fs/broker_file_reader.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ struct IOContext;
3838
class BrokerFileReader final : public FileReader {
3939
public:
4040
BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd,
41-
std::shared_ptr<BrokerServiceConnection> connection);
41+
std::shared_ptr<BrokerServiceConnection> connection, int64_t mtime = 0);
4242

4343
~BrokerFileReader() override;
4444

@@ -50,6 +50,8 @@ class BrokerFileReader final : public FileReader {
5050

5151
bool closed() const override { return _closed.load(std::memory_order_acquire); }
5252

53+
int64_t mtime() const override { return _mtime; }
54+
5355
protected:
5456
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
5557
const IOContext* io_ctx) override;
@@ -62,6 +64,7 @@ class BrokerFileReader final : public FileReader {
6264
TBrokerFD _fd;
6365

6466
std::shared_ptr<BrokerServiceConnection> _connection;
67+
int64_t _mtime;
6568
std::atomic<bool> _closed = false;
6669
};
6770
} // namespace doris::io

be/src/io/fs/broker_file_system.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ Status BrokerFileSystem::open_file_internal(const Path& file, FileReaderSPtr* re
139139
error_msg(response->opStatus.message));
140140
}
141141
*reader = std::make_shared<BrokerFileReader>(_broker_addr, file, fsize, response->fd,
142-
_connection);
142+
_connection, opts.mtime);
143143
return Status::OK();
144144
}
145145

be/src/io/fs/buffered_reader.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ class RangeCacheFileReader : public io::FileReader {
160160

161161
bool closed() const override { return _closed; }
162162

163+
int64_t mtime() const override { return _inner_reader->mtime(); }
164+
163165
protected:
164166
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
165167
const IOContext* io_ctx) override;
@@ -329,6 +331,8 @@ class MergeRangeFileReader : public io::FileReader {
329331

330332
bool closed() const override { return _closed; }
331333

334+
int64_t mtime() const override { return _reader->mtime(); }
335+
332336
// for test only
333337
size_t buffer_remaining() const { return _remaining; }
334338

@@ -532,6 +536,8 @@ class PrefetchBufferedReader final : public io::FileReader {
532536

533537
bool closed() const override { return _closed; }
534538

539+
int64_t mtime() const override { return _reader->mtime(); }
540+
535541
void set_random_access_ranges(const std::vector<PrefetchRange>* random_access_ranges) {
536542
_random_access_ranges = random_access_ranges;
537543
for (auto& _pre_buffer : _pre_buffers) {
@@ -592,6 +598,8 @@ class InMemoryFileReader final : public io::FileReader {
592598

593599
bool closed() const override { return _closed; }
594600

601+
int64_t mtime() const override { return _reader->mtime(); }
602+
595603
protected:
596604
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
597605
const IOContext* io_ctx) override;
@@ -626,6 +634,8 @@ class BufferedStreamReader {
626634
virtual ~BufferedStreamReader() = default;
627635
// return the file path
628636
virtual std::string path() = 0;
637+
638+
virtual int64_t mtime() const = 0;
629639
};
630640

631641
class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector {
@@ -639,6 +649,8 @@ class BufferedFileStreamReader : public BufferedStreamReader, public ProfileColl
639649
Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override;
640650
std::string path() override { return _file->path(); }
641651

652+
int64_t mtime() const override { return _file->mtime(); }
653+
642654
protected:
643655
void _collect_profile_before_close() override {
644656
if (_file != nullptr) {

be/src/io/fs/file_reader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ class FileReader : public doris::ProfileCollector {
9090

9191
virtual const std::string& get_data_dir_path() { return VIRTUAL_REMOTE_DATA_DIR; }
9292

93+
// File modification time (seconds since epoch). Default to 0 meaning unknown.
94+
virtual int64_t mtime() const = 0;
95+
9396
protected:
9497
virtual Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
9598
const IOContext* io_ctx) = 0;

0 commit comments

Comments
 (0)