diff --git a/.github/workflows/test_with_sanitizer.yaml b/.github/workflows/test_with_sanitizer.yaml index a083a773b..709f82b2a 100644 --- a/.github/workflows/test_with_sanitizer.yaml +++ b/.github/workflows/test_with_sanitizer.yaml @@ -52,6 +52,7 @@ jobs: env: CC: clang CXX: clang++ + PAIMON_CI_LINK_JOBS: 1 run: ci/scripts/build_paimon.sh $(pwd) true - name: Show ccache statistics if: always() diff --git a/ci/scripts/build_paimon.sh b/ci/scripts/build_paimon.sh index 5584d84f5..f54afa935 100755 --- a/ci/scripts/build_paimon.sh +++ b/ci/scripts/build_paimon.sh @@ -21,6 +21,9 @@ enable_sanitizer=${2:-false} check_clang_tidy=${3:-false} build_type=${4:-Debug} build_dir="${source_dir}/build" +build_jobs=${PAIMON_CI_BUILD_JOBS:-$(nproc)} +test_jobs=${PAIMON_CI_TEST_JOBS:-$(nproc)} +link_jobs=${PAIMON_CI_LINK_JOBS:-} # Display ccache status if available if command -v ccache &> /dev/null; then @@ -65,9 +68,16 @@ if [[ "${enable_sanitizer}" == "true" ]]; then ) fi +if [[ -n "${link_jobs}" ]]; then + CMAKE_ARGS+=( + "-DCMAKE_JOB_POOLS=paimon_link_pool=${link_jobs}" + "-DCMAKE_JOB_POOL_LINK=paimon_link_pool" + ) +fi + cmake "${CMAKE_ARGS[@]}" "${source_dir}" -cmake --build . -- -j "$(nproc)" -ctest --output-on-failure -j "$(nproc)" +cmake --build . -- -j "${build_jobs}" +ctest --output-on-failure -j "${test_jobs}" if [[ "${check_clang_tidy}" == "true" ]]; then cmake --build . --target check-clang-tidy diff --git a/docs/source/user_guide.rst b/docs/source/user_guide.rst index 4c497b5c3..e9d109cea 100644 --- a/docs/source/user_guide.rst +++ b/docs/source/user_guide.rst @@ -25,6 +25,7 @@ User Guide user_guide/snapshot user_guide/manifest user_guide/manifest_cache + user_guide/manifest_entry_cache user_guide/parquet_metadata_cache user_guide/data_types user_guide/primary_key_table diff --git a/docs/source/user_guide/manifest_entry_cache.rst b/docs/source/user_guide/manifest_entry_cache.rst new file mode 100644 index 000000000..a70e4d9ae --- /dev/null +++ b/docs/source/user_guide/manifest_entry_cache.rst @@ -0,0 +1,88 @@ +.. Copyright 2026-present Alibaba Inc. + +.. Licensed under the Apache License, Version 2.0 (the "License"); +.. you may not use this file except in compliance with the License. +.. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, software +.. distributed under the License is distributed on an "AS IS" BASIS, +.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +.. See the License for the specific language governing permissions and +.. limitations under the License. + +Manifest Entry Cache +==================== + +Overview +-------- + +Large tables may contain many manifest entries, while a scan may only need a +small subset after bucket, partition, and statistics pruning. The snapshot live +manifest entry cache reduces repeated manifest decoding cost for successive full +scans that target the same bucket. + +The cache stores decoded and merged live manifest entries by table path, branch, +and bucket for ``ScanMode::ALL``. Each cache value can retain several snapshot +results for that bucket. Exact snapshot hits are served from the cache; cache +misses rebuild the target snapshot bucket from the target snapshot's data +manifests and store the rebuilt live entries. + +Request-specific filters are not stored in the cache. Partition, level, and +predicate filters are still evaluated for each scan, so cached entries can be +reused safely across different scan predicates for the same bucket. + +Configuration +------------- + +Manifest entry caching reuses the cache instance provided by +``ScanContextBuilder::WithCache()`` and stores bucket-scoped snapshot entries +under +``CacheKind::SNAPSHOT_LIVE_MANIFEST``: + +.. code-block:: cpp + + auto cache = std::make_shared(128 * 1024 * 1024); + ScanContextBuilder context_builder(table_path); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr scan_context, + context_builder + .WithCache(cache) + .AddOption(Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "3") + .Finish()); + +Cache entries are scoped by table path, branch, and bucket, so they can be +reused across newly created ``TableScan`` and ``FileStoreScan`` instances as +long as they share the same cache object and scan the same bucket. + +``Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS`` controls how many snapshot +results are retained in each table/branch/bucket cache value. Older snapshots in +the same bucket are evicted first. The default value is ``0``, which disables +the cache path. Set it to a positive value to enable the cache when +``ScanContextBuilder::WithCache()`` is also configured. Physical cache eviction +is still controlled by the configured ``Cache`` implementation, for example the +capacity of ``LruCache``. + +If no cache is provided through ``ScanContextBuilder::WithCache()``, this +optimization is skipped. The snapshot manifest entry cache shares the same +``Cache`` interface with raw manifest and data-file footer caches, but it uses a +dedicated ``CacheKind`` and a table/branch/bucket key instead of file byte +ranges. + +Limitations +----------- + +The cache is currently used only for ``ScanMode::ALL`` scans that can determine +a single target bucket. It is skipped for scans without a bucket filter because +reading or deserializing all buckets would be too expensive for selective +queries. It is also skipped for row-range scans because row-range pruning is +applied at manifest-meta level. + +Metrics +------- + +The scan metrics expose existing counters for the last scan: + +- ``lastScannedManifests``: how many manifest files were loaded during this + scan before manifest entry decoding. diff --git a/include/paimon/cache/cache.h b/include/paimon/cache/cache.h index 89ed69047..1742bf187 100644 --- a/include/paimon/cache/cache.h +++ b/include/paimon/cache/cache.h @@ -33,6 +33,7 @@ enum class CacheKind { DEFAULT, MANIFEST, DATA_FILE_FOOTER, + SNAPSHOT_LIVE_MANIFEST, }; class PAIMON_EXPORT CacheKey { @@ -41,6 +42,9 @@ class PAIMON_EXPORT CacheKey { int32_t length, bool is_index); static std::shared_ptr ForKind(const std::string& file_path, int64_t position, int32_t length, CacheKind kind); + static std::shared_ptr ForSnapshotLiveManifestEntries(const std::string& table_path, + const std::string& branch, + int32_t bucket); public: virtual ~CacheKey() = default; diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 720cc521a..6e8e3c789 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -166,6 +166,11 @@ struct PAIMON_EXPORT Options { /// "latest-full", "latest", "from-snapshot", "from-snapshot-full". Default value is "default". static const char SCAN_MODE[]; + /// "scan.manifest-entry-cache.max-snapshots" - Maximum number of snapshot live manifest entry + /// results retained per table, branch, and bucket. Setting it to 0 disables manifest entry + /// cache. Default value is 0. + static const char SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS[]; + /// "read.batch-size" - Read batch size for any file format if it supports. /// The default value is 1024. static const char READ_BATCH_SIZE[]; diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index c883ec012..81036e53d 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -305,6 +305,7 @@ set(PAIMON_CORE_SRCS core/operation/raw_file_split_read.cpp core/operation/read_context.cpp core/operation/scan_context.cpp + core/manifest/snapshot_live_manifest_entries.cpp core/operation/write_context.cpp core/operation/write_restore.cpp core/postpone/postpone_bucket_writer.cpp diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index c0496d10a..4ee78a735 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -47,6 +47,8 @@ const char Options::SOURCE_SPLIT_TARGET_SIZE[] = "source.split.target-size"; const char Options::SOURCE_SPLIT_OPEN_FILE_COST[] = "source.split.open-file-cost"; const char Options::SCAN_SNAPSHOT_ID[] = "scan.snapshot-id"; const char Options::SCAN_MODE[] = "scan.mode"; +const char Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS[] = + "scan.manifest-entry-cache.max-snapshots"; const char Options::READ_BATCH_SIZE[] = "read.batch-size"; const char Options::WRITE_BATCH_SIZE[] = "write.batch-size"; const char Options::WRITE_BUFFER_SIZE[] = "write-buffer-size"; diff --git a/src/paimon/common/io/cache/cache_key.cpp b/src/paimon/common/io/cache/cache_key.cpp index 12732bb27..c457c6067 100644 --- a/src/paimon/common/io/cache/cache_key.cpp +++ b/src/paimon/common/io/cache/cache_key.cpp @@ -17,6 +17,49 @@ #include "paimon/common/io/cache/cache_key.h" namespace paimon { +namespace { + +class SnapshotLiveManifestEntriesCacheKey : public CacheKey { + public: + SnapshotLiveManifestEntriesCacheKey(const std::string& table_path, const std::string& branch, + int32_t bucket) + : CacheKey(CacheKind::SNAPSHOT_LIVE_MANIFEST), + table_path_(table_path), + branch_(branch), + bucket_(bucket) {} + + bool IsIndex() const override { + return false; + } + + bool Equals(const CacheKey& other) const override { + const auto* rhs = dynamic_cast(&other); + if (!rhs) { + return false; + } + return table_path_ == rhs->table_path_ && branch_ == rhs->branch_ && + bucket_ == rhs->bucket_ && GetKind() == rhs->GetKind(); + } + + size_t HashCode() const override { + size_t seed = 0; + seed ^= std::hash{}(table_path_) + HASH_CONSTANT + (seed << 6) + (seed >> 2); + seed ^= std::hash{}(branch_) + HASH_CONSTANT + (seed << 6) + (seed >> 2); + seed ^= std::hash{}(bucket_) + HASH_CONSTANT + (seed << 6) + (seed >> 2); + seed ^= std::hash{}(static_cast(GetKind())) + HASH_CONSTANT + + (seed << 6) + (seed >> 2); + return seed; + } + + private: + static constexpr uint64_t HASH_CONSTANT = 0x9e3779b97f4a7c15ULL; + + const std::string table_path_; + const std::string branch_; + const int32_t bucket_; +}; + +} // namespace std::shared_ptr CacheKey::ForPosition(const std::string& file_path, int64_t position, int32_t length, bool is_index) { @@ -31,6 +74,12 @@ std::shared_ptr CacheKey::ForKind(const std::string& file_path, int64_ return key; } +std::shared_ptr CacheKey::ForSnapshotLiveManifestEntries(const std::string& table_path, + const std::string& branch, + int32_t bucket) { + return std::make_shared(table_path, branch, bucket); +} + bool PositionCacheKey::IsIndex() const { return is_index_; } diff --git a/src/paimon/common/io/cache/lru_cache_test.cpp b/src/paimon/common/io/cache/lru_cache_test.cpp index 3864c8224..c96697952 100644 --- a/src/paimon/common/io/cache/lru_cache_test.cpp +++ b/src/paimon/common/io/cache/lru_cache_test.cpp @@ -381,6 +381,23 @@ TEST_F(LruCacheTest, TestForKindSetsKeyKind) { ASSERT_EQ(CacheKind::MANIFEST, put_key->GetKind()); } +TEST_F(LruCacheTest, TestForSnapshotLiveManifestEntries) { + auto main_key = CacheKey::ForSnapshotLiveManifestEntries("table_path", "main", 0); + auto same_key = CacheKey::ForSnapshotLiveManifestEntries("table_path", "main", 0); + auto branch_key = CacheKey::ForSnapshotLiveManifestEntries("table_path", "dev", 0); + auto table_key = CacheKey::ForSnapshotLiveManifestEntries("other_table_path", "main", 0); + auto bucket_key = CacheKey::ForSnapshotLiveManifestEntries("table_path", "main", 1); + auto hash_in_path_key = CacheKey::ForSnapshotLiveManifestEntries("table#path", "main", 0); + auto hash_in_branch_key = CacheKey::ForSnapshotLiveManifestEntries("table", "path#main", 0); + + ASSERT_EQ(CacheKind::SNAPSHOT_LIVE_MANIFEST, main_key->GetKind()); + ASSERT_TRUE(CacheKeyEqual()(main_key, same_key)); + ASSERT_FALSE(CacheKeyEqual()(main_key, branch_key)); + ASSERT_FALSE(CacheKeyEqual()(main_key, table_key)); + ASSERT_FALSE(CacheKeyEqual()(main_key, bucket_key)); + ASSERT_FALSE(CacheKeyEqual()(hash_in_path_key, hash_in_branch_key)); +} + /// Verifies that multiple evictions happen when a single large entry is inserted. TEST_F(LruCacheTest, TestMultipleEvictions) { LruCache cache(300); diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp index c960d4563..0e7511231 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp @@ -153,6 +153,7 @@ PrefetchFileBatchReaderImpl::~PrefetchFileBatchReaderImpl() { Status PrefetchFileBatchReaderImpl::SetReadSchema( ::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) { + PAIMON_RETURN_NOT_OK(CleanUp()); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr schema, arrow::ImportSchema(read_schema)); for (const auto& reader : readers_) { @@ -162,11 +163,15 @@ Status PrefetchFileBatchReaderImpl::SetReadSchema( } selection_bitmap_ = selection_bitmap; predicate_ = predicate; - return RefreshReadRanges(); + return RefreshReadRangesInternal(); } Status PrefetchFileBatchReaderImpl::RefreshReadRanges() { PAIMON_RETURN_NOT_OK(CleanUp()); + return RefreshReadRangesInternal(); +} + +Status PrefetchFileBatchReaderImpl::RefreshReadRangesInternal() { bool need_prefetch; PAIMON_ASSIGN_OR_RAISE(auto read_ranges, readers_[0]->GenReadRanges(&need_prefetch)); diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.h b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h index 19f936c8f..bb29f6279 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_impl.h +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h @@ -115,6 +115,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader { PrefetchCacheMode cache_mode); Status CleanUp(); + Status RefreshReadRangesInternal(); void Workloop(); void SetReadStatus(const Status& status); Status GetReadStatus() const; diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 2ad623d84..044bc5f83 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -403,6 +403,7 @@ struct CoreOptions::Impl { int32_t bucket = -1; int32_t manifest_merge_min_count = 30; + int32_t scan_manifest_entry_cache_max_snapshots = 0; int32_t read_batch_size = 1024; int32_t write_batch_size = 1024; int32_t local_sort_max_num_file_handles = 128; @@ -717,6 +718,13 @@ struct CoreOptions::Impl { } // Parse scan.mode - scanning behavior of the source, default "default" PAIMON_RETURN_NOT_OK(parser.ParseStartupMode(&startup_mode)); + // Parse scan.manifest-entry-cache.max-snapshots - cached snapshots per bucket. + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, + &scan_manifest_entry_cache_max_snapshots)); + if (scan_manifest_entry_cache_max_snapshots < 0) { + return Status::Invalid(fmt::format("{} must be non-negative", + Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS)); + } // Parse scan.fallback-branch - fallback branch when partition not found PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_FALLBACK_BRANCH, &scan_fallback_branch)); // Parse branch - branch name, default "main" @@ -968,6 +976,11 @@ std::optional CoreOptions::GetScanSnapshotId() const { std::optional CoreOptions::GetScanTimestampMillis() const { return impl_->scan_timestamp_millis; } + +int32_t CoreOptions::GetScanManifestEntryCacheMaxSnapshots() const { + return impl_->scan_manifest_entry_cache_max_snapshots; +} + int64_t CoreOptions::GetManifestTargetFileSize() const { return impl_->manifest_target_file_size; } diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 2b08dec2b..45ffa9364 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -79,6 +79,7 @@ class PAIMON_EXPORT CoreOptions { int64_t GetSourceSplitOpenFileCost() const; std::optional GetScanSnapshotId() const; std::optional GetScanTimestampMillis() const; + int32_t GetScanManifestEntryCacheMaxSnapshots() const; int64_t GetManifestTargetFileSize() const; std::shared_ptr GetCache() const; diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index fb8a29bc6..fd61f7d32 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -55,6 +55,7 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_EQ(8 * 1024 * 1024L, core_options.GetManifestTargetFileSize()); ASSERT_EQ(16 * 1024 * 1024L, core_options.GetManifestFullCompactionThresholdSize()); ASSERT_EQ(30, core_options.GetManifestMergeMinCount()); + ASSERT_EQ(0, core_options.GetScanManifestEntryCacheMaxSnapshots()); ASSERT_EQ(nullptr, core_options.GetCache()); ASSERT_EQ(128 * 1024 * 1024L, core_options.GetSourceSplitTargetSize()); ASSERT_EQ(4 * 1024 * 1024L, core_options.GetSourceSplitOpenFileCost()); @@ -186,6 +187,7 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::COMMIT_MAX_RETRIES, "20"}, {Options::SCAN_SNAPSHOT_ID, "5"}, {Options::SCAN_MODE, "from-snapshot-full"}, + {Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "7"}, {Options::SNAPSHOT_NUM_RETAINED_MIN, "15"}, {Options::SNAPSHOT_NUM_RETAINED_MAX, "30"}, {Options::SNAPSHOT_EXPIRE_LIMIT, "20"}, @@ -308,6 +310,7 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_EQ(120 * 1000, core_options.GetCommitTimeout()); ASSERT_EQ(20, core_options.GetCommitMaxRetries()); ASSERT_EQ(5, core_options.GetScanSnapshotId().value_or(-1)); + ASSERT_EQ(7, core_options.GetScanManifestEntryCacheMaxSnapshots()); ExpireConfig expire_config = core_options.GetExpireConfig(); ASSERT_EQ(15, expire_config.GetSnapshotRetainMin()); ASSERT_EQ(30, expire_config.GetSnapshotRetainMax()); @@ -437,6 +440,9 @@ TEST(CoreOptionsTest, TestInvalidCase) { "invalid lookup mode: invalid"); ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::LOOKUP_COMPACT_MAX_INTERVAL, "invalid"}}), "Invalid Config [lookup-compact.max-interval: invalid]"); + ASSERT_NOK_WITH_MSG( + CoreOptions::FromMap({{Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "-1"}}), + "scan.manifest-entry-cache.max-snapshots must be non-negative"); ASSERT_NOK_WITH_MSG( CoreOptions::FromMap({{Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO, "1.1"}}), "The high priority pool ratio should in the range [0, 1), while input is 1.1"); diff --git a/src/paimon/core/manifest/snapshot_live_manifest_entries.cpp b/src/paimon/core/manifest/snapshot_live_manifest_entries.cpp new file mode 100644 index 000000000..8ae3bb3a5 --- /dev/null +++ b/src/paimon/core/manifest/snapshot_live_manifest_entries.cpp @@ -0,0 +1,136 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/manifest/snapshot_live_manifest_entries.h" + +#include +#include +#include + +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/core/manifest/manifest_entry_serializer.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/io/data_input_stream.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/memory/memory_segment.h" + +namespace paimon { +namespace { + +constexpr int32_t kMagic = 0x534d4543; // SMEC + +size_t NormalizeMaxSnapshots(int32_t max_snapshots) { + return static_cast(std::max(0, max_snapshots)); +} + +std::shared_ptr ToBytes(const MemorySegmentOutputStream& out, + const std::shared_ptr& pool) { + auto bytes = Bytes::AllocateBytes(static_cast(out.CurrentSize()), pool.get()); + int64_t offset = 0; + for (const auto& segment : out.Segments()) { + int64_t copy_size = + std::min(segment.Size(), static_cast(bytes->size()) - offset); + if (copy_size <= 0) { + break; + } + std::memcpy(bytes->data() + offset, segment.Data(), static_cast(copy_size)); + offset += copy_size; + } + return bytes; +} + +} // namespace + +SnapshotLiveManifestEntries::SnapshotLiveManifestEntries(int32_t max_snapshots) + : max_snapshots_(max_snapshots) {} + +std::optional SnapshotLiveManifestEntries::LatestBeforeOrEqual( + int64_t snapshot_id) const { + auto iter = entries_by_snapshot_.upper_bound(snapshot_id); + if (iter == entries_by_snapshot_.begin()) { + return std::optional(); + } + --iter; + return Entry{iter->first, iter->second}; +} + +void SnapshotLiveManifestEntries::Put(int64_t snapshot_id, std::vector&& entries) { + if (NormalizeMaxSnapshots(max_snapshots_) == 0) { + return; + } + entries_by_snapshot_[snapshot_id] = + std::make_shared>(std::move(entries)); + EvictIfNeeded(); +} + +size_t SnapshotLiveManifestEntries::Size() const { + return entries_by_snapshot_.size(); +} + +Result> SnapshotLiveManifestEntries::Serialize( + const std::shared_ptr& pool) const { + MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + out.WriteValue(kMagic); + out.WriteValue(static_cast(entries_by_snapshot_.size())); + + ManifestEntrySerializer serializer(pool); + for (const auto& [snapshot_id, entries] : entries_by_snapshot_) { + out.WriteValue(snapshot_id); + PAIMON_RETURN_NOT_OK(serializer.SerializeList(*entries, &out)); + } + return ToBytes(out, pool); +} + +Result SnapshotLiveManifestEntries::Deserialize( + const MemorySegment& segment, int32_t max_snapshots, const std::shared_ptr& pool) { + SnapshotLiveManifestEntries snapshot_live_manifest_entries(max_snapshots); + if (segment.Data() == nullptr || segment.Size() == 0) { + return snapshot_live_manifest_entries; + } + + auto bytes = segment.GetOrCreateHeapMemory(pool.get()); + auto input_stream = std::make_shared(bytes->data(), bytes->size()); + DataInputStream in(input_stream); + + PAIMON_ASSIGN_OR_RAISE(int32_t magic, in.ReadValue()); + if (magic != kMagic) { + return Status::Invalid("invalid snapshot live manifest entries magic"); + } + PAIMON_ASSIGN_OR_RAISE(int32_t snapshot_count, in.ReadValue()); + if (snapshot_count < 0) { + return Status::Invalid("snapshot live manifest entries snapshot count is negative"); + } + + ManifestEntrySerializer serializer(pool); + for (int32_t i = 0; i < snapshot_count; i++) { + PAIMON_ASSIGN_OR_RAISE(int64_t snapshot_id, in.ReadValue()); + PAIMON_ASSIGN_OR_RAISE(std::vector entries, serializer.DeserializeList(&in)); + snapshot_live_manifest_entries.entries_by_snapshot_[snapshot_id] = + std::make_shared>(std::move(entries)); + } + snapshot_live_manifest_entries.EvictIfNeeded(); + return snapshot_live_manifest_entries; +} + +void SnapshotLiveManifestEntries::EvictIfNeeded() { + size_t max_snapshots = NormalizeMaxSnapshots(max_snapshots_); + while (entries_by_snapshot_.size() > max_snapshots) { + entries_by_snapshot_.erase(entries_by_snapshot_.begin()); + } +} + +} // namespace paimon diff --git a/src/paimon/core/manifest/snapshot_live_manifest_entries.h b/src/paimon/core/manifest/snapshot_live_manifest_entries.h new file mode 100644 index 000000000..0372b7428 --- /dev/null +++ b/src/paimon/core/manifest/snapshot_live_manifest_entries.h @@ -0,0 +1,63 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/result.h" + +namespace paimon { +class Bytes; +class MemoryPool; +class MemorySegment; + +/// Live manifest entries retained for one bucket across multiple snapshots. +/// +/// This value object owns merged live manifest entries by snapshot id. It does not own or access a +/// cache; callers are responsible for storing the serialized bytes in the cache layer. +class SnapshotLiveManifestEntries { + public: + struct Entry { + int64_t snapshot_id; + std::shared_ptr> entries; + }; + + explicit SnapshotLiveManifestEntries(int32_t max_snapshots); + + std::optional LatestBeforeOrEqual(int64_t snapshot_id) const; + void Put(int64_t snapshot_id, std::vector&& entries); + size_t Size() const; + + Result> Serialize(const std::shared_ptr& pool) const; + static Result Deserialize(const MemorySegment& segment, + int32_t max_snapshots, + const std::shared_ptr& pool); + + private: + void EvictIfNeeded(); + + std::map>> entries_by_snapshot_; + int32_t max_snapshots_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/append_only_file_store_scan_test.cpp b/src/paimon/core/operation/append_only_file_store_scan_test.cpp index ce3824e60..d6ab1f8ad 100644 --- a/src/paimon/core/operation/append_only_file_store_scan_test.cpp +++ b/src/paimon/core/operation/append_only_file_store_scan_test.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -25,6 +26,7 @@ #include "gtest/gtest.h" #include "paimon/common/data/binary_row.h" #include "paimon/common/data/binary_row_writer.h" +#include "paimon/common/io/cache/lru_cache.h" #include "paimon/core/manifest/partition_entry.h" #include "paimon/core/operation/metrics/scan_metrics.h" #include "paimon/core/schema/schema_manager.h" @@ -175,4 +177,72 @@ TEST(AppendOnlyFileStoreScanTest, TestScanDurationMetric) { ASSERT_LE(stats.p50, stats.p99); ASSERT_LE(stats.p99, stats.max); } + +namespace { + +std::shared_ptr BuildScan(const std::string& table_path, + const std::shared_ptr& cache, + const std::optional& bucket = std::nullopt) { + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::FILE_FORMAT, "orc") + .AddOption(Options::MANIFEST_FORMAT, "orc") + .AddOption(Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "8") + .WithCache(cache); + if (bucket) { + context_builder.SetBucketFilter(bucket.value()); + } + EXPECT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + EXPECT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + auto typed_table_scan = dynamic_cast(table_scan.get()); + EXPECT_TRUE(typed_table_scan); + return typed_table_scan->snapshot_reader_->scan_; +} + +} // namespace + +TEST(AppendOnlyFileStoreScanTest, TestSnapshotLiveManifestCachePath) { + TimezoneGuard guard("Asia/Shanghai"); + std::string table_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09/"; + auto cache = std::make_shared(/*max_weight=*/16 * 1024 * 1024); + + // First scan on snapshot 5: cache miss, entries rebuilt from all manifests. + auto scan_first = BuildScan(table_path, cache, /*bucket=*/0); + ASSERT_OK_AND_ASSIGN(Snapshot snapshot_5, + scan_first->GetSnapshotManager()->LoadSnapshot(/*snapshot_id=*/5)); + scan_first->WithSnapshot(snapshot_5); + ASSERT_OK_AND_ASSIGN(auto plan_first, scan_first->CreatePlan()); + size_t first_size = plan_first->Files().size(); + + // Second scan on the same snapshot should read the same bucket live entries from cache. + auto scan_second = BuildScan(table_path, cache, /*bucket=*/0); + scan_second->WithSnapshot(snapshot_5); + ASSERT_OK_AND_ASSIGN(auto plan_second, scan_second->CreatePlan()); + ASSERT_EQ(first_size, plan_second->Files().size()); +} + +TEST(AppendOnlyFileStoreScanTest, TestSnapshotLiveManifestCacheRebuildOnMiss) { + TimezoneGuard guard("Asia/Shanghai"); + std::string table_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09/"; + auto cache = std::make_shared(/*max_weight=*/16 * 1024 * 1024); + + // Seed the cache with an earlier snapshot. + auto scan_base = BuildScan(table_path, cache, /*bucket=*/0); + ASSERT_OK_AND_ASSIGN(Snapshot snapshot_3, + scan_base->GetSnapshotManager()->LoadSnapshot(/*snapshot_id=*/3)); + scan_base->WithSnapshot(snapshot_3); + ASSERT_OK_AND_ASSIGN(auto plan_base, scan_base->CreatePlan()); + (void)plan_base; + + // Advance to a newer snapshot: cache miss rebuilds the target snapshot bucket. + auto scan_next = BuildScan(table_path, cache, /*bucket=*/0); + ASSERT_OK_AND_ASSIGN(Snapshot snapshot_5, + scan_next->GetSnapshotManager()->LoadSnapshot(/*snapshot_id=*/5)); + scan_next->WithSnapshot(snapshot_5); + ASSERT_OK_AND_ASSIGN(auto plan_next, scan_next->CreatePlan()); + + auto scan_expected = BuildScan(table_path, /*cache=*/nullptr, /*bucket=*/0); + scan_expected->WithSnapshot(snapshot_5); + ASSERT_OK_AND_ASSIGN(auto plan_expected, scan_expected->CreatePlan()); + ASSERT_EQ(plan_expected->Files().size(), plan_next->Files().size()); +} } // namespace paimon::test diff --git a/src/paimon/core/operation/file_store_scan.cpp b/src/paimon/core/operation/file_store_scan.cpp index 876fa6276..44f56ffc1 100644 --- a/src/paimon/core/operation/file_store_scan.cpp +++ b/src/paimon/core/operation/file_store_scan.cpp @@ -16,6 +16,7 @@ #include "paimon/core/operation/file_store_scan.h" +#include #include #include #include @@ -26,6 +27,7 @@ #include "arrow/type.h" #include "fmt/format.h" +#include "paimon/cache/cache.h" #include "paimon/common/data/binary_array.h" #include "paimon/common/data/blob_utils.h" #include "paimon/common/executor/future.h" @@ -38,13 +40,17 @@ #include "paimon/core/manifest/manifest_file.h" #include "paimon/core/manifest/manifest_file_meta.h" #include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/manifest/snapshot_live_manifest_entries.h" #include "paimon/core/operation/metrics/scan_metrics.h" #include "paimon/core/partition/partition_info.h" #include "paimon/core/stats/simple_stats.h" #include "paimon/core/stats/simple_stats_evolution.h" +#include "paimon/core/utils/branch_manager.h" #include "paimon/core/utils/duration.h" #include "paimon/core/utils/field_mapping.h" #include "paimon/core/utils/snapshot_manager.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_segment.h" #include "paimon/predicate/literal.h" #include "paimon/predicate/predicate.h" #include "paimon/predicate/predicate_builder.h" @@ -111,7 +117,8 @@ Result> FileStoreScan::ReadPartitionEntries() const PAIMON_RETURN_NOT_OK( ReadManifests(&snapshot, &all_manifest_file_metas, &filtered_manifest_file_metas)); std::vector manifest_entries; - PAIMON_RETURN_NOT_OK(ReadFileEntries(filtered_manifest_file_metas, &manifest_entries)); + PAIMON_RETURN_NOT_OK(ReadFileEntries(filtered_manifest_file_metas, &manifest_entries, + /*apply_scan_filter=*/true)); std::unordered_map partitions; PAIMON_RETURN_NOT_OK(PartitionEntry::Merge(manifest_entries, &partitions)); @@ -134,7 +141,26 @@ Result> FileStoreScan::CreatePlan() cons ReadManifests(&snapshot, &all_manifest_file_metas, &filtered_manifest_file_metas)); std::vector manifest_entries; - PAIMON_RETURN_NOT_OK(ReadManifestEntries(filtered_manifest_file_metas, &manifest_entries)); + const bool use_snapshot_live_manifest_cache = + snapshot.has_value() && scan_mode_ == ScanMode::ALL && + core_options_.GetScanManifestEntryCacheMaxSnapshots() > 0 && + core_options_.GetCache() != nullptr && !table_path_.empty() && + !row_range_index_.has_value() && bucket_filter_.has_value(); + if (use_snapshot_live_manifest_cache) { + PAIMON_RETURN_NOT_OK(ReadManifestEntriesWithCache( + snapshot.value(), all_manifest_file_metas, bucket_filter_.value(), &manifest_entries)); + std::vector filtered_entries; + filtered_entries.reserve(manifest_entries.size()); + for (auto& entry : manifest_entries) { + PAIMON_ASSIGN_OR_RAISE(bool keep, FilterManifestEntry(entry)); + if (keep) { + filtered_entries.emplace_back(std::move(entry)); + } + } + manifest_entries = std::move(filtered_entries); + } else { + PAIMON_RETURN_NOT_OK(ReadManifestEntries(filtered_manifest_file_metas, &manifest_entries)); + } PAIMON_ASSIGN_OR_RAISE(manifest_entries, PostFilterManifestEntries(std::move(manifest_entries))); @@ -169,7 +195,8 @@ Result> FileStoreScan::CreatePlan() cons metrics_->ObserveHistogram(ScanMetrics::SCAN_DURATION, static_cast(scan_duration_ms)); metrics_->SetCounter(ScanMetrics::LAST_SCANNED_SNAPSHOT_ID, snapshot.has_value() ? snapshot.value().Id() : int64_t{0}); - metrics_->SetCounter(ScanMetrics::LAST_SCANNED_MANIFESTS, filtered_manifest_file_metas.size()); + metrics_->SetCounter(ScanMetrics::LAST_SCANNED_MANIFESTS, + static_cast(filtered_manifest_file_metas.size())); metrics_->SetCounter( ScanMetrics::LAST_SCAN_SKIPPED_TABLE_FILES, std::max(int64_t{0}, all_data_files - static_cast(manifest_entries.size()))); @@ -218,12 +245,19 @@ Status FileStoreScan::ReadManifestsWithSnapshot(const Snapshot& snapshot, } Status FileStoreScan::ReadFileEntries(const std::vector& manifest_metas, - std::vector* manifest_entries) const { + std::vector* manifest_entries, + bool apply_scan_filter) const { std::vector>>> futures; for (const auto& meta : manifest_metas) { - auto read_meta_task = [this, meta]() -> Result> { + auto read_meta_task = [this, meta, + apply_scan_filter]() -> Result> { std::vector tmp_entries; - PAIMON_RETURN_NOT_OK(ReadManifestFileMeta(meta, &tmp_entries)); + if (apply_scan_filter) { + PAIMON_RETURN_NOT_OK(ReadManifestFileMeta(meta, &tmp_entries)); + } else { + PAIMON_RETURN_NOT_OK( + manifest_file_->Read(meta.FileName(), /*filter=*/nullptr, &tmp_entries)); + } return tmp_entries; }; futures.push_back(Via(executor_.get(), read_meta_task)); @@ -251,29 +285,129 @@ Status FileStoreScan::ReadManifestEntries(const std::vector& m return ReadAndNoMergeFileEntries(manifest_metas, manifest_entries); } -Status FileStoreScan::ReadAndMergeFileEntries(const std::vector& manifest_metas, - std::vector* merged_entries) const { +// Cache merged live manifest entries for one bucket before applying scan filters. Each cache value +// keeps a bounded number of snapshot results for the same table/branch/bucket. Exact snapshot hits +// can be returned directly; cache misses rebuild the target snapshot bucket from the target +// snapshot's data manifests. +Status FileStoreScan::ReadManifestEntriesWithCache( + const Snapshot& snapshot, const std::vector& all_manifest_metas, + int32_t bucket, std::vector* manifest_entries) const { + PAIMON_ASSIGN_OR_RAISE(SnapshotLiveManifestEntries cached_entries, + LoadSnapshotLiveManifestEntries(bucket)); + std::optional cached = + cached_entries.LatestBeforeOrEqual(snapshot.Id()); + if (cached && cached->snapshot_id == snapshot.Id()) { + *manifest_entries = *cached->entries; + return Status::OK(); + } + + // Rebuild the target snapshot bucket from all manifests and write the live entries back to the + // cache. + std::vector bucket_manifest_metas; + for (const auto& meta : all_manifest_metas) { + if (MayContainBucket(meta, bucket)) { + bucket_manifest_metas.push_back(meta); + } + } + PAIMON_RETURN_NOT_OK( + ReadAndMergeBucketFileEntries(bucket_manifest_metas, bucket, manifest_entries)); + std::vector cache_entries = *manifest_entries; + cached_entries.Put(snapshot.Id(), std::move(cache_entries)); + PAIMON_RETURN_NOT_OK(StoreSnapshotLiveManifestEntries(bucket, cached_entries)); + return Status::OK(); +} + +std::shared_ptr FileStoreScan::SnapshotLiveManifestEntriesCacheKey(int32_t bucket) const { + return CacheKey::ForSnapshotLiveManifestEntries( + table_path_, BranchManager::NormalizeBranch(core_options_.GetBranch()), bucket); +} + +Result FileStoreScan::LoadSnapshotLiveManifestEntries( + int32_t bucket) const { + auto supplier = [](const std::shared_ptr&) -> Result> { + return std::shared_ptr(); + }; + std::shared_ptr cache_key = SnapshotLiveManifestEntriesCacheKey(bucket); + const auto max_snapshots = core_options_.GetScanManifestEntryCacheMaxSnapshots(); + Result> cache_result = + core_options_.GetCache()->Get(cache_key, supplier); + if (!cache_result.ok() || !cache_result.value()) { + return SnapshotLiveManifestEntries(max_snapshots); + } + Result deserialized = SnapshotLiveManifestEntries::Deserialize( + cache_result.value()->GetSegment(), max_snapshots, pool_); + if (!deserialized.ok()) { + return SnapshotLiveManifestEntries(max_snapshots); + } + return std::move(deserialized.value()); +} + +Status FileStoreScan::StoreSnapshotLiveManifestEntries( + int32_t bucket, const SnapshotLiveManifestEntries& entries) const { + Result> bytes_result = entries.Serialize(pool_); + if (!bytes_result.ok()) { + return Status::OK(); + } + auto cache_value = + std::make_shared(MemorySegment::Wrap(bytes_result.value()), CacheCallback()); + Status status = + core_options_.GetCache()->Put(SnapshotLiveManifestEntriesCacheKey(bucket), cache_value); + return status.ok() ? status : Status::OK(); +} + +Status FileStoreScan::ReadAndMergeBucketFileEntries( + const std::vector& manifest_metas, int32_t bucket, + std::vector* merged_entries) const { std::vector unmerged_entries; - PAIMON_RETURN_NOT_OK(ReadFileEntries(manifest_metas, &unmerged_entries)); + std::vector entries; + PAIMON_RETURN_NOT_OK(ReadFileEntries(manifest_metas, &entries, /*apply_scan_filter=*/false)); + unmerged_entries.reserve(entries.size()); + for (auto& entry : entries) { + if (entry.Bucket() == bucket) { + unmerged_entries.emplace_back(std::move(entry)); + } + } + return MergeLiveEntries(unmerged_entries, merged_entries); +} + +Status FileStoreScan::MergeLiveEntries(const std::vector& unmerged_entries, + std::vector* live_entries) { std::unordered_set deleted_entries; for (const auto& entry : unmerged_entries) { if (entry.Kind() == FileKind::Delete()) { deleted_entries.insert(entry.CreateIdentifier()); } } - for (auto& entry : unmerged_entries) { + for (const auto& entry : unmerged_entries) { if (entry.Kind() == FileKind::Add() && deleted_entries.find(entry.CreateIdentifier()) == deleted_entries.end()) { - merged_entries->push_back(std::move(entry)); + live_entries->push_back(entry); } } return Status::OK(); } +Status FileStoreScan::ReadAndMergeFileEntries(const std::vector& manifest_metas, + std::vector* merged_entries) const { + std::vector unmerged_entries; + PAIMON_RETURN_NOT_OK( + ReadFileEntries(manifest_metas, &unmerged_entries, /*apply_scan_filter=*/true)); + return MergeLiveEntries(unmerged_entries, merged_entries); +} + Status FileStoreScan::ReadAndNoMergeFileEntries( const std::vector& manifest_metas, std::vector* manifest_entries) const { - return ReadFileEntries(manifest_metas, manifest_entries); + return ReadFileEntries(manifest_metas, manifest_entries, /*apply_scan_filter=*/true); +} + +bool FileStoreScan::MayContainBucket(const ManifestFileMeta& manifest, int32_t bucket) const { + const std::optional& min_bucket = manifest.MinBucket(); + const std::optional& max_bucket = manifest.MaxBucket(); + if (min_bucket && max_bucket) { + return bucket >= min_bucket.value() && bucket <= max_bucket.value(); + } + return true; } Result FileStoreScan::FilterManifestFileMeta(const ManifestFileMeta& manifest) const { @@ -319,37 +453,38 @@ bool FileStoreScan::FilterManifestByRowRanges(const ManifestFileMeta& manifest) Status FileStoreScan::ReadManifestFileMeta(const ManifestFileMeta& manifest, std::vector* entries) const { - auto filter = [&](const ManifestEntry& entry) -> Result { - if (partition_filter_) { - PAIMON_ASSIGN_OR_RAISE(bool res, - partition_filter_->Test(partition_schema_, entry.Partition())); - if (!res) { - return false; - } - } - if (only_read_real_buckets_ && entry.Bucket() < 0) { - return false; - } - if (bucket_filter_ != std::nullopt && entry.Bucket() != bucket_filter_.value()) { - return false; - } - if (level_filter_ != nullptr && !level_filter_(entry.Level())) { - return false; - } - return true; - }; std::vector unfiltered_entries; - PAIMON_RETURN_NOT_OK(manifest_file_->Read(manifest.FileName(), filter, &unfiltered_entries)); + PAIMON_RETURN_NOT_OK(manifest_file_->Read( + manifest.FileName(), + [this](const ManifestEntry& entry) -> Result { return FilterManifestEntry(entry); }, + &unfiltered_entries)); entries->reserve(entries->size() + unfiltered_entries.size()); for (auto& entry : unfiltered_entries) { - PAIMON_ASSIGN_OR_RAISE(bool res, FilterByStats(entry)); - if (res) { - entries->emplace_back(std::move(entry)); - } + entries->emplace_back(std::move(entry)); } return Status::OK(); } +Result FileStoreScan::FilterManifestEntry(const ManifestEntry& entry) const { + if (partition_filter_) { + PAIMON_ASSIGN_OR_RAISE(bool res, + partition_filter_->Test(partition_schema_, entry.Partition())); + if (!res) { + return false; + } + } + if (only_read_real_buckets_ && entry.Bucket() < 0) { + return false; + } + if (bucket_filter_ != std::nullopt && entry.Bucket() != bucket_filter_.value()) { + return false; + } + if (level_filter_ != nullptr && !level_filter_(entry.Level())) { + return false; + } + return FilterByStats(entry); +} + Status FileStoreScan::SplitAndSetFilter(const std::vector& partition_keys, const std::shared_ptr& arrow_schema, const std::shared_ptr& scan_filters) { diff --git a/src/paimon/core/operation/file_store_scan.h b/src/paimon/core/operation/file_store_scan.h index 46a06e4a7..cfd426393 100644 --- a/src/paimon/core/operation/file_store_scan.h +++ b/src/paimon/core/operation/file_store_scan.h @@ -21,7 +21,6 @@ #include #include #include -#include #include #include #include @@ -41,6 +40,7 @@ #include "paimon/core/manifest/manifest_file_meta.h" #include "paimon/core/manifest/manifest_list.h" #include "paimon/core/manifest/partition_entry.h" +#include "paimon/core/manifest/snapshot_live_manifest_entries.h" #include "paimon/core/schema/schema_manager.h" #include "paimon/core/snapshot.h" #include "paimon/core/table/source/scan_mode.h" @@ -57,6 +57,7 @@ class Schema; } // namespace arrow namespace paimon { +class CacheKey; class Executor; class FileKind; class ManifestFile; @@ -120,6 +121,11 @@ class FileStoreScan { return this; } + FileStoreScan* WithTablePath(const std::string& table_path) { + table_path_ = table_path; + return this; + } + virtual FileStoreScan* EnableValueFilter() { return this; } @@ -237,6 +243,26 @@ class FileStoreScan { Status ReadManifestEntries(const std::vector& manifest_metas, std::vector* manifest_entries) const; + Status ReadManifestEntriesWithCache(const Snapshot& snapshot, + const std::vector& bucket_manifest_metas, + int32_t bucket, + std::vector* manifest_entries) const; + std::shared_ptr SnapshotLiveManifestEntriesCacheKey(int32_t bucket) const; + Result LoadSnapshotLiveManifestEntries(int32_t bucket) const; + Status StoreSnapshotLiveManifestEntries(int32_t bucket, + const SnapshotLiveManifestEntries& entries) const; + + Status ReadAndMergeBucketFileEntries(const std::vector& manifest_metas, + int32_t bucket, + std::vector* merged_entries) const; + + /// Merge raw manifest entries into the set of currently-live files. Entries are deduplicated + /// by identifier (matching Add cancels a prior or following Delete), and lingering Delete + /// entries are dropped so the caller receives Add-only output, matching the semantics of + /// `ReadAndMergeFileEntries`. + static Status MergeLiveEntries(const std::vector& unmerged_entries, + std::vector* live_entries); + Status ReadAndMergeFileEntries(const std::vector& manifest_metas, std::vector* merged_entries) const; @@ -244,7 +270,10 @@ class FileStoreScan { std::vector* manifest_entries) const; Status ReadFileEntries(const std::vector& manifest_metas, - std::vector* manifest_entries) const; + std::vector* manifest_entries, + bool apply_scan_filter) const; + + bool MayContainBucket(const ManifestFileMeta& manifest, int32_t bucket) const; Result FilterManifestFileMeta(const ManifestFileMeta& manifest) const; @@ -253,6 +282,8 @@ class FileStoreScan { Status ReadManifestFileMeta(const ManifestFileMeta& manifest, std::vector* entries) const; + Result FilterManifestEntry(const ManifestEntry& entry) const; + protected: std::shared_ptr pool_; std::shared_ptr schema_manager_; @@ -265,7 +296,6 @@ class FileStoreScan { CoreOptions core_options_; private: - mutable std::mutex lock_; bool only_read_real_buckets_ = false; std::shared_ptr snapshot_manager_; std::shared_ptr manifest_list_; @@ -277,5 +307,6 @@ class FileStoreScan { std::function level_filter_; std::optional specified_snapshot_; std::shared_ptr metrics_; + std::string table_path_; }; } // namespace paimon diff --git a/src/paimon/core/operation/file_store_scan_test.cpp b/src/paimon/core/operation/file_store_scan_test.cpp index d2ef0db13..c7f171699 100644 --- a/src/paimon/core/operation/file_store_scan_test.cpp +++ b/src/paimon/core/operation/file_store_scan_test.cpp @@ -16,13 +16,17 @@ #include "paimon/core/operation/file_store_scan.h" +#include + #include "arrow/type.h" #include "gtest/gtest.h" #include "paimon/core/io/data_file_meta.h" #include "paimon/core/manifest/file_kind.h" #include "paimon/core/manifest/file_source.h" +#include "paimon/core/manifest/snapshot_live_manifest_entries.h" #include "paimon/core/stats/simple_stats.h" #include "paimon/data/timestamp.h" +#include "paimon/memory/memory_segment.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { @@ -183,4 +187,88 @@ TEST_F(FileStoreScanTest, TestFilterManifestByRowRanges) { file_store_scan->WithRowRangeIndex(row_range_index); ASSERT_TRUE(file_store_scan->FilterManifestByRowRanges(manifest2)); } + +TEST_F(FileStoreScanTest, TestSnapshotLiveManifestEntries) { + std::vector snapshot1; + ASSERT_OK_AND_ASSIGN( + auto file1, + DataFileMeta::ForAppend("file-1", /*file_size=*/10, /*row_count=*/1, + SimpleStats::EmptyStats(), /*min_sequence_number=*/0, + /*max_sequence_number=*/0, /*schema_id=*/0, + /*file_source=*/std::nullopt, /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt)); + snapshot1.emplace_back(FileKind::Add(), BinaryRow::EmptyRow(), /*bucket=*/0, + /*total_buckets=*/1, file1); + SnapshotLiveManifestEntries entries(/*max_snapshots=*/2); + entries.Put(/*snapshot_id=*/1, std::move(snapshot1)); + ASSERT_EQ(entries.Size(), 1); + auto hit = entries.LatestBeforeOrEqual(/*snapshot_id=*/1); + ASSERT_TRUE(hit); + ASSERT_EQ(hit->snapshot_id, 1); + ASSERT_EQ(hit->entries->size(), 1); + ASSERT_EQ((*hit->entries)[0].FileName(), "file-1"); + auto latest_before_2 = entries.LatestBeforeOrEqual(/*snapshot_id=*/2); + ASSERT_TRUE(latest_before_2); + ASSERT_EQ(latest_before_2->snapshot_id, 1); + + std::vector snapshot3; + ASSERT_OK_AND_ASSIGN( + auto file3, + DataFileMeta::ForAppend("file-3", /*file_size=*/10, /*row_count=*/1, + SimpleStats::EmptyStats(), /*min_sequence_number=*/0, + /*max_sequence_number=*/0, /*schema_id=*/0, + /*file_source=*/std::nullopt, /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt)); + snapshot3.emplace_back(FileKind::Add(), BinaryRow::EmptyRow(), /*bucket=*/0, + /*total_buckets=*/1, file3); + entries.Put(/*snapshot_id=*/3, std::move(snapshot3)); + + auto latest_before_4 = entries.LatestBeforeOrEqual(/*snapshot_id=*/4); + ASSERT_TRUE(latest_before_4); + ASSERT_EQ(latest_before_4->snapshot_id, 3); + + entries.Put(/*snapshot_id=*/5, {}); + ASSERT_EQ(entries.Size(), 2); + ASSERT_FALSE(entries.LatestBeforeOrEqual(/*snapshot_id=*/1)); + ASSERT_TRUE(entries.LatestBeforeOrEqual(/*snapshot_id=*/3)); + ASSERT_TRUE(entries.LatestBeforeOrEqual(/*snapshot_id=*/5)); +} + +TEST_F(FileStoreScanTest, TestSnapshotLiveManifestEntriesSerialization) { + std::vector manifest_entries; + ASSERT_OK_AND_ASSIGN( + auto file1, + DataFileMeta::ForAppend("file-1", /*file_size=*/10, /*row_count=*/1, + SimpleStats::EmptyStats(), /*min_sequence_number=*/0, + /*max_sequence_number=*/0, /*schema_id=*/0, + /*file_source=*/std::nullopt, /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt)); + manifest_entries.emplace_back(FileKind::Add(), BinaryRow::EmptyRow(), /*bucket=*/0, + /*total_buckets=*/1, file1); + SnapshotLiveManifestEntries entries(/*max_snapshots=*/2); + entries.Put(/*snapshot_id=*/1, std::move(manifest_entries)); + entries.Put(/*snapshot_id=*/3, {}); + + ASSERT_OK_AND_ASSIGN(auto bytes, entries.Serialize(GetDefaultPool())); + ASSERT_OK_AND_ASSIGN(auto deserialized, + SnapshotLiveManifestEntries::Deserialize( + MemorySegment::Wrap(bytes), /*max_snapshots=*/2, GetDefaultPool())); + ASSERT_EQ(deserialized.Size(), 2); + auto hit = deserialized.LatestBeforeOrEqual(/*snapshot_id=*/2); + ASSERT_TRUE(hit); + ASSERT_EQ(hit->snapshot_id, 1); + ASSERT_EQ(hit->entries->size(), 1); + ASSERT_EQ((*hit->entries)[0].FileName(), "file-1"); + ASSERT_EQ(deserialized.LatestBeforeOrEqual(/*snapshot_id=*/4)->snapshot_id, 3); + + ASSERT_OK_AND_ASSIGN(auto evicted_deserialized, SnapshotLiveManifestEntries::Deserialize( + MemorySegment::Wrap(bytes), + /*max_snapshots=*/1, GetDefaultPool())); + ASSERT_EQ(evicted_deserialized.Size(), 1); + ASSERT_FALSE(evicted_deserialized.LatestBeforeOrEqual(/*snapshot_id=*/1)); + ASSERT_EQ(evicted_deserialized.LatestBeforeOrEqual(/*snapshot_id=*/3)->snapshot_id, 3); +} } // namespace paimon::test diff --git a/src/paimon/core/table/source/table_scan.cpp b/src/paimon/core/table/source/table_scan.cpp index 7869cf8b5..62155aaad 100644 --- a/src/paimon/core/table/source/table_scan.cpp +++ b/src/paimon/core/table/source/table_scan.cpp @@ -17,6 +17,7 @@ #include "paimon/table/source/table_scan.h" #include +#include #include #include #include @@ -95,19 +96,35 @@ class TableScanImpl { ManifestFile::Create(fs, manifest_file_format, core_options.GetManifestCompression(), path_factory, core_options.GetManifestTargetFileSize(), memory_pool, core_options, partition_schema)); + std::unique_ptr scan; if (table_schema->PrimaryKeys().empty()) { if (core_options.DataEvolutionEnabled()) { - return DataEvolutionFileStoreScan::Create( - snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema, - arrow_schema, context->GetScanFilters(), core_options, executor, memory_pool); + PAIMON_ASSIGN_OR_RAISE( + scan, DataEvolutionFileStoreScan::Create( + snapshot_manager, schema_manager, manifest_list, manifest_file, + table_schema, arrow_schema, context->GetScanFilters(), core_options, + executor, memory_pool)); + } else { + PAIMON_ASSIGN_OR_RAISE( + scan, AppendOnlyFileStoreScan::Create( + snapshot_manager, schema_manager, manifest_list, manifest_file, + table_schema, arrow_schema, context->GetScanFilters(), core_options, + executor, memory_pool)); } - return AppendOnlyFileStoreScan::Create( - snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema, - arrow_schema, context->GetScanFilters(), core_options, executor, memory_pool); + } else { + PAIMON_ASSIGN_OR_RAISE( + scan, KeyValueFileStoreScan::Create(snapshot_manager, schema_manager, manifest_list, + manifest_file, table_schema, arrow_schema, + context->GetScanFilters(), core_options, + executor, memory_pool)); } - return KeyValueFileStoreScan::Create( - snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema, - arrow_schema, context->GetScanFilters(), core_options, executor, memory_pool); + return WithTablePath(std::move(scan), context); + } + + static std::unique_ptr WithTablePath(std::unique_ptr&& scan, + const ScanContext* context) { + scan->WithTablePath(context->GetPath()); + return std::move(scan); } static Result> CreateSplitGenerator( diff --git a/src/paimon/testing/utils/counting_cache_test_utils.h b/src/paimon/testing/utils/counting_cache_test_utils.h index 5d4b88fc3..ae683e732 100644 --- a/src/paimon/testing/utils/counting_cache_test_utils.h +++ b/src/paimon/testing/utils/counting_cache_test_utils.h @@ -47,12 +47,14 @@ class CountingRoutingCache : public Cache { supplier) override { ++get_count_; last_kind_ = key->GetKind(); + ++get_count_by_kind_[key->GetKind()]; PAIMON_ASSIGN_OR_RAISE(std::shared_ptr cache, GetCache(key)); return cache->Get( key, [this, supplier = std::move(supplier)](const std::shared_ptr& supplier_key) -> Result> { ++supplier_call_count_; + ++supplier_call_count_by_kind_[supplier_key->GetKind()]; return supplier(supplier_key); }); } @@ -88,15 +90,31 @@ class CountingRoutingCache : public Cache { return get_count_; } + int64_t GetCount(CacheKind kind) const { + return GetCount(get_count_by_kind_, kind); + } + int64_t SupplierCallCount() const { return supplier_call_count_; } + int64_t SupplierCallCount(CacheKind kind) const { + return GetCount(supplier_call_count_by_kind_, kind); + } + CacheKind LastKind() const { return last_kind_; } private: + static int64_t GetCount(const std::map& counts, CacheKind kind) { + auto iter = counts.find(kind); + if (iter == counts.end()) { + return 0; + } + return iter->second; + } + Result> GetCache(const std::shared_ptr& key) const { auto iter = caches_.find(key->GetKind()); if (iter == caches_.end()) { @@ -106,6 +124,8 @@ class CountingRoutingCache : public Cache { } std::map> caches_; + std::map get_count_by_kind_; + std::map supplier_call_count_by_kind_; int64_t get_count_ = 0; int64_t supplier_call_count_ = 0; CacheKind last_kind_ = CacheKind::DEFAULT; diff --git a/test/inte/data_evolution_table_test.cpp b/test/inte/data_evolution_table_test.cpp index 83e0c232b..d4df6d500 100644 --- a/test/inte/data_evolution_table_test.cpp +++ b/test/inte/data_evolution_table_test.cpp @@ -13,9 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include + #include "arrow/type.h" #include "gtest/gtest.h" #include "paimon/common/factories/io_hook.h" +#include "paimon/common/io/cache/lru_cache.h" #include "paimon/common/table/special_fields.h" #include "paimon/common/types/data_field.h" #include "paimon/common/utils/date_time_utils.h" @@ -35,9 +38,11 @@ #include "paimon/testing/utils/test_helper.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { +using DataEvolutionTableParam = std::tuple; + // This is a sdk end-to-end test for data evolution class DataEvolutionTableTest : public ::testing::Test, - public ::testing::WithParamInterface { + public ::testing::WithParamInterface { void SetUp() override { dir_ = UniqueTestDirectory::Create("local"); int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); @@ -68,7 +73,7 @@ class DataEvolutionTableTest : public ::testing::Test, void CreateTable(const std::vector& partition_keys) const { std::map options = {{Options::MANIFEST_FORMAT, "orc"}, - {Options::FILE_FORMAT, GetParam()}, + {Options::FILE_FORMAT, FileFormat()}, {Options::FILE_SYSTEM, "local"}, {Options::ROW_TRACKING_ENABLED, "true"}, {Options::DATA_EVOLUTION_ENABLED, "true"}}; @@ -142,7 +147,7 @@ class DataEvolutionTableTest : public ::testing::Test, auto global_index_result = BitmapGlobalIndexResult::FromRanges(row_ranges); scan_context_builder.SetGlobalIndexResult(global_index_result); } - PAIMON_ASSIGN_OR_RAISE(auto scan_context, scan_context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(auto scan_context, FinishScanContext(scan_context_builder)); PAIMON_ASSIGN_OR_RAISE(auto table_scan, TableScan::Create(std::move(scan_context))); PAIMON_ASSIGN_OR_RAISE(auto result_plan, table_scan->CreatePlan()); if (!expected_array && check_scan_plan_when_empty_result) { @@ -208,7 +213,7 @@ class DataEvolutionTableTest : public ::testing::Test, auto global_index_result = BitmapGlobalIndexResult::FromRanges(row_ranges); scan_context_builder.SetGlobalIndexResult(global_index_result); } - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); const auto& result_splits = result_plan->Splits(); @@ -234,6 +239,26 @@ class DataEvolutionTableTest : public ::testing::Test, ASSERT_EQ(result_row_counts, expected_row_counts); } + Result> FinishScanContext(ScanContextBuilder& builder) const { + if (EnableSnapshotLiveManifestCache()) { + if (!snapshot_live_manifest_cache_) { + snapshot_live_manifest_cache_ = + std::make_shared(/*max_weight=*/64 * 1024 * 1024); + } + builder.AddOption(Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "3") + .WithCache(snapshot_live_manifest_cache_); + } + return builder.Finish(); + } + + std::string FileFormat() const { + return std::get<0>(GetParam()); + } + + bool EnableSnapshotLiveManifestCache() const { + return std::get<1>(GetParam()); + } + std::shared_ptr PrepareBulkData( int32_t write_batch_size, std::function data_generator, const arrow::FieldVector& fields) const { @@ -253,6 +278,7 @@ class DataEvolutionTableTest : public ::testing::Test, private: std::unique_ptr dir_; + mutable std::shared_ptr snapshot_live_manifest_cache_; arrow::FieldVector fields_ = { arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::utf8()), @@ -293,7 +319,7 @@ TEST_P(DataEvolutionTableTest, TestBasic) { .ValueOrDie()); ASSERT_OK(ScanAndRead(table_path, schema->field_names(), expected_array)); - if (GetParam() != "lance") { + if (FileFormat() != "lance") { // read with row tracking auto expected_row_tracking_array = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON( @@ -422,7 +448,7 @@ TEST_P(DataEvolutionTableTest, TestMultipleAppends) { /*predicate=*/nullptr, /*row_ranges=*/row_ranges)); } - if (GetParam() != "lance") { + if (FileFormat() != "lance") { // read with row tracking auto expected_row_tracking_array = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({ @@ -497,7 +523,7 @@ TEST_P(DataEvolutionTableTest, TestOnlySomeColumns) { .ValueOrDie()); ASSERT_OK(ScanAndRead(table_path, schema->field_names(), expected_array)); - if (GetParam() != "lance") { + if (FileFormat() != "lance") { // read with row tracking auto expected_row_tracking_array = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({ @@ -518,7 +544,7 @@ TEST_P(DataEvolutionTableTest, TestOnlySomeColumns) { } TEST_P(DataEvolutionTableTest, TestMultipleSharedShreddingMapsPartialOverwrite) { - if (GetParam() != "parquet" && GetParam() != "orc") { + if (FileFormat() != "parquet" && FileFormat() != "orc") { return; } @@ -530,7 +556,7 @@ TEST_P(DataEvolutionTableTest, TestMultipleSharedShreddingMapsPartialOverwrite) }; std::map options = { {Options::MANIFEST_FORMAT, "orc"}, - {Options::FILE_FORMAT, GetParam()}, + {Options::FILE_FORMAT, FileFormat()}, {Options::FILE_SYSTEM, "local"}, {Options::ROW_TRACKING_ENABLED, "true"}, {Options::DATA_EVOLUTION_ENABLED, "true"}, @@ -608,7 +634,7 @@ TEST_P(DataEvolutionTableTest, TestMultipleSharedShreddingMapsPartialOverwrite) ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); @@ -709,7 +735,7 @@ TEST_P(DataEvolutionTableTest, TestNullValues) { .ValueOrDie()); ASSERT_OK(ScanAndRead(table_path, schema->field_names(), expected_array)); - if (GetParam() != "lance") { + if (FileFormat() != "lance") { // read with row tracking auto expected_row_tracking_array = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({ @@ -792,7 +818,7 @@ TEST_P(DataEvolutionTableTest, TestMultipleAppendsDifferentFirstRowIds) { .ValueOrDie()); ASSERT_OK(ScanAndRead(table_path, schema->field_names(), expected_array)); - if (GetParam() != "lance") { + if (FileFormat() != "lance") { // read with row tracking auto expected_row_tracking_array = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({ @@ -862,7 +888,7 @@ TEST_P(DataEvolutionTableTest, TestMoreData) { TEST_P(DataEvolutionTableTest, TestOnlyRowTrackingEnabled) { std::map options = { {Options::MANIFEST_FORMAT, "orc"}, - {Options::FILE_FORMAT, GetParam()}, + {Options::FILE_FORMAT, FileFormat()}, {Options::FILE_SYSTEM, "local"}, {Options::ROW_TRACKING_ENABLED, "true"}, {Options::DATA_EVOLUTION_ENABLED, "false"}, @@ -883,7 +909,7 @@ TEST_P(DataEvolutionTableTest, TestOnlyRowTrackingEnabled) { ASSERT_OK_AND_ASSIGN(auto commit_msgs, WriteArray(table_path, write_cols0, src_array0)); ASSERT_OK(Commit(table_path, commit_msgs)); - if (GetParam() != "lance") { + if (FileFormat() != "lance") { // read with row tracking auto expected_row_tracking_array = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON( @@ -908,7 +934,7 @@ TEST_P(DataEvolutionTableTest, TestExternalPath) { std::map options = { {Options::MANIFEST_FORMAT, "orc"}, - {Options::FILE_FORMAT, GetParam()}, + {Options::FILE_FORMAT, FileFormat()}, {Options::FILE_SYSTEM, "local"}, {Options::ROW_TRACKING_ENABLED, "true"}, {Options::DATA_EVOLUTION_ENABLED, "true"}, @@ -951,7 +977,7 @@ TEST_P(DataEvolutionTableTest, TestExternalPath) { .ValueOrDie()); ASSERT_OK(ScanAndRead(table_path, schema->field_names(), expected_array)); - if (GetParam() != "lance") { + if (FileFormat() != "lance") { // read with row tracking auto expected_row_tracking_array = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON( @@ -1029,7 +1055,7 @@ TEST_P(DataEvolutionTableTest, TestWithPartitionSimple) { .ValueOrDie()); ASSERT_OK(ScanAndRead(table_path, schema->field_names(), expected_array)); - if (GetParam() != "lance") { + if (FileFormat() != "lance") { // test only read partition fields auto expected_array_only_partition = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields_[1]}), R"([ @@ -1140,7 +1166,7 @@ TEST_P(DataEvolutionTableTest, TestWithPartitionWithoutPartitionFieldsInFile) { /*row_ranges=*/row_ranges)); } - if (GetParam() != "lance") { + if (FileFormat() != "lance") { // read with row tracking auto expected_row_tracking_array = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON( @@ -1159,13 +1185,13 @@ TEST_P(DataEvolutionTableTest, TestWithPartitionWithoutPartitionFieldsInFile) { } TEST_P(DataEvolutionTableTest, TestPartitionWithPredicate) { - auto file_format = GetParam(); + auto file_format = FileFormat(); if (file_format == "lance" || file_format == "avro") { return; } std::vector partition_keys = {"f1"}; std::map options = { - {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, GetParam()}, + {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, FileFormat()}, {Options::FILE_SYSTEM, "local"}, {Options::ROW_TRACKING_ENABLED, "true"}, {Options::DATA_EVOLUTION_ENABLED, "true"}, {"parquet.write.max-row-group-length", "1"}}; @@ -1332,7 +1358,7 @@ TEST_P(DataEvolutionTableTest, TestPartitionWithPredicate) { } TEST_P(DataEvolutionTableTest, TestAlterTable) { - auto file_format = GetParam(); + auto file_format = FileFormat(); if (file_format == "lance" || file_format == "avro") { return; } @@ -1429,7 +1455,7 @@ TEST_P(DataEvolutionTableTest, TestAlterTable) { } TEST_P(DataEvolutionTableTest, TestReadCompactFiles) { - auto file_format = GetParam(); + auto file_format = FileFormat(); if (file_format == "lance" || file_format == "avro") { return; } @@ -1459,7 +1485,7 @@ TEST_P(DataEvolutionTableTest, TestReadCompactFiles) { } TEST_P(DataEvolutionTableTest, TestReadTableWithDenseStats) { - auto file_format = GetParam(); + auto file_format = FileFormat(); if (file_format == "lance" || file_format == "avro") { return; } @@ -1540,7 +1566,7 @@ TEST_P(DataEvolutionTableTest, TestReadTableWithDenseStats) { } TEST_P(DataEvolutionTableTest, TestScanAndReadWithIndex) { - auto file_format = GetParam(); + auto file_format = FileFormat(); if (file_format == "lance" || file_format == "avro") { return; } @@ -1681,7 +1707,7 @@ TEST_P(DataEvolutionTableTest, TestScanAndReadWithIndex) { } TEST_P(DataEvolutionTableTest, TestPredicate) { - if (GetParam() == "lance" || GetParam() == "avro") { + if (FileFormat() == "lance" || FileFormat() == "avro") { // lance and avro do not have stats return; } @@ -1750,7 +1776,7 @@ TEST_P(DataEvolutionTableTest, TestPredicate) { } TEST_P(DataEvolutionTableTest, TestIOException) { - if (GetParam() == "lance") { + if (FileFormat() == "lance") { return; } std::string table_path; @@ -1825,7 +1851,7 @@ TEST_P(DataEvolutionTableTest, TestIOException) { TEST_P(DataEvolutionTableTest, TestWithRowIds) { std::map options = {{Options::MANIFEST_FORMAT, "orc"}, - {Options::FILE_FORMAT, GetParam()}, + {Options::FILE_FORMAT, FileFormat()}, {Options::FILE_SYSTEM, "local"}, {Options::ROW_TRACKING_ENABLED, "true"}, {Options::DATA_EVOLUTION_ENABLED, "true"}}; @@ -1987,7 +2013,7 @@ TEST_P(DataEvolutionTableTest, TestWithRowIds) { /*predicate=*/nullptr, /*row_ranges=*/row_ranges)); } - if (GetParam() == "lance" || GetParam() == "avro") { + if (FileFormat() == "lance" || FileFormat() == "avro") { // as lance and avro do not support stats return; } @@ -2047,18 +2073,20 @@ TEST_P(DataEvolutionTableTest, TestWithRowIds) { } } -std::vector GetTestValuesForDataEvolutionTableTest() { - std::vector values; - values.emplace_back("parquet"); +std::vector GetTestValuesForDataEvolutionTableTest() { + std::vector values; + for (bool enable_snapshot_live_manifest_cache : {false, true}) { + values.emplace_back("parquet", enable_snapshot_live_manifest_cache); #ifdef PAIMON_ENABLE_ORC - values.emplace_back("orc"); + values.emplace_back("orc", enable_snapshot_live_manifest_cache); #endif #ifdef PAIMON_ENABLE_LANCE - values.emplace_back("lance"); + values.emplace_back("lance", enable_snapshot_live_manifest_cache); #endif #ifdef PAIMON_ENABLE_AVRO - values.emplace_back("avro"); + values.emplace_back("avro", enable_snapshot_live_manifest_cache); #endif + } return values; } diff --git a/test/inte/scan_and_read_inte_test.cpp b/test/inte/scan_and_read_inte_test.cpp index ad3d8e7fb..5f891089e 100644 --- a/test/inte/scan_and_read_inte_test.cpp +++ b/test/inte/scan_and_read_inte_test.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -28,6 +29,7 @@ #include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" #include "paimon/common/factories/io_hook.h" +#include "paimon/common/io/cache/lru_cache.h" #include "paimon/common/table/special_fields.h" #include "paimon/common/types/data_field.h" #include "paimon/common/utils/date_time_utils.h" @@ -65,8 +67,10 @@ class DataSplit; } // namespace paimon namespace paimon::test { +using ScanAndReadParam = std::tuple; + class ScanAndReadInteTest : public testing::Test, - public ::testing::WithParamInterface> { + public ::testing::WithParamInterface { public: void CheckStreamScanResult( const std::unique_ptr& table_scan, const std::unique_ptr& table_read, @@ -129,13 +133,36 @@ class ScanAndReadInteTest : public testing::Test, } void AddReadOptionsForPrefetch(ReadContextBuilder* read_context_builder) { - auto [file_format, enable_prefetch] = GetParam(); read_context_builder->AddOption("test.enable-adaptive-prefetch-strategy", "false"); - if (enable_prefetch) { + if (EnablePrefetch()) { read_context_builder->EnablePrefetch(true).SetPrefetchBatchCount(3); } } + Result> FinishScanContext(ScanContextBuilder& builder) { + if (EnableSnapshotLiveManifestCache()) { + if (!snapshot_live_manifest_cache_) { + snapshot_live_manifest_cache_ = + std::make_shared(/*max_weight=*/64 * 1024 * 1024); + } + builder.AddOption(Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "3") + .WithCache(snapshot_live_manifest_cache_); + } + return builder.Finish(); + } + + std::string FileFormat() const { + return std::get<0>(GetParam()); + } + + bool EnablePrefetch() const { + return std::get<1>(GetParam()); + } + + bool EnableSnapshotLiveManifestCache() const { + return std::get<2>(GetParam()); + } + void AdjustSplitWithExternalPath(const std::string& src_path, const std::string& target_path, bool adjust_index, std::vector>* splits_ptr) { @@ -165,6 +192,8 @@ class ScanAndReadInteTest : public testing::Test, } private: + std::shared_ptr snapshot_live_manifest_cache_; + std::shared_ptr arrow_data_type_ = std::dynamic_pointer_cast(DataField::ConvertDataFieldsToArrowStructType( {SpecialFields::ValueKind(), DataField(0, arrow::field("f0", arrow::utf8())), @@ -173,21 +202,24 @@ class ScanAndReadInteTest : public testing::Test, DataField(3, arrow::field("f3", arrow::float64()))})); }; -std::vector> GetTestValuesForScanAndReadInteTest() { - std::vector> values = {{"parquet", false}, {"parquet", true}}; +std::vector GetTestValuesForScanAndReadInteTest() { + std::vector values; + for (bool enable_snapshot_live_manifest_cache : {false, true}) { + values.emplace_back("parquet", false, enable_snapshot_live_manifest_cache); + values.emplace_back("parquet", true, enable_snapshot_live_manifest_cache); #ifdef PAIMON_ENABLE_ORC - values.emplace_back("orc", false); - values.emplace_back("orc", true); + values.emplace_back("orc", false, enable_snapshot_live_manifest_cache); + values.emplace_back("orc", true, enable_snapshot_live_manifest_cache); #endif + } return values; } INSTANTIATE_TEST_SUITE_P(FileFormatAndEnablePaimonPrefetch, ScanAndReadInteTest, - ::testing::ValuesIn(std::vector>( - GetTestValuesForScanAndReadInteTest()))); + ::testing::ValuesIn(GetTestValuesForScanAndReadInteTest())); TEST_P(ScanAndReadInteTest, TestWithAppendSnapshotIOException) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = GetDataDir() + "/" + file_format + "/append_09.db/append_09"; bool run_complete = false; @@ -198,7 +230,7 @@ TEST_P(ScanAndReadInteTest, TestWithAppendSnapshotIOException) { // scan ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1"); - Result> scan_context = scan_context_builder.Finish(); + Result> scan_context = FinishScanContext(scan_context_builder); CHECK_HOOK_STATUS(scan_context.status(), i); Result> table_scan = TableScan::Create(std::move(scan_context).value()); @@ -242,7 +274,7 @@ TEST_P(ScanAndReadInteTest, TestWithAppendSnapshotIOException) { } TEST_P(ScanAndReadInteTest, TestWithPkSnapshotIOException) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = GetDataDir() + "/" + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; @@ -254,7 +286,7 @@ TEST_P(ScanAndReadInteTest, TestWithPkSnapshotIOException) { // scan ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "6"); - Result> scan_context = scan_context_builder.Finish(); + Result> scan_context = FinishScanContext(scan_context_builder); CHECK_HOOK_STATUS(scan_context.status(), i); Result> table_scan = TableScan::Create(std::move(scan_context).value()); @@ -300,13 +332,13 @@ TEST_P(ScanAndReadInteTest, TestWithPkSnapshotIOException) { } TEST_P(ScanAndReadInteTest, TestWithAppendSnapshot1) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = GetDataDir() + "/" + file_format + "/append_09.db/append_09"; // scan ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1"); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 1); @@ -341,13 +373,13 @@ TEST_P(ScanAndReadInteTest, TestWithAppendSnapshot1) { } TEST_P(ScanAndReadInteTest, TestWithAppendSnapshot3) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = GetDataDir() + "/" + file_format + "/append_09.db/append_09"; // scan ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3"); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 3); @@ -383,13 +415,13 @@ TEST_P(ScanAndReadInteTest, TestWithAppendSnapshot3) { } TEST_P(ScanAndReadInteTest, TestWithAppendSnapshot5) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = GetDataDir() + "/" + file_format + "/append_09.db/append_09"; // scan ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "5"); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 5); @@ -430,13 +462,13 @@ TEST_P(ScanAndReadInteTest, TestWithAppendSnapshot5) { } TEST_P(ScanAndReadInteTest, TestWithAppendSnapshotWithStreamWithDefaultMode) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = GetDataDir() + "/" + file_format + "/append_09.db/append_09"; // scan ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1").WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -479,13 +511,13 @@ TEST_P(ScanAndReadInteTest, TestWithAppendSnapshotWithStreamWithDefaultMode) { } TEST_P(ScanAndReadInteTest, TestJavaPaimon1WithAppendSnapshot1) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = GetDataDir() + "/" + file_format + "/append_10.db/append_10"; // scan ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1"); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 1); @@ -516,14 +548,14 @@ TEST_P(ScanAndReadInteTest, TestJavaPaimon1WithAppendSnapshot1) { } TEST_P(ScanAndReadInteTest, TestJavaPaimon1WithAppendSnapshotOfNestedType) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = GetDataDir() + "/" + file_format + "/append_complex_build_in_fieldid.db/" "append_complex_build_in_fieldid/"; // scan ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1"); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 1); @@ -569,30 +601,29 @@ TEST_P(ScanAndReadInteTest, TestJavaPaimon1WithAppendSnapshotOfNestedType) { } // test pk with dv -TEST_F(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6) { - auto check_result = [&](const std::string& file_format) { - std::string table_path = GetDataDir() + "/" + file_format + - "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; +TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6) { + auto file_format = FileFormat(); + std::string table_path = GetDataDir() + "/" + file_format + + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; - // normal batch scan case for pk+dv, all data in level 0 is filtered out - ScanContextBuilder scan_context_builder(table_path); - scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "6"); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + // normal batch scan case for pk+dv, all data in level 0 is filtered out + ScanContextBuilder scan_context_builder(table_path); + scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "6"); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); - ReadContextBuilder read_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + ReadContextBuilder read_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); - ASSERT_EQ(result_plan->SnapshotId().value(), 6); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); - ASSERT_OK_AND_ASSIGN(auto read_result, - ReadResultCollector::CollectResult(batch_reader.get())); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + ASSERT_EQ(result_plan->SnapshotId().value(), 6); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); + ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - // check result - auto expected = std::make_shared( - arrow::ipc::internal::json::ArrayFromJSON(arrow_data_type_, R"([ + // check result + auto expected = std::make_shared( + arrow::ipc::internal::json::ArrayFromJSON(arrow_data_type_, R"([ [0, "Two roads diverged in a wood, and I took the one less traveled by, And that has made all the difference.", 10, 1, 11.0], [0, "Alice", 10, 1, 19.1], [0, "Alex", 10, 0, 16.1], @@ -602,29 +633,23 @@ TEST_F(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6) { [0, "Lucy", 20, 1, 14.1], [0, "Paul", 20, 1, 18.1] ])") - .ValueOrDie()); - ASSERT_TRUE(expected); - ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); + .ValueOrDie()); + ASSERT_TRUE(expected); + ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); - // CountRows should match the number of visible rows returned by CreateReader. - ASSERT_OK_AND_ASSIGN(auto count_reader, - table_read->CreateCountReader(result_plan->Splits())); - ASSERT_OK_AND_ASSIGN(int64_t count, count_reader->CountRows()); - ASSERT_EQ(count, read_result->length()); - }; - for (auto [file_format, enable_prefetch] : GetTestValuesForScanAndReadInteTest()) { - check_result(file_format); - } - check_result("avro"); + // CountRows should match the number of visible rows returned by CreateReader. + ASSERT_OK_AND_ASSIGN(auto count_reader, table_read->CreateCountReader(result_plan->Splits())); + ASSERT_OK_AND_ASSIGN(int64_t count, count_reader->CountRows()); + ASSERT_EQ(count, read_result->length()); } TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot1) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = GetDataDir() + "/" + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1"); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -640,7 +665,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot1) { } TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6WithPartitionAndBucketFilter) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); // all data in level 0 & not in partition 10, bucket 1 is filtered out std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; @@ -648,7 +673,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6WithPartitionAndBu ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "6"); scan_context_builder.SetBucketFilter(1).SetPartitionFilter({{{"f1", "10"}}}); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -675,7 +700,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6WithPartitionAndBu } TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6WithPredicate) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; // predicate: f0 != "Alice" (key predicate) and f3 > 18 (value predicate) and all data in level @@ -691,7 +716,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6WithPredicate) { FieldType::DOUBLE, Literal(18.0)); ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({not_equal, greater_than})); scan_context_builder.SetPredicate(predicate); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -717,7 +742,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6WithPredicate) { } TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot4WithPredicate) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; @@ -728,7 +753,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot4WithPredicate) { auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f3", FieldType::DOUBLE, Literal(20.0)); scan_context_builder.SetPredicate(predicate); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); @@ -737,7 +762,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot4WithPredicate) { } TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6WithLimit) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; @@ -745,7 +770,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6WithLimit) { // data in partition 20 is truncated ScanContextBuilder scan_context_builder(table_path); scan_context_builder.SetLimit(6); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -774,7 +799,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6WithLimit) { } TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot4) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; @@ -782,7 +807,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot4) { scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "4") .AddOption(Options::SCAN_MODE, "from-snapshot-full") .WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -826,7 +851,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot4) { } TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot5) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; @@ -834,7 +859,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot5) { scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "5") .AddOption(Options::SCAN_MODE, "from-snapshot-full") .WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -872,13 +897,13 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot5) { } TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot6) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_MODE, "latest-full").WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -909,7 +934,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot6) { } TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot1) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; @@ -917,7 +942,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot1) { scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1") .AddOption(Options::SCAN_MODE, "from-snapshot-full") .WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -968,7 +993,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot1) { } TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot2) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; @@ -976,7 +1001,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot2) { scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "2") .AddOption(Options::SCAN_MODE, "from-snapshot") .WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1013,12 +1038,12 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvStreamFromSnapshot2) { } TEST_P(ScanAndReadInteTest, TestWithPKWithNestedType) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_nested_type.db/pk_table_nested_type/"; ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1065,13 +1090,13 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithNestedType) { // test pk with mor TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanLatestSnapshot) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; // normal batch scan case for pk+mor, use latest snapshot if not specified ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1110,7 +1135,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanLatestSnapshot) { } TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot2) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1118,7 +1143,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot2) { // with merge read ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "2"); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1154,7 +1179,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot2) { } TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot5WithPartitionAndBucketFilter) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1162,7 +1187,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot5WithPartitionAndB ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "5"); scan_context_builder.SetBucketFilter(1).SetPartitionFilter({{{"f1", "10"}}}); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1192,7 +1217,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot5WithPartitionAndB } TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot5WithPredicate) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1218,7 +1243,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot5WithPredicate) { PredicateBuilder::And({not_equal, less_than, less_or_equal})); scan_context_builder.SetPredicate(predicate); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1252,7 +1277,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot5WithPredicate) { } TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot3WithPredicate) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1264,7 +1289,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot3WithPredicate) { auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f3", FieldType::DOUBLE, Literal(20.0)); scan_context_builder.SetPredicate(predicate); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); @@ -1273,7 +1298,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot3WithPredicate) { } TEST_P(ScanAndReadInteTest, TestWithPKWithDvWithInvalidAggregateBatchScanSnapshot3) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; @@ -1282,7 +1307,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvWithInvalidAggregateBatchScanSnapsho .AddOption(Options::MERGE_ENGINE, "aggregation") .AddOption("fields.f3.aggregate-function", "rbm32"); scan_context_builder.SetBucketFilter(1).SetPartitionFilter({{{"f1", "10"}}}); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1312,7 +1337,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithDvWithInvalidAggregateBatchScanSnapsho } TEST_P(ScanAndReadInteTest, TestWithPKWithMorWithInvalidAggregateBatchScanSnapshot3) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1321,7 +1346,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorWithInvalidAggregateBatchScanSnapsh .AddOption(Options::MERGE_ENGINE, "aggregation") .AddOption("fields.f3.aggregate-function", "rbm32"); scan_context_builder.SetBucketFilter(1).SetPartitionFilter({{{"f1", "10"}}}); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1339,7 +1364,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorWithInvalidAggregateBatchScanSnapsh } TEST_P(ScanAndReadInteTest, TestWithPKWithAggregateBatchScanSnapshot3WithPredicate) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1352,7 +1377,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithAggregateBatchScanSnapshot3WithPredica auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f3", FieldType::DOUBLE, Literal(20.0)); scan_context_builder.SetPredicate(predicate); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1382,7 +1407,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithAggregateBatchScanSnapshot3WithPredica } TEST_P(ScanAndReadInteTest, TestWithPKWithPartialUpdateBatchScanSnapshot3WithPredicate) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1395,7 +1420,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithPartialUpdateBatchScanSnapshot3WithPre auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f3", FieldType::DOUBLE, Literal(20.0)); scan_context_builder.SetPredicate(predicate); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1425,7 +1450,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithPartialUpdateBatchScanSnapshot3WithPre } TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot5WithLimit) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1433,7 +1458,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot5WithLimit) { // merging ScanContextBuilder scan_context_builder(table_path); scan_context_builder.SetLimit(6); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1466,7 +1491,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot5WithLimit) { } TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot4) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1474,7 +1499,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot4) { scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "4") .AddOption(Options::SCAN_MODE, "from-snapshot-full") .WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1511,7 +1536,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot4) { } TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot1) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1519,7 +1544,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot1) { scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1") .AddOption(Options::SCAN_MODE, "from-snapshot-full") .WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1570,7 +1595,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot1) { } TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot2) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1580,7 +1605,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot2) { scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "2") .AddOption(Options::SCAN_MODE, "from-snapshot") .WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1617,7 +1642,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot2) { } TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot5WithPredicate) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -1628,7 +1653,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot5WithPredicate) { auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f3", FieldType::DOUBLE, Literal(50.0)); scan_context_builder.SetPredicate(predicate); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1655,7 +1680,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorStreamFromSnapshot5WithPredicate) { // test first row merge engine TEST_P(ScanAndReadInteTest, TestWithPKWithFirstRowBatchScanSnapshot5) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_first_row.db/pk_table_scan_and_read_first_row/"; @@ -1663,7 +1688,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithFirstRowBatchScanSnapshot5) { // normal batch scan case for pk+first row, all data in level 0 is filtered out ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "5"); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1695,7 +1720,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithFirstRowBatchScanSnapshot5) { } TEST_P(ScanAndReadInteTest, TestWithPKWithFirstRowStreamFromSnapshot3) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_first_row.db/pk_table_scan_and_read_first_row/"; @@ -1704,7 +1729,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithFirstRowStreamFromSnapshot3) { scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3") .AddOption(Options::SCAN_MODE, "from-snapshot-full") .WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1748,14 +1773,14 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithFirstRowStreamFromSnapshot3) { } TEST_P(ScanAndReadInteTest, TestWithPKWithFirstRowStreamFromSnapshot5) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_first_row.db/pk_table_scan_and_read_first_row/"; ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_MODE, "latest-full").WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1788,13 +1813,13 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithFirstRowStreamFromSnapshot5) { } TEST_P(ScanAndReadInteTest, TestWithPKWith09VersionDvBatchScanLatestSnapshot) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_09.db/pk_09/"; // normal batch scan case for pk+dv (09 version) ScanContextBuilder scan_context_builder(table_path); scan_context_builder.SetLimit(2); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1826,7 +1851,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKWith09VersionDvBatchScanLatestSnapshot) { } TEST_P(ScanAndReadInteTest, TestWithEmptyPartitionValue) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); auto check_result = [&](const std::string& table_path, @@ -1834,7 +1859,7 @@ TEST_P(ScanAndReadInteTest, TestWithEmptyPartitionValue) { const std::shared_ptr& expected) { ScanContextBuilder scan_context_builder(table_path); scan_context_builder.SetPartitionFilter(partition_filters); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1893,7 +1918,7 @@ TEST_P(ScanAndReadInteTest, TestWithEmptyPartitionValue) { } TEST_P(ScanAndReadInteTest, TestWithMultipleEmptyPartitionValue) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/append_with_empty_partition_with_empty_value.db/" "append_with_empty_partition_with_empty_value/"; @@ -1903,7 +1928,7 @@ TEST_P(ScanAndReadInteTest, TestWithMultipleEmptyPartitionValue) { const std::shared_ptr& expected) { ScanContextBuilder scan_context_builder(table_path); scan_context_builder.SetPartitionFilter(partition_filters); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -1946,13 +1971,13 @@ TEST_P(ScanAndReadInteTest, TestWithMultipleEmptyPartitionValue) { } TEST_P(ScanAndReadInteTest, TestMemoryUse) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/append_09.db/append_09/"; // scan ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1"); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 1); @@ -1991,7 +2016,7 @@ TEST_P(ScanAndReadInteTest, TestMemoryUse) { } TEST_P(ScanAndReadInteTest, TestPkScanWithPostponeBucket) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); auto test_dir = UniqueTestDirectory::Create("local"); arrow::FieldVector fields = {arrow::field("f0", arrow::utf8()), @@ -2054,7 +2079,7 @@ TEST_P(ScanAndReadInteTest, TestPkScanWithPostponeBucket) { // batch scan ScanContextBuilder scan_context_builder(table_path); scan_context_builder.WithStreamingMode(false); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 2); @@ -2064,7 +2089,7 @@ TEST_P(ScanAndReadInteTest, TestPkScanWithPostponeBucket) { // stream scan: from snapshot 1 ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1").WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -2098,7 +2123,7 @@ TEST_P(ScanAndReadInteTest, TestPkScanWithPostponeBucket) { // stream scan: from snapshot 2 ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "2").WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -2126,7 +2151,7 @@ TEST_P(ScanAndReadInteTest, TestPkScanWithPostponeBucket) { scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1") .AddOption(Options::SCAN_MODE, "from-snapshot-full") .WithStreamingMode(true); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -2158,7 +2183,7 @@ TEST_P(ScanAndReadInteTest, TestPkScanWithPostponeBucket) { } TEST_P(ScanAndReadInteTest, TestScanWithPredicateAndReadWithUnorderedFieldForParquet) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); if (file_format != "parquet") { return; } @@ -2169,7 +2194,7 @@ TEST_P(ScanAndReadInteTest, TestScanWithPredicateAndReadWithUnorderedFieldForPar auto predicate = PredicateBuilder::LessThan( /*field_index=*/3, /*field_name=*/"f4", FieldType::INT, Literal(300006)); scan_context_builder.SetPredicate(predicate); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 2); @@ -2208,7 +2233,7 @@ TEST_P(ScanAndReadInteTest, TestScanWithPredicateAndReadWithUnorderedFieldForPar } TEST_P(ScanAndReadInteTest, TestPkSchemaEvolutionScanWithRenamedPkPredicate) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_with_alter_table.db/pk_table_with_alter_table/"; @@ -2217,7 +2242,7 @@ TEST_P(ScanAndReadInteTest, TestPkSchemaEvolutionScanWithRenamedPkPredicate) { ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "6"); scan_context_builder.SetPredicate(predicate); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 6); @@ -2319,7 +2344,7 @@ TEST_F(ScanAndReadInteTest, TestScanWithPredicateAndReadWithUnorderedFieldForLan #endif TEST_P(ScanAndReadInteTest, TestAppendTableWithMultipleFileFormat) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); if (file_format != "parquet") { return; } @@ -2329,7 +2354,7 @@ TEST_P(ScanAndReadInteTest, TestAppendTableWithMultipleFileFormat) { // scan ScanContextBuilder scan_context_builder(table_path); scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "2"); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 2); @@ -2361,12 +2386,12 @@ TEST_P(ScanAndReadInteTest, TestAppendTableWithMultipleFileFormat) { } TEST_P(ScanAndReadInteTest, TestPkDvTableIndexInDataAndNoExternalPath) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_dv_index_in_data_no_external.db/pk_dv_index_in_data_no_external"; // scan ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 4); @@ -2399,13 +2424,13 @@ TEST_P(ScanAndReadInteTest, TestPkDvTableIndexInDataAndNoExternalPath) { } TEST_P(ScanAndReadInteTest, TestPkDvTableIndexNotInDataAndNoExternalPath) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_dv_index_not_in_data_no_external.db/pk_dv_index_not_in_data_no_external"; // scan ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 4); @@ -2438,7 +2463,7 @@ TEST_P(ScanAndReadInteTest, TestPkDvTableIndexNotInDataAndNoExternalPath) { } TEST_P(ScanAndReadInteTest, TestPkDvTableIndexNotInDataAndWithExternalPath) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_dv_index_not_in_data_with_external.db/pk_dv_index_not_in_data_with_external"; @@ -2446,7 +2471,7 @@ TEST_P(ScanAndReadInteTest, TestPkDvTableIndexNotInDataAndWithExternalPath) { "/pk_dv_index_not_in_data_with_external.db/external"; // scan ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 4); @@ -2481,7 +2506,7 @@ TEST_P(ScanAndReadInteTest, TestPkDvTableIndexNotInDataAndWithExternalPath) { } TEST_P(ScanAndReadInteTest, TestScanAndReadWithDisableIndex) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/append_with_bitmap.db/append_with_bitmap"; auto predicate = @@ -2526,7 +2551,7 @@ TEST_P(ScanAndReadInteTest, TestScanAndReadWithDisableIndex) { } TEST_P(ScanAndReadInteTest, TestPkDvTableIndexInDataAndWithExternalPath) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_dv_index_in_data_with_external.db/pk_dv_index_in_data_with_external"; @@ -2534,7 +2559,7 @@ TEST_P(ScanAndReadInteTest, TestPkDvTableIndexInDataAndWithExternalPath) { paimon::test::GetDataDir() + file_format + "/pk_dv_index_in_data_with_external.db/external"; // scan ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 4); @@ -2569,12 +2594,12 @@ TEST_P(ScanAndReadInteTest, TestPkDvTableIndexInDataAndWithExternalPath) { } TEST_P(ScanAndReadInteTest, TestTimestampType) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/append_with_multiple_ts_precision_and_timezone.db" "/append_with_multiple_ts_precision_and_timezone/"; ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 1); @@ -2615,13 +2640,13 @@ TEST_P(ScanAndReadInteTest, TestTimestampType) { TEST_P(ScanAndReadInteTest, TestCastTimestampType) { TimezoneGuard tz_guard("Asia/Shanghai"); - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/append_with_cast_timestamp.db" "/append_with_cast_timestamp/"; // scan ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 1); @@ -2800,7 +2825,7 @@ TEST_F(ScanAndReadInteTest, TestAvroWithPkTable) { } TEST_P(ScanAndReadInteTest, TestWithPKBucketSelectByPredicate) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); // Verify BucketSelectConverter: an EQUAL predicate on bucket key f2 should automatically // derive the target bucket, without explicitly calling SetBucketFilter. // From the existing test f2=0 maps to bucket 1, f2=1 maps to bucket 0. @@ -2815,7 +2840,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKBucketSelectByPredicate) { scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "6"); scan_context_builder.SetPartitionFilter({{{"f1", "10"}}}); scan_context_builder.SetPredicate(predicate); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ReadContextBuilder read_context_builder(table_path); @@ -2853,7 +2878,7 @@ TEST_P(ScanAndReadInteTest, TestWithPKBucketSelectByPredicate) { } TEST_P(ScanAndReadInteTest, TestCountRowsEmptySplits) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; @@ -2870,13 +2895,13 @@ TEST_P(ScanAndReadInteTest, TestCountRowsEmptySplits) { } TEST_P(ScanAndReadInteTest, TestCountRowsConsistencyWithCreateReader) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; // Scan latest snapshot ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); @@ -2903,13 +2928,13 @@ TEST_P(ScanAndReadInteTest, TestCountRowsConsistencyWithCreateReader) { } TEST_P(ScanAndReadInteTest, TestCreateCountReaderWithPredicateNotSupported) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; // Create splits from latest snapshot. ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); @@ -2927,13 +2952,13 @@ TEST_P(ScanAndReadInteTest, TestCreateCountReaderWithPredicateNotSupported) { } TEST_P(ScanAndReadInteTest, TestCreateCountReaderWithForceKeepDeleteNotSupported) { - auto [file_format, enable_prefetch] = GetParam(); + auto file_format = FileFormat(); std::string table_path = paimon::test::GetDataDir() + file_format + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; // Create splits from latest snapshot. ScanContextBuilder scan_context_builder(table_path); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(scan_context_builder)); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); diff --git a/test/inte/scan_inte_test.cpp b/test/inte/scan_inte_test.cpp index 75c4c0e36..4c7f6c015 100644 --- a/test/inte/scan_inte_test.cpp +++ b/test/inte/scan_inte_test.cpp @@ -63,10 +63,12 @@ class ScanInteTest : public testing::TestWithParam { Result> FinishScanContext(ScanContextBuilder& builder) { if (GetParam() == ManifestCacheMode::Cache) { if (!cache_) { - cache_ = - std::make_shared(CacheKind::MANIFEST, 64 * 1024 * 1024); + cache_ = std::make_shared(std::map{ + {CacheKind::MANIFEST, 64 * 1024 * 1024}, + {CacheKind::SNAPSHOT_LIVE_MANIFEST, 64 * 1024 * 1024}}); } - builder.WithCache(cache_); + builder.AddOption(Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "3") + .WithCache(cache_); } return builder.Finish(); } @@ -284,6 +286,110 @@ TEST(ScanInteManifestCacheTest, TestRepeatedScanReusesManifestCache) { ASSERT_EQ(supplier_calls_after_first_scan, manifest_cache->SupplierCallCount()); } +Result>> RunBucketSnapshotScan( + const std::string& table_path, int64_t snapshot_id, int32_t bucket, + const std::shared_ptr& cache, int32_t max_snapshot_live_manifest_versions) { + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, std::to_string(snapshot_id)) + .SetBucketFilter(bucket); + if (max_snapshot_live_manifest_versions > 0) { + context_builder.AddOption(Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, + std::to_string(max_snapshot_live_manifest_versions)); + } + if (cache) { + context_builder.WithCache(cache); + } + + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr scan_context, context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(auto table_scan, TableScan::Create(std::move(scan_context))); + PAIMON_ASSIGN_OR_RAISE(auto plan, table_scan->CreatePlan()); + + std::vector> data_splits; + for (const auto& split : plan->Splits()) { + auto data_split = std::dynamic_pointer_cast(split); + if (!data_split) { + return Status::Invalid("expected DataSplitImpl from bucket snapshot scan"); + } + data_splits.push_back(data_split); + } + return data_splits; +} + +void AssertDataSplitsEqual(const std::vector>& expected, + const std::vector>& actual) { + ASSERT_EQ(actual.size(), expected.size()); + for (size_t i = 0; i < actual.size(); ++i) { + ASSERT_EQ(*actual[i], *expected[i]) << actual[i]->ToString() << std::endl + << expected[i]->ToString(); + } +} + +std::shared_ptr CreateSnapshotLiveManifestTestCache() { + return std::make_shared( + std::map{{CacheKind::MANIFEST, 64 * 1024 * 1024}, + {CacheKind::SNAPSHOT_LIVE_MANIFEST, 64 * 1024 * 1024}}); +} + +TEST(ScanInteManifestCacheTest, TestRepeatedBucketSnapshotScanReusesSnapshotLiveManifestCache) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + auto cache = CreateSnapshotLiveManifestTestCache(); + + ASSERT_OK_AND_ASSIGN(auto expected, + RunBucketSnapshotScan(table_path, /*snapshot_id=*/5, + /*bucket=*/1, nullptr, + /*max_snapshot_live_manifest_versions=*/0)); + ASSERT_OK_AND_ASSIGN(auto first, + RunBucketSnapshotScan(table_path, /*snapshot_id=*/5, + /*bucket=*/1, cache, + /*max_snapshot_live_manifest_versions=*/3)); + AssertDataSplitsEqual(expected, first); + ASSERT_GT(cache->SupplierCallCount(CacheKind::SNAPSHOT_LIVE_MANIFEST), 0); + int64_t get_count_after_first_scan = cache->GetCount(CacheKind::SNAPSHOT_LIVE_MANIFEST); + int64_t supplier_calls_after_first_scan = + cache->SupplierCallCount(CacheKind::SNAPSHOT_LIVE_MANIFEST); + + ASSERT_OK_AND_ASSIGN(auto second, + RunBucketSnapshotScan(table_path, /*snapshot_id=*/5, + /*bucket=*/1, cache, + /*max_snapshot_live_manifest_versions=*/3)); + AssertDataSplitsEqual(expected, second); + ASSERT_EQ(get_count_after_first_scan + 1, cache->GetCount(CacheKind::SNAPSHOT_LIVE_MANIFEST)); + ASSERT_EQ(supplier_calls_after_first_scan, + cache->SupplierCallCount(CacheKind::SNAPSHOT_LIVE_MANIFEST)); +} + +TEST(ScanInteManifestCacheTest, TestSnapshotLiveManifestCacheRetainsSnapshotsPerBucketValue) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + auto cache = CreateSnapshotLiveManifestTestCache(); + + ASSERT_OK_AND_ASSIGN(auto expected_snapshot3, + RunBucketSnapshotScan(table_path, /*snapshot_id=*/3, /*bucket=*/0, nullptr, + /*max_snapshot_live_manifest_versions=*/0)); + ASSERT_OK_AND_ASSIGN(auto cached_snapshot3, + RunBucketSnapshotScan(table_path, /*snapshot_id=*/3, /*bucket=*/0, cache, + /*max_snapshot_live_manifest_versions=*/3)); + AssertDataSplitsEqual(expected_snapshot3, cached_snapshot3); + + ASSERT_OK_AND_ASSIGN(auto expected_snapshot5, + RunBucketSnapshotScan(table_path, /*snapshot_id=*/5, /*bucket=*/0, nullptr, + /*max_snapshot_live_manifest_versions=*/0)); + ASSERT_OK_AND_ASSIGN(auto cached_snapshot5, + RunBucketSnapshotScan(table_path, /*snapshot_id=*/5, /*bucket=*/0, cache, + /*max_snapshot_live_manifest_versions=*/3)); + AssertDataSplitsEqual(expected_snapshot5, cached_snapshot5); + int64_t get_count_after_snapshot5 = cache->GetCount(CacheKind::SNAPSHOT_LIVE_MANIFEST); + int64_t supplier_calls_after_snapshot5 = + cache->SupplierCallCount(CacheKind::SNAPSHOT_LIVE_MANIFEST); + + ASSERT_OK_AND_ASSIGN(auto cached_snapshot3_again, + RunBucketSnapshotScan(table_path, /*snapshot_id=*/3, /*bucket=*/0, cache, + /*max_snapshot_live_manifest_versions=*/3)); + AssertDataSplitsEqual(expected_snapshot3, cached_snapshot3_again); + ASSERT_EQ(get_count_after_snapshot5 + 1, cache->GetCount(CacheKind::SNAPSHOT_LIVE_MANIFEST)); + ASSERT_EQ(supplier_calls_after_snapshot5, + cache->SupplierCallCount(CacheKind::SNAPSHOT_LIVE_MANIFEST)); +} + TEST_P(ScanInteTest, TestScanAppendWithSnapshot1) { std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; ScanContextBuilder context_builder(table_path);