diff --git a/include/paimon/global_index/bitmap_topk_global_index_result.h b/include/paimon/global_index/bitmap_vector_search_global_index_result.h similarity index 77% rename from include/paimon/global_index/bitmap_topk_global_index_result.h rename to include/paimon/global_index/bitmap_vector_search_global_index_result.h index cecb1fed..48e242a3 100644 --- a/include/paimon/global_index/bitmap_topk_global_index_result.h +++ b/include/paimon/global_index/bitmap_vector_search_global_index_result.h @@ -26,24 +26,25 @@ #include "paimon/visibility.h" namespace paimon { -/// Represents a Top-K global index result that combines a Roaring bitmap of candidate row ids -/// with an array of associated relevance scores. +/// Represents a vector search global index result that combines a Roaring bitmap of candidate row +/// ids with an array of associated relevance scores. /// -/// **Important Ordering Note**: Despite inheriting from TopKGlobalIndexResult, the results are +/// **Important Ordering Note**: Despite inheriting from VectorSearchGlobalIndexResult, the results +/// are /// **NOT sorted by score**. Instead, both the bitmap and the score vector are ordered by /// **ascending row id**. This design enables efficient merging and set operations while preserving /// row id-to-score mapping. -class PAIMON_EXPORT BitmapTopKGlobalIndexResult : public TopKGlobalIndexResult { +class PAIMON_EXPORT BitmapVectorSearchGlobalIndexResult : public VectorSearchGlobalIndexResult { public: - BitmapTopKGlobalIndexResult(RoaringBitmap64&& bitmap, std::vector&& scores) + BitmapVectorSearchGlobalIndexResult(RoaringBitmap64&& bitmap, std::vector&& scores) : bitmap_(std::move(bitmap)), scores_(std::move(scores)) { assert(static_cast(bitmap_.Cardinality()) == scores_.size()); } - class TopKIterator : public TopKGlobalIndexResult::TopKIterator { + class VectorSearchIterator : public VectorSearchGlobalIndexResult::VectorSearchIterator { public: - TopKIterator(const RoaringBitmap64* bitmap, RoaringBitmap64::Iterator&& iter, - const float* scores) + VectorSearchIterator(const RoaringBitmap64* bitmap, RoaringBitmap64::Iterator&& iter, + const float* scores) : bitmap_(bitmap), iter_(std::move(iter)), scores_(scores) {} bool HasNext() const override { @@ -65,8 +66,8 @@ class PAIMON_EXPORT BitmapTopKGlobalIndexResult : public TopKGlobalIndexResult { Result> CreateIterator() const override; - Result> CreateTopKIterator() - const override; + Result> + CreateVectorSearchIterator() const override; Result> And( const std::shared_ptr& other) override; diff --git a/include/paimon/global_index/global_index_reader.h b/include/paimon/global_index/global_index_reader.h index 64851de0..510a0578 100644 --- a/include/paimon/global_index/global_index_reader.h +++ b/include/paimon/global_index/global_index_reader.h @@ -22,6 +22,7 @@ #include "paimon/global_index/global_index_result.h" #include "paimon/predicate/function_visitor.h" +#include "paimon/predicate/vector_search.h" #include "paimon/visibility.h" namespace paimon { @@ -36,36 +37,13 @@ namespace paimon { /// The `GlobalIndexResult` can be converted to global row ids by calling `AddOffset()`. class PAIMON_EXPORT GlobalIndexReader : public FunctionVisitor> { public: - /// TopKPreFilter: A lightweight pre-filtering function applied **before** similarity scoring. - /// It operates solely on **local row ids** and is typically driven by other global index, such - /// as bitmap, or range index. This filter enables early pruning of irrelevant candidates (e.g., - /// "only consider rows with label X"), significantly reducing the search space. Returns true to - /// include the row in Top-K computation; false to exclude it. - /// - /// @note Must be thread-safe. - using TopKPreFilter = std::function; - - /// VisitTopK performs approximate top-k similarity search. - /// - /// @param k Number of top results to return. - /// @param query The query vector (must match the dimensionality of the indexed vectors). - /// @param filter A pre-filter based on **local row ids**, implemented by leveraging other - /// global index - /// structures (e.g., bitmap index) for efficient candidate pruning. - /// @param predicate A runtime filtering condition that may involve graph traversal of - /// structured attributes. **Using this parameter often yields better - /// filtering accuracy** because during index construction, the underlying - /// graph was built with explicit consideration of field connectivity (e.g., - /// relationships between attributes). As a result, predicates can leverage - /// this pre-established semantic structure to perform more meaningful and - /// context-aware filtering at query time. - /// @note All fields referenced in the predicate must have been materialized - /// in the index during build to ensure availability. - /// @note `VisitTopK` is thread-safe (not coroutine-safe) while other `VisitXXX` is not + /// VisitVectorSearch performs approximate vector similarity search. + /// @note `VisitVectorSearch` is thread-safe (not coroutine-safe) while other `VisitXXX` is not /// thread-safe. - virtual Result> VisitTopK( - int32_t k, const std::vector& query, TopKPreFilter filter, - const std::shared_ptr& predicate) = 0; + /// @warning `VisitVectorSearch` may return error status when it is incorrectly invoked (e.g., + /// BitmapGlobalIndexReader call `VisitVectorSearch`). + virtual Result> VisitVectorSearch( + const std::shared_ptr& vector_search) = 0; }; } // namespace paimon diff --git a/include/paimon/global_index/global_index_result.h b/include/paimon/global_index/global_index_result.h index 33d33ffd..76647ac9 100644 --- a/include/paimon/global_index/global_index_result.h +++ b/include/paimon/global_index/global_index_result.h @@ -76,7 +76,7 @@ class PAIMON_EXPORT GlobalIndexResult : public std::enable_shared_from_this NextWithScore() = 0; }; - /// Creates a new iterator for traversing the Top-K results. - virtual Result> CreateTopKIterator() const = 0; + /// Creates a new iterator for traversing the vector search results. + virtual Result> CreateVectorSearchIterator() const = 0; }; } // namespace paimon diff --git a/include/paimon/global_index/row_range_global_index_scanner.h b/include/paimon/global_index/row_range_global_index_scanner.h index 0c6eb04b..996b2c2e 100644 --- a/include/paimon/global_index/row_range_global_index_scanner.h +++ b/include/paimon/global_index/row_range_global_index_scanner.h @@ -52,8 +52,7 @@ class PAIMON_EXPORT RowRangeGlobalIndexScanner { /// - Successful with several readers if the indexes exist and load correctly; /// - Successful with an empty vector if no index was built for the given field; /// - Error returns when loading fails (e.g., file corruption, I/O error, unsupported - /// format) or the predicate method was incorrectly invoked (e.g., VisitTopK was invoked - /// incorrectly). + /// format). virtual Result>> CreateReaders( const std::string& field_name) const = 0; }; diff --git a/include/paimon/predicate/vector_search.h b/include/paimon/predicate/vector_search.h new file mode 100644 index 00000000..9c5eae64 --- /dev/null +++ b/include/paimon/predicate/vector_search.h @@ -0,0 +1,69 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include + +#include "paimon/predicate/predicate.h" +#include "paimon/visibility.h" + +namespace paimon { +/// `VectorSearch` to perform vector similarity search. +struct PAIMON_EXPORT VectorSearch { + /// `PreFilter`: A lightweight pre-filtering function applied **before** similarity + /// scoring. It operates solely on **local row ids** and is typically driven by other global + /// index, such as bitmap, or range index. This filter enables early pruning of irrelevant + /// candidates (e.g., "only consider rows with label X"), significantly reducing the search + /// space. Returns true to include the row in vector search process; false to exclude it. + /// + /// @note Must be thread-safe. + using PreFilter = std::function; + + VectorSearch(const std::string& _field_name, int32_t _limit, const std::vector& _query, + PreFilter _pre_filter, const std::shared_ptr& _predicate) + : field_name(_field_name), + limit(_limit), + query(_query), + pre_filter(_pre_filter), + predicate(_predicate) {} + + std::shared_ptr ReplacePreFilter(PreFilter _pre_filter) const { + return std::make_shared(field_name, limit, query, _pre_filter, predicate); + } + + /// Search field name. + std::string field_name; + /// Number of top results to return. + int32_t limit; + /// The query vector (must match the dimensionality of the indexed vectors). + std::vector query; + /// A pre-filter based on **local row ids**, implemented by leveraging other global index + std::function pre_filter; + /// A runtime filtering condition that may involve graph traversal of + /// structured attributes. **Using this parameter often yields better + /// filtering accuracy** because during index construction, the underlying + /// graph was built with explicit consideration of field connectivity (e.g., + /// relationships between attributes). As a result, predicates can leverage + /// this pre-established semantic structure to perform more meaningful and + /// context-aware filtering at query time. + /// @note All fields referenced in the predicate must have been materialized + /// in the index during build to ensure availability. + std::shared_ptr predicate; +}; +} // namespace paimon diff --git a/include/paimon/scan_context.h b/include/paimon/scan_context.h index 686d5e3d..8c72e949 100644 --- a/include/paimon/scan_context.h +++ b/include/paimon/scan_context.h @@ -25,6 +25,7 @@ #include "paimon/global_index/global_index_result.h" #include "paimon/predicate/predicate.h" +#include "paimon/predicate/vector_search.h" #include "paimon/result.h" #include "paimon/type_fwd.h" #include "paimon/visibility.h" @@ -97,14 +98,19 @@ class PAIMON_EXPORT ScanFilter { public: ScanFilter(const std::shared_ptr& predicate, const std::vector>& partition_filters, - const std::optional& bucket_filter) + const std::optional& bucket_filter, + const std::shared_ptr& vector_search) : predicates_(predicate), + vector_search_(vector_search), bucket_filter_(bucket_filter), partition_filters_(partition_filters) {} std::shared_ptr GetPredicate() const { return predicates_; } + std::shared_ptr GetVectorSearch() const { + return vector_search_; + } std::optional GetBucketFilter() const { return bucket_filter_; } @@ -114,6 +120,7 @@ class PAIMON_EXPORT ScanFilter { private: std::shared_ptr predicates_; + std::shared_ptr vector_search_; std::optional bucket_filter_; std::vector> partition_filters_; }; @@ -141,6 +148,9 @@ class PAIMON_EXPORT ScanContextBuilder { /// data retrieval. ScanContextBuilder& SetGlobalIndexResult( const std::shared_ptr& global_index_result); + + /// Set vector search for similarity search. + ScanContextBuilder& SetVectorSearch(const std::shared_ptr& vector_search); /// The options added or set in `ScanContextBuilder` have high priority and will be merged with /// the options in table schema. ScanContextBuilder& AddOption(const std::string& key, const std::string& value); diff --git a/include/paimon/table/source/table_read.h b/include/paimon/table/source/table_read.h index f8a3a761..f8327da1 100644 --- a/include/paimon/table/source/table_read.h +++ b/include/paimon/table/source/table_read.h @@ -50,6 +50,8 @@ class PAIMON_EXPORT TableRead { /// @param splits A vector of shared pointers to `Split` instances representing the /// data to be read. /// @return A Result containing a unique pointer to the `BatchReader` instance. + /// @note `BatchReader`s created by the same `TableRead` are not thread-safe for + /// concurrent reading. virtual Result> CreateReader( const std::vector>& splits); diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 82d516c0..22eb4788 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -47,7 +47,7 @@ set(PAIMON_COMMON_SRCS common/fs/resolving_file_system.cpp common/fs/file_system_factory.cpp common/global_index/complete_index_score_batch_reader.cpp - common/global_index/bitmap_topk_global_index_result.cpp + common/global_index/bitmap_vector_search_global_index_result.cpp common/global_index/bitmap_global_index_result.cpp common/global_index/global_index_result.cpp common/global_index/global_indexer_factory.cpp @@ -333,7 +333,7 @@ if(PAIMON_BUILD_TESTS) common/global_index/global_index_result_test.cpp common/global_index/global_indexer_factory_test.cpp common/global_index/bitmap_global_index_result_test.cpp - common/global_index/bitmap_topk_global_index_result_test.cpp + common/global_index/bitmap_vector_search_global_index_result_test.cpp common/global_index/bitmap/bitmap_global_index_test.cpp common/io/byte_array_input_stream_test.cpp common/io/data_input_output_stream_test.cpp diff --git a/src/paimon/common/global_index/bitmap/bitmap_global_index_test.cpp b/src/paimon/common/global_index/bitmap/bitmap_global_index_test.cpp index 305b26d4..2563ed06 100644 --- a/src/paimon/common/global_index/bitmap/bitmap_global_index_test.cpp +++ b/src/paimon/common/global_index/bitmap/bitmap_global_index_test.cpp @@ -216,9 +216,10 @@ TEST_F(BitmapGlobalIndexTest, TestStringType) { // result CheckResult(reader->VisitGreaterThan(lit_c).value(), {0, 1, 2, 3, 4}); - // test visit topk - ASSERT_NOK_WITH_MSG(reader->VisitTopK(10, {1.0f, 2.0f}, nullptr, nullptr), - "FileIndexReaderWrapper is not supposed to handle topk query"); + // test visit vector search + ASSERT_NOK_WITH_MSG(reader->VisitVectorSearch(std::make_shared( + "f0", 10, std::vector({1.0f, 2.0f}), nullptr, nullptr)), + "FileIndexReaderWrapper is not supposed to handle vector search query"); }; { diff --git a/src/paimon/common/global_index/bitmap_topk_global_index_result.cpp b/src/paimon/common/global_index/bitmap_vector_search_global_index_result.cpp similarity index 62% rename from src/paimon/common/global_index/bitmap_topk_global_index_result.cpp rename to src/paimon/common/global_index/bitmap_vector_search_global_index_result.cpp index 7d70d6a9..10e999c2 100644 --- a/src/paimon/common/global_index/bitmap_topk_global_index_result.cpp +++ b/src/paimon/common/global_index/bitmap_vector_search_global_index_result.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "paimon/global_index/bitmap_topk_global_index_result.h" +#include "paimon/global_index/bitmap_vector_search_global_index_result.h" #include "fmt/format.h" #include "fmt/ranges.h" @@ -41,28 +41,29 @@ std::vector GetScoresFromMap(const RoaringBitmap64& bitmap, return scores; } } // namespace -Result> BitmapTopKGlobalIndexResult::CreateIterator() - const { +Result> +BitmapVectorSearchGlobalIndexResult::CreateIterator() const { return std::make_unique(&bitmap_, bitmap_.Begin()); } -Result> -BitmapTopKGlobalIndexResult::CreateTopKIterator() const { - return std::make_unique(&bitmap_, bitmap_.Begin(), - scores_.data()); +Result> +BitmapVectorSearchGlobalIndexResult::CreateVectorSearchIterator() const { + return std::make_unique( + &bitmap_, bitmap_.Begin(), scores_.data()); } -Result> BitmapTopKGlobalIndexResult::And( +Result> BitmapVectorSearchGlobalIndexResult::And( const std::shared_ptr& other) { - auto topk_other = std::dynamic_pointer_cast(other); - if (topk_other) { - // If current and other result are both BitmapTopKGlobalIndexResult, return + auto vector_search_other = + std::dynamic_pointer_cast(other); + if (vector_search_other) { + // If current and other result are both BitmapVectorSearchGlobalIndexResult, return // BitmapGlobalIndexResult. Erase scores to prevent the same row id with different // scores in current and other results. - auto supplier = [topk_other, - result = std::dynamic_pointer_cast( + auto supplier = [vector_search_other, + result = std::dynamic_pointer_cast( shared_from_this())]() -> Result { - PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* r1, topk_other->GetBitmap()); + PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* r1, vector_search_other->GetBitmap()); PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* r2, result->GetBitmap()); return RoaringBitmap64::And(*r1, *r2); }; @@ -70,38 +71,40 @@ Result> BitmapTopKGlobalIndexResult::And( } auto bitmap_other = std::dynamic_pointer_cast(other); if (bitmap_other) { - // If other bitmap is BitmapGlobalIndexResult, return BitmapTopKGlobalIndexResult as - // score must exist in current topk result. + // If other bitmap is BitmapGlobalIndexResult, return BitmapVectorSearchGlobalIndexResult as + // score must exist in current vector search result. std::map id_to_score = CreateIdToScoreMap(bitmap_, scores_); PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* other_bitmap, bitmap_other->GetBitmap()); auto and_bitmap = RoaringBitmap64::And(bitmap_, *other_bitmap); std::vector and_scores = GetScoresFromMap(and_bitmap, id_to_score); - return std::make_shared(std::move(and_bitmap), - std::move(and_scores)); + return std::make_shared(std::move(and_bitmap), + std::move(and_scores)); } return GlobalIndexResult::And(other); } -Result> BitmapTopKGlobalIndexResult::Or( +Result> BitmapVectorSearchGlobalIndexResult::Or( const std::shared_ptr& other) { - auto topk_other = std::dynamic_pointer_cast(other); - if (topk_other) { - // If current and other result are both BitmapTopKGlobalIndexResult, return - // BitmapTopKGlobalIndexResult when current and other have has no intersection row id. + auto vector_search_other = + std::dynamic_pointer_cast(other); + if (vector_search_other) { + // If current and other result are both BitmapVectorSearchGlobalIndexResult, return + // BitmapVectorSearchGlobalIndexResult when current and other have has no intersection row + // id. std::map id_to_score = CreateIdToScoreMap(bitmap_, scores_); size_t idx = 0; - for (auto iter = topk_other->bitmap_.Begin(); iter != topk_other->bitmap_.End(); - ++iter, ++idx) { + for (auto iter = vector_search_other->bitmap_.Begin(); + iter != vector_search_other->bitmap_.End(); ++iter, ++idx) { if (id_to_score.find(*iter) != id_to_score.end()) { return Status::Invalid( - "not support two BitmapTopKGlobalIndexResult or with same row id"); + "not support two BitmapVectorSearchGlobalIndexResult or with same row id"); } - id_to_score[*iter] = topk_other->scores_[idx]; + id_to_score[*iter] = vector_search_other->scores_[idx]; } - auto or_bitmap = RoaringBitmap64::Or(bitmap_, topk_other->bitmap_); + auto or_bitmap = RoaringBitmap64::Or(bitmap_, vector_search_other->bitmap_); std::vector or_scores = GetScoresFromMap(or_bitmap, id_to_score); - return std::make_shared(std::move(or_bitmap), - std::move(or_scores)); + return std::make_shared(std::move(or_bitmap), + std::move(or_scores)); } auto bitmap_other = std::dynamic_pointer_cast(other); @@ -109,7 +112,7 @@ Result> BitmapTopKGlobalIndexResult::Or( // If other bitmap is BitmapGlobalIndexResult, return BitmapGlobalIndexResult as // score for union row id is unknown. auto supplier = [bitmap_other, - result = std::dynamic_pointer_cast( + result = std::dynamic_pointer_cast( shared_from_this())]() -> Result { PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* r1, bitmap_other->GetBitmap()); PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* r2, result->GetBitmap()); @@ -120,29 +123,31 @@ Result> BitmapTopKGlobalIndexResult::Or( return GlobalIndexResult::Or(other); } -Result> BitmapTopKGlobalIndexResult::AddOffset(int64_t offset) { +Result> BitmapVectorSearchGlobalIndexResult::AddOffset( + int64_t offset) { PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* bitmap, GetBitmap()); RoaringBitmap64 bitmap64; for (auto iter = bitmap->Begin(); iter != bitmap->End(); ++iter) { bitmap64.Add(offset + (*iter)); } auto scores = GetScores(); - return std::make_shared(std::move(bitmap64), std::move(scores)); + return std::make_shared(std::move(bitmap64), + std::move(scores)); } -Result BitmapTopKGlobalIndexResult::IsEmpty() const { +Result BitmapVectorSearchGlobalIndexResult::IsEmpty() const { return bitmap_.IsEmpty(); } -Result BitmapTopKGlobalIndexResult::GetBitmap() const { +Result BitmapVectorSearchGlobalIndexResult::GetBitmap() const { return &bitmap_; } -const std::vector& BitmapTopKGlobalIndexResult::GetScores() const { +const std::vector& BitmapVectorSearchGlobalIndexResult::GetScores() const { return scores_; } -std::string BitmapTopKGlobalIndexResult::ToString() const { +std::string BitmapVectorSearchGlobalIndexResult::ToString() const { std::vector formatted_scores; formatted_scores.reserve(scores_.size()); for (const auto& score : scores_) { diff --git a/src/paimon/common/global_index/bitmap_topk_global_index_result_test.cpp b/src/paimon/common/global_index/bitmap_vector_search_global_index_result_test.cpp similarity index 81% rename from src/paimon/common/global_index/bitmap_topk_global_index_result_test.cpp rename to src/paimon/common/global_index/bitmap_vector_search_global_index_result_test.cpp index 771a62de..e32d8288 100644 --- a/src/paimon/common/global_index/bitmap_topk_global_index_result_test.cpp +++ b/src/paimon/common/global_index/bitmap_vector_search_global_index_result_test.cpp @@ -13,14 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "paimon/global_index/bitmap_topk_global_index_result.h" +#include "paimon/global_index/bitmap_vector_search_global_index_result.h" #include "gtest/gtest.h" #include "paimon/global_index/bitmap_global_index_result.h" #include "paimon/testing/utils/testharness.h" #include "paimon/utils/roaring_bitmap32.h" namespace paimon::test { -class BitmapTopKGlobalIndexResultTest : public ::testing::Test { +class BitmapVectorSearchGlobalIndexResultTest : public ::testing::Test { public: void SetUp() override {} void TearDown() override {} @@ -70,13 +70,13 @@ class BitmapTopKGlobalIndexResultTest : public ::testing::Test { std::vector values_; }; }; -TEST_F(BitmapTopKGlobalIndexResultTest, TestIterator) { +TEST_F(BitmapVectorSearchGlobalIndexResultTest, TestIterator) { auto check_iterator = [](const std::vector& expected_ids, const std::vector& expected_scores) { ASSERT_EQ(expected_ids.size(), expected_scores.size()); auto tmp_scores = expected_scores; - auto index_result = std::make_shared( + auto index_result = std::make_shared( RoaringBitmap64::From(expected_ids), std::move(tmp_scores)); if (expected_ids.empty()) { ASSERT_TRUE(index_result->IsEmpty().value()); @@ -89,15 +89,15 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestIterator) { } ASSERT_FALSE(iter->HasNext()); - // check topk iterator - ASSERT_OK_AND_ASSIGN(auto topk_iter, index_result->CreateTopKIterator()); + // check vector search iterator + ASSERT_OK_AND_ASSIGN(auto vector_search_iter, index_result->CreateVectorSearchIterator()); for (size_t i = 0; i < expected_ids.size(); i++) { - ASSERT_TRUE(topk_iter->HasNext()); - auto [id, score] = topk_iter->NextWithScore(); + ASSERT_TRUE(vector_search_iter->HasNext()); + auto [id, score] = vector_search_iter->NextWithScore(); ASSERT_EQ(id, expected_ids[i]); ASSERT_NEAR(score, expected_scores[i], 0.01); } - ASSERT_FALSE(topk_iter->HasNext()); + ASSERT_FALSE(vector_search_iter->HasNext()); }; check_iterator({}, {}); @@ -106,15 +106,15 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestIterator) { check_iterator({100, 101, 102, 103}, {100.1f, 200.2f, 0.12f, 0.34f}); } -TEST_F(BitmapTopKGlobalIndexResultTest, TestAnd) { +TEST_F(BitmapVectorSearchGlobalIndexResultTest, TestAnd) { auto check_and_result = [](const std::vector& left_ids, const std::vector& right_ids, const std::string& expected_str) { std::vector left_scores(left_ids.size(), 1.1f); - auto index_result1 = std::make_shared( + auto index_result1 = std::make_shared( RoaringBitmap64::From(left_ids), std::move(left_scores)); std::vector right_scores(right_ids.size(), 1.2f); - auto index_result2 = std::make_shared( + auto index_result2 = std::make_shared( RoaringBitmap64::From(right_ids), std::move(right_scores)); ASSERT_OK_AND_ASSIGN(auto result, index_result1->And(index_result2)); ASSERT_EQ(result->ToString(), expected_str); @@ -128,11 +128,11 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestAnd) { "{1,9223372036854775807}"); } -TEST_F(BitmapTopKGlobalIndexResultTest, TestAndBitmapResult) { +TEST_F(BitmapVectorSearchGlobalIndexResultTest, TestAndBitmapResult) { auto check_and_result = [](const std::vector& left_ids, std::vector&& left_scores, const std::vector& right_ids, const std::string& expected_str) { - auto index_result1 = std::make_shared( + auto index_result1 = std::make_shared( RoaringBitmap64::From(left_ids), std::move(left_scores)); auto bitmap_supplier2 = [&]() -> Result { @@ -155,8 +155,8 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestAndBitmapResult) { "row ids: {1,9223372036854775807}, scores: {0.12,0.15}"); } -TEST_F(BitmapTopKGlobalIndexResultTest, TestAndOtherResult) { - auto index_result1 = std::make_shared( +TEST_F(BitmapVectorSearchGlobalIndexResultTest, TestAndOtherResult) { + auto index_result1 = std::make_shared( RoaringBitmap64::From({1, 2, 3}), std::vector({1.1f, 1.2f, 1.3f})); auto fake_result = std::make_shared(std::vector({1l, 2l, 7l})); @@ -165,14 +165,14 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestAndOtherResult) { ASSERT_EQ(result->ToString(), "{1,2}"); } -TEST_F(BitmapTopKGlobalIndexResultTest, TestOr) { +TEST_F(BitmapVectorSearchGlobalIndexResultTest, TestOr) { auto check_or_result = [](const std::vector& left_ids, std::vector&& left_scores, const std::vector& right_ids, std::vector&& right_scores, const std::string& expected_str) { - auto index_result1 = std::make_shared( + auto index_result1 = std::make_shared( RoaringBitmap64::From(left_ids), std::move(left_scores)); - auto index_result2 = std::make_shared( + auto index_result2 = std::make_shared( RoaringBitmap64::From(right_ids), std::move(right_scores)); ASSERT_OK_AND_ASSIGN(auto result, index_result1->Or(index_result2)); ASSERT_EQ(result->ToString(), expected_str); @@ -188,12 +188,12 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestOr) { "row ids: {1,2,3,2147483647,9223372036854775807}, scores: {1.10,1.20,1.30,0.12,1.40}"); } -TEST_F(BitmapTopKGlobalIndexResultTest, TestOrBitmapResult) { +TEST_F(BitmapVectorSearchGlobalIndexResultTest, TestOrBitmapResult) { auto check_or_result = [](const std::vector& left_ids, const std::vector& right_ids, const std::string& expected_str) { std::vector left_scores(left_ids.size(), 1.1f); - auto index_result1 = std::make_shared( + auto index_result1 = std::make_shared( RoaringBitmap64::From(left_ids), std::move(left_scores)); auto bitmap_supplier2 = [&]() -> Result { @@ -214,8 +214,8 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestOrBitmapResult) { "{1,2,3,9223372036854775807}"); } -TEST_F(BitmapTopKGlobalIndexResultTest, TestOrOtherResult) { - auto index_result1 = std::make_shared( +TEST_F(BitmapVectorSearchGlobalIndexResultTest, TestOrOtherResult) { + auto index_result1 = std::make_shared( RoaringBitmap64::From({1, 2, 3}), std::vector({1.1f, 1.2f, 1.3f})); auto fake_result = std::make_shared(std::vector({1l, 2l, 7l})); @@ -224,24 +224,24 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestOrOtherResult) { ASSERT_EQ(result->ToString(), "{1,2,3,7}"); } -TEST_F(BitmapTopKGlobalIndexResultTest, TestInvalidOr) { +TEST_F(BitmapVectorSearchGlobalIndexResultTest, TestInvalidOr) { std::vector left_ids = {1, 2, 3}; std::vector left_scores = {1.1f, 1.2f, 1.3f}; - auto index_result1 = std::make_shared( + auto index_result1 = std::make_shared( RoaringBitmap64::From(left_ids), std::move(left_scores)); std::vector right_ids = {1, 2, 7}; std::vector right_scores = {2.1f, 2.2f, 2.3f}; - auto index_result2 = std::make_shared( + auto index_result2 = std::make_shared( RoaringBitmap64::From(right_ids), std::move(right_scores)); ASSERT_NOK_WITH_MSG(index_result1->Or(index_result2), - "not support two BitmapTopKGlobalIndexResult or with same row id"); + "not support two BitmapVectorSearchGlobalIndexResult or with same row id"); } -TEST_F(BitmapTopKGlobalIndexResultTest, TestAddOffset) { +TEST_F(BitmapVectorSearchGlobalIndexResultTest, TestAddOffset) { { std::vector ids = {1, 2, 3}; std::vector scores = {1.1f, 1.2f, 1.3f}; - auto index_result = std::make_shared( + auto index_result = std::make_shared( RoaringBitmap64::From(ids), std::move(scores)); ASSERT_OK_AND_ASSIGN(auto result_with_offset, index_result->AddOffset(10)); ASSERT_EQ(result_with_offset->ToString(), "row ids: {11,12,13}, scores: {1.10,1.20,1.30}"); @@ -249,7 +249,7 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestAddOffset) { { std::vector ids = {}; std::vector scores = {}; - auto index_result = std::make_shared( + auto index_result = std::make_shared( RoaringBitmap64::From(ids), std::move(scores)); ASSERT_OK_AND_ASSIGN(auto result_with_offset, index_result->AddOffset(10)); ASSERT_EQ(result_with_offset->ToString(), "row ids: {}, scores: {}"); diff --git a/src/paimon/common/global_index/global_index_result.cpp b/src/paimon/common/global_index/global_index_result.cpp index 60c615d3..061dc6fa 100644 --- a/src/paimon/common/global_index/global_index_result.cpp +++ b/src/paimon/common/global_index/global_index_result.cpp @@ -20,7 +20,7 @@ #include "paimon/common/io/memory_segment_output_stream.h" #include "paimon/common/memory/memory_segment_utils.h" #include "paimon/global_index/bitmap_global_index_result.h" -#include "paimon/global_index/bitmap_topk_global_index_result.h" +#include "paimon/global_index/bitmap_vector_search_global_index_result.h" #include "paimon/io/byte_array_input_stream.h" #include "paimon/io/data_input_stream.h" #include "paimon/memory/bytes.h" @@ -89,15 +89,17 @@ Result> GlobalIndexResult::Serialize( std::dynamic_pointer_cast(global_index_result)) { PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* bitmap, bitmap_result->GetBitmap()); WriteBitmapAndScores(bitmap, {}, &out, pool.get()); - } else if (auto bitmap_topk_result = - std::dynamic_pointer_cast(global_index_result)) { - PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* bitmap, bitmap_topk_result->GetBitmap()); - const auto& scores = bitmap_topk_result->GetScores(); + } else if (auto bitmap_vector_search_result = + std::dynamic_pointer_cast( + global_index_result)) { + PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* bitmap, + bitmap_vector_search_result->GetBitmap()); + const auto& scores = bitmap_vector_search_result->GetScores(); WriteBitmapAndScores(bitmap, scores, &out, pool.get()); } else { return Status::Invalid( "invalid GlobalIndexResult, must be BitmapGlobalIndexResult or " - "BitmapTopKGlobalIndexResult"); + "BitmapVectorSearchGlobalIndexResult"); } return MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); } @@ -130,7 +132,8 @@ Result> GlobalIndexResult::Deserialize( PAIMON_ASSIGN_OR_RAISE(float score, in.ReadValue()); scores.push_back(score); } - return std::make_shared(std::move(bitmap), std::move(scores)); + return std::make_shared(std::move(bitmap), + std::move(scores)); } Result> GlobalIndexResult::ToRanges() const { diff --git a/src/paimon/common/global_index/global_index_result_test.cpp b/src/paimon/common/global_index/global_index_result_test.cpp index c5650466..84a1a861 100644 --- a/src/paimon/common/global_index/global_index_result_test.cpp +++ b/src/paimon/common/global_index/global_index_result_test.cpp @@ -20,7 +20,7 @@ #include "gtest/gtest.h" #include "paimon/global_index/bitmap_global_index_result.h" -#include "paimon/global_index/bitmap_topk_global_index_result.h" +#include "paimon/global_index/bitmap_vector_search_global_index_result.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { @@ -127,14 +127,15 @@ TEST_F(GlobalIndexResultTest, TestSerializeAndDeserializeWithScore) { ASSERT_OK_AND_ASSIGN(std::shared_ptr index_result, GlobalIndexResult::Deserialize(reinterpret_cast(byte_buffer.data()), byte_buffer.size(), pool)); - auto typed_result = std::dynamic_pointer_cast(index_result); + auto typed_result = + std::dynamic_pointer_cast(index_result); ASSERT_TRUE(typed_result); auto bitmap = RoaringBitmap64::From( {10l, 2147483647l, 2147483649l, 2147483651l, 2147483653l, 2247483647l}); std::vector scores = {1.01f, -1.32f, 4.23f, 50.74f, -100.25f, 2.10f}; auto expected_result = - std::make_shared(std::move(bitmap), std::move(scores)); + std::make_shared(std::move(bitmap), std::move(scores)); ASSERT_EQ(expected_result->ToString(), typed_result->ToString()); ASSERT_OK_AND_ASSIGN(auto serialize_bytes, GlobalIndexResult::Serialize(index_result, pool)); ASSERT_EQ(byte_buffer, std::vector(serialize_bytes->data(), @@ -146,6 +147,6 @@ TEST_F(GlobalIndexResultTest, TestInvalidSerialize) { auto result = std::make_shared(std::vector({1, 3, 5, 100})); ASSERT_NOK_WITH_MSG(GlobalIndexResult::Serialize(result, pool), "invalid GlobalIndexResult, must be BitmapGlobalIndexResult or " - "BitmapTopKGlobalIndexResult"); + "BitmapVectorSearchGlobalIndexResult"); } } // namespace paimon::test diff --git a/src/paimon/common/global_index/wrap/file_index_reader_wrapper.h b/src/paimon/common/global_index/wrap/file_index_reader_wrapper.h index 176d9942..9f46cc9c 100644 --- a/src/paimon/common/global_index/wrap/file_index_reader_wrapper.h +++ b/src/paimon/common/global_index/wrap/file_index_reader_wrapper.h @@ -114,10 +114,10 @@ class FileIndexReaderWrapper : public GlobalIndexReader { return transform_(file_index_result); } - Result> VisitTopK( - int32_t k, const std::vector& query, TopKPreFilter filter, - const std::shared_ptr& predicate) override { - return Status::Invalid("FileIndexReaderWrapper is not supposed to handle topk query"); + Result> VisitVectorSearch( + const std::shared_ptr& vector_search) override { + return Status::Invalid( + "FileIndexReaderWrapper is not supposed to handle vector search query"); } private: diff --git a/src/paimon/core/global_index/global_index_evaluator.h b/src/paimon/core/global_index/global_index_evaluator.h index b0325800..ed2177fe 100644 --- a/src/paimon/core/global_index/global_index_evaluator.h +++ b/src/paimon/core/global_index/global_index_evaluator.h @@ -20,6 +20,7 @@ #include "paimon/global_index/global_index_result.h" #include "paimon/predicate/predicate.h" +#include "paimon/predicate/vector_search.h" #include "paimon/visibility.h" namespace paimon { @@ -30,16 +31,20 @@ class PAIMON_EXPORT GlobalIndexEvaluator { /// Evaluates a predicate against the global index. /// /// @param predicate The filter predicate to evaluate. + /// @param vector_search The vector similarity search to evaluate. + /// @note When both `predicate` and `vector_search` are present, the predicate + /// is used to constrain the vector search space (for example, via a + /// pre-filter callback that may be applied during vector search), so + /// vector similarity scoring is effectively limited to rows that satisfy + /// the predicate. /// @return A `Result` containing: /// - `std::nullopt` if the predicate cannot be evaluated by this index (e.g., field has /// no index), /// - A `std::shared_ptr` if evaluation succeeds. /// The `GlobalIndexResult` indicates the matching rows (e.g., via row ID bitmaps). - /// - /// @note Top-K predicates are **not handled** by this method. Use - /// `GlobalIndexReader::VisitTopK()` for Top-K specific index evaluation. virtual Result>> Evaluate( - const std::shared_ptr& predicate) = 0; + const std::shared_ptr& predicate, + const std::shared_ptr& vector_search) = 0; }; } // namespace paimon diff --git a/src/paimon/core/global_index/global_index_evaluator_impl.cpp b/src/paimon/core/global_index/global_index_evaluator_impl.cpp index a20e889d..9a7d23e1 100644 --- a/src/paimon/core/global_index/global_index_evaluator_impl.cpp +++ b/src/paimon/core/global_index/global_index_evaluator_impl.cpp @@ -18,11 +18,80 @@ #include "fmt/format.h" #include "paimon/common/predicate/predicate_utils.h" +#include "paimon/global_index/bitmap_global_index_result.h" #include "paimon/predicate/leaf_predicate.h" namespace paimon { Result>> GlobalIndexEvaluatorImpl::Evaluate( - const std::shared_ptr& predicate) { + const std::shared_ptr& predicate, + const std::shared_ptr& vector_search) { + std::optional> compound_result; + if (predicate) { + PAIMON_ASSIGN_OR_RAISE(compound_result, EvaluatePredicate(predicate)); + } + if (vector_search) { + PAIMON_ASSIGN_OR_RAISE( + compound_result, + EvaluateVectorSearch(vector_search, /*predicate_result=*/compound_result)); + } + return compound_result; +} + +Result>> GlobalIndexEvaluatorImpl::GetIndexReaders( + const std::string& field_name) { + PAIMON_ASSIGN_OR_RAISE(DataField data_field, table_schema_->GetField(field_name)); + int32_t field_id = data_field.Id(); + // get or create global index readers for current field + std::vector> readers; + auto iter = index_readers_cache_.find(field_id); + if (iter != index_readers_cache_.end()) { + readers = iter->second; + } else { + PAIMON_ASSIGN_OR_RAISE(readers, create_index_readers_(field_id)); + index_readers_cache_.insert({field_id, readers}); + } + return readers; +} + +Result>> +GlobalIndexEvaluatorImpl::EvaluateVectorSearch( + const std::shared_ptr& vector_search, + const std::optional>& predicate_result) { + PAIMON_ASSIGN_OR_RAISE(std::vector> readers, + GetIndexReaders(vector_search->field_name)); + if (readers.empty()) { + return predicate_result; + } + if (readers.size() > 1) { + return Status::Invalid("Vector search cannot have multiple global indexes"); + } + const auto& vector_search_reader = readers[0]; + if (predicate_result && vector_search->pre_filter != nullptr) { + return Status::Invalid("Predicate result and pre_filter in VectorSearch conflict"); + } + auto final_vector_search = vector_search; + if (predicate_result) { + auto bitmap_global_index_result = + std::dynamic_pointer_cast(predicate_result.value()); + if (!bitmap_global_index_result) { + return Status::Invalid( + "The pre_filter of vector search only supports BitmapGlobalIndexResult"); + } + PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* bitmap, + bitmap_global_index_result->GetBitmap()); + assert(bitmap); + final_vector_search = vector_search->ReplacePreFilter( + [bitmap_global_index_result, bitmap](int64_t row_id) -> bool { + return bitmap->Contains(row_id); + }); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr vector_search_result, + vector_search_reader->VisitVectorSearch(final_vector_search)); + return std::optional>(vector_search_result); +} + +Result>> +GlobalIndexEvaluatorImpl::EvaluatePredicate(const std::shared_ptr& predicate) { if (predicate == nullptr) { return std::optional>(); } @@ -31,18 +100,8 @@ Result>> GlobalIndexEvaluatorIm return EvaluateCompoundPredicate(compound_predicate); } else if (auto leaf_predicate = std::dynamic_pointer_cast(predicate)) { const std::string& field_name = leaf_predicate->FieldName(); - PAIMON_ASSIGN_OR_RAISE(DataField data_field, table_schema_->GetField(field_name)); - int32_t field_id = data_field.Id(); - // get or create global index readers for current field - std::vector> readers; - auto iter = index_readers_cache_.find(field_id); - if (iter != index_readers_cache_.end()) { - readers = iter->second; - } else { - PAIMON_ASSIGN_OR_RAISE(readers, create_index_readers_(field_id)); - index_readers_cache_.insert({field_id, readers}); - } - + PAIMON_ASSIGN_OR_RAISE(std::vector> readers, + GetIndexReaders(field_name)); // calculate compound result as field may has multiple indexes std::optional> compound_result; for (const auto& index_reader : readers) { @@ -76,7 +135,7 @@ GlobalIndexEvaluatorImpl::EvaluateCompoundPredicate( std::optional> compound_result; for (const auto& child : compound_predicate->Children()) { PAIMON_ASSIGN_OR_RAISE(std::optional> sub_result, - Evaluate(child)); + EvaluatePredicate(child)); if (!sub_result) { return std::optional>(); } @@ -93,7 +152,7 @@ GlobalIndexEvaluatorImpl::EvaluateCompoundPredicate( std::optional> compound_result; for (const auto& child : compound_predicate->Children()) { PAIMON_ASSIGN_OR_RAISE(std::optional> sub_result, - Evaluate(child)); + EvaluatePredicate(child)); if (sub_result) { if (!compound_result) { compound_result = sub_result; diff --git a/src/paimon/core/global_index/global_index_evaluator_impl.h b/src/paimon/core/global_index/global_index_evaluator_impl.h index 900906cb..2d3731a1 100644 --- a/src/paimon/core/global_index/global_index_evaluator_impl.h +++ b/src/paimon/core/global_index/global_index_evaluator_impl.h @@ -39,12 +39,23 @@ class GlobalIndexEvaluatorImpl : public GlobalIndexEvaluator { : table_schema_(table_schema), create_index_readers_(std::move(create_index_readers)) {} Result>> Evaluate( - const std::shared_ptr& predicate) override; + const std::shared_ptr& predicate, + const std::shared_ptr& vector_search) override; private: + Result>> EvaluateVectorSearch( + const std::shared_ptr& vector_search, + const std::optional>& predicate_result); + + Result>> EvaluatePredicate( + const std::shared_ptr& predicate); + Result>> EvaluateCompoundPredicate( const std::shared_ptr& compound_predicate); + Result>> GetIndexReaders( + const std::string& field_name); + private: std::shared_ptr table_schema_; // create_index_readers_(field_id) diff --git a/src/paimon/core/global_index/global_index_scan_impl.cpp b/src/paimon/core/global_index/global_index_scan_impl.cpp index 81e20f40..1b48829c 100644 --- a/src/paimon/core/global_index/global_index_scan_impl.cpp +++ b/src/paimon/core/global_index/global_index_scan_impl.cpp @@ -143,7 +143,7 @@ Status GlobalIndexScanImpl::Scan() { Result>> GlobalIndexScanImpl::ParallelScan( const std::vector& ranges, const std::shared_ptr& predicate, - const std::shared_ptr& executor) { + const std::shared_ptr& vector_search, const std::shared_ptr& executor) { std::vector> range_scanners; range_scanners.reserve(ranges.size()); for (const auto& range : ranges) { @@ -163,12 +163,12 @@ Result>> GlobalIndexScanImpl::P const auto& scanner = range_scanners[i]; const auto& range = ranges[i]; auto search_index = - [&scanner, &predicate, + [&scanner, &predicate, &vector_search, &range]() -> Result>> { PAIMON_ASSIGN_OR_RAISE(std::shared_ptr evaluator, scanner->CreateIndexEvaluator()); PAIMON_ASSIGN_OR_RAISE(std::optional> index_result, - evaluator->Evaluate(predicate)); + evaluator->Evaluate(predicate, vector_search)); if (!index_result) { return index_result; } @@ -196,17 +196,16 @@ Result>> GlobalIndexScanImpl::P } // union result from multiple ranges - std::shared_ptr final_global_index_result = - BitmapGlobalIndexResult::FromRanges({}); + std::optional> final_global_index_result; for (size_t i = 0; i < results.size(); ++i) { - if (results[i]) { - PAIMON_ASSIGN_OR_RAISE(final_global_index_result, - final_global_index_result->Or(results[i].value())); + std::shared_ptr result = + results[i] ? results[i].value() : BitmapGlobalIndexResult::FromRanges({ranges[i]}); + if (!final_global_index_result) { + final_global_index_result = result; } else { - PAIMON_ASSIGN_OR_RAISE( - final_global_index_result, - final_global_index_result->Or(BitmapGlobalIndexResult::FromRanges({ranges[i]}))); + PAIMON_ASSIGN_OR_RAISE(final_global_index_result, + final_global_index_result.value()->Or(result)); } } return std::optional>(final_global_index_result); diff --git a/src/paimon/core/global_index/global_index_scan_impl.h b/src/paimon/core/global_index/global_index_scan_impl.h index 510dd2a3..60b44c9b 100644 --- a/src/paimon/core/global_index/global_index_scan_impl.h +++ b/src/paimon/core/global_index/global_index_scan_impl.h @@ -49,6 +49,7 @@ class GlobalIndexScanImpl : public GlobalIndexScan { Result>> ParallelScan( const std::vector& ranges, const std::shared_ptr& predicate, + const std::shared_ptr& vector_search, const std::shared_ptr& executor); private: diff --git a/src/paimon/core/operation/abstract_file_store_write.cpp b/src/paimon/core/operation/abstract_file_store_write.cpp index a5d5d953..669ef806 100644 --- a/src/paimon/core/operation/abstract_file_store_write.cpp +++ b/src/paimon/core/operation/abstract_file_store_write.cpp @@ -245,7 +245,8 @@ Result AbstractFileStoreWrite::ScanExistingFileMetas( partition_filters.push_back(part_values_map); } auto scan_filter = std::make_shared( - /*predicate=*/nullptr, partition_filters, std::optional(bucket)); + /*predicate=*/nullptr, partition_filters, std::optional(bucket), + /*vector_search=*/nullptr); PAIMON_ASSIGN_OR_RAISE(std::unique_ptr scan, CreateFileStoreScan(scan_filter)); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr plan, diff --git a/src/paimon/core/operation/file_store_commit_impl.cpp b/src/paimon/core/operation/file_store_commit_impl.cpp index b6ff9ab1..4c32da34 100644 --- a/src/paimon/core/operation/file_store_commit_impl.cpp +++ b/src/paimon/core/operation/file_store_commit_impl.cpp @@ -300,8 +300,9 @@ Result FileStoreCommitImpl::GetLastCommitTableRequest() { Result> FileStoreCommitImpl::GetAllFiles( const Snapshot& snapshot, const std::vector>& partitions) { - auto scan_filter = std::make_shared(/*predicate=*/nullptr, partitions, - /*bucket_filter=*/std::nullopt); + auto scan_filter = + std::make_shared(/*predicate=*/nullptr, partitions, + /*bucket_filter=*/std::nullopt, /*vector_search=*/nullptr); PAIMON_ASSIGN_OR_RAISE( auto scan, AppendOnlyFileStoreScan::Create( snapshot_manager_, schema_manager_, manifest_list_, manifest_file_, @@ -446,8 +447,9 @@ Result> FileStoreCommitImpl::ReadAllEntriesFromChange const std::set>& partitions) const { std::vector> partition_filters(partitions.begin(), partitions.end()); - auto scan_filter = std::make_shared(/*predicate=*/nullptr, partition_filters, - /*bucket_filter=*/std::nullopt); + auto scan_filter = + std::make_shared(/*predicate=*/nullptr, partition_filters, + /*bucket_filter=*/std::nullopt, /*vector_search=*/nullptr); PAIMON_ASSIGN_OR_RAISE( auto scan, AppendOnlyFileStoreScan::Create( snapshot_manager_, schema_manager_, manifest_list_, manifest_file_, diff --git a/src/paimon/core/operation/key_value_file_store_scan_test.cpp b/src/paimon/core/operation/key_value_file_store_scan_test.cpp index 40c6de69..96ea0e46 100644 --- a/src/paimon/core/operation/key_value_file_store_scan_test.cpp +++ b/src/paimon/core/operation/key_value_file_store_scan_test.cpp @@ -132,9 +132,10 @@ TEST_F(KeyValueFileStoreScanTest, TestMaxSequenceNumber) { std::string table_path = paimon::test::GetDataDir() + "orc/pk_table_with_dv_cardinality.db/pk_table_with_dv_cardinality"; std::vector> partition_filters = {{{"f1", "10"}}}; - auto scan_filter = std::make_shared(/*predicate=*/nullptr, - /*partition_filters=*/partition_filters, - /*bucket_filter=*/0); + auto scan_filter = + std::make_shared(/*predicate=*/nullptr, + /*partition_filters=*/partition_filters, + /*bucket_filter=*/0, /*vector_search=*/nullptr); ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, CreateFileStoreScan(table_path, scan_filter, /*table_schema_id=*/0, /*snapshot_id=*/2)); @@ -155,9 +156,10 @@ TEST_F(KeyValueFileStoreScanTest, TestMaxSequenceNumber) { "orc/pk_table_with_dv_cardinality.db/" "pk_table_with_dv_cardinality"; std::vector> partition_filters = {{{"f1", "10"}}}; - auto scan_filter = std::make_shared(/*predicate=*/nullptr, - /*partition_filters=*/partition_filters, - /*bucket_filter=*/1); + auto scan_filter = + std::make_shared(/*predicate=*/nullptr, + /*partition_filters=*/partition_filters, + /*bucket_filter=*/1, /*vector_search=*/nullptr); ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, CreateFileStoreScan(table_path, scan_filter, /*table_schema_id=*/0, /*snapshot_id=*/4)); @@ -172,9 +174,10 @@ TEST_F(KeyValueFileStoreScanTest, TestMaxSequenceNumber) { paimon::test::GetDataDir() + "orc/pk_table_with_mor.db/pk_table_with_mor"; std::vector> partition_filters = { {{"p0", "1"}, {"p1", "0"}}}; - auto scan_filter = std::make_shared(/*predicate=*/nullptr, - /*partition_filters=*/partition_filters, - /*bucket_filter=*/0); + auto scan_filter = + std::make_shared(/*predicate=*/nullptr, + /*partition_filters=*/partition_filters, + /*bucket_filter=*/0, /*vector_search=*/nullptr); ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, CreateFileStoreScan(table_path, scan_filter, /*table_schema_id=*/0, /*snapshot_id=*/1)); @@ -189,9 +192,10 @@ TEST_F(KeyValueFileStoreScanTest, TestMaxSequenceNumber) { paimon::test::GetDataDir() + "orc/pk_table_with_mor.db/pk_table_with_mor"; std::vector> partition_filters = { {{"p0", "0"}, {"p1", "0"}}}; - auto scan_filter = std::make_shared(/*predicate=*/nullptr, - /*partition_filters=*/partition_filters, - /*bucket_filter=*/0); + auto scan_filter = + std::make_shared(/*predicate=*/nullptr, + /*partition_filters=*/partition_filters, + /*bucket_filter=*/0, /*vector_search=*/nullptr); ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, CreateFileStoreScan(table_path, scan_filter, /*table_schema_id=*/0, /*snapshot_id=*/2)); @@ -205,9 +209,10 @@ TEST_F(KeyValueFileStoreScanTest, TestMaxSequenceNumber) { std::string table_path = paimon::test::GetDataDir() + "orc/pk_table_partial_update.db/pk_table_partial_update"; std::vector> partition_filters = {}; - auto scan_filter = std::make_shared(/*predicate=*/nullptr, - /*partition_filters=*/partition_filters, - /*bucket_filter=*/0); + auto scan_filter = + std::make_shared(/*predicate=*/nullptr, + /*partition_filters=*/partition_filters, + /*bucket_filter=*/0, /*vector_search=*/nullptr); ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, CreateFileStoreScan(table_path, scan_filter, /*table_schema_id=*/0, /*snapshot_id=*/2)); @@ -235,7 +240,7 @@ TEST_F(KeyValueFileStoreScanTest, TestSplitAndSetKeyValueFilter) { PredicateBuilder::And({not_equal, equal, greater_than, less_than})); auto scan_filter = std::make_shared(/*predicate=*/predicate, /*partition_filters=*/partition_filters, - /*bucket_filter=*/0); + /*bucket_filter=*/0, /*vector_search=*/nullptr); ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, CreateFileStoreScan(table_path, scan_filter, /*table_schema_id=*/0, /*snapshot_id=*/1)); diff --git a/src/paimon/core/operation/scan_context.cpp b/src/paimon/core/operation/scan_context.cpp index aaa2816c..e9e4d36f 100644 --- a/src/paimon/core/operation/scan_context.cpp +++ b/src/paimon/core/operation/scan_context.cpp @@ -54,6 +54,7 @@ class ScanContextBuilder::Impl { bucket_filter_ = std::nullopt; partition_filters_.clear(); predicates_.reset(); + vector_search_.reset(); global_index_result_.reset(); memory_pool_ = GetDefaultPool(); executor_ = CreateDefaultExecutor(); @@ -67,6 +68,7 @@ class ScanContextBuilder::Impl { std::optional bucket_filter_; std::vector> partition_filters_; std::shared_ptr predicates_; + std::shared_ptr vector_search_; std::shared_ptr global_index_result_; std::shared_ptr memory_pool_ = GetDefaultPool(); std::shared_ptr executor_ = CreateDefaultExecutor(); @@ -104,6 +106,12 @@ ScanContextBuilder& ScanContextBuilder::SetPredicate(const std::shared_ptr& vector_search) { + impl_->vector_search_ = vector_search; + return *this; +} + ScanContextBuilder& ScanContextBuilder::SetGlobalIndexResult( const std::shared_ptr& global_index_result) { impl_->global_index_result_ = global_index_result; @@ -141,7 +149,7 @@ Result> ScanContextBuilder::Finish() { auto ctx = std::make_unique( impl_->path_, impl_->is_streaming_mode_, impl_->limit_, std::make_shared(impl_->predicates_, impl_->partition_filters_, - impl_->bucket_filter_), + impl_->bucket_filter_, impl_->vector_search_), impl_->global_index_result_, impl_->memory_pool_, impl_->executor_, impl_->options_); impl_->Reset(); return ctx; diff --git a/src/paimon/core/operation/scan_context_test.cpp b/src/paimon/core/operation/scan_context_test.cpp index 3f7c52e4..98296099 100644 --- a/src/paimon/core/operation/scan_context_test.cpp +++ b/src/paimon/core/operation/scan_context_test.cpp @@ -35,6 +35,7 @@ TEST(ScanContextTest, TestSimple) { ASSERT_TRUE(ctx->GetScanFilters()); ASSERT_FALSE(ctx->GetScanFilters()->GetBucketFilter()); ASSERT_FALSE(ctx->GetScanFilters()->GetPredicate()); + ASSERT_FALSE(ctx->GetScanFilters()->GetVectorSearch()); ASSERT_TRUE(ctx->GetScanFilters()->GetPartitionFilters().empty()); ASSERT_FALSE(ctx->GetGlobalIndexResult()); } @@ -47,6 +48,9 @@ TEST(ScanContextTest, TestSetFilter) { auto predicate = PredicateBuilder::IsNull(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT); builder.SetPredicate(predicate); + std::vector query = {1.0, 2.0}; + VectorSearch::PreFilter pre_filter = [](int64_t id) -> bool { return id % 2; }; + builder.SetVectorSearch(std::make_shared("f0", 10, query, pre_filter, nullptr)); std::vector row_ranges = {Range(1, 2), Range(4, 5)}; auto global_index_result = BitmapGlobalIndexResult::FromRanges(row_ranges); builder.SetGlobalIndexResult(global_index_result); @@ -60,6 +64,9 @@ TEST(ScanContextTest, TestSetFilter) { ASSERT_TRUE(ctx->GetScanFilters()); ASSERT_EQ(10, ctx->GetScanFilters()->GetBucketFilter()); ASSERT_EQ(*predicate, *(ctx->GetScanFilters()->GetPredicate())); + auto result_vector_search = ctx->GetScanFilters()->GetVectorSearch(); + ASSERT_TRUE(result_vector_search); + ASSERT_EQ(query, result_vector_search->query); ASSERT_EQ(partition_filters, ctx->GetScanFilters()->GetPartitionFilters()); ASSERT_EQ("{1,2,4,5}", ctx->GetGlobalIndexResult()->ToString()); std::map expected_options = {{"key", "value"}}; diff --git a/src/paimon/core/table/source/data_evolution_batch_scan.cpp b/src/paimon/core/table/source/data_evolution_batch_scan.cpp index 8e1377f6..edd20db8 100644 --- a/src/paimon/core/table/source/data_evolution_batch_scan.cpp +++ b/src/paimon/core/table/source/data_evolution_batch_scan.cpp @@ -26,13 +26,15 @@ namespace paimon { DataEvolutionBatchScan::DataEvolutionBatchScan( const std::string& table_path, const std::shared_ptr& snapshot_reader, std::unique_ptr&& batch_scan, - const std::shared_ptr& global_index_result, const CoreOptions& core_options, + const std::shared_ptr& global_index_result, + const std::shared_ptr& vector_search, const CoreOptions& core_options, const std::shared_ptr& pool, const std::shared_ptr& executor) : AbstractTableScan(core_options, snapshot_reader), pool_(pool), table_path_(table_path), batch_scan_(std::move(batch_scan)), global_index_result_(global_index_result), + vector_search_(vector_search), executor_(executor) {} Result> DataEvolutionBatchScan::CreatePlan() { @@ -54,12 +56,13 @@ Result> DataEvolutionBatchScan::CreatePlan() { batch_scan_->WithRowRanges(row_ranges.value()); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_plan, batch_scan_->CreatePlan()); std::map id_to_score; - if (auto topk_result = - std::dynamic_pointer_cast(final_global_index_result)) { - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr topk_iter, - topk_result->CreateTopKIterator()); - while (topk_iter->HasNext()) { - auto [id, score] = topk_iter->NextWithScore(); + if (auto vector_search_result = + std::dynamic_pointer_cast(final_global_index_result)) { + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr vector_search_iter, + vector_search_result->CreateVectorSearchIterator()); + while (vector_search_iter->HasNext()) { + auto [id, score] = vector_search_iter->NextWithScore(); id_to_score[id] = score; } } @@ -108,7 +111,7 @@ Result> DataEvolutionBatchScan::WrapToIndexedSplits( Result>> DataEvolutionBatchScan::EvalGlobalIndex() const { auto predicate = batch_scan_->GetNonPartitionPredicate(); - if (!predicate) { + if (!predicate && !vector_search_) { return std::optional>(); } if (!core_options_.GlobalIndexEnabled()) { @@ -136,8 +139,9 @@ Result>> DataEvolutionBatchScan std::vector non_indexed_row_ranges = Range(0, next_row_id.value() - 1).Exclude(indexed_row_ranges); - PAIMON_ASSIGN_OR_RAISE(std::optional> index_result, - index_scan_impl->ParallelScan(indexed_row_ranges, predicate, executor_)); + PAIMON_ASSIGN_OR_RAISE( + std::optional> index_result, + index_scan_impl->ParallelScan(indexed_row_ranges, predicate, vector_search_, executor_)); if (!index_result) { return std::optional>(); } diff --git a/src/paimon/core/table/source/data_evolution_batch_scan.h b/src/paimon/core/table/source/data_evolution_batch_scan.h index 8367209e..450a9e3c 100644 --- a/src/paimon/core/table/source/data_evolution_batch_scan.h +++ b/src/paimon/core/table/source/data_evolution_batch_scan.h @@ -32,6 +32,7 @@ class DataEvolutionBatchScan : public AbstractTableScan { const std::shared_ptr& snapshot_reader, std::unique_ptr&& batch_scan, const std::shared_ptr& global_index_result, + const std::shared_ptr& vector_search, const CoreOptions& core_options, const std::shared_ptr& pool, const std::shared_ptr& executor); @@ -48,6 +49,7 @@ class DataEvolutionBatchScan : public AbstractTableScan { std::string table_path_; std::unique_ptr batch_scan_; std::shared_ptr global_index_result_; + std::shared_ptr vector_search_; std::shared_ptr executor_; }; diff --git a/src/paimon/core/table/source/table_scan.cpp b/src/paimon/core/table/source/table_scan.cpp index 62af3056..4c5a47d0 100644 --- a/src/paimon/core/table/source/table_scan.cpp +++ b/src/paimon/core/table/source/table_scan.cpp @@ -234,7 +234,8 @@ Result> TableScan::Create(std::unique_ptr( context->GetPath(), snapshot_reader, std::move(batch_scan), context->GetGlobalIndexResult(), - core_options, context->GetMemoryPool(), context->GetExecutor()); + context->GetScanFilters()->GetVectorSearch(), core_options, context->GetMemoryPool(), + context->GetExecutor()); } } // namespace paimon diff --git a/src/paimon/global_index/lumina/lumina_global_index.cpp b/src/paimon/global_index/lumina/lumina_global_index.cpp index 4c95e561..af886588 100644 --- a/src/paimon/global_index/lumina/lumina_global_index.cpp +++ b/src/paimon/global_index/lumina/lumina_global_index.cpp @@ -28,7 +28,7 @@ #include "lumina/core/Types.h" #include "paimon/common/utils/options_utils.h" #include "paimon/common/utils/string_utils.h" -#include "paimon/global_index/bitmap_topk_global_index_result.h" +#include "paimon/global_index/bitmap_vector_search_global_index_result.h" #include "paimon/global_index/lumina/lumina_file_reader.h" #include "paimon/global_index/lumina/lumina_file_writer.h" #include "paimon/global_index/lumina/lumina_utils.h" @@ -281,29 +281,29 @@ LuminaIndexReader::LuminaIndexReader( searcher_(std::move(searcher)), searcher_with_filter_(std::move(searcher_with_filter)) {} -Result> LuminaIndexReader::VisitTopK( - int32_t k, const std::vector& query, TopKPreFilter filter, - const std::shared_ptr& predicate) { - if (predicate) { - return Status::NotImplemented("lumina index not support predicate in VisitTopK"); +Result> LuminaIndexReader::VisitVectorSearch( + const std::shared_ptr& vector_search) { + if (vector_search->predicate) { + return Status::NotImplemented("lumina index not support predicate in VisitVectorSearch"); } auto search_options = search_options_; - search_options.Set(::lumina::core::kTopK, k); + search_options.Set(::lumina::core::kTopK, vector_search->limit); - ::lumina::api::Query lumina_query(query.data(), query.size()); + ::lumina::api::Query lumina_query(vector_search->query.data(), vector_search->query.size()); ::lumina::api::LuminaSearcher::SearchResult search_result; - if (!filter) { + if (!vector_search->pre_filter) { PAIMON_ASSIGN_OR_RAISE_FROM_LUMINA(search_result, searcher_->Search(lumina_query, search_options, *pool_)); } else { search_options.Set(::lumina::core::kSearchThreadSafeFilter, true); - auto lumina_filter = [filter](::lumina::core::VectorId id) -> bool { return filter(id); }; + auto lumina_filter = [filter = vector_search->pre_filter]( + ::lumina::core::VectorId id) -> bool { return filter(id); }; PAIMON_ASSIGN_OR_RAISE_FROM_LUMINA( search_result, searcher_with_filter_->SearchWithFilter(lumina_query, lumina_filter, search_options, *pool_)); } - // prepare BitmapTopKGlobalIndexResult + // prepare BitmapVectorSearchGlobalIndexResult std::map id_to_score; for (const auto& [id, score] : search_result.topk) { id_to_score[id] = score; @@ -316,7 +316,8 @@ Result> LuminaIndexReader::VisitTopK( bitmap.Add(id); scores.push_back(score); } - return std::make_shared(std::move(bitmap), std::move(scores)); + return std::make_shared(std::move(bitmap), + std::move(scores)); } } // namespace paimon::lumina diff --git a/src/paimon/global_index/lumina/lumina_global_index.h b/src/paimon/global_index/lumina/lumina_global_index.h index 4350a7d0..3c93557d 100644 --- a/src/paimon/global_index/lumina/lumina_global_index.h +++ b/src/paimon/global_index/lumina/lumina_global_index.h @@ -89,9 +89,12 @@ class LuminaIndexReader : public GlobalIndexReader { std::unique_ptr<::lumina::extensions::SearchWithFilterExtension>&& searcher_with_filter, const std::shared_ptr& pool); - Result> VisitTopK( - int32_t k, const std::vector& query, TopKPreFilter filter, - const std::shared_ptr& predicate) override; + ~LuminaIndexReader() override { + [[maybe_unused]] auto status = searcher_->Close(); + } + + Result> VisitVectorSearch( + const std::shared_ptr& vector_search) override; Result> VisitIsNotNull() override { return BitmapGlobalIndexResult::FromRanges({Range(0, range_end_)}); diff --git a/src/paimon/global_index/lumina/lumina_global_index_test.cpp b/src/paimon/global_index/lumina/lumina_global_index_test.cpp index 7f1e5056..095abfc2 100644 --- a/src/paimon/global_index/lumina/lumina_global_index_test.cpp +++ b/src/paimon/global_index/lumina/lumina_global_index_test.cpp @@ -28,7 +28,7 @@ #include "paimon/core/global_index/global_index_file_manager.h" #include "paimon/core/index/index_path_factory.h" #include "paimon/fs/local/local_file_system.h" -#include "paimon/global_index/bitmap_topk_global_index_result.h" +#include "paimon/global_index/bitmap_vector_search_global_index_result.h" #include "paimon/global_index/global_index_result.h" #include "paimon/predicate/predicate_builder.h" #include "paimon/testing/utils/testharness.h" @@ -93,10 +93,10 @@ class LuminaGlobalIndexTest : public ::testing::Test { return result_metas[0]; } - void CheckResult(const std::shared_ptr& result, + void CheckResult(const std::shared_ptr& result, const std::vector& expected_ids, const std::vector& expected_scores) const { - auto typed_result = std::dynamic_pointer_cast(result); + auto typed_result = std::dynamic_pointer_cast(result); ASSERT_TRUE(typed_result); ASSERT_OK_AND_ASSIGN(const RoaringBitmap64* bitmap, typed_result->GetBitmap()); ASSERT_TRUE(bitmap); @@ -188,17 +188,19 @@ TEST_F(LuminaGlobalIndexTest, TestSimple) { CreateGlobalIndexReader(test_root, data_type_, options_, meta)); { // recall all data - ASSERT_OK_AND_ASSIGN(auto topk_result, - reader->VisitTopK(/*k=*/4, query_, /*filter=*/nullptr, - /*predicate*/ nullptr)); - CheckResult(topk_result, {3l, 1l, 2l, 0l}, {0.01f, 2.01f, 2.21f, 4.21f}); + ASSERT_OK_AND_ASSIGN(auto vector_search_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/4, query_, /*filter=*/nullptr, + /*predicate=*/nullptr))); + CheckResult(vector_search_result, {3l, 1l, 2l, 0l}, {0.01f, 2.01f, 2.21f, 4.21f}); } { - // small topk - ASSERT_OK_AND_ASSIGN(auto topk_result, - reader->VisitTopK(/*k=*/3, query_, /*filter=*/nullptr, - /*predicate*/ nullptr)); - CheckResult(topk_result, {3l, 1l, 2l}, {0.01f, 2.01f, 2.21f}); + // small limit + ASSERT_OK_AND_ASSIGN(auto vector_search_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/3, query_, /*filter=*/nullptr, + /*predicate=*/nullptr))); + CheckResult(vector_search_result, {3l, 1l, 2l}, {0.01f, 2.01f, 2.21f}); } { // visit equal will return all rows @@ -217,22 +219,27 @@ TEST_F(LuminaGlobalIndexTest, TestWithFilter) { ASSERT_OK_AND_ASSIGN(auto reader, CreateGlobalIndexReader(test_root, data_type_, options_, meta)); { - ASSERT_OK_AND_ASSIGN(auto topk_result, - reader->VisitTopK(/*k=*/2, query_, /*filter=*/nullptr, - /*predicate*/ nullptr)); - CheckResult(topk_result, {3l, 1l}, {0.01f, 2.01f}); + ASSERT_OK_AND_ASSIGN(auto vector_search_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/2, query_, /*filter=*/nullptr, + /*predicate=*/nullptr))); + CheckResult(vector_search_result, {3l, 1l}, {0.01f, 2.01f}); } { auto filter = [](int64_t id) -> bool { return id < 3; }; - ASSERT_OK_AND_ASSIGN(auto topk_result, reader->VisitTopK(/*k=*/2, query_, filter, - /*predicate*/ nullptr)); - CheckResult(topk_result, {1l, 2l}, {2.01f, 2.21f}); + ASSERT_OK_AND_ASSIGN(auto vector_search_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/2, query_, filter, + /*predicate=*/nullptr))); + CheckResult(vector_search_result, {1l, 2l}, {2.01f, 2.21f}); } { auto filter = [](int64_t id) -> bool { return id < 3; }; - ASSERT_OK_AND_ASSIGN(auto topk_result, reader->VisitTopK(/*k=*/4, query_, filter, - /*predicate*/ nullptr)); - CheckResult(topk_result, {1l, 2l, 0l}, {2.01f, 2.21f, 4.21f}); + ASSERT_OK_AND_ASSIGN(auto vector_search_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/4, query_, filter, + /*predicate=*/nullptr))); + CheckResult(vector_search_result, {1l, 2l, 0l}, {2.01f, 2.21f, 4.21f}); } } @@ -374,11 +381,12 @@ TEST_F(LuminaGlobalIndexTest, TestInvalidInputs) { { ASSERT_OK_AND_ASSIGN(auto reader, CreateGlobalIndexReader(index_root, data_type_, options_, meta)); - ASSERT_NOK_WITH_MSG( - reader->VisitTopK(/*k=*/2, query_, /*filter=*/nullptr, - PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f01", - FieldType::BIGINT, Literal(5l))), - "lumina index not support predicate in VisitTopK"); + ASSERT_NOK_WITH_MSG(reader->VisitVectorSearch(std::make_shared( + "f1", + /*limit=*/2, query_, /*filter=*/nullptr, + PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f01", + FieldType::BIGINT, Literal(5l)))), + "lumina index not support predicate in VisitVectorSearch"); } } } @@ -397,20 +405,26 @@ TEST_F(LuminaGlobalIndexTest, TestHighCardinalityAndMultiThreadSearch) { CreateGlobalIndexReader(test_root, data_type_, options_, meta)); auto search_with_filter = [&]() { - int32_t k = paimon::test::RandomNumber(0, 99); + int32_t limit = paimon::test::RandomNumber(0, 99); auto filter = [](int64_t id) -> bool { return id % 2; }; - ASSERT_OK_AND_ASSIGN(auto topk_result, reader->VisitTopK(k, query_, filter, - /*predicate*/ nullptr)); - auto typed_result = std::dynamic_pointer_cast(topk_result); + ASSERT_OK_AND_ASSIGN( + auto vector_search_result, + reader->VisitVectorSearch(std::make_shared("f0", limit, query_, filter, + /*predicate=*/nullptr))); + auto typed_result = + std::dynamic_pointer_cast(vector_search_result); ASSERT_TRUE(typed_result); - ASSERT_EQ(typed_result->bitmap_.Cardinality(), k); + ASSERT_EQ(typed_result->bitmap_.Cardinality(), limit); }; auto search = [&]() { int32_t k = paimon::test::RandomNumber(0, 99); - ASSERT_OK_AND_ASSIGN(auto topk_result, reader->VisitTopK(k, query_, /*filter=*/nullptr, - /*predicate*/ nullptr)); - auto typed_result = std::dynamic_pointer_cast(topk_result); + ASSERT_OK_AND_ASSIGN(auto vector_search_result, + reader->VisitVectorSearch( + std::make_shared("f0", k, query_, /*filter=*/nullptr, + /*predicate=*/nullptr))); + auto typed_result = + std::dynamic_pointer_cast(vector_search_result); ASSERT_TRUE(typed_result); ASSERT_EQ(typed_result->bitmap_.Cardinality(), k); }; diff --git a/test/inte/global_index_test.cpp b/test/inte/global_index_test.cpp index ff99cba4..338d6978 100644 --- a/test/inte/global_index_test.cpp +++ b/test/inte/global_index_test.cpp @@ -25,7 +25,7 @@ #include "paimon/defs.h" #include "paimon/fs/file_system.h" #include "paimon/global_index/bitmap_global_index_result.h" -#include "paimon/global_index/bitmap_topk_global_index_result.h" +#include "paimon/global_index/bitmap_vector_search_global_index_result.h" #include "paimon/global_index/global_index_scan.h" #include "paimon/global_index/global_index_write_task.h" #include "paimon/predicate/literal.h" @@ -141,11 +141,14 @@ class GlobalIndexTest : public ::testing::Test, public ::testing::WithParamInter Result> ScanGlobalIndexAndData( const std::string& table_path, const std::shared_ptr& predicate, + const std::shared_ptr& vector_search = nullptr, const std::map& options = {}, const std::shared_ptr& index_result = nullptr) const { ScanContextBuilder scan_context_builder(table_path); - scan_context_builder.SetPredicate(predicate).SetOptions(options).SetGlobalIndexResult( - index_result); + scan_context_builder.SetPredicate(predicate) + .SetVectorSearch(vector_search) + .SetOptions(options) + .SetGlobalIndexResult(index_result); PAIMON_ASSIGN_OR_RAISE(auto scan_context, scan_context_builder.Finish()); PAIMON_ASSIGN_OR_RAISE(auto table_scan, TableScan::Create(std::move(scan_context))); PAIMON_ASSIGN_OR_RAISE(auto result_plan, table_scan->CreatePlan()); @@ -153,8 +156,8 @@ class GlobalIndexTest : public ::testing::Test, public ::testing::WithParamInter } Result> ScanDataWithIndexResult( - const std::string& table_path, const std::shared_ptr& predicate, - const std::vector& row_ranges, const std::map& id_to_score) const { + const std::string& table_path, const std::vector& row_ranges, + const std::map& id_to_score) const { std::shared_ptr index_result; if (id_to_score.empty()) { index_result = BitmapGlobalIndexResult::FromRanges(row_ranges); @@ -167,10 +170,11 @@ class GlobalIndexTest : public ::testing::Test, public ::testing::WithParamInter for (auto iter = bitmap.Begin(); iter != bitmap.End(); ++iter) { scores.push_back(id_to_score.at(*iter)); } - index_result = - std::make_shared(std::move(bitmap), std::move(scores)); + index_result = std::make_shared(std::move(bitmap), + std::move(scores)); } - return ScanGlobalIndexAndData(table_path, predicate, /*options=*/{}, index_result); + return ScanGlobalIndexAndData(table_path, /*predicate=*/nullptr, /*vector_search=*/nullptr, + /*options=*/{}, index_result); } Status ReadData(const std::string& table_path, const std::vector& read_schema, @@ -194,7 +198,8 @@ class GlobalIndexTest : public ::testing::Test, public ::testing::WithParamInter return Status::OK(); } auto expected_chunk_array = std::make_shared(expected_array); - if (!expected_chunk_array->ApproxEquals(*read_result)) { + if (!expected_chunk_array->ApproxEquals(*read_result, + arrow::EqualOptions::Defaults().atol(1E-2))) { std::cout << "result=" << read_result->ToString() << std::endl << "expected=" << expected_chunk_array->ToString() << std::endl; return Status::Invalid("expected array and result array not equal"); @@ -435,7 +440,8 @@ TEST_P(GlobalIndexTest, TestScanIndex) { ASSERT_OK_AND_ASSIGN(auto evaluator, scanner_impl->CreateIndexEvaluator()); { // test with non predicate - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(nullptr)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(/*predicate=*/nullptr, /*vector_search=*/nullptr)); ASSERT_FALSE(index_result); } { @@ -443,7 +449,8 @@ TEST_P(GlobalIndexTest, TestScanIndex) { auto predicate = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, Literal(FieldType::STRING, "Alice", 5)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0,7}"); } { @@ -451,35 +458,40 @@ TEST_P(GlobalIndexTest, TestScanIndex) { auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, Literal(FieldType::STRING, "Alice", 5)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{1,2,3,4,5,6}"); } { // test equal predicate for f1 auto predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, Literal(20)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{4,6,7}"); } { // test equal predicate for f2 auto predicate = PredicateBuilder::Equal(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT, Literal(1)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0,1,4,5}"); } { // test is null predicate auto predicate = PredicateBuilder::IsNull(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{7}"); } { // test is not null predicate auto predicate = PredicateBuilder::IsNotNull(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0,1,2,3,4,5,6}"); } { @@ -488,7 +500,8 @@ TEST_P(GlobalIndexTest, TestScanIndex) { /*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, {Literal(FieldType::STRING, "Alice", 5), Literal(FieldType::STRING, "Bob", 3), Literal(FieldType::STRING, "Lucy", 4)}); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0,1,4,5,7}"); } { @@ -497,7 +510,8 @@ TEST_P(GlobalIndexTest, TestScanIndex) { /*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, {Literal(FieldType::STRING, "Alice", 5), Literal(FieldType::STRING, "Bob", 3), Literal(FieldType::STRING, "Lucy", 4)}); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{2,3,6}"); } { @@ -508,7 +522,8 @@ TEST_P(GlobalIndexTest, TestScanIndex) { auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, Literal(20)); ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({f0_predicate, f1_predicate})); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{7}"); } { @@ -519,14 +534,16 @@ TEST_P(GlobalIndexTest, TestScanIndex) { auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, Literal(20)); ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::Or({f0_predicate, f1_predicate})); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0,4,6,7}"); } { // test non-result auto predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, Literal(30)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{}"); } { @@ -541,42 +558,48 @@ TEST_P(GlobalIndexTest, TestScanIndex) { ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({f1_predicate, f2_predicate, f0_predicate})); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{}"); } { // test greater than predicate which bitmap index is not support, will return all range auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, Literal(10)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0,1,2,3,4,5,6,7}"); } { // test greater or equal predicate which bitmap index is not support, will return all range auto predicate = PredicateBuilder::GreaterOrEqual(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, Literal(10)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0,1,2,3,4,5,6,7}"); } { // test less than predicate which bitmap index is not support, will return all range auto predicate = PredicateBuilder::LessThan(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, Literal(10)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0,1,2,3,4,5,6,7}"); } { // test less or equal predicate which bitmap index is not support, will return all range auto predicate = PredicateBuilder::LessOrEqual(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, Literal(10)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0,1,2,3,4,5,6,7}"); } { // test a predicate for field with no index auto f3_predicate = PredicateBuilder::Equal(/*field_index=*/3, /*field_name=*/"f3", FieldType::DOUBLE, Literal(1.2)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(f3_predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(f3_predicate, /*vector_search=*/nullptr)); ASSERT_FALSE(index_result); } } @@ -618,7 +641,8 @@ TEST_P(GlobalIndexTest, TestScanIndexWithSpecificSnapshot) { auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, Literal(20)); ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({f0_predicate, f1_predicate})); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0,7}"); } { @@ -629,7 +653,8 @@ TEST_P(GlobalIndexTest, TestScanIndexWithSpecificSnapshot) { auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, Literal(20)); ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::Or({f0_predicate, f1_predicate})); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_FALSE(index_result); } } @@ -661,7 +686,8 @@ TEST_P(GlobalIndexTest, TestScanIndexWithSpecificSnapshotWithNoIndex) { auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, Literal(FieldType::STRING, "Alice", 5)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_FALSE(index_result); } @@ -702,7 +728,8 @@ TEST_P(GlobalIndexTest, TestScanIndexWithRange) { auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, Literal(FieldType::STRING, "Alice", 5)); - ASSERT_OK_AND_ASSIGN(auto evaluator_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto evaluator_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(evaluator_result.value()->ToString(), "{1,2,3,4,5,6}"); } { @@ -719,7 +746,8 @@ TEST_P(GlobalIndexTest, TestScanIndexWithRange) { auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, Literal(FieldType::STRING, "Alice", 5)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_FALSE(index_result); } } @@ -760,7 +788,8 @@ TEST_P(GlobalIndexTest, TestScanIndexWithPartition) { auto predicate = PredicateBuilder::Equal(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT, Literal(1)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_FALSE(index_result); } { @@ -768,7 +797,8 @@ TEST_P(GlobalIndexTest, TestScanIndexWithPartition) { auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, Literal(FieldType::STRING, "Bob", 3)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0,2,3}"); } { @@ -776,7 +806,8 @@ TEST_P(GlobalIndexTest, TestScanIndexWithPartition) { auto predicate = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, Literal(FieldType::STRING, "Alice", 5)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_EQ(index_result.value()->ToString(), "{0}"); } }; @@ -813,7 +844,8 @@ TEST_P(GlobalIndexTest, TestScanUnregisteredIndex) { PredicateBuilder::NotEqual(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, Literal(FieldType::STRING, "Bob", 3)); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_FALSE(index_result); } @@ -914,9 +946,8 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithPartition) { write_data_and_index(src_array2, {{"f2", "20"}}, Range(4, 8)); auto scan_and_check_result = [&](const std::map& partition, - const Range& expected_range, - GlobalIndexReader::TopKPreFilter filter, int32_t k, - const std::string& bitmap_result, + const Range& expected_range, VectorSearch::PreFilter filter, + int32_t limit, const std::string& bitmap_result, const std::string& lumina_result, const std::vector& read_row_ranges, const std::shared_ptr& expected_array, @@ -946,7 +977,8 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithPartition) { Literal(FieldType::STRING, "Paul", 4)); ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::Or({predicate1, predicate2})); - ASSERT_OK_AND_ASSIGN(auto index_result, evaluator->Evaluate(predicate)); + ASSERT_OK_AND_ASSIGN(auto index_result, + evaluator->Evaluate(predicate, /*vector_search=*/nullptr)); ASSERT_TRUE(index_result); ASSERT_EQ(index_result.value()->ToString(), bitmap_result); @@ -954,15 +986,26 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithPartition) { ASSERT_OK_AND_ASSIGN(auto lumina_reader, range_scanner->CreateReader("f1", "lumina")); std::vector query = {1.0f, 1.0f, 1.0f, 1.1f}; - ASSERT_OK_AND_ASSIGN(auto topk_result, lumina_reader->VisitTopK(k, query, filter, - /*predicate*/ nullptr)); - ASSERT_EQ(topk_result->ToString(), lumina_result); + auto vector_search = std::make_shared("f1", limit, query, filter, + /*predicate=*/nullptr); + ASSERT_OK_AND_ASSIGN(auto vector_search_result, + lumina_reader->VisitVectorSearch(vector_search)); + ASSERT_EQ(vector_search_result->ToString(), lumina_result); + + // check evaluate predicate and vector search + auto vector_search_without_filter = vector_search->ReplacePreFilter(nullptr); + ASSERT_OK_AND_ASSIGN(auto compound_index_result, + evaluator->Evaluate(predicate, vector_search_without_filter)); + ASSERT_TRUE(compound_index_result); + ASSERT_EQ(compound_index_result.value()->ToString(), lumina_result); // check read array std::vector read_field_names = schema->field_names(); read_field_names.push_back("_INDEX_SCORE"); - ASSERT_OK_AND_ASSIGN(auto result_with_offset, topk_result->AddOffset(expected_range.from)); + ASSERT_OK_AND_ASSIGN(auto result_with_offset, + compound_index_result.value()->AddOffset(expected_range.from)); ASSERT_OK_AND_ASSIGN(auto plan, ScanGlobalIndexAndData(table_path, /*predicate=*/nullptr, + /*vector_search=*/nullptr, /*options=*/{}, result_with_offset)); ASSERT_OK(ReadData(table_path, read_field_names, expected_array, /*predicate=*/nullptr, plan)); @@ -983,7 +1026,7 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithPartition) { [0, "Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1, 4.21] ])") .ValueOrDie(); - scan_and_check_result({{"f2", "10"}}, Range(0, 3), filter, /*k=*/2, "{0}", + scan_and_check_result({{"f2", "10"}}, Range(0, 3), filter, /*limit=*/2, "{0}", "row ids: {0}, scores: {4.21}", {Range(0, 0)}, expected_array, id_to_score1); } @@ -995,7 +1038,7 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithPartition) { [0, "Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1, 322.21] ])") .ValueOrDie(); - scan_and_check_result({{"f2", "20"}}, Range(4, 8), filter, /*k=*/1, "{3,4}", + scan_and_check_result({{"f2", "20"}}, Range(4, 8), filter, /*limit=*/1, "{3,4}", "row ids: {4}, scores: {322.21}", {Range(4, 4)}, expected_array, id_to_score2); } @@ -1066,8 +1109,8 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { // check read array std::vector read_field_names = schema->field_names(); read_field_names.push_back("_INDEX_SCORE"); - ASSERT_OK_AND_ASSIGN(auto plan, ScanDataWithIndexResult(table_path, /*predicate=*/nullptr, - read_row_ranges, id_to_score)); + ASSERT_OK_AND_ASSIGN(auto plan, + ScanDataWithIndexResult(table_path, read_row_ranges, id_to_score)); ASSERT_OK(ReadData(table_path, read_field_names, expected_array, /*predicate=*/nullptr, plan)); }; @@ -1236,12 +1279,212 @@ TEST_P(GlobalIndexTest, TestDataEvolutionBatchScan) { PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, Literal(FieldType::STRING, "Alice", 5)); ASSERT_OK_AND_ASSIGN( - auto plan, - ScanGlobalIndexAndData(table_path, predicate, {{"global-index.enabled", "false"}})); + auto plan, ScanGlobalIndexAndData(table_path, predicate, /*vector_search=*/nullptr, + {{"global-index.enabled", "false"}})); ASSERT_OK(ReadData(table_path, write_cols, expected_all_array, predicate, plan)); } } +TEST_P(GlobalIndexTest, TestDataEvolutionBatchScanWithVectorSearch) { + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::list(arrow::float32())), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + std::map lumina_options = { + {"lumina.dimension", "4"}, + {"lumina.indextype", "bruteforce"}, + {"lumina.distance.metric", "l2"}, + {"lumina.encoding.type", "encoding.rawf32"}, + {"lumina.search.threadcount", "10"}}; + auto schema = arrow::schema(fields); + std::map options = {{Options::MANIFEST_FORMAT, "orc"}, + {Options::FILE_FORMAT, GetParam()}, + {Options::FILE_SYSTEM, "local"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}}; + CreateTable(/*partition_keys=*/{}, schema, options); + + std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar"); + std::vector write_cols = schema->field_names(); + + auto src_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ +["Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1], +["Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1], +["Emily", [1.0, 0.0, 1.0, 0.0], 10, 13.1], +["Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1], +["Lucy", [10.0, 10.0, 10.0, 10.0], 20, 15.1], +["Bob", [10.0, 11.0, 10.0, 11.0], 20, 16.1], +["Tony", [11.0, 10.0, 11.0, 10.0], 20, 17.1], +["Alice", [11.0, 11.0, 11.0, 11.0], 20, 18.1], +["Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1] + ])") + .ValueOrDie()); + ASSERT_OK_AND_ASSIGN(auto commit_msgs, WriteArray(table_path, write_cols, src_array)); + ASSERT_OK(Commit(table_path, commit_msgs)); + + auto result_fields = fields; + result_fields.insert(result_fields.begin(), SpecialFields::ValueKind().ArrowField()); + { + // read when no index is built + auto predicate = + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, "Alice", 5)); + auto vector_search = std::make_shared( + "f1", /*limit=*/1, std::vector({1.0f, 1.0f, 1.0f, 1.1f}), /*filter=*/nullptr, + /*predicate=*/nullptr); + ASSERT_OK_AND_ASSIGN(auto plan, ScanGlobalIndexAndData(table_path, predicate, vector_search, + lumina_options)); + + auto expected_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([ +[0, "Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1], +[0, "Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1], +[0, "Emily", [1.0, 0.0, 1.0, 0.0], 10, 13.1], +[0, "Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1], +[0, "Lucy", [10.0, 10.0, 10.0, 10.0], 20, 15.1], +[0, "Bob", [10.0, 11.0, 10.0, 11.0], 20, 16.1], +[0, "Tony", [11.0, 10.0, 11.0, 10.0], 20, 17.1], +[0, "Alice", [11.0, 11.0, 11.0, 11.0], 20, 18.1], +[0, "Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1] + ])") + .ValueOrDie(); + ASSERT_OK(ReadData(table_path, write_cols, expected_array, predicate, plan)); + } + + // write and commit bitmap global index + ASSERT_OK(WriteIndex(table_path, /*partition_filters=*/{}, "f0", "bitmap", /*options=*/{}, + Range(0, 8))); + + auto read_cols = write_cols; + read_cols.push_back("_INDEX_SCORE"); + result_fields.insert(result_fields.end(), SpecialFields::IndexScore().ArrowField()); + { + // read when only bitmap index is built + auto predicate = + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, "Alice", 5)); + auto vector_search = std::make_shared( + "f1", /*limit=*/1, std::vector({1.0f, 1.0f, 1.0f, 1.1f}), /*filter=*/nullptr, + /*predicate=*/nullptr); + ASSERT_OK_AND_ASSIGN(auto plan, ScanGlobalIndexAndData(table_path, predicate, vector_search, + lumina_options)); + + auto expected_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([ +[0, "Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1, null], +[0, "Alice", [11.0, 11.0, 11.0, 11.0], 20, 18.1, null] + ])") + .ValueOrDie(); + ASSERT_OK(ReadData(table_path, read_cols, expected_array, predicate, plan)); + } + + // write and commit lumina global index + ASSERT_OK(WriteIndex(table_path, /*partition_filters=*/{}, "f1", "lumina", + /*options=*/lumina_options, Range(0, 8))); + + // scan and read with global index + { + auto predicate = + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, "Alice", 5)); + auto vector_search = std::make_shared( + "f1", /*limit=*/1, std::vector({1.0f, 1.0f, 1.0f, 1.1f}), /*filter=*/nullptr, + /*predicate=*/nullptr); + ASSERT_OK_AND_ASSIGN(auto plan, ScanGlobalIndexAndData(table_path, predicate, vector_search, + lumina_options)); + + auto expected_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([ +[0, "Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1, 4.21] + ])") + .ValueOrDie(); + ASSERT_OK(ReadData(table_path, read_cols, expected_array, predicate, plan)); + } + { + auto predicate = + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, "Alice", 5)); + auto vector_search = std::make_shared( + "f1", /*limit=*/3, std::vector({1.0f, 1.0f, 1.0f, 1.1f}), /*filter=*/nullptr, + /*predicate=*/nullptr); + ASSERT_OK_AND_ASSIGN(auto plan, ScanGlobalIndexAndData(table_path, predicate, vector_search, + lumina_options)); + + auto expected_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([ +[0, "Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1, 4.21], +[0, "Alice", [11.0, 11.0, 11.0, 11.0], 20, 18.1, 398.01] + ])") + .ValueOrDie(); + ASSERT_OK(ReadData(table_path, read_cols, expected_array, predicate, plan)); + } + { + auto predicate = + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, "Bob", 3)); + auto vector_search = std::make_shared( + "f1", /*limit=*/3, std::vector({1.0f, 1.0f, 1.0f, 1.1f}), /*filter=*/nullptr, + /*predicate=*/nullptr); + ASSERT_OK_AND_ASSIGN(auto plan, ScanGlobalIndexAndData(table_path, predicate, vector_search, + lumina_options)); + + auto expected_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([ +[0, "Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1, 2.01], +[0, "Bob", [10.0, 11.0, 10.0, 11.0], 20, 16.1, 360.01] + ])") + .ValueOrDie(); + ASSERT_OK(ReadData(table_path, read_cols, expected_array, predicate, plan)); + } + { + // test only has vector search with pre_filter + auto vector_search = std::make_shared( + "f1", /*limit=*/3, std::vector({1.0f, 1.0f, 1.0f, 1.1f}), + /*filter=*/[](int64_t row_id) { return row_id == 1 || row_id == 5; }, + /*predicate=*/nullptr); + ASSERT_OK_AND_ASSIGN(auto plan, ScanGlobalIndexAndData(table_path, /*predicate=*/nullptr, + vector_search, lumina_options)); + + auto expected_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([ +[0, "Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1, 2.01], +[0, "Bob", [10.0, 11.0, 10.0, 11.0], 20, 16.1, 360.01] + ])") + .ValueOrDie(); + ASSERT_OK(ReadData(table_path, read_cols, expected_array, /*predicate=*/nullptr, plan)); + } + { + // test only has vector search with no pre_filter + auto vector_search = std::make_shared( + "f1", /*limit=*/2, std::vector({1.0f, 1.0f, 1.0f, 1.1f}), + /*filter=*/nullptr, + /*predicate=*/nullptr); + ASSERT_OK_AND_ASSIGN(auto plan, ScanGlobalIndexAndData(table_path, /*predicate=*/nullptr, + vector_search, lumina_options)); + + auto expected_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([ +[0, "Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1, 2.01], +[0, "Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1, 0.01] + ])") + .ValueOrDie(); + ASSERT_OK(ReadData(table_path, read_cols, expected_array, /*predicate=*/nullptr, plan)); + } + { + // test invalid vector search + auto predicate = + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, "Bob", 3)); + auto vector_search = std::make_shared( + "f1", /*limit=*/3, std::vector({1.0f, 1.0f, 1.0f, 1.1f}), + /*filter=*/[](int64_t row_id) { return true; }, + /*predicate=*/nullptr); + ASSERT_NOK_WITH_MSG( + ScanGlobalIndexAndData(table_path, predicate, vector_search, lumina_options), + "Predicate result and pre_filter in VectorSearch conflict"); + } +} + TEST_P(GlobalIndexTest, TestDataEvolutionBatchScanWithOnlyOnePartitionHasIndex) { CreateTable(/*partition_keys=*/{"f1"}); std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar"); @@ -1738,9 +1981,11 @@ TEST_P(GlobalIndexTest, TestScanIndexWithTwoIndexes) { ASSERT_OK_AND_ASSIGN(index_readers, range_scanner->CreateReaders("f1")); ASSERT_EQ(index_readers.size(), 1); std::vector query = {11.0f, 11.0f, 11.0f, 11.0f}; - ASSERT_OK_AND_ASSIGN(auto topk_result, index_readers[0]->VisitTopK(1, query, /*filter=*/nullptr, - /*predicate*/ nullptr)); - ASSERT_EQ(topk_result->ToString(), "row ids: {7}, scores: {0.00}"); + ASSERT_OK_AND_ASSIGN(auto vector_search_result, + index_readers[0]->VisitVectorSearch( + std::make_shared("f1", 1, query, /*filter=*/nullptr, + /*predicate*/ nullptr))); + ASSERT_EQ(vector_search_result->ToString(), "row ids: {7}, scores: {0.00}"); // query f2 ASSERT_OK_AND_ASSIGN(index_readers, range_scanner->CreateReaders("f2")); @@ -1760,7 +2005,7 @@ TEST_P(GlobalIndexTest, TestIOException) { auto src_array = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ ["Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1], ["Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1], -["Emily", [1.0, 0.0, 1.0, 0.0], 10, 13.1], +["Alice", [1.0, 0.0, 1.0, 0.0], 10, 13.1], ["Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1] ])") .ValueOrDie(); @@ -1804,59 +2049,40 @@ TEST_P(GlobalIndexTest, TestIOException) { } ASSERT_TRUE(write_run_complete); - // read for bitmap + // read for bitmap and lumina bool read_run_complete = false; for (size_t i = 0; i < 2000; i += paimon::test::RandomNumber(20, 30)) { ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); + + auto result_fields = fields; + result_fields.insert(result_fields.begin(), SpecialFields::ValueKind().ArrowField()); + result_fields.insert(result_fields.end(), SpecialFields::IndexScore().ArrowField()); + auto read_cols = write_cols; + read_cols.push_back("_INDEX_SCORE"); + auto predicate = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, Literal(FieldType::STRING, "Alice", 5)); - auto result_fields = fields; - result_fields.insert(result_fields.begin(), SpecialFields::ValueKind().ArrowField()); + auto vector_search = std::make_shared( + "f1", /*limit=*/1, std::vector({1.0f, 1.0f, 1.0f, 1.1f}), /*filter=*/nullptr, + /*predicate*/ nullptr); auto expected_array = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([ -[0, "Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1] +[0, "Alice", [1.0, 0.0, 1.0, 0.0], 10, 13.1, 2.21] ])") .ValueOrDie(); - auto plan_result = ScanGlobalIndexAndData(table_path, predicate); - CHECK_HOOK_STATUS(plan_result.status(), i); + auto plan_result = + ScanGlobalIndexAndData(table_path, predicate, vector_search, lumina_options); + CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(plan_result.status()); auto plan = std::move(plan_result).value(); - auto read_status = ReadData(table_path, write_cols, expected_array, predicate, plan); + auto read_status = ReadData(table_path, read_cols, expected_array, predicate, plan); CHECK_HOOK_STATUS(read_status, i); read_run_complete = true; break; } ASSERT_TRUE(read_run_complete); - - // read for lumina - read_run_complete = false; - for (size_t i = 0; i < 2000; i += paimon::test::RandomNumber(20, 30)) { - ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); - io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); - auto global_index_scan_result = - GlobalIndexScan::Create(table_path, /*snapshot_id=*/std::nullopt, - /*partitions=*/std::nullopt, lumina_options, - /*file_system=*/nullptr, pool_); - CHECK_HOOK_STATUS(global_index_scan_result.status(), i); - auto global_index_scan = std::move(global_index_scan_result).value(); - auto range_scanner_result = global_index_scan->CreateRangeScan(Range(0, 3)); - CHECK_HOOK_STATUS(range_scanner_result.status(), i); - auto range_scanner = std::move(range_scanner_result).value(); - auto lumina_reader_result = range_scanner->CreateReader("f1", "lumina"); - CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(lumina_reader_result.status()); - auto lumina_reader = std::move(lumina_reader_result).value(); - - std::vector query = {1.0f, 1.0f, 1.0f, 1.1f}; - auto topk_result = lumina_reader->VisitTopK(1, query, /*filter=*/nullptr, - /*predicate*/ nullptr); - CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(topk_result.status()); - ASSERT_EQ(topk_result.value()->ToString(), "row ids: {3}, scores: {0.01}"); - read_run_complete = true; - break; - } - ASSERT_TRUE(read_run_complete); } std::vector GetTestValuesForGlobalIndexTest() {