diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5f78c7d9294a30..8bf940c7b64e67 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -398,6 +398,11 @@ DEFINE_Int32(index_page_cache_percentage, "10"); DEFINE_mBool(disable_storage_page_cache, "false"); // whether to disable row cache feature in storage DEFINE_mBool(disable_storage_row_cache, "true"); +// Parquet page cache: threshold ratio for caching decompressed vs compressed pages +// If uncompressed_size / compressed_size <= threshold, cache decompressed; otherwise cache compressed +DEFINE_Double(parquet_page_cache_decompress_threshold, "1.5"); +// Parquet page cache: whether to enable caching compressed pages (when ratio exceeds threshold) +DEFINE_Bool(enable_parquet_cache_compressed_pages, "false"); // whether to disable pk page cache feature in storage DEFINE_Bool(disable_pk_storage_page_cache, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index c9d9fe94ffbdca..f32e3cb0be22c3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -447,6 +447,10 @@ DECLARE_Int32(index_page_cache_percentage); DECLARE_Bool(disable_storage_page_cache); // whether to disable row cache feature in storage DECLARE_mBool(disable_storage_row_cache); +// Parquet page cache: threshold ratio for caching decompressed vs compressed pages +DECLARE_Double(parquet_page_cache_decompress_threshold); +// Parquet page cache: whether to enable caching compressed pages +DECLARE_Bool(enable_parquet_cache_compressed_pages); // whether to disable pk page cache feature in storage DECLARE_Bool(disable_pk_storage_page_cache); diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index 939471b62ea41d..20c1a47ce881c3 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -55,6 +55,8 @@ class CachedRemoteFileReader final : public FileReader { static std::pair s_align_size(size_t offset, size_t size, size_t length); + int64_t mtime() const override { return _remote_file_reader->mtime(); } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index bd08bc20461016..074849356438dc 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -203,6 +203,21 @@ Result FileFactory::create_file_reader( const io::FileSystemProperties& system_properties, const io::FileDescription& file_description, const io::FileReaderOptions& reader_options, RuntimeProfile* profile) { + auto reader_res = _create_file_reader_internal(system_properties, file_description, + reader_options, profile); + if (!reader_res.has_value()) { + return unexpected(std::move(reader_res).error()); + } + auto file_reader = std::move(reader_res).value(); + LOG_INFO("create file reader for path={}, size={}, mtime={}", file_description.path, + file_description.file_size, file_description.mtime); + return file_reader; +} + +Result FileFactory::_create_file_reader_internal( + const io::FileSystemProperties& system_properties, + const io::FileDescription& file_description, const io::FileReaderOptions& reader_options, + RuntimeProfile* profile) { TFileType::type type = system_properties.system_type; switch (type) { case TFileType::FILE_LOCAL: { diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 0ba791bd0a3dc9..61e322ca0af02c 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -126,6 +126,12 @@ class FileFactory { private: static std::string _get_fs_name(const io::FileDescription& file_description); + + /// Create FileReader without FS + static Result _create_file_reader_internal( + const io::FileSystemProperties& system_properties, + const io::FileDescription& file_description, + const io::FileReaderOptions& reader_options, RuntimeProfile* profile = nullptr); }; } // namespace doris diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp index 102ea3e247778a..41b2992f70008a 100644 --- a/be/src/io/fs/broker_file_reader.cpp +++ b/be/src/io/fs/broker_file_reader.cpp @@ -39,12 +39,14 @@ struct IOContext; BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd, - std::shared_ptr connection) + std::shared_ptr connection, + int64_t mtime) : _path(std::move(path)), _file_size(file_size), _broker_addr(broker_addr), _fd(fd), - _connection(std::move(connection)) { + _connection(std::move(connection)), + _mtime(mtime) { DorisMetrics::instance()->broker_file_open_reading->increment(1); DorisMetrics::instance()->broker_file_reader_total->increment(1); } diff --git a/be/src/io/fs/broker_file_reader.h b/be/src/io/fs/broker_file_reader.h index 7d19edb32c0dea..2f6bd94b652bcb 100644 --- a/be/src/io/fs/broker_file_reader.h +++ b/be/src/io/fs/broker_file_reader.h @@ -38,7 +38,7 @@ struct IOContext; class BrokerFileReader final : public FileReader { public: BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd, - std::shared_ptr connection); + std::shared_ptr connection, int64_t mtime = 0); ~BrokerFileReader() override; @@ -50,6 +50,8 @@ class BrokerFileReader final : public FileReader { bool closed() const override { return _closed.load(std::memory_order_acquire); } + int64_t mtime() const override { return _mtime; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -62,6 +64,7 @@ class BrokerFileReader final : public FileReader { TBrokerFD _fd; std::shared_ptr _connection; + int64_t _mtime; std::atomic _closed = false; }; } // namespace doris::io diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index 8b0d5db23e2116..b0dc89dc277ad1 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -139,7 +139,7 @@ Status BrokerFileSystem::open_file_internal(const Path& file, FileReaderSPtr* re error_msg(response->opStatus.message)); } *reader = std::make_shared(_broker_addr, file, fsize, response->fd, - _connection); + _connection, opts.mtime); return Status::OK(); } diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 5fe071762351d8..6ddcca02067ddb 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -160,6 +160,8 @@ class RangeCacheFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _inner_reader->mtime(); } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -329,6 +331,8 @@ class MergeRangeFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _reader->mtime(); } + // for test only size_t buffer_remaining() const { return _remaining; } @@ -532,6 +536,8 @@ class PrefetchBufferedReader final : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _reader->mtime(); } + void set_random_access_ranges(const std::vector* random_access_ranges) { _random_access_ranges = random_access_ranges; for (auto& _pre_buffer : _pre_buffers) { @@ -592,6 +598,8 @@ class InMemoryFileReader final : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _reader->mtime(); } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -626,6 +634,8 @@ class BufferedStreamReader { virtual ~BufferedStreamReader() = default; // return the file path virtual std::string path() = 0; + + virtual int64_t mtime() const = 0; }; class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector { @@ -639,6 +649,8 @@ class BufferedFileStreamReader : public BufferedStreamReader, public ProfileColl Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override; std::string path() override { return _file->path(); } + int64_t mtime() const override { return _file->mtime(); } + protected: void _collect_profile_before_close() override { if (_file != nullptr) { diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h index e6d8527e831906..3df912cbad4af9 100644 --- a/be/src/io/fs/file_reader.h +++ b/be/src/io/fs/file_reader.h @@ -90,6 +90,9 @@ class FileReader : public doris::ProfileCollector { virtual const std::string& get_data_dir_path() { return VIRTUAL_REMOTE_DATA_DIR; } + // File modification time (seconds since epoch). Default to 0 meaning unknown. + virtual int64_t mtime() const = 0; + protected: virtual Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) = 0; diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 0e278dff0c8847..b1d65a63ba0529 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -66,16 +66,17 @@ Result HdfsFileReader::create(Path full_path, const hdfsFS& fs, auto path = convert_path(full_path, fs_name); return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&& accessor) { return std::make_shared(std::move(path), std::move(fs_name), - std::move(accessor), profile); + std::move(accessor), profile, opts.mtime); }); } HdfsFileReader::HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor, - RuntimeProfile* profile) + RuntimeProfile* profile, int64_t mtime) : _path(std::move(path)), _fs_name(std::move(fs_name)), _accessor(std::move(accessor)), - _profile(profile) { + _profile(profile), + _mtime(mtime) { _handle = _accessor.get(); DorisMetrics::instance()->hdfs_file_open_reading->increment(1); diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h index 8556eea0de6ac5..08f98bca29af0c 100644 --- a/be/src/io/fs/hdfs_file_reader.h +++ b/be/src/io/fs/hdfs_file_reader.h @@ -45,7 +45,7 @@ class HdfsFileReader final : public FileReader { const FileReaderOptions& opts, RuntimeProfile* profile); HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor, - RuntimeProfile* profile); + RuntimeProfile* profile, int64_t mtime = 0); ~HdfsFileReader() override; @@ -57,6 +57,8 @@ class HdfsFileReader final : public FileReader { bool closed() const override { return _closed.load(std::memory_order_acquire); } + int64_t mtime() const override { return _mtime; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -86,6 +88,7 @@ class HdfsFileReader final : public FileReader { CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle std::atomic _closed = false; RuntimeProfile* _profile = nullptr; + int64_t _mtime; #ifdef USE_HADOOP_HDFS HDFSProfile _hdfs_profile; #endif diff --git a/be/src/io/fs/http_file_reader.cpp b/be/src/io/fs/http_file_reader.cpp index fb243179baf557..5ad984039fc475 100644 --- a/be/src/io/fs/http_file_reader.cpp +++ b/be/src/io/fs/http_file_reader.cpp @@ -34,7 +34,7 @@ Result HttpFileReader::create(const std::string& url, ofi.path = Path(url); ofi.extend_info = props; - auto reader = std::make_shared(ofi, url); + auto reader = std::make_shared(ofi, url, opts.mtime); // Open the file to detect Range support and validate configuration RETURN_IF_ERROR_RESULT(reader->open(opts)); @@ -42,11 +42,12 @@ Result HttpFileReader::create(const std::string& url, return reader; } -HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url) +HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, int64_t mtime) : _extend_kv(fileInfo.extend_info), _path(fileInfo.path), _url(std::move(url)), - _client(std::make_unique()) { + _client(std::make_unique()), + _mtime(mtime) { auto etag_iter = _extend_kv.find("etag"); if (etag_iter != _extend_kv.end()) { _etag = etag_iter->second; diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h index 607eedf3d1a50b..982e65905aa691 100644 --- a/be/src/io/fs/http_file_reader.h +++ b/be/src/io/fs/http_file_reader.h @@ -41,7 +41,7 @@ class HttpFileReader final : public FileReader { const std::map& props, const FileReaderOptions& opts, RuntimeProfile* profile); - explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url); + explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url, int64_t mtime); ~HttpFileReader() override; Status open(const FileReaderOptions& opts); @@ -52,6 +52,8 @@ class HttpFileReader final : public FileReader { bool closed() const override { return _closed.load(std::memory_order_acquire); } size_t size() const override { return _file_size; } + int64_t mtime() const override { return _mtime; } + private: // Prepare and initialize the HTTP client for a new request Status prepare_client(bool set_fail_on_error = true); @@ -78,6 +80,7 @@ class HttpFileReader final : public FileReader { int64_t _last_modified = 0; std::atomic _closed = false; std::unique_ptr _client; + int64_t _mtime; // Configuration for non-Range request handling bool _enable_range_request = true; // Whether Range request is required diff --git a/be/src/io/fs/http_file_system.cpp b/be/src/io/fs/http_file_system.cpp index 92e175ca774041..b1e8de354ad9b3 100644 --- a/be/src/io/fs/http_file_system.cpp +++ b/be/src/io/fs/http_file_system.cpp @@ -56,7 +56,7 @@ Status HttpFileSystem::open_file_internal(const Path& path, FileReaderSPtr* read // Pass properties (including HTTP headers) to the file reader file_info.extend_info = _properties; - auto http_reader = std::make_shared(file_info, path.native()); + auto http_reader = std::make_shared(file_info, path.native(), opts.mtime); RETURN_IF_ERROR(http_reader->open(opts)); *reader = http_reader; return Status::OK(); diff --git a/be/src/io/fs/local_file_reader.h b/be/src/io/fs/local_file_reader.h index 38b4cfa55af3bf..e11bb152b672d8 100644 --- a/be/src/io/fs/local_file_reader.h +++ b/be/src/io/fs/local_file_reader.h @@ -63,6 +63,8 @@ class LocalFileReader final : public FileReader { const std::string& get_data_dir_path() override { return _data_dir_path; } + int64_t mtime() const override { return 0; } + private: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; diff --git a/be/src/io/fs/packed_file_reader.h b/be/src/io/fs/packed_file_reader.h index 79cd23c8cd71a8..b6b423fbbfefdb 100644 --- a/be/src/io/fs/packed_file_reader.h +++ b/be/src/io/fs/packed_file_reader.h @@ -48,6 +48,8 @@ class PackedFileReader final : public FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _inner_reader->mtime(); } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h index 58294ec1891cb8..40e3ac61d3aca6 100644 --- a/be/src/io/fs/s3_file_reader.h +++ b/be/src/io/fs/s3_file_reader.h @@ -53,6 +53,8 @@ class S3FileReader final : public FileReader { bool closed() const override { return _closed.load(std::memory_order_acquire); } + int64_t mtime() const override { return 0; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h index cedab0b6c17a7b..df137a9267cb29 100644 --- a/be/src/io/fs/stream_load_pipe.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -57,6 +57,8 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { size_t size() const override { return 0; } + int64_t mtime() const override { return 0; } + // called when consumer finished Status close() override { if (!(_finished || _cancelled)) { diff --git a/be/src/io/fs/tracing_file_reader.h b/be/src/io/fs/tracing_file_reader.h index 39b70dfbb63bef..7a6651afd21a2a 100644 --- a/be/src/io/fs/tracing_file_reader.h +++ b/be/src/io/fs/tracing_file_reader.h @@ -47,6 +47,8 @@ class TracingFileReader : public FileReader { void _collect_profile_at_runtime() override { return _inner->collect_profile_at_runtime(); } void _collect_profile_before_close() override { return _inner->collect_profile_before_close(); } + int64_t mtime() const override { return _inner->mtime(); } + FileReaderStats* stats() const { return _stats; } doris::io::FileReaderSPtr inner_reader() { return _inner; } diff --git a/be/src/vec/exec/format/orc/orc_file_reader.h b/be/src/vec/exec/format/orc/orc_file_reader.h index 503777e67c2946..15aeed332428b7 100644 --- a/be/src/vec/exec/format/orc/orc_file_reader.h +++ b/be/src/vec/exec/format/orc/orc_file_reader.h @@ -54,6 +54,8 @@ class OrcMergeRangeFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _inner_reader->mtime(); } + // for test only const Statistics& statistics() const { return _statistics; } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index a474b76a51814e..807975def2ff9e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -25,6 +25,8 @@ #include #include "common/compiler_util.h" // IWYU pragma: keep +#include "io/fs/buffered_reader.h" +#include "olap/page_cache.h" #include "util/bit_util.h" #include "util/block_compression.h" #include "util/runtime_profile.h" @@ -51,7 +53,7 @@ template ColumnChunkReader::ColumnChunkReader( io::BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema, const tparquet::OffsetIndex* offset_index, size_t total_rows, - io::IOContext* io_ctx) + io::IOContext* io_ctx, const ParquetPageReadContext& ctx) : _field_schema(field_schema), _max_rep_level(field_schema->repetition_level), _max_def_level(field_schema->definition_level), @@ -59,7 +61,8 @@ ColumnChunkReader::ColumnChunkReader( _metadata(column_chunk->meta_data), _offset_index(offset_index), _total_rows(total_rows), - _io_ctx(io_ctx) {} + _io_ctx(io_ctx), + _ctx(ctx) {} template Status ColumnChunkReader::init() { @@ -68,7 +71,8 @@ Status ColumnChunkReader::init() { size_t chunk_size = _metadata.total_compressed_size; // create page reader _page_reader = create_page_reader( - _stream_reader, _io_ctx, start_offset, chunk_size, _total_rows, _offset_index); + _stream_reader, _io_ctx, start_offset, chunk_size, _total_rows, _metadata, + _offset_index, _ctx); // get the block compression codec RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, &_block_compress_codec)); _state = INITIALIZED; @@ -103,7 +107,7 @@ Status ColumnChunkReader::_parse_first_page_header( RETURN_IF_ERROR(parse_page_header()); const tparquet::PageHeader* header = nullptr; - RETURN_IF_ERROR(_page_reader->get_page_header(header)); + RETURN_IF_ERROR(_page_reader->get_page_header(&header)); if (header->type == tparquet::PageType::DICTIONARY_PAGE) { // the first page maybe directory page even if _metadata.__isset.dictionary_page_offset == false, // so we should parse the directory page in next_page() @@ -124,8 +128,7 @@ Status ColumnChunkReader::parse_page_header() { RETURN_IF_ERROR(_page_reader->parse_page_header()); const tparquet::PageHeader* header = nullptr; - ; - RETURN_IF_ERROR(_page_reader->get_page_header(header)); + RETURN_IF_ERROR(_page_reader->get_page_header(&header)); int32_t page_num_values = _page_reader->is_header_v2() ? header->data_page_header_v2.num_values : header->data_page_header.num_values; _remaining_rep_nums = page_num_values; @@ -168,37 +171,151 @@ Status ColumnChunkReader::load_page_data() { } const tparquet::PageHeader* header = nullptr; - RETURN_IF_ERROR(_page_reader->get_page_header(header)); + RETURN_IF_ERROR(_page_reader->get_page_header(&header)); int32_t uncompressed_size = header->uncompressed_page_size; + bool page_loaded = false; + + // First, try to reuse a cache handle previously discovered by PageReader + // (header-only lookup) to avoid a second lookup here. If no handle is + // attached, fall back to a StoragePageCache lookup for a decompressed page. + if (_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr) { + if (_page_reader->has_page_cache_handle()) { + const PageCacheHandle& handle = _page_reader->page_cache_handle(); + Slice cached = handle.data(); + size_t header_size = _page_reader->header_bytes().size(); + //size_t levels_size = 0; + size_t levels_size = 0; + if (header->__isset.data_page_header_v2) { + const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2; + size_t rl = header_v2.repetition_levels_byte_length; + size_t dl = header_v2.definition_levels_byte_length; + levels_size = rl + dl; + _v2_rep_levels = + Slice(reinterpret_cast(cached.data) + header_size, rl); + _v2_def_levels = + Slice(reinterpret_cast(cached.data) + header_size + rl, dl); + } + // payload_slice points to the bytes after header and levels + Slice payload_slice(cached.data + header_size + levels_size, + cached.size - header_size - levels_size); - if (_block_compress_codec != nullptr) { - Slice compressed_data; - RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); - if (header->__isset.data_page_header_v2) { - const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2; - // uncompressed_size = rl + dl + uncompressed_data_size - // compressed_size = rl + dl + compressed_data_size - uncompressed_size -= header_v2.repetition_levels_byte_length + - header_v2.definition_levels_byte_length; - _get_uncompressed_levels(header_v2, compressed_data); + bool cache_payload_is_decompressed = _page_reader->is_cache_payload_decompressed(); + + if (cache_payload_is_decompressed) { + // Cached payload is already uncompressed + _page_data = payload_slice; + } else { + CHECK(_block_compress_codec); + // Decompress cached payload into _decompress_buf for decoding + size_t uncompressed_payload_size = + header->__isset.data_page_header_v2 + ? static_cast(header->uncompressed_page_size) - levels_size + : static_cast(header->uncompressed_page_size); + _reserve_decompress_buf(uncompressed_payload_size); + _page_data = Slice(_decompress_buf.get(), uncompressed_payload_size); + SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time); + _chunk_statistics.decompress_cnt++; + RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &_page_data)); + } + // page cache counters were incremented when PageReader did the header-only + // cache lookup. Do not increment again to avoid double-counting. + page_loaded = true; } - bool is_v2_compressed = - header->__isset.data_page_header_v2 && header->data_page_header_v2.is_compressed; - if (header->__isset.data_page_header || is_v2_compressed) { - // check decompressed buffer size - _reserve_decompress_buf(uncompressed_size); - _page_data = Slice(_decompress_buf.get(), uncompressed_size); - SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time); - _chunk_statistics.decompress_cnt++; - RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data)); + } + + if (!page_loaded) { + if (_block_compress_codec != nullptr) { + Slice compressed_data; + RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); + std::vector level_bytes; + if (header->__isset.data_page_header_v2) { + const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2; + // uncompressed_size = rl + dl + uncompressed_data_size + // compressed_size = rl + dl + compressed_data_size + uncompressed_size -= header_v2.repetition_levels_byte_length + + header_v2.definition_levels_byte_length; + // copy level bytes (rl + dl) so that we can cache header + levels + uncompressed payload + size_t rl = header_v2.repetition_levels_byte_length; + size_t dl = header_v2.definition_levels_byte_length; + size_t level_sz = rl + dl; + if (level_sz > 0) { + level_bytes.resize(level_sz); + memcpy(level_bytes.data(), compressed_data.data, level_sz); + } + // now remove levels from compressed_data for decompression + _get_uncompressed_levels(header_v2, compressed_data); + } + bool is_v2_compressed = header->__isset.data_page_header_v2 && + header->data_page_header_v2.is_compressed; + bool page_has_compression = header->__isset.data_page_header || is_v2_compressed; + + if (page_has_compression) { + // Decompress payload for immediate decoding + _reserve_decompress_buf(uncompressed_size); + _page_data = Slice(_decompress_buf.get(), uncompressed_size); + SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time); + _chunk_statistics.decompress_cnt++; + RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data)); + + // Decide whether to cache decompressed payload or compressed payload based on threshold + bool should_cache_decompressed = false; + if (header->compressed_page_size > 0) { + should_cache_decompressed = + (_metadata.codec == tparquet::CompressionCodec::UNCOMPRESSED) || + (static_cast(header->uncompressed_page_size) <= + static_cast(config::parquet_page_cache_decompress_threshold) * + static_cast(header->compressed_page_size)); + } + + if (_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr && + !_page_reader->header_bytes().empty()) { + if (should_cache_decompressed) { + _insert_page_into_cache(level_bytes, _page_data); + _chunk_statistics.page_cache_decompressed_write_counter += 1; + } else { + if (config::enable_parquet_cache_compressed_pages) { + // cache the compressed payload as-is (header | levels | compressed_payload) + _insert_page_into_cache( + level_bytes, Slice(compressed_data.data, compressed_data.size)); + _chunk_statistics.page_cache_compressed_write_counter += 1; + } + } + } + } else { + // no compression on this page, use the data directly + _page_data = Slice(compressed_data.data, compressed_data.size); + if (_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr) { + _insert_page_into_cache(level_bytes, _page_data); + _chunk_statistics.page_cache_decompressed_write_counter += 1; + } + } } else { - // Don't need decompress - _page_data = Slice(compressed_data.data, compressed_data.size); - } - } else { - RETURN_IF_ERROR(_page_reader->get_page_data(_page_data)); - if (header->__isset.data_page_header_v2) { - _get_uncompressed_levels(header->data_page_header_v2, _page_data); + // For uncompressed page, we may still need to extract v2 levels + std::vector level_bytes; + Slice uncompressed_data; + RETURN_IF_ERROR(_page_reader->get_page_data(uncompressed_data)); + if (header->__isset.data_page_header_v2) { + const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2; + size_t rl = header_v2.repetition_levels_byte_length; + size_t dl = header_v2.definition_levels_byte_length; + size_t level_sz = rl + dl; + if (level_sz > 0) { + level_bytes.resize(level_sz); + memcpy(level_bytes.data(), uncompressed_data.data, level_sz); + } + _get_uncompressed_levels(header_v2, uncompressed_data); + } + // copy page data out + _page_data = Slice(uncompressed_data.data, uncompressed_data.size); + // Optionally cache uncompressed data for uncompressed pages + if (_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr) { + _insert_page_into_cache(level_bytes, _page_data); + _chunk_statistics.page_cache_decompressed_write_counter += 1; + } } } @@ -243,7 +360,15 @@ Status ColumnChunkReader::load_page_data() { _decoders[static_cast(encoding)] = std::move(page_decoder); _page_decoder = _decoders[static_cast(encoding)].get(); } - // Reset page data for each page + // Reset page data for each page. + // If this is a v2 data page, _page_data currently contains rl+dl followed by payload. + // The decoder expects payload-only, so strip the level bytes into a temporary Slice + // that points into the same cached memory (so ownership remains with the cache handle). + // Slice payload_slice = _page_data; + // if (header->__isset.data_page_header_v2) { + // const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2; + // _get_uncompressed_levels(header_v2, payload_slice); + // } RETURN_IF_ERROR(_page_decoder->set_data(&_page_data)); _state = DATA_LOADED; @@ -253,7 +378,7 @@ Status ColumnChunkReader::load_page_data() { template Status ColumnChunkReader::_decode_dict_page() { const tparquet::PageHeader* header = nullptr; - RETURN_IF_ERROR(_page_reader->get_page_header(header)); + RETURN_IF_ERROR(_page_reader->get_page_header(&header)); DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type); SCOPED_RAW_TIMER(&_chunk_statistics.decode_dict_time); @@ -270,16 +395,88 @@ Status ColumnChunkReader::_decode_dict_page() { // Prepare dictionary data int32_t uncompressed_size = header->uncompressed_page_size; auto dict_data = make_unique_buffer(uncompressed_size); - if (_block_compress_codec != nullptr) { - Slice compressed_data; - RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); - Slice dict_slice(dict_data.get(), uncompressed_size); - RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &dict_slice)); - } else { - Slice dict_slice; - RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice)); - // The data is stored by BufferedStreamReader, we should copy it out - memcpy(dict_data.get(), dict_slice.data, dict_slice.size); + bool dict_loaded = false; + + // Try to load dictionary page from cache + if (_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr) { + if (_page_reader->has_page_cache_handle()) { + const PageCacheHandle& handle = _page_reader->page_cache_handle(); + Slice cached = handle.data(); + size_t header_size = _page_reader->header_bytes().size(); + // Dictionary page layout in cache: header | payload (compressed or uncompressed) + Slice payload_slice(cached.data + header_size, cached.size - header_size); + + bool cache_payload_is_decompressed = _page_reader->is_cache_payload_decompressed(); + + if (cache_payload_is_decompressed) { + // Use cached decompressed dictionary data + memcpy(dict_data.get(), payload_slice.data, payload_slice.size); + dict_loaded = true; + } else { + CHECK(_block_compress_codec); + // Decompress cached compressed dictionary data + Slice dict_slice(dict_data.get(), uncompressed_size); + RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &dict_slice)); + dict_loaded = true; + } + + // When dictionary page is loaded from cache, we need to skip the page data + // to update the offset correctly (similar to calling get_page_data()) + if (dict_loaded) { + _page_reader->skip_page_data(); + } + } + } + + if (!dict_loaded) { + // Load and decompress dictionary page from file + if (_block_compress_codec != nullptr) { + Slice compressed_data; + RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); + Slice dict_slice(dict_data.get(), uncompressed_size); + RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &dict_slice)); + + // Decide whether to cache decompressed or compressed dictionary based on threshold + bool should_cache_decompressed = false; + if (header->compressed_page_size > 0) { + should_cache_decompressed = + (static_cast(header->uncompressed_page_size) <= + static_cast(config::parquet_page_cache_decompress_threshold) * + static_cast(header->compressed_page_size)); + } + + if (_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr && !_page_reader->header_bytes().empty()) { + std::vector empty_levels; // Dictionary pages don't have levels + if (should_cache_decompressed) { + // Cache the decompressed dictionary page + _insert_page_into_cache(empty_levels, dict_slice); + _chunk_statistics.page_cache_decompressed_write_counter += 1; + } else { + if (config::enable_parquet_cache_compressed_pages) { + // Cache the compressed dictionary page + _insert_page_into_cache(empty_levels, + Slice(compressed_data.data, compressed_data.size)); + _chunk_statistics.page_cache_compressed_write_counter += 1; + } + } + } + } else { + Slice dict_slice; + RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice)); + // The data is stored by BufferedStreamReader, we should copy it out + memcpy(dict_data.get(), dict_slice.data, dict_slice.size); + + // Cache the uncompressed dictionary page + if (_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr && !_page_reader->header_bytes().empty()) { + std::vector empty_levels; + Slice payload(dict_data.get(), uncompressed_size); + _insert_page_into_cache(empty_levels, payload); + _chunk_statistics.page_cache_decompressed_write_counter += 1; + } + } } // Cache page decoder @@ -305,6 +502,32 @@ void ColumnChunkReader::_reserve_decompress_buf(siz } } +template +void ColumnChunkReader::_insert_page_into_cache( + const std::vector& level_bytes, const Slice& payload) { + StoragePageCache::CacheKey key( + fmt::format("{}::{}", _stream_reader->path(), _stream_reader->mtime()), + _page_reader->file_end_offset(), _page_reader->header_start_offset()); + const std::vector& header_bytes = _page_reader->header_bytes(); + size_t total = header_bytes.size() + level_bytes.size() + payload.size; + auto* page = new DataPage(total, true, segment_v2::DATA_PAGE); + size_t pos = 0; + memcpy(page->data() + pos, header_bytes.data(), header_bytes.size()); + pos += header_bytes.size(); + if (!level_bytes.empty()) { + memcpy(page->data() + pos, level_bytes.data(), level_bytes.size()); + pos += level_bytes.size(); + } + if (payload.size > 0) { + memcpy(page->data() + pos, payload.data, payload.size); + pos += payload.size; + } + page->reset_size(total); + PageCacheHandle handle; + StoragePageCache::instance()->insert(key, page, &handle, segment_v2::DATA_PAGE); + _chunk_statistics.page_cache_write_counter += 1; +} + template Status ColumnChunkReader::skip_values(size_t num_values, bool skip_data) { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 1270e5e37fcd1e..544fb781de4f64 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -61,6 +61,18 @@ struct ColumnChunkReaderStatistics { int64_t skip_page_header_num = 0; int64_t parse_page_header_num = 0; int64_t read_page_header_time = 0; + // page cache metrics + // total pages read (from cache or file) + int64_t page_read_counter = 0; + int64_t page_cache_write_counter = 0; + int64_t page_cache_compressed_write_counter = 0; + int64_t page_cache_decompressed_write_counter = 0; + // number of cache hits (either compressed or decompressed) + int64_t page_cache_hit_counter = 0; + int64_t page_cache_missing_counter = 0; + // per-hit breakdown + int64_t page_cache_compressed_hit_counter = 0; + int64_t page_cache_decompressed_hit_counter = 0; }; /** @@ -74,6 +86,17 @@ struct ColumnChunkReaderStatistics { int64_t decode_level_time = 0; int64_t skip_page_header_num = 0; int64_t parse_page_header_num = 0; + // page cache metrics + // total pages read (from cache or file) + int64_t page_read_counter = 0; + int64_t page_cache_write_counter = 0; + int64_t page_cache_compressed_write_counter = 0; + int64_t page_cache_decompressed_write_counter = 0; + // number of cache hits (either compressed or decompressed) + int64_t page_cache_hit_counter = 0; + // per-hit breakdown + int64_t page_cache_compressed_hit_counter = 0; + int64_t page_cache_decompressed_hit_counter = 0; }; * // Create chunk reader * ColumnChunkReader chunk_reader(BufferedStreamReader* reader, @@ -97,7 +120,8 @@ class ColumnChunkReader { public: ColumnChunkReader(io::BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema, const tparquet::OffsetIndex* offset_index, - size_t total_row, io::IOContext* io_ctx); + size_t total_row, io::IOContext* io_ctx, + const ParquetPageReadContext& ctx = ParquetPageReadContext()); ~ColumnChunkReader() = default; // Initialize chunk reader, will generate the decoder and codec. @@ -155,6 +179,21 @@ class ColumnChunkReader { _page_reader->page_statistics().parse_page_header_num; _chunk_statistics.read_page_header_time = _page_reader->page_statistics().read_page_header_time; + _chunk_statistics.page_read_counter += _page_reader->page_statistics().page_read_counter; + _chunk_statistics.page_cache_write_counter += + _page_reader->page_statistics().page_cache_write_counter; + _chunk_statistics.page_cache_compressed_write_counter += + _page_reader->page_statistics().page_cache_compressed_write_counter; + _chunk_statistics.page_cache_decompressed_write_counter += + _page_reader->page_statistics().page_cache_decompressed_write_counter; + _chunk_statistics.page_cache_hit_counter += + _page_reader->page_statistics().page_cache_hit_counter; + _chunk_statistics.page_cache_missing_counter += + _page_reader->page_statistics().page_cache_missing_counter; + _chunk_statistics.page_cache_compressed_hit_counter += + _page_reader->page_statistics().page_cache_compressed_hit_counter; + _chunk_statistics.page_cache_decompressed_hit_counter += + _page_reader->page_statistics().page_cache_decompressed_hit_counter; return _chunk_statistics; } @@ -193,6 +232,12 @@ class ColumnChunkReader { size_t* result_rows, bool* cross_page); Status load_cross_page_nested_row(std::vector& rep_levels, bool* cross_page); + // Test helpers / accessors + Slice get_page_data() const { return _page_data; } + const Slice& v2_rep_levels() const { return _v2_rep_levels; } + const Slice& v2_def_levels() const { return _v2_def_levels; } + ColumnChunkReaderStatistics& statistics() { return chunk_statistics(); } + private: enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED, PAGE_SKIPPED }; @@ -202,6 +247,7 @@ class ColumnChunkReader { void _reserve_decompress_buf(size_t size); int32_t _get_type_length(); + void _insert_page_into_cache(const std::vector& level_bytes, const Slice& payload); void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, Slice& page_data); Status _skip_nested_rows_in_page(size_t num_rows); @@ -221,6 +267,9 @@ class ColumnChunkReader { std::unique_ptr> _page_reader; BlockCompressionCodec* _block_compress_codec = nullptr; + // Session-level parquet page cache options + ParquetPageReadContext _ctx; + LevelDecoder _rep_level_decoder; LevelDecoder _def_level_decoder; size_t _chunk_parsed_values = 0; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 0917ca7cd06fb2..a2820b515d859c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -111,13 +111,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size, std::unordered_map& col_offsets, bool in_collection, const std::set& column_ids, - const std::set& filter_column_ids) { + const std::set& filter_column_ids, RuntimeState* state, + const std::string& created_by) { size_t total_rows = row_group.num_rows; if (field->data_type->get_primitive_type() == TYPE_ARRAY) { std::unique_ptr element_reader; RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx, element_reader, max_buf_size, col_offsets, true, column_ids, - filter_column_ids)); + filter_column_ids, state, created_by)); // element_reader->set_nested_column(); auto array_reader = ArrayColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx); RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field)); @@ -132,7 +133,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, // Create key reader RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx, key_reader, max_buf_size, col_offsets, true, column_ids, - filter_column_ids)); + filter_column_ids, state, created_by)); // key_reader->set_nested_column(); } else { auto skip_reader = std::make_unique(row_ranges, total_rows, ctz, @@ -145,7 +146,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, // Create value reader RETURN_IF_ERROR(create(file, &field->children[1], row_group, row_ranges, ctz, io_ctx, value_reader, max_buf_size, col_offsets, true, column_ids, - filter_column_ids)); + filter_column_ids, state, created_by)); // value_reader->set_nested_column(); } else { auto skip_reader = std::make_unique(row_ranges, total_rows, ctz, @@ -208,14 +209,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, auto scalar_reader = ScalarColumnReader::create_unique( row_ranges, total_rows, chunk, offset_index, ctz, io_ctx); - RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size)); + RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state, created_by)); scalar_reader->_filter_column_ids = filter_column_ids; reader.reset(scalar_reader.release()); } else { auto scalar_reader = ScalarColumnReader::create_unique( row_ranges, total_rows, chunk, offset_index, ctz, io_ctx); - RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size)); + RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state, created_by)); scalar_reader->_filter_column_ids = filter_column_ids; reader.reset(scalar_reader.release()); } @@ -224,14 +225,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, auto scalar_reader = ScalarColumnReader::create_unique( row_ranges, total_rows, chunk, offset_index, ctz, io_ctx); - RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size)); + RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state, created_by)); scalar_reader->_filter_column_ids = filter_column_ids; reader.reset(scalar_reader.release()); } else { auto scalar_reader = ScalarColumnReader::create_unique( row_ranges, total_rows, chunk, offset_index, ctz, io_ctx); - RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size)); + RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state, created_by)); scalar_reader->_filter_column_ids = filter_column_ids; reader.reset(scalar_reader.release()); } @@ -249,7 +250,9 @@ void ParquetColumnReader::_generate_read_ranges(RowRange page_row_range, template Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, - size_t max_buf_size) { + size_t max_buf_size, + RuntimeState* state, + const std::string& created_by) { _field_schema = field; auto& chunk_meta = _chunk_meta.meta_data; int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset @@ -265,8 +268,13 @@ Status ScalarColumnReader::init(io::FileReaderSPtr } _stream_reader = std::make_unique(file, chunk_start, chunk_len, prefetch_buffer_size); + // Build Parquet page read context: enable_parquet_file_page_cache from session, others from BE config + ParquetPageReadContext ctx( + (state == nullptr) ? true : state->query_options().enable_parquet_file_page_cache, + created_by); + _chunk_reader = std::make_unique>( - _stream_reader.get(), &_chunk_meta, field, _offset_index, _total_rows, _io_ctx); + _stream_reader.get(), &_chunk_meta, field, _offset_index, _total_rows, _io_ctx, ctx); RETURN_IF_ERROR(_chunk_reader->init()); return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 4a49473a69fe26..8bcd9712716962 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -65,7 +65,15 @@ class ParquetColumnReader { decode_null_map_time(0), skip_page_header_num(0), parse_page_header_num(0), - read_page_header_time(0) {} + read_page_header_time(0), + page_read_counter(0), + page_cache_write_counter(0), + page_cache_compressed_write_counter(0), + page_cache_decompressed_write_counter(0), + page_cache_hit_counter(0), + page_cache_missing_counter(0), + page_cache_compressed_hit_counter(0), + page_cache_decompressed_hit_counter(0) {} ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time) : page_index_read_calls(0), @@ -78,7 +86,15 @@ class ParquetColumnReader { decode_null_map_time(null_map_time), skip_page_header_num(cs.skip_page_header_num), parse_page_header_num(cs.parse_page_header_num), - read_page_header_time(cs.read_page_header_time) {} + read_page_header_time(cs.read_page_header_time), + page_read_counter(cs.page_read_counter), + page_cache_write_counter(cs.page_cache_write_counter), + page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter), + page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter), + page_cache_hit_counter(cs.page_cache_hit_counter), + page_cache_missing_counter(cs.page_cache_missing_counter), + page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter), + page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter) {} int64_t page_index_read_calls; int64_t decompress_time; @@ -91,6 +107,14 @@ class ParquetColumnReader { int64_t skip_page_header_num; int64_t parse_page_header_num; int64_t read_page_header_time; + int64_t page_read_counter; + int64_t page_cache_write_counter; + int64_t page_cache_compressed_write_counter; + int64_t page_cache_decompressed_write_counter; + int64_t page_cache_hit_counter; + int64_t page_cache_missing_counter; + int64_t page_cache_compressed_hit_counter; + int64_t page_cache_decompressed_hit_counter; void merge(ColumnStatistics& col_statistics) { page_index_read_calls += col_statistics.page_index_read_calls; @@ -104,6 +128,17 @@ class ParquetColumnReader { skip_page_header_num += col_statistics.skip_page_header_num; parse_page_header_num += col_statistics.parse_page_header_num; read_page_header_time += col_statistics.read_page_header_time; + page_read_counter += col_statistics.page_read_counter; + page_cache_write_counter += col_statistics.page_cache_write_counter; + page_cache_compressed_write_counter += + col_statistics.page_cache_compressed_write_counter; + page_cache_decompressed_write_counter += + col_statistics.page_cache_decompressed_write_counter; + page_cache_hit_counter += col_statistics.page_cache_hit_counter; + page_cache_missing_counter += col_statistics.page_cache_missing_counter; + page_cache_compressed_hit_counter += col_statistics.page_cache_compressed_hit_counter; + page_cache_decompressed_hit_counter += + col_statistics.page_cache_decompressed_hit_counter; } }; @@ -132,7 +167,8 @@ class ParquetColumnReader { std::unique_ptr& reader, size_t max_buf_size, std::unordered_map& col_offsets, bool in_collection = false, const std::set& column_ids = {}, - const std::set& filter_column_ids = {}); + const std::set& filter_column_ids = {}, + RuntimeState* state = nullptr, const std::string& created_by = ""); virtual const std::vector& get_rep_level() const = 0; virtual const std::vector& get_def_level() const = 0; virtual ColumnStatistics column_statistics() = 0; @@ -169,7 +205,8 @@ class ScalarColumnReader : public ParquetColumnReader { _chunk_meta(chunk_meta), _offset_index(offset_index) {} ~ScalarColumnReader() override { close(); } - Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size); + Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size, + RuntimeState* state, const std::string& created_by = ""); Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, const std::shared_ptr& root_node, FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 9952cf570a9f85..91f092b0848c8a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -84,7 +84,8 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, const PositionDeleteContext& position_delete_ctx, const LazyReadContext& lazy_read_ctx, RuntimeState* state, const std::set& column_ids, - const std::set& filter_column_ids) + const std::set& filter_column_ids, + const std::string& created_by) : _file_reader(file_reader), _read_table_columns(read_columns), _row_group_id(row_group_id), @@ -97,7 +98,8 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, _state(state), _obj_pool(new ObjectPool()), _column_ids(column_ids), - _filter_column_ids(filter_column_ids) {} + _filter_column_ids(filter_column_ids), + _created_by(created_by) {} RowGroupReader::~RowGroupReader() { _column_readers.clear(); @@ -130,9 +132,10 @@ Status RowGroupReader::init( auto read_file_col = _table_info_node_ptr->children_file_column_name(read_table_col); auto* field = schema.get_column(read_file_col); std::unique_ptr reader; - RETURN_IF_ERROR(ParquetColumnReader::create( - _file_reader, field, _row_group_meta, _read_ranges, _ctz, _io_ctx, reader, - max_buf_size, col_offsets, false, _column_ids, _filter_column_ids)); + RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, _row_group_meta, + _read_ranges, _ctz, _io_ctx, reader, + max_buf_size, col_offsets, false, _column_ids, + _filter_column_ids, _state, _created_by)); if (reader == nullptr) { VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed"; return Status::Corruption("Init row group reader failed"); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 0ea390c9bff86b..4afea3b6f84afa 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -166,7 +166,7 @@ class RowGroupReader : public ProfileCollector { const PositionDeleteContext& position_delete_ctx, const LazyReadContext& lazy_read_ctx, RuntimeState* state, const std::set& column_ids, - const std::set& filter_column_ids); + const std::set& filter_column_ids, const std::string& created_by = ""); ~RowGroupReader(); Status init(const FieldDescriptor& schema, RowRanges& row_ranges, @@ -274,6 +274,7 @@ class RowGroupReader : public ProfileCollector { std::shared_ptr _obj_pool; const std::set& _column_ids; const std::set& _filter_column_ids; + std::string _created_by; // Parquet file's created_by field bool _is_row_group_filtered = false; RowGroupIndex _current_row_group_idx {0, 0, 0}; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index 3b6d7fdcb9bbb2..38478ab2863f52 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -26,6 +26,8 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "io/fs/buffered_reader.h" +#include "olap/page_cache.h" +#include "parquet_common.h" #include "util/runtime_profile.h" #include "util/slice.h" #include "util/thrift_util.h" @@ -40,18 +42,57 @@ namespace doris::vectorized { #include "common/compile_check_begin.h" static constexpr size_t INIT_PAGE_HEADER_SIZE = 128; +// // Check if the file was created by a version that always marks pages as compressed +// // regardless of the actual compression state (parquet-cpp < 2.0.0) +// static bool _is_always_compressed(const ParquetPageReadContext& ctx) { +// if (ctx.created_by.empty()) { +// return false; +// } + +// // Parse the version string +// std::unique_ptr parsed_version; +// Status status = VersionParser::parse(ctx.created_by, &parsed_version); +// if (!status.ok()) { +// return false; +// } + +// // Check if it's parquet-cpp +// if (parsed_version->application() != "parquet-cpp") { +// return false; +// } + +// // Check if version < 2.0.0 +// if (!parsed_version->version().has_value()) { +// return false; +// } + +// std::unique_ptr semantic_version; +// status = SemanticVersion::parse(parsed_version->version().value(), &semantic_version); +// if (!status.ok()) { +// return false; +// } + +// // parquet-cpp versions < 2.0.0 always report compressed +// static const SemanticVersion PARQUET_CPP_FIXED_VERSION(2, 0, 0); +// return semantic_version->compare_to(PARQUET_CPP_FIXED_VERSION) < 0; +// } + template PageReader::PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t offset, uint64_t length, size_t total_rows, - const tparquet::OffsetIndex* offset_index) + const tparquet::ColumnMetaData& metadata, + const tparquet::OffsetIndex* offset_index, + const ParquetPageReadContext& ctx) : _reader(reader), _io_ctx(io_ctx), _offset(offset), _start_offset(offset), _end_offset(offset + length), _total_rows(total_rows), - _offset_index(offset_index) { + _metadata(metadata), + _offset_index(offset_index), + _ctx(ctx) { _next_header_offset = _offset; _state = INITIALIZED; @@ -77,11 +118,104 @@ Status PageReader::parse_page_header() { return Status::IOError("Should skip or load current page to get next page"); } + _page_statistics.page_read_counter += 1; + + // Parse page header from file; header bytes are saved for possible cache insertion const uint8_t* page_header_buf = nullptr; size_t max_size = _end_offset - _offset; size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size); const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20; uint32_t real_header_size = 0; + + // Try a header-only lookup in the page cache. Cached pages store + // header + optional v2 levels + uncompressed payload, so we can + // parse the page header directly from the cached bytes and avoid + // a file read for the header. + if (_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr) { + PageCacheHandle handle; + StoragePageCache::CacheKey key(fmt::format("{}::{}", _reader->path(), _reader->mtime()), + _end_offset, _offset); + if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) { + // Parse header directly from cached data + _page_cache_handle = std::move(handle); + Slice s = _page_cache_handle.data(); + real_header_size = cast_set(s.size); + SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); + auto st = deserialize_thrift_msg(reinterpret_cast(s.data), + &real_header_size, true, &_cur_page_header); + if (!st.ok()) return st; + // Increment page cache counters for a true cache hit on header+payload + _page_statistics.page_cache_hit_counter += 1; + // Detect whether the cached payload is compressed or decompressed and record + // the appropriate counter. Cached layout is: header | optional levels | payload + + // Determine if payload is compressed using V2 is_compressed field if available + // bool payload_is_compressed = true; + // if (_cur_page_header.type == tparquet::PageType::DATA_PAGE_V2) { + // const auto& page_header_v2 = _cur_page_header.data_page_header_v2; + // if (page_header_v2.__isset.is_compressed) { + // payload_is_compressed = page_header_v2.is_compressed; + // } + // } + + // // ARROW-17100: [C++][Parquet] Fix backwards compatibility for ParquetV2 data pages written prior to 3.0.0 per ARROW-10353 #13665 + // // https://github.com/apache/arrow/pull/13665/files + // // Prior to parquet-cpp version 2.0.0, is_compressed was always set to false in column headers, + // // even if compression was used. See ARROW-17100. + // bool always_compressed = _is_always_compressed(_ctx); + // payload_is_compressed |= always_compressed; + + // // Apply codec check: if codec is UNCOMPRESSED, payload cannot be compressed + // payload_is_compressed = payload_is_compressed && + // (_metadata.codec != tparquet::CompressionCodec::UNCOMPRESSED); + + // // Save the computed result for use by ColumnChunkReader + // _payload_is_compressed = payload_is_compressed; + + bool is_cache_payload_decompressed = true; + if (_cur_page_header.compressed_page_size > 0) { + is_cache_payload_decompressed = + (_metadata.codec == tparquet::CompressionCodec::UNCOMPRESSED) || + (static_cast(_cur_page_header.uncompressed_page_size) <= + static_cast(config::parquet_page_cache_decompress_threshold) * + static_cast(_cur_page_header.compressed_page_size)); + } + + if (is_cache_payload_decompressed) { + _page_statistics.page_cache_decompressed_hit_counter += 1; + } else { + _page_statistics.page_cache_compressed_hit_counter += 1; + } + + _is_cache_payload_decompressed = is_cache_payload_decompressed; + + if constexpr (OFFSET_INDEX == false) { + if (is_header_v2()) { + _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; + } else if constexpr (!IN_COLLECTION) { + _end_row = _start_row + _cur_page_header.data_page_header.num_values; + } + } + + // Save header bytes for later use (e.g., to insert updated cache entries) + _header_buf.assign(s.data, s.data + real_header_size); + _last_header_size = real_header_size; + _page_statistics.parse_page_header_num++; + _offset += real_header_size; + _next_header_offset = _offset + _cur_page_header.compressed_page_size; + _state = HEADER_PARSED; + return Status::OK(); + } else { + _page_statistics.page_cache_missing_counter += 1; + // Clear any existing cache handle on miss to avoid holding stale handle + _page_cache_handle = PageCacheHandle(); + } + } + // NOTE: page cache lookup for *decompressed* page data is handled in + // ColumnChunkReader::load_page_data(). PageReader should only be + // responsible for parsing the header bytes from the file and saving + // them in `_header_buf` for possible later insertion into the cache. while (true) { if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { return Status::EndOfFile("stop"); @@ -115,6 +249,9 @@ Status PageReader::parse_page_header() { } } + // Save header bytes for possible cache insertion later + _header_buf.assign(page_header_buf, page_header_buf + real_header_size); + _last_header_size = real_header_size; _page_statistics.parse_page_header_num++; _offset += real_header_size; _next_header_offset = _offset + _cur_page_header.compressed_page_size; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h index 9246819d59c399..a722507c7423a3 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h @@ -23,7 +23,9 @@ #include #include "common/cast_set.h" +#include "common/config.h" #include "common/status.h" +#include "olap/page_cache.h" #include "util/block_compression.h" #include "vec/exec/format/parquet/parquet_common.h" namespace doris { @@ -50,6 +52,15 @@ namespace doris::vectorized { * Use to deserialize parquet page header, and get the page data in iterator interface. */ +// Session-level options for parquet page reading/caching. +struct ParquetPageReadContext { + bool enable_parquet_file_page_cache = true; + std::string created_by; // Parquet file's created_by field for version checking + ParquetPageReadContext() = default; + ParquetPageReadContext(bool enable, const std::string& created_by_str = "") + : enable_parquet_file_page_cache(enable), created_by(created_by_str) {} +}; + template class PageReader { public: @@ -58,11 +69,29 @@ class PageReader { int64_t skip_page_header_num = 0; int64_t parse_page_header_num = 0; int64_t read_page_header_time = 0; + // page cache metrics + // page_cache_hit_counter: number of times a cached page was used + int64_t page_cache_hit_counter = 0; + // page_cache_missing_counter: number of times a cached page was not found + int64_t page_cache_missing_counter = 0; + // page_cache_compressed_hit_counter: number of cache hits where the cached payload is compressed + int64_t page_cache_compressed_hit_counter = 0; + // page_cache_decompressed_hit_counter: number of cache hits where the cached payload is decompressed + int64_t page_cache_decompressed_hit_counter = 0; + // page_cache_write_counter: number of times this reader wrote an entry into the cache + int64_t page_cache_write_counter = 0; + // page_cache_compressed_write_counter: number of times this reader wrote compressed page into the cache + int64_t page_cache_compressed_write_counter = 0; + // page_cache_decompressed_write_counter: number of times this reader wrote decompressed page into the cache + int64_t page_cache_decompressed_write_counter = 0; + // page_read_counter: total pages read by this PageReader (includes cache hits and file reads) + int64_t page_read_counter = 0; }; PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t offset, - uint64_t length, size_t total_rows, - const tparquet::OffsetIndex* offset_index = nullptr); + uint64_t length, size_t total_rows, const tparquet::ColumnMetaData& metadata, + const tparquet::OffsetIndex* offset_index = nullptr, + const ParquetPageReadContext& ctx = ParquetPageReadContext()); ~PageReader() = default; bool has_next_page() const { @@ -123,24 +152,61 @@ class PageReader { } } - Status get_page_header(const tparquet::PageHeader*& page_header) { + Status get_page_header(const tparquet::PageHeader** page_header) { if (UNLIKELY(_state != HEADER_PARSED)) { return Status::InternalError("Page header not parsed"); } - page_header = &_cur_page_header; + *page_header = &_cur_page_header; return Status::OK(); } Status get_page_data(Slice& slice); + // Skip page data and update offset (used when data is loaded from cache) + void skip_page_data() { + if (_state == HEADER_PARSED) { + _offset += _cur_page_header.compressed_page_size; + _state = DATA_LOADED; + } + } + + // Expose header bytes info for cache insertion + uint32_t last_header_size() const { return _last_header_size; } + const std::vector& header_bytes() const { return _header_buf; } + // header start offset for current page + int64_t header_start_offset() const { + return static_cast(_next_header_offset) - static_cast(_last_header_size) - + static_cast(_cur_page_header.compressed_page_size); + } + uint64_t file_end_offset() const { return _end_offset; } + bool cached_decompressed() const { + return static_cast(_cur_page_header.uncompressed_page_size) <= + static_cast(config::parquet_page_cache_decompress_threshold) * + static_cast(_cur_page_header.compressed_page_size); + } + PageStatistics& page_statistics() { return _page_statistics; } bool is_header_v2() { return _cur_page_header.__isset.data_page_header_v2; } + // Returns whether the current page's cache payload is decompressed + bool is_cache_payload_decompressed() const { return _is_cache_payload_decompressed; } + size_t start_row() const { return _start_row; } size_t end_row() const { return _end_row; } + // Accessors for cache handle + bool has_page_cache_handle() const { return _page_cache_handle.cache() != nullptr; } + const doris::PageCacheHandle& page_cache_handle() const { return _page_cache_handle; } + + // Page cache members + doris::PageCacheHandle _page_cache_handle; + // stored header bytes when cache miss so we can insert header+payload into cache + std::vector _header_buf; + // last parsed header size in bytes + uint32_t _last_header_size = 0; + private: enum PageReaderState { INITIALIZED, HEADER_PARSED, DATA_LOADED }; PageReaderState _state = INITIALIZED; @@ -159,19 +225,27 @@ class PageReader { size_t _end_row = 0; // total rows in this column chunk size_t _total_rows = 0; + // Column metadata for this column chunk + const tparquet::ColumnMetaData& _metadata; // for page index size_t _page_index = 0; const tparquet::OffsetIndex* _offset_index; + // Session-level parquet page cache options + ParquetPageReadContext _ctx; + tparquet::PageHeader _cur_page_header; + bool _is_cache_payload_decompressed = true; }; template std::unique_ptr> create_page_reader( io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t offset, uint64_t length, - size_t total_rows, const tparquet::OffsetIndex* offset_index = nullptr) { - return std::make_unique>(reader, io_ctx, offset, length, - total_rows, offset_index); + size_t total_rows, const tparquet::ColumnMetaData& metadata, + const tparquet::OffsetIndex* offset_index = nullptr, + const ParquetPageReadContext& ctx = ParquetPageReadContext()) { + return std::make_unique>( + reader, io_ctx, offset, length, total_rows, metadata, offset_index, ctx); } #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 21a49673f0837f..da5797324a845c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -194,6 +194,22 @@ void ParquetReader::_init_profile() { ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecompressTime", parquet_profile, 1); _parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "DecompressCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_read_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageReadCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_write_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheWriteCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_compressed_write_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheCompressedWriteCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_decompressed_write_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheDecompressedWriteCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_hit_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheHitCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_missing_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheMissingCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_compressed_hit_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheCompressedHitCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_decompressed_hit_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheDecompressedHitCount", TUnit::UNIT, parquet_profile, 1); _parquet_profile.decode_header_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageHeaderDecodeTime", parquet_profile, 1); _parquet_profile.read_page_header_time = @@ -796,12 +812,13 @@ Status ParquetReader::_next_row_group_reader() { _profile, _file_reader, io_ranges, merged_read_slice_size) : _file_reader; } - _current_group_reader.reset(new RowGroupReader( - _io_ctx ? std::make_shared(group_file_reader, - _io_ctx->file_reader_stats) - : group_file_reader, - _read_table_columns, _current_row_group_index.row_group_id, row_group, _ctz, _io_ctx, - position_delete_ctx, _lazy_read_ctx, _state, _column_ids, _filter_column_ids)); + _current_group_reader.reset( + new RowGroupReader(_io_ctx ? std::make_shared( + group_file_reader, _io_ctx->file_reader_stats) + : group_file_reader, + _read_table_columns, _current_row_group_index.row_group_id, + row_group, _ctz, _io_ctx, position_delete_ctx, _lazy_read_ctx, + _state, _column_ids, _filter_column_ids, _t_metadata->created_by)); _row_group_eof = false; _current_group_reader->set_current_row_group_idx(_current_row_group_index); @@ -1320,6 +1337,21 @@ void ParquetReader::_collect_profile() { _column_statistics.page_index_read_calls); COUNTER_UPDATE(_parquet_profile.decompress_time, _column_statistics.decompress_time); COUNTER_UPDATE(_parquet_profile.decompress_cnt, _column_statistics.decompress_cnt); + COUNTER_UPDATE(_parquet_profile.page_read_counter, _column_statistics.page_read_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_write_counter, + _column_statistics.page_cache_write_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_compressed_write_counter, + _column_statistics.page_cache_compressed_write_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_decompressed_write_counter, + _column_statistics.page_cache_decompressed_write_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_hit_counter, + _column_statistics.page_cache_hit_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_missing_counter, + _column_statistics.page_cache_missing_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_compressed_hit_counter, + _column_statistics.page_cache_compressed_hit_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_decompressed_hit_counter, + _column_statistics.page_cache_decompressed_hit_counter); COUNTER_UPDATE(_parquet_profile.decode_header_time, _column_statistics.decode_header_time); COUNTER_UPDATE(_parquet_profile.read_page_header_time, _column_statistics.read_page_header_time); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index d6f16d595b8931..7c400f590267d3 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -200,6 +200,14 @@ class ParquetReader : public GenericReader { RuntimeProfile::Counter* file_footer_hit_cache = nullptr; RuntimeProfile::Counter* decompress_time = nullptr; RuntimeProfile::Counter* decompress_cnt = nullptr; + RuntimeProfile::Counter* page_read_counter = nullptr; + RuntimeProfile::Counter* page_cache_write_counter = nullptr; + RuntimeProfile::Counter* page_cache_compressed_write_counter = nullptr; + RuntimeProfile::Counter* page_cache_decompressed_write_counter = nullptr; + RuntimeProfile::Counter* page_cache_hit_counter = nullptr; + RuntimeProfile::Counter* page_cache_missing_counter = nullptr; + RuntimeProfile::Counter* page_cache_compressed_hit_counter = nullptr; + RuntimeProfile::Counter* page_cache_decompressed_hit_counter = nullptr; RuntimeProfile::Counter* decode_header_time = nullptr; RuntimeProfile::Counter* read_page_header_time = nullptr; RuntimeProfile::Counter* decode_value_time = nullptr; diff --git a/be/test/io/fs/buffered_reader_test.cpp b/be/test/io/fs/buffered_reader_test.cpp index 3874b06c68c592..bc92d22b178d05 100644 --- a/be/test/io/fs/buffered_reader_test.cpp +++ b/be/test/io/fs/buffered_reader_test.cpp @@ -68,6 +68,8 @@ class SyncLocalFileReader : public io::FileReader { bool closed() const override { return _reader->closed(); } + int64_t mtime() const override { return _reader->mtime(); } + private: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const io::IOContext* io_ctx) override { @@ -96,6 +98,8 @@ class MockOffsetFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return 0; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const io::IOContext* io_ctx) override { @@ -130,6 +134,8 @@ class TestingRangeCacheFileReader : public io::FileReader { bool closed() const override { return _delegate->closed(); } + int64_t mtime() const override { return _delegate->mtime(); } + const io::PrefetchRange& last_read_range() const { return *_last_read_range; } protected: diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp b/be/test/io/fs/packed_file_concurrency_test.cpp index 06db472b95e24d..371b1dfbb1f253 100644 --- a/be/test/io/fs/packed_file_concurrency_test.cpp +++ b/be/test/io/fs/packed_file_concurrency_test.cpp @@ -411,6 +411,8 @@ class MockRemoteReader : public FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return 0; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* /*io_ctx*/) override { diff --git a/be/test/io/fs/packed_file_reader_test.cpp b/be/test/io/fs/packed_file_reader_test.cpp index feaf5c37f3d104..2b0472c38a5795 100644 --- a/be/test/io/fs/packed_file_reader_test.cpp +++ b/be/test/io/fs/packed_file_reader_test.cpp @@ -69,6 +69,8 @@ class MockFileReader : public FileReader { return Status::OK(); } + int64_t mtime() const override { return 0; } + private: Path _path = Path("mock_file"); std::string _content; diff --git a/be/test/io/fs/packed_file_system_test.cpp b/be/test/io/fs/packed_file_system_test.cpp index 7f8a4af63df0b7..096bb8c4a93516 100644 --- a/be/test/io/fs/packed_file_system_test.cpp +++ b/be/test/io/fs/packed_file_system_test.cpp @@ -53,6 +53,8 @@ class MockFileReader : public FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return 0; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* /*io_ctx*/) override { diff --git a/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp index 3aef8db8459f39..5ea6335d2b9d18 100644 --- a/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp +++ b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp @@ -38,6 +38,8 @@ class MockFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return 0; } + Status close() override { _closed = true; return Status::OK(); diff --git a/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp b/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp new file mode 100644 index 00000000000000..21f2fa4e82b0c8 --- /dev/null +++ b/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp @@ -0,0 +1,859 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "common/config.h" +#include "io/fs/buffered_reader.h" +#include "olap/page_cache.h" +#include "runtime/exec_env.h" +#include "runtime/memory/cache_manager.h" +#include "util/block_compression.h" +#include "util/faststring.h" +#include "util/thrift_util.h" +#include "vec/exec/format/parquet/schema_desc.h" +#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h" +#include "vec/exec/format/parquet/vparquet_page_reader.h" + +using namespace doris; +using namespace doris::vectorized; + +class FakeBufferedReader : public io::BufferedStreamReader { +public: + FakeBufferedReader(std::string path, std::vector data) + : _path(std::move(path)), _data(std::move(data)) {} + Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read, + const doris::io::IOContext* io_ctx) override { + if (offset + bytes_to_read > _data.size()) return Status::IOError("Out of bounds"); + *buf = _data.data() + offset; + return Status::OK(); + } + Status read_bytes(Slice& slice, uint64_t offset, const doris::io::IOContext* io_ctx) override { + if (offset + slice.size > _data.size()) return Status::IOError("Out of bounds"); + slice.data = reinterpret_cast(_data.data() + offset); + return Status::OK(); + } + std::string path() override { return _path; } + + int64_t mtime() const override { return 0; } + +private: + std::string _path; + std::vector _data; +}; + +TEST(ParquetPageCacheTest, CacheHitReturnsDecompressedPayload) { + // setup storage page cache + // ExecEnv::GetInstance()->set_storage_page_cache(StoragePageCache::create_global_cache(1 << 20, 10, 0)); + + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + + // construct thrift PageHeader (uncompressed payload) and payload + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__set_compressed_page_size(4); + header.__set_uncompressed_page_size(4); + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector payload = {0x11, 0x22, 0x33, 0x44}; + std::vector cached_data; + cached_data.insert(cached_data.end(), header_bytes.begin(), header_bytes.end()); + cached_data.insert(cached_data.end(), payload.begin(), payload.end()); + + std::string path = "test_parquet_cache_file"; + int64_t header_offset = 128; + // make file_end_offset consistent with reader/page reader end offset used in test + int64_t file_end_offset = header_offset + static_cast(cached_data.size()); + + // insert into cache + int64_t mtime = 0; + StoragePageCache::CacheKey key(fmt::format("{}::{}", path, mtime), + static_cast(file_end_offset), header_offset); + size_t total = cached_data.size(); + auto* page = new DataPage(total, true, segment_v2::DATA_PAGE); + memcpy(page->data(), cached_data.data(), total); + page->reset_size(total); + PageCacheHandle handle; + StoragePageCache::instance()->insert(key, page, &handle, segment_v2::DATA_PAGE); + + // create fake reader and a ColumnChunkReader to verify cache hit + // ensure the reader contains the same header+payload at the header offset so header parsing succeeds + std::vector backing(256, 0); + memcpy(backing.data() + header_offset, cached_data.data(), total); + FakeBufferedReader reader(path, backing); + // prepare column chunk metadata so ColumnChunkReader uses same offsets + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(header_offset); + cc.meta_data.__set_total_compressed_size(total); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + + FieldSchema field_schema; + field_schema.repetition_level = 0; + field_schema.definition_level = 0; + + ColumnChunkReader ccr(&reader, &cc, &field_schema, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + // ASSERT_TRUE(ccr.next_page().ok()); + // load_page_data should hit the cache and return decompressed payload + ASSERT_TRUE(ccr.load_page_data().ok()); + Slice s = ccr.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + ASSERT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + // stats: ensure there was a page read and at least one hit recorded + auto& statistics = ccr.statistics(); + EXPECT_EQ(statistics.page_read_counter, 1); + EXPECT_EQ(statistics.page_cache_hit_counter, 1); + EXPECT_EQ(statistics.page_cache_decompressed_hit_counter, 1); + // now safe to cleanup cache + // { + // StoragePageCache* _cache_ptr = StoragePageCache::instance(); + // // Ensure any outstanding PageCacheHandle is released before destroying cache + // handle = PageCacheHandle(); + // ExecEnv::GetInstance()->set_storage_page_cache(nullptr); + // delete _cache_ptr; + // // Clear CacheManager registration so tests do not leave global state + // // behind when unregister_cache is a no-op under BE_TEST. + // CacheManager::instance()->clear_for_test(); + // } +} + +TEST(ParquetPageCacheTest, DecompressedPageInsertedByColumnChunkReader) { + // ExecEnv::GetInstance()->set_storage_page_cache(StoragePageCache::create_global_cache(1 << 20, 10, 0)); + + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + // ensure decompressed pages are cached via BE config + double old_thresh = config::parquet_page_cache_decompress_threshold; + bool old_enable_compressed = config::enable_parquet_cache_compressed_pages; + config::parquet_page_cache_decompress_threshold = 100.0; + config::enable_parquet_cache_compressed_pages = false; + + // construct uncompressed header + payload in file buffer + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__set_compressed_page_size(4); + header.__set_uncompressed_page_size(4); + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector payload = {0x55, 0x66, 0x77, 0x88}; + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), payload.begin(), payload.end()); + + std::string path = "test_parquet_insert_file"; + int64_t header_offset = 0; + + FakeBufferedReader reader(path, file_data); + + // prepare column chunk metadata + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(header_offset); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + + { + FieldSchema field_schema; + field_schema.repetition_level = 0; + field_schema.definition_level = 0; + ColumnChunkReader ccr(&reader, &cc, &field_schema, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + // ASSERT_TRUE(ccr.next_page().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + + // Now cache should have an entry; verify by creating a fresh ColumnChunkReader and hitting cache + ColumnChunkReader ccr_check(&reader, &cc, &field_schema, nullptr, 0, nullptr, + ctx); + ASSERT_TRUE(ccr_check.init().ok()); + // ASSERT_TRUE(ccr_check.next_page().ok()); + ASSERT_TRUE(ccr_check.load_page_data().ok()); + Slice s = ccr_check.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1); + } + // cleanup cache after readers go out of scope + // { + // StoragePageCache* _cache_ptr = StoragePageCache::instance(); + // ExecEnv::GetInstance()->set_storage_page_cache(nullptr); + // delete _cache_ptr; + // CacheManager::instance()->clear_for_test(); + // } + // restore config + config::parquet_page_cache_decompress_threshold = old_thresh; + config::enable_parquet_cache_compressed_pages = old_enable_compressed; +} + +TEST(ParquetPageCacheTest, V2LevelsPreservedInCache) { + // ExecEnv::GetInstance()->set_storage_page_cache(StoragePageCache::create_global_cache(1 << 20, 10, 0)); + + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + // ensure decompressed pages are cached via BE config + double old_thresh = config::parquet_page_cache_decompress_threshold; + bool old_enable_compressed = config::enable_parquet_cache_compressed_pages; + config::parquet_page_cache_decompress_threshold = 100.0; + config::enable_parquet_cache_compressed_pages = false; + + // construct v2 header + levels + payload in file buffer (uncompressed) + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE_V2; + int rl = 2; + int dl = 1; + int payload_sz = 2; + header.__set_compressed_page_size(rl + dl + payload_sz); + header.__set_uncompressed_page_size(rl + dl + payload_sz); + header.__isset.data_page_header_v2 = true; + header.data_page_header_v2.__set_repetition_levels_byte_length(rl); + header.data_page_header_v2.__set_definition_levels_byte_length(dl); + header.data_page_header_v2.__set_is_compressed(false); + header.data_page_header_v2.__set_num_values(1); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector level_bytes = {0x11, 0x22, 0x33}; + std::vector payload = {0xAA, 0xBB}; + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), level_bytes.begin(), level_bytes.end()); + file_data.insert(file_data.end(), payload.begin(), payload.end()); + + std::string path = "test_v2_levels_file"; + FakeBufferedReader reader(path, file_data); + + // prepare column chunk metadata + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(0); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + + FieldSchema field_schema; + field_schema.repetition_level = 0; + field_schema.definition_level = 0; + { + ColumnChunkReader ccr(&reader, &cc, &field_schema, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + // ASSERT_TRUE(ccr.next_page().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + + // Now cache should have entry; verify by creating a ColumnChunkReader and hitting cache + ColumnChunkReader ccr_check(&reader, &cc, &field_schema, nullptr, 0, nullptr, + ctx); + ASSERT_TRUE(ccr_check.init().ok()); + // ASSERT_TRUE(ccr_check.next_page().ok()); + ASSERT_TRUE(ccr_check.load_page_data().ok()); + Slice s = ccr_check.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + } + + // Verify that a fresh ColumnChunkReader reusing cache gets level bytes preserved + FieldSchema field_schema2; + field_schema2.repetition_level = 2; // v2 levels present + field_schema2.definition_level = 1; + ColumnChunkReader ccr2(&reader, &cc, &field_schema2, nullptr, 0, nullptr); + ASSERT_TRUE(ccr2.init().ok()); + // ASSERT_TRUE(ccr2.next_page().ok()); + ASSERT_TRUE(ccr2.load_page_data().ok()); + // Level slices should equal the original level bytes + const Slice& rep = ccr2.v2_rep_levels(); + const Slice& def = ccr2.v2_def_levels(); + auto& statistics = ccr2.statistics(); + EXPECT_GT(statistics.page_cache_hit_counter, 0); + // because threshold is set to cache decompressed, we should see decompressed hits + EXPECT_GT(statistics.page_cache_decompressed_hit_counter, 0); + ASSERT_EQ(def.size, dl); + EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl)); + EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl)); + // // cleanup cache after readers have been destroyed + // { + // StoragePageCache* _cache_ptr = StoragePageCache::instance(); + // ExecEnv::GetInstance()->set_storage_page_cache(nullptr); + // delete _cache_ptr; + // CacheManager::instance()->clear_for_test(); + // } + // restore config + config::parquet_page_cache_decompress_threshold = old_thresh; + config::enable_parquet_cache_compressed_pages = old_enable_compressed; +} + +TEST(ParquetPageCacheTest, CompressedV1PageCachedAndHit) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + // Note: parquet_page_cache_decompress_threshold and enable_parquet_cache_compressed_pages + // are now BE config variables, not context fields + + // construct compressed v1 header + compressed payload in file buffer + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + + std::vector payload = {0x01, 0x02, 0x03, 0x04}; + + // compress payload using a block codec + BlockCompressionCodec* codec = nullptr; + ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, &codec).ok()); + faststring compressed_fast; + std::vector inputs; + inputs.emplace_back(payload.data(), payload.size()); + ASSERT_TRUE(codec->compress(inputs, payload.size(), &compressed_fast).ok()); + + header.__set_compressed_page_size(static_cast(compressed_fast.size())); + header.__set_uncompressed_page_size(static_cast(payload.size())); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), compressed_fast.data(), + compressed_fast.data() + compressed_fast.size()); + + std::string path = "test_compressed_v1_file"; + FakeBufferedReader reader(path, file_data); + + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(0); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY); + + FieldSchema field_schema; + field_schema.repetition_level = 0; + field_schema.definition_level = 0; + + // Load page to trigger decompression + cache insert + ColumnChunkReader ccr(&reader, &cc, &field_schema, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + // ASSERT_TRUE(ccr.next_page().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + EXPECT_EQ(ccr.statistics().page_cache_write_counter, 1); + + // Now verify a fresh reader hits the cache and returns payload + ColumnChunkReader ccr_check(&reader, &cc, &field_schema, nullptr, 0, nullptr, + ctx); + ASSERT_TRUE(ccr_check.init().ok()); + // ASSERT_TRUE(ccr_check.next_page().ok()); + ASSERT_TRUE(ccr_check.load_page_data().ok()); + Slice s = ccr_check.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1); +} + +TEST(ParquetPageCacheTest, CompressedV2LevelsPreservedInCache) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + // Note: parquet_page_cache_decompress_threshold and enable_parquet_cache_compressed_pages + // are now BE config variables, not context fields + + // construct v2 header + levels + compressed payload in file buffer + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE_V2; + int rl = 2; + int dl = 1; + //int payload_sz = 2; + header.__isset.data_page_header_v2 = true; + header.data_page_header_v2.__set_repetition_levels_byte_length(rl); + header.data_page_header_v2.__set_definition_levels_byte_length(dl); + header.data_page_header_v2.__set_is_compressed(true); + header.data_page_header_v2.__set_num_values(1); + + std::vector level_bytes = {0x11, 0x22, 0x33}; + std::vector payload = {0xAA, 0xBB}; + + // compress payload + BlockCompressionCodec* codec = nullptr; + ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, &codec).ok()); + faststring compressed_fast; + std::vector inputs; + inputs.emplace_back(payload.data(), payload.size()); + ASSERT_TRUE(codec->compress(inputs, payload.size(), &compressed_fast).ok()); + + // compressed page: levels (uncompressed) followed by compressed payload + std::vector compressed_page; + compressed_page.insert(compressed_page.end(), level_bytes.begin(), level_bytes.end()); + compressed_page.insert(compressed_page.end(), compressed_fast.data(), + compressed_fast.data() + compressed_fast.size()); + + header.__set_compressed_page_size(static_cast(compressed_page.size())); + header.__set_uncompressed_page_size(static_cast(level_bytes.size() + payload.size())); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), compressed_page.begin(), compressed_page.end()); + + std::string path = "test_compressed_v2_file"; + FakeBufferedReader reader(path, file_data); + + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(0); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY); + + FieldSchema field_schema; + field_schema.repetition_level = 0; + field_schema.definition_level = 0; + + // Load page to trigger decompression + cache insert + ColumnChunkReader ccr(&reader, &cc, &field_schema, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + // ASSERT_TRUE(ccr.next_page().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + EXPECT_EQ(ccr.statistics().page_cache_write_counter, 1); + + // Now verify a fresh reader hits the cache and v2 levels are preserved + FieldSchema field_schema2; + field_schema2.repetition_level = rl; + field_schema2.definition_level = dl; + ColumnChunkReader ccr_check(&reader, &cc, &field_schema2, nullptr, 0, nullptr, + ctx); + ASSERT_TRUE(ccr_check.init().ok()); + // ASSERT_TRUE(ccr_check.next_page().ok()); + ASSERT_TRUE(ccr_check.load_page_data().ok()); + Slice s = ccr_check.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + const Slice& rep = ccr_check.v2_rep_levels(); + const Slice& def = ccr_check.v2_def_levels(); + ASSERT_EQ(rep.size, rl); + ASSERT_EQ(def.size, dl); + // cached v2 page is stored decompressed (threshold=100), make sure counter reflects it + EXPECT_GT(ccr_check.statistics().page_cache_decompressed_hit_counter, 0); + EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl)); + EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl)); +} + +TEST(ParquetPageCacheTest, MultiPagesMixedV1V2CacheHit) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + // Note: parquet_page_cache_decompress_threshold and enable_parquet_cache_compressed_pages + // are now BE config variables, not context fields + + // Prepare a v1 uncompressed page and a v2 uncompressed page and insert both into cache + std::string path = "test_multi_pages_file"; + + // v1 page + tparquet::PageHeader hdr1; + hdr1.type = tparquet::PageType::DATA_PAGE; + hdr1.__set_compressed_page_size(4); + hdr1.__set_uncompressed_page_size(4); + hdr1.__isset.data_page_header = true; + hdr1.data_page_header.__set_num_values(1); + std::vector header1_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&hdr1, &header1_bytes).ok()); + std::vector payload1 = {0x10, 0x20, 0x30, 0x40}; + std::vector cached1; + cached1.insert(cached1.end(), header1_bytes.begin(), header1_bytes.end()); + cached1.insert(cached1.end(), payload1.begin(), payload1.end()); + + // v2 page + tparquet::PageHeader hdr2; + hdr2.type = tparquet::PageType::DATA_PAGE_V2; + int rl = 2; + int dl = 1; + int payload2_sz = 2; + hdr2.__set_compressed_page_size(rl + dl + payload2_sz); + hdr2.__set_uncompressed_page_size(rl + dl + payload2_sz); + hdr2.__isset.data_page_header_v2 = true; + hdr2.data_page_header_v2.__set_repetition_levels_byte_length(rl); + hdr2.data_page_header_v2.__set_definition_levels_byte_length(dl); + hdr2.data_page_header_v2.__set_is_compressed(false); + hdr2.data_page_header_v2.__set_num_values(1); + std::vector header2_bytes; + ASSERT_TRUE(ts.serialize(&hdr2, &header2_bytes).ok()); + std::vector level_bytes = {0x11, 0x22, 0x33}; + std::vector payload2 = {0xAA, 0xBB}; + std::vector cached2; + cached2.insert(cached2.end(), header2_bytes.begin(), header2_bytes.end()); + cached2.insert(cached2.end(), level_bytes.begin(), level_bytes.end()); + cached2.insert(cached2.end(), payload2.begin(), payload2.end()); + + // Insert both pages into cache under different header offsets + size_t total1 = cached1.size(); + auto* page1 = new DataPage(total1, true, segment_v2::DATA_PAGE); + memcpy(page1->data(), cached1.data(), total1); + page1->reset_size(total1); + PageCacheHandle h1; + size_t header1_start = 128; + int64_t mtime = 0; + StoragePageCache::CacheKey key1(fmt::format("{}::{}", path, mtime), + static_cast(header1_start + total1), header1_start); + StoragePageCache::instance()->insert(key1, page1, &h1, segment_v2::DATA_PAGE); + + size_t total2 = cached2.size(); + auto* page2 = new DataPage(total2, true, segment_v2::DATA_PAGE); + memcpy(page2->data(), cached2.data(), total2); + page2->reset_size(total2); + PageCacheHandle h2; + size_t header2_start = 256; + StoragePageCache::CacheKey key2(fmt::format("{}::{}", path, mtime), + static_cast(header2_start + total2), header2_start); + StoragePageCache::instance()->insert(key2, page2, &h2, segment_v2::DATA_PAGE); + + // Now create readers that would lookup those cache keys + // Reader1 must expose header+page bytes at offset header1_start + std::vector reader_backing1(3000, 0); + memcpy(reader_backing1.data() + header1_start, cached1.data(), total1); + FakeBufferedReader reader1(path, reader_backing1); + tparquet::ColumnChunk cc1; + cc1.meta_data.__set_data_page_offset(128); + cc1.meta_data.__set_total_compressed_size(total1); + cc1.meta_data.__set_num_values(1); + cc1.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + FieldSchema field_schema1; + field_schema1.repetition_level = 0; + field_schema1.definition_level = 0; + ColumnChunkReader ccr1(&reader1, &cc1, &field_schema1, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr1.init().ok()); + // ASSERT_TRUE(ccr1.next_page().ok()); + ASSERT_TRUE(ccr1.load_page_data().ok()); + Slice s1 = ccr1.get_page_data(); + ASSERT_EQ(s1.size, payload1.size()); + EXPECT_EQ(0, memcmp(s1.data, payload1.data(), payload1.size())); + + std::vector reader_backing2(3000, 0); + memcpy(reader_backing2.data() + header2_start, cached2.data(), total2); + FakeBufferedReader reader2(path, reader_backing2); + tparquet::ColumnChunk cc2; + cc2.meta_data.__set_data_page_offset(256); + cc2.meta_data.__set_total_compressed_size(total2); + cc2.meta_data.__set_num_values(1); + cc2.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + FieldSchema field_schema2; + field_schema2.repetition_level = rl; + field_schema2.definition_level = dl; + ColumnChunkReader ccr2(&reader2, &cc2, &field_schema2, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr2.init().ok()); + // ASSERT_TRUE(ccr2.next_page().ok()); + ASSERT_TRUE(ccr2.load_page_data().ok()); + Slice s2 = ccr2.get_page_data(); + ASSERT_EQ(s2.size, payload2.size()); + EXPECT_EQ(0, memcmp(s2.data, payload2.data(), payload2.size())); + const Slice& rep = ccr2.v2_rep_levels(); + const Slice& def = ccr2.v2_def_levels(); + ASSERT_EQ(rep.size, rl); + ASSERT_EQ(def.size, dl); + EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl)); + EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl)); +} + +TEST(ParquetPageCacheTest, CacheMissThenHit) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + // Note: parquet_page_cache_decompress_threshold and enable_parquet_cache_compressed_pages + // are now BE config variables, not context fields + + // uncompressed v1 page + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__set_compressed_page_size(4); + header.__set_uncompressed_page_size(4); + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + std::vector payload = {0xDE, 0xAD, 0xBE, 0xEF}; + std::vector backing(256, 0); + std::vector cached; + cached.insert(cached.end(), header_bytes.begin(), header_bytes.end()); + cached.insert(cached.end(), payload.begin(), payload.end()); + int64_t header_offset = 64; + memcpy(backing.data() + header_offset, cached.data(), cached.size()); + + std::string path = "test_miss_then_hit"; + FakeBufferedReader reader(path, backing); + + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(header_offset); + cc.meta_data.__set_total_compressed_size(cached.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + + FieldSchema fs; + fs.repetition_level = 0; + fs.definition_level = 0; + + // First reader: should not hit cache, but should write cache + ColumnChunkReader ccr(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + // ASSERT_TRUE(ccr.next_page().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + auto& statistics = ccr.statistics(); + EXPECT_EQ(statistics.page_cache_hit_counter, 0); + EXPECT_EQ(statistics.page_cache_write_counter, 1); + + // Second reader: should hit cache + ColumnChunkReader ccr2(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr2.init().ok()); + // ASSERT_TRUE(ccr2.next_page().ok()); + ASSERT_TRUE(ccr2.load_page_data().ok()); + auto& statistics2 = ccr2.statistics(); + EXPECT_EQ(statistics2.page_cache_hit_counter, 1); + EXPECT_EQ(statistics2.page_cache_decompressed_hit_counter, 1); +} + +TEST(ParquetPageCacheTest, DecompressThresholdCachesCompressed) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + // Note: enable_parquet_cache_compressed_pages is now a BE config variable, not a context field + + // prepare a compressible payload (lots of zeros) + std::vector payload(1024, 0); + + // compress payload using snappy + BlockCompressionCodec* codec = nullptr; + ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, &codec).ok()); + faststring compressed_fast; + std::vector inputs; + inputs.emplace_back(payload.data(), payload.size()); + ASSERT_TRUE(codec->compress(inputs, payload.size(), &compressed_fast).ok()); + + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__set_compressed_page_size(static_cast(compressed_fast.size())); + header.__set_uncompressed_page_size(static_cast(payload.size())); + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), compressed_fast.data(), + compressed_fast.data() + compressed_fast.size()); + + std::string path = "test_threshold_file_compressed"; + FakeBufferedReader reader(path, file_data); + + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(0); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY); + + FieldSchema fs; + fs.repetition_level = 0; + fs.definition_level = 0; + + // Case: very small threshold -> cache the compressed payload (smaller footprint) + double old_thresh = config::parquet_page_cache_decompress_threshold; + bool old_enable_compressed = config::enable_parquet_cache_compressed_pages; + config::parquet_page_cache_decompress_threshold = 0.1; + config::enable_parquet_cache_compressed_pages = true; + ColumnChunkReader ccr_small_thresh(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr_small_thresh.init().ok()); + // ASSERT_TRUE(ccr_small_thresh.next_page().ok()); + ASSERT_TRUE(ccr_small_thresh.load_page_data().ok()); + EXPECT_EQ(ccr_small_thresh.statistics().page_cache_write_counter, 1); + + // Inspect cache entry: payload stored should be compressed size + PageCacheHandle handle_small; + size_t file_end = header_bytes.size() + compressed_fast.size(); + int64_t mtime = 0; + StoragePageCache::CacheKey key_small(fmt::format("{}::{}", path, mtime), + /*file_end_offset*/ file_end, /*header_start*/ 0); + bool found_small = + StoragePageCache::instance()->lookup(key_small, &handle_small, segment_v2::DATA_PAGE); + ASSERT_TRUE(found_small); + Slice cached_small = handle_small.data(); + size_t header_size = header_bytes.size(); + size_t payload_in_cache_size = cached_small.size - header_size; // no levels here + ASSERT_EQ(payload_in_cache_size, compressed_fast.size()); + + // restore config + config::parquet_page_cache_decompress_threshold = old_thresh; + config::enable_parquet_cache_compressed_pages = old_enable_compressed; +} + +TEST(ParquetPageCacheTest, DecompressThresholdCachesDecompressed) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + // Note: enable_parquet_cache_compressed_pages is now a BE config variable, not a context field + + // prepare a compressible payload (lots of zeros) + std::vector payload(1024, 0); + + // compress payload using snappy + BlockCompressionCodec* codec = nullptr; + ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, &codec).ok()); + faststring compressed_fast; + std::vector inputs; + inputs.emplace_back(payload.data(), payload.size()); + ASSERT_TRUE(codec->compress(inputs, payload.size(), &compressed_fast).ok()); + + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__set_compressed_page_size(static_cast(compressed_fast.size())); + header.__set_uncompressed_page_size(static_cast(payload.size())); + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), compressed_fast.data(), + compressed_fast.data() + compressed_fast.size()); + + std::string path = "test_threshold_file_decompressed"; + FakeBufferedReader reader(path, file_data); + + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(0); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY); + + FieldSchema fs; + fs.repetition_level = 0; + fs.definition_level = 0; + + // Case: very large threshold -> cache decompressed payload + double old_thresh = config::parquet_page_cache_decompress_threshold; + bool old_enable_compressed = config::enable_parquet_cache_compressed_pages; + config::parquet_page_cache_decompress_threshold = 100.0; + config::enable_parquet_cache_compressed_pages = false; + ColumnChunkReader ccr_large_thresh(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr_large_thresh.init().ok()); + // ASSERT_TRUE(ccr_large_thresh.next_page().ok()); + ASSERT_TRUE(ccr_large_thresh.load_page_data().ok()); + EXPECT_EQ(ccr_large_thresh.statistics().page_cache_write_counter, 1); + + // Inspect cache entry for large threshold: payload stored should be uncompressed size + PageCacheHandle handle_large; + size_t file_end = header_bytes.size() + compressed_fast.size(); + int64_t mtime = 0; + StoragePageCache::CacheKey key_large(fmt::format("{}::{}", path, mtime), + /*file_end_offset*/ file_end, /*header_start*/ 0); + bool found_large = + StoragePageCache::instance()->lookup(key_large, &handle_large, segment_v2::DATA_PAGE); + ASSERT_TRUE(found_large); + Slice cached_large = handle_large.data(); + size_t payload_in_cache_size_large = cached_large.size - header_bytes.size(); + ASSERT_EQ(payload_in_cache_size_large, payload.size()); + + // Verify cache hit for a new reader (should hit the decompressed entry we just created) + ColumnChunkReader ccr_check(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr_check.init().ok()); + // ASSERT_TRUE(ccr_check.next_page().ok()); + ASSERT_TRUE(ccr_check.load_page_data().ok()); + EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1); + // restore config + config::parquet_page_cache_decompress_threshold = old_thresh; + config::enable_parquet_cache_compressed_pages = old_enable_compressed; +} + +TEST(ParquetPageCacheTest, MultipleReadersShareCachedEntry) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + double old_thresh = config::parquet_page_cache_decompress_threshold; + bool old_enable_compressed = config::enable_parquet_cache_compressed_pages; + config::parquet_page_cache_decompress_threshold = 100.0; + config::enable_parquet_cache_compressed_pages = false; + + // Create a v2 cached page and then instantiate multiple readers that hit the cache + std::string path = "test_shared_handles"; + tparquet::PageHeader hdr; + hdr.type = tparquet::PageType::DATA_PAGE_V2; + int rl = 2; + int dl = 1; + hdr.__isset.data_page_header_v2 = true; + hdr.data_page_header_v2.__set_repetition_levels_byte_length(rl); + hdr.data_page_header_v2.__set_definition_levels_byte_length(dl); + hdr.data_page_header_v2.__set_is_compressed(false); + hdr.data_page_header_v2.__set_num_values(1); + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&hdr, &header_bytes).ok()); + std::vector level_bytes = {0x11, 0x22, 0x33}; + std::vector payload = {0x0A, 0x0B}; + std::vector cached; + cached.insert(cached.end(), header_bytes.begin(), header_bytes.end()); + cached.insert(cached.end(), level_bytes.begin(), level_bytes.end()); + cached.insert(cached.end(), payload.begin(), payload.end()); + + size_t total = cached.size(); + auto* page = new DataPage(total, true, segment_v2::DATA_PAGE); + memcpy(page->data(), cached.data(), total); + page->reset_size(total); + PageCacheHandle handle; + size_t header_start = 512; + int64_t mtime = 0; + StoragePageCache::CacheKey key(fmt::format("{}::{}", path, mtime), + static_cast(header_start + total), header_start); + StoragePageCache::instance()->insert(key, page, &handle, segment_v2::DATA_PAGE); + + // Create multiple readers that will hit cache + const int N = 4; + for (int i = 0; i < N; ++i) { + std::vector reader_backing(5000, 0); + memcpy(reader_backing.data() + header_start, cached.data(), total); + FakeBufferedReader reader(path, reader_backing); + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(512); + cc.meta_data.__set_total_compressed_size(total); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + FieldSchema fs; + fs.repetition_level = rl; + fs.definition_level = dl; + ColumnChunkReader ccr(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + // ASSERT_TRUE(ccr.next_page().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + Slice s = ccr.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + const Slice& rep = ccr.v2_rep_levels(); + const Slice& def = ccr.v2_def_levels(); + ASSERT_EQ(rep.size, rl); + ASSERT_EQ(def.size, dl); + } + // restore config + config::parquet_page_cache_decompress_threshold = old_thresh; + config::enable_parquet_cache_compressed_pages = old_enable_compressed; +} diff --git a/be/test/vec/exec/orc/orc_file_reader_test.cpp b/be/test/vec/exec/orc/orc_file_reader_test.cpp index 9e1003c397f07f..4c71129cdbbc3d 100644 --- a/be/test/vec/exec/orc/orc_file_reader_test.cpp +++ b/be/test/vec/exec/orc/orc_file_reader_test.cpp @@ -41,6 +41,8 @@ class MockFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return 0; } + void set_data(const std::string& data) { _data = data; } protected: diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 11df0e53eb109c..c6915af6eede47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -496,6 +496,9 @@ public class SessionVariable implements Serializable, Writable { public static final String SHOW_USER_DEFAULT_ROLE = "show_user_default_role"; public static final String ENABLE_PAGE_CACHE = "enable_page_cache"; + public static final String ENABLE_PARQUET_FILE_PAGE_CACHE = "enable_parquet_file_page_cache"; + // public static final String PARQUET_PAGE_CACHE_DECOMPRESS_THRESHOLD = "parquet_page_cache_decompress_threshold"; + // public static final String ENABLE_PARQUET_CACHE_COMPRESSED_PAGES = "enable_parquet_cache_compressed_pages"; public static final String MINIDUMP_PATH = "minidump_path"; @@ -2166,6 +2169,27 @@ public boolean isEnableHboNonStrictMatchingMode() { needForward = true) public boolean enablePageCache = true; + @VariableMgr.VarAttr( + name = ENABLE_PARQUET_FILE_PAGE_CACHE, + description = {"控制是否启用 Parquet file page cache。默认为 true。", + "Controls whether to use Parquet file page cache. The default is true."}, + needForward = true) + public boolean enableParquetFilePageCache = true; + + // @VariableMgr.VarAttr( + // name = PARQUET_PAGE_CACHE_DECOMPRESS_THRESHOLD, + // description = {"决定是否缓存解压后 page 的阈值,默认 1.5。", + // "Threshold ratio to decide caching decompressed parquet page, default 1.5."}, + // needForward = true) + // public double parquetPageCacheDecompressThreshold = 1.5; + + // @VariableMgr.VarAttr( + // name = ENABLE_PARQUET_CACHE_COMPRESSED_PAGES, + // description = {"控制是否缓存压缩的 Parquet 页面,默认为 false", + // "Controls whether to cache compressed parquet pages. Default false."}, + // needForward = true) + // public boolean enableParquetCacheCompressedPages = false; + @VariableMgr.VarAttr(name = ENABLE_FOLD_NONDETERMINISTIC_FN) public boolean enableFoldNondeterministicFn = false; @@ -4877,6 +4901,10 @@ public TQueryOptions toThrift() { tResult.setEnablePageCache(enablePageCache); + tResult.setEnableParquetFilePageCache(enableParquetFilePageCache); + // tResult.setParquetPageCacheDecompressThreshold(parquetPageCacheDecompressThreshold); + // tResult.setEnableParquetCacheCompressedPages(enableParquetCacheCompressedPages); + tResult.setFileCacheBasePath(fileCacheBasePath); tResult.setEnableInvertedIndexQuery(enableInvertedIndexQuery); @@ -4891,6 +4919,11 @@ public TQueryOptions toThrift() { tResult.setEnableOrcLazyMat(enableOrcLazyMat); tResult.setEnableParquetFilterByMinMax(enableParquetFilterByMinMax); tResult.setEnableParquetFilterByBloomFilter(enableParquetFilterByBloomFilter); + + // Parquet page cache session options + tResult.setEnableParquetFilePageCache(enableParquetFilePageCache); + // tResult.setParquetPageCacheDecompressThreshold(parquetPageCacheDecompressThreshold); + // tResult.setEnableParquetCacheCompressedPages(enableParquetCacheCompressedPages); tResult.setEnableOrcFilterByMinMax(enableOrcFilterByMinMax); tResult.setCheckOrcInitSargsSuccess(checkOrcInitSargsSuccess); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 5622929166ef5f..46b4a4423b4b99 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -426,6 +426,10 @@ struct TQueryOptions { // runtime profiling to choose the most efficient algorithm for the data pattern 183: optional bool enable_use_hybrid_sort = false; + // Parquet page cache session options + // Whether to enable parquet file page cache on BE for this query + 184: optional bool enable_parquet_file_page_cache = true; + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query.