diff --git a/include/paimon/global_index/global_index_scan.h b/include/paimon/global_index/global_index_scan.h index ff2c976fc..7aeaec970 100644 --- a/include/paimon/global_index/global_index_scan.h +++ b/include/paimon/global_index/global_index_scan.h @@ -97,6 +97,22 @@ class PAIMON_EXPORT GlobalIndexScan { const std::string& field_name, const std::optional& row_range_index) const = 0; + /// Creates a `GlobalIndexReader` for a specific field with a specific index type. + /// @param field_name Name of the indexed column. + /// @param index_type Name of index type. + /// @param row_range_index Optional row range that limits the scan to a sub-range of row ids. + /// If not provided, the entire row range is considered. + /// @return A `Result` that is: + /// - Successful with a reader(with global row id) if the index exists and loads + /// correctly; + /// - Successful with nullptr if no index was built for the given field with the given + /// index type; + /// - Error returns when loading fails (e.g., file corruption, I/O error, + /// unsupported format). + virtual Result> CreateReader( + const std::string& field_name, const std::string& index_type, + const std::optional& row_range_index) const = 0; + /// Creates several `GlobalIndexReader`s for a specific field (looked up by id), /// @param field_id Field id of the indexed column. /// @param row_range_index Optional row range that limits the scan to a sub-range of row ids. diff --git a/src/paimon/common/executor/default_executor_test.cpp b/src/paimon/common/executor/default_executor_test.cpp index 65d923dc9..638ac06b5 100644 --- a/src/paimon/common/executor/default_executor_test.cpp +++ b/src/paimon/common/executor/default_executor_test.cpp @@ -122,4 +122,61 @@ TEST(DefaultExecutorTest, TestAddTaskAfterShutdownNowIgnored) { ASSERT_EQ(executed_count.load(), 0); } +TEST(DefaultExecutorTest, TestAddTaskFromMultipleThreads) { + auto executor = CreateDefaultExecutor(/*thread_count=*/4); + + constexpr int32_t kSubmitterCount = 8; + constexpr int32_t kTaskCountPerSubmitter = 64; + constexpr int32_t kTotalTaskCount = kSubmitterCount * kTaskCountPerSubmitter; + + std::vector> executed_slots(kTotalTaskCount); + for (auto& executed_slot : executed_slots) { + executed_slot.store(0); + } + std::vector> task_promises(kTotalTaskCount); + std::vector> task_futures; + task_futures.reserve(kTotalTaskCount); + for (auto& task_promise : task_promises) { + task_futures.push_back(task_promise.get_future()); + } + std::atomic ready_submitter_count = 0; + std::atomic executed_count = 0; + std::promise start_signal; + std::shared_future start_future = start_signal.get_future().share(); + std::vector submitters; + submitters.reserve(kSubmitterCount); + + for (int32_t submitter_index = 0; submitter_index < kSubmitterCount; ++submitter_index) { + submitters.emplace_back([&, submitter_index]() { + ++ready_submitter_count; + start_future.wait(); + for (int32_t task_index = 0; task_index < kTaskCountPerSubmitter; ++task_index) { + const int32_t slot_index = submitter_index * kTaskCountPerSubmitter + task_index; + executor->Add([&, slot_index]() { + ++executed_slots[slot_index]; + ++executed_count; + task_promises[slot_index].set_value(); + }); + } + }); + } + + for (int32_t retry = 0; retry < 100 && ready_submitter_count.load() < kSubmitterCount; + ++retry) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + const int32_t ready_submitter_count_before_start = ready_submitter_count.load(); + start_signal.set_value(); + for (auto& submitter : submitters) { + submitter.join(); + } + ASSERT_EQ(kSubmitterCount, ready_submitter_count_before_start); + Wait(task_futures); + + ASSERT_EQ(kTotalTaskCount, executed_count.load()); + for (const auto& executed_slot : executed_slots) { + ASSERT_EQ(1, executed_slot.load()); + } +} + } // namespace paimon::test diff --git a/src/paimon/common/global_index/union_global_index_reader.cpp b/src/paimon/common/global_index/union_global_index_reader.cpp index d5133e945..0192ef167 100644 --- a/src/paimon/common/global_index/union_global_index_reader.cpp +++ b/src/paimon/common/global_index/union_global_index_reader.cpp @@ -182,6 +182,15 @@ Result> UnionGlobalIndexReader::Union(ReaderA return merged_result; } +bool UnionGlobalIndexReader::IsThreadSafe() const { + for (const auto& reader : readers_) { + if (!reader->IsThreadSafe()) { + return false; + } + } + return true; +} + template std::vector UnionGlobalIndexReader::ExecuteAllReaders( const std::function&)>& action) { @@ -199,7 +208,7 @@ std::vector UnionGlobalIndexReader::ExecuteAllReaders( futures.reserve(readers_.size()); for (const auto& reader : readers_) { futures.push_back( - Via(executor_.get(), [&action, &reader]() -> R { return action(reader); })); + Via(executor_.get(), [&action, reader]() -> R { return action(reader); })); } return CollectAll(futures); } diff --git a/src/paimon/common/global_index/union_global_index_reader.h b/src/paimon/common/global_index/union_global_index_reader.h index 1f253ca47..d64c3a2a8 100644 --- a/src/paimon/common/global_index/union_global_index_reader.h +++ b/src/paimon/common/global_index/union_global_index_reader.h @@ -58,9 +58,7 @@ class UnionGlobalIndexReader : public GlobalIndexReader { Result> VisitFullTextSearch( const std::shared_ptr& full_text_search) override; - bool IsThreadSafe() const override { - return false; - } + bool IsThreadSafe() const override; std::string GetIndexType() const override { return "union"; diff --git a/src/paimon/common/global_index/union_global_index_reader_test.cpp b/src/paimon/common/global_index/union_global_index_reader_test.cpp index 98ecfdc76..0f6794701 100644 --- a/src/paimon/common/global_index/union_global_index_reader_test.cpp +++ b/src/paimon/common/global_index/union_global_index_reader_test.cpp @@ -61,6 +61,10 @@ class FakeReader : public GlobalIndexReader { has_scored_result_ = true; } + void SetThreadSafe(bool thread_safe) { + thread_safe_ = thread_safe; + } + /// Counts how many times any Visit* method was invoked. Useful to assert all readers /// are exercised by UnionGlobalIndexReader. int InvocationCount() const { @@ -147,7 +151,7 @@ class FakeReader : public GlobalIndexReader { } bool IsThreadSafe() const override { - return true; + return thread_safe_; } std::string GetIndexType() const override { @@ -176,6 +180,7 @@ class FakeReader : public GlobalIndexReader { std::vector scored_row_ids_; std::vector scored_scores_; bool has_scored_result_ = false; + bool thread_safe_ = true; std::atomic invocation_count_{0}; }; @@ -515,12 +520,24 @@ TEST_F(UnionGlobalIndexReaderTest, TestVisitVectorSearchErrorPropagation) { ASSERT_NOK_WITH_MSG(union_reader.VisitVectorSearch(nullptr), "vector search failure"); } -TEST_F(UnionGlobalIndexReaderTest, TestIsThreadSafeAlwaysFalse) { - auto reader = std::make_shared(); - std::vector> readers = {reader}; +TEST_F(UnionGlobalIndexReaderTest, TestIsThreadSafeReturnsTrueWhenAllReadersAreSafe) { + auto reader1 = std::make_shared(); + auto reader2 = std::make_shared(); + + std::vector> readers = {reader1, reader2}; + UnionGlobalIndexReader union_reader(std::move(readers), nullptr); + + ASSERT_TRUE(union_reader.IsThreadSafe()); +} + +TEST_F(UnionGlobalIndexReaderTest, TestIsThreadSafeReturnsFalseWhenAnyReaderIsNotSafe) { + auto reader1 = std::make_shared(); + auto reader2 = std::make_shared(); + reader2->SetThreadSafe(false); + + std::vector> readers = {reader1, reader2}; UnionGlobalIndexReader union_reader(std::move(readers), nullptr); - // UnionGlobalIndexReader is not thread-safe regardless of inner readers ASSERT_FALSE(union_reader.IsThreadSafe()); } diff --git a/src/paimon/core/global_index/global_index_scan_impl.cpp b/src/paimon/core/global_index/global_index_scan_impl.cpp index de7ce1b83..6c3ad9c6e 100644 --- a/src/paimon/core/global_index/global_index_scan_impl.cpp +++ b/src/paimon/core/global_index/global_index_scan_impl.cpp @@ -128,27 +128,48 @@ Result> GlobalIndexScanImpl::GetOrCreateIn Result>> GlobalIndexScanImpl::CreateReaders( int32_t field_id, const std::optional& row_range_index) const { PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_id)); - return CreateReaders(field, row_range_index); + return CreateReaders(field, /*index_type=*/std::nullopt, row_range_index); } Result>> GlobalIndexScanImpl::CreateReaders( const std::string& field_name, const std::optional& row_range_index) const { PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_name)); - return CreateReaders(field, row_range_index); + return CreateReaders(field, /*index_type=*/std::nullopt, row_range_index); +} + +Result> GlobalIndexScanImpl::CreateReader( + const std::string& field_name, const std::string& index_type, + const std::optional& row_range_index) const { + PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_name)); + PAIMON_ASSIGN_OR_RAISE( + std::vector> readers, + CreateReaders(field, std::optional(index_type), row_range_index)); + if (readers.empty()) { + return std::shared_ptr(); + } + if (readers.size() != 1) { + return Status::Invalid( + fmt::format("invalid global index reader size, expected 1, actual {}", readers.size())); + } + return readers[0]; } Result>> GlobalIndexScanImpl::CreateReaders( - const DataField& field, const std::optional& row_range_index) const { + const DataField& field, const std::optional& index_type, + const std::optional& row_range_index) const { auto field_iter = index_metas_.find(field.Id()); if (field_iter == index_metas_.end()) { return std::vector>(); } const auto& index_type_to_metas = field_iter->second; std::vector> readers; - readers.reserve(index_type_to_metas.size()); - for (const auto& [index_type, range_to_metas] : index_type_to_metas) { + readers.reserve(index_type ? 1 : index_type_to_metas.size()); + for (const auto& [current_index_type, range_to_metas] : index_type_to_metas) { + if (index_type && current_index_type != index_type.value()) { + continue; + } PAIMON_ASSIGN_OR_RAISE(std::unique_ptr indexer, - GlobalIndexerFactory::Get(index_type, options_.ToMap())); + GlobalIndexerFactory::Get(current_index_type, options_.ToMap())); if (!indexer) { continue; } diff --git a/src/paimon/core/global_index/global_index_scan_impl.h b/src/paimon/core/global_index/global_index_scan_impl.h index 8988d32f4..55030b367 100644 --- a/src/paimon/core/global_index/global_index_scan_impl.h +++ b/src/paimon/core/global_index/global_index_scan_impl.h @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -47,6 +48,10 @@ class GlobalIndexScanImpl : public GlobalIndexScan { const std::string& field_name, const std::optional& row_range_index) const override; + Result> CreateReader( + const std::string& field_name, const std::string& index_type, + const std::optional& row_range_index) const override; + Result>> CreateReaders( int32_t field_id, const std::optional& row_range_index) const override; @@ -65,7 +70,8 @@ class GlobalIndexScanImpl : public GlobalIndexScan { Result> GetOrCreateIndexEvaluator(); Result>> CreateReaders( - const DataField& field, const std::optional& row_range_index) const; + const DataField& field, const std::optional& index_type, + const std::optional& row_range_index) const; std::vector ToGlobalIndexIOMetas( const std::vector>& metas) const; diff --git a/src/paimon/core/global_index/global_index_write_task.cpp b/src/paimon/core/global_index/global_index_write_task.cpp index 5ee425f86..a16b48983 100644 --- a/src/paimon/core/global_index/global_index_write_task.cpp +++ b/src/paimon/core/global_index/global_index_write_task.cpp @@ -20,6 +20,7 @@ #include "paimon/common/table/special_fields.h" #include "paimon/common/types/data_field.h" #include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" #include "paimon/core/core_options.h" #include "paimon/core/global_index/global_index_file_manager.h" #include "paimon/core/io/data_increment.h" @@ -204,16 +205,21 @@ Result> GlobalIndexWriteTask::WriteIndex( std::shared_ptr index_file_manager, CreateGlobalIndexFileManager(table_path, table_schema, core_options, pool)); + // create batch reader + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr batch_reader, + CreateBatchReader(table_path, field_name, indexed_split, core_options, pool)); + // create global index writer PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema->GetField(field_name)); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr global_index_writer, CreateGlobalIndexWriter(index_type, field, index_file_manager, core_options, pool)); - // create batch reader - PAIMON_ASSIGN_OR_RAISE( - std::unique_ptr batch_reader, - CreateBatchReader(table_path, field_name, indexed_split, core_options, pool)); + ScopeGuard guard([&]() { + global_index_writer.reset(); + batch_reader.reset(); + }); // read from data split and write to index writer PAIMON_ASSIGN_OR_RAISE( diff --git a/src/paimon/global_index/lumina/lumina_global_index.cpp b/src/paimon/global_index/lumina/lumina_global_index.cpp index 041ed53e8..be4bdb92e 100644 --- a/src/paimon/global_index/lumina/lumina_global_index.cpp +++ b/src/paimon/global_index/lumina/lumina_global_index.cpp @@ -177,8 +177,12 @@ Result> LuminaGlobalIndex::CreateReader( class LuminaDataset : public ::lumina::api::Dataset { public: LuminaDataset(int64_t element_count, uint32_t dimension, - const std::vector>& array_vec) - : element_count_(element_count), dimension_(dimension), array_vec_(array_vec) {} + const std::vector>& array_vec, + const std::vector& start_ids) + : element_count_(element_count), + dimension_(dimension), + array_vec_(array_vec), + start_ids_(start_ids) {} uint32_t Dim() const noexcept override { return dimension_; @@ -200,8 +204,8 @@ class LuminaDataset : public ::lumina::api::Dataset { vector_buffer.resize(value_array_length); memcpy(vector_buffer.data(), value_ptr, sizeof(float) * value_array_length); id_buffer.resize(element_count); - std::iota(id_buffer.begin(), id_buffer.end(), id_); - id_ += element_count; + std::iota(id_buffer.begin(), id_buffer.end(), + static_cast<::lumina::core::vector_id_t>(start_ids_[cursor_])); // release the array when copy to vector_buffer value_array.reset(); @@ -213,8 +217,8 @@ class LuminaDataset : public ::lumina::api::Dataset { int64_t element_count_; uint32_t dimension_; std::vector> array_vec_; + std::vector start_ids_; size_t cursor_ = 0; - ::lumina::core::vector_id_t id_ = 0; }; LuminaIndexWriter::LuminaIndexWriter(const std::string& field_name, @@ -254,35 +258,62 @@ Status LuminaIndexWriter::AddBatch(::ArrowArray* arrow_array, auto list_field_array = std::dynamic_pointer_cast(field_array); CHECK_NOT_NULL(list_field_array, "invalid input array in LuminaIndexWriter, field array must be list array"); - auto value_array = std::dynamic_pointer_cast(list_field_array->values()); - CHECK_NOT_NULL( - value_array, - "invalid input array in LuminaIndexWriter, field value array must be float array"); - if (value_array->null_count() != 0) { - return Status::Invalid("field value array in LuminaIndexWriter is invalid, must not null"); - } - if (value_array->length() != field_length * dimension_) { - return Status::Invalid(fmt::format( - "invalid input array in LuminaIndexWriter, length of field array [{}] multiplied " - "dimension [{}] must match length of field value array [{}]", - field_length, dimension_, value_array->length())); + + // Split into contiguous non-null segments, skipping null rows in the list field. + int64_t segment_start = -1; + for (int64_t i = 0; i <= field_length; i++) { + bool is_null = (i < field_length) && list_field_array->IsNull(i); + bool is_end = (i == field_length); + + if (!is_null && !is_end && segment_start == -1) { + segment_start = i; + } + + if ((is_null || is_end) && segment_start != -1) { + int64_t segment_len = i - segment_start; + // Use value_offset to precisely locate the float range for this segment + auto value_start_offset = list_field_array->value_offset(segment_start); + auto value_end_offset = list_field_array->value_offset(segment_start + segment_len); + int64_t value_length = value_end_offset - value_start_offset; + auto sliced_values = std::dynamic_pointer_cast( + list_field_array->values()->Slice(value_start_offset, value_length)); + CHECK_NOT_NULL(sliced_values, + "invalid sliced value array in LuminaIndexWriter, must be float array"); + if (sliced_values->null_count() != 0) { + return Status::Invalid( + "field value array in LuminaIndexWriter is invalid, must not null"); + } + if (sliced_values->length() != segment_len * static_cast(dimension_)) { + return Status::Invalid(fmt::format( + "invalid input array in LuminaIndexWriter, length of field array [{}] " + "multiplied dimension [{}] must match length of field value array [{}]", + segment_len, dimension_, sliced_values->length())); + } + array_vec_.push_back(std::move(sliced_values)); + array_start_ids_.push_back(count_ + segment_start); + indexed_count_ += segment_len; + segment_start = -1; + } } + count_ += array->length(); - array_vec_.push_back(std::move(value_array)); return Status::OK(); } Result> LuminaIndexWriter::Finish() { + if (indexed_count_ == 0) { + return std::vector(); + } ::lumina::core::MemoryResourceConfig memory_resource(pool_.get()); PAIMON_ASSIGN_OR_RAISE_FROM_LUMINA( ::lumina::api::LuminaBuilder builder, ::lumina::api::LuminaBuilder::Create(builder_options_, memory_resource)); // pretrain - LuminaDataset dataset1(count_, dimension_, array_vec_); + LuminaDataset dataset1(indexed_count_, dimension_, array_vec_, array_start_ids_); PAIMON_RETURN_NOT_OK_FROM_LUMINA(builder.PretrainFrom(dataset1)); // insert data - LuminaDataset dataset2(count_, dimension_, array_vec_); + LuminaDataset dataset2(indexed_count_, dimension_, array_vec_, array_start_ids_); std::vector>().swap(array_vec_); PAIMON_RETURN_NOT_OK_FROM_LUMINA(builder.InsertFrom(dataset2)); diff --git a/src/paimon/global_index/lumina/lumina_global_index.h b/src/paimon/global_index/lumina/lumina_global_index.h index 0c5a11ab5..cbd6c7d9d 100644 --- a/src/paimon/global_index/lumina/lumina_global_index.h +++ b/src/paimon/global_index/lumina/lumina_global_index.h @@ -90,6 +90,7 @@ class LuminaIndexWriter : public GlobalIndexWriter { private: int64_t count_ = 0; + int64_t indexed_count_ = 0; std::shared_ptr pool_; std::string field_name_; std::shared_ptr arrow_type_; @@ -99,6 +100,7 @@ class LuminaIndexWriter : public GlobalIndexWriter { ::lumina::api::IOOptions io_options_; std::map lumina_options_; std::vector> array_vec_; + std::vector array_start_ids_; }; class LuminaIndexReader : public GlobalIndexReader { diff --git a/src/paimon/global_index/lumina/lumina_global_index_test.cpp b/src/paimon/global_index/lumina/lumina_global_index_test.cpp index d87fb707b..e5aeca71a 100644 --- a/src/paimon/global_index/lumina/lumina_global_index_test.cpp +++ b/src/paimon/global_index/lumina/lumina_global_index_test.cpp @@ -158,7 +158,7 @@ class LuminaGlobalIndexTest : public ::testing::Test { return struct_array; } - private: + protected: std::shared_ptr pool_ = GetDefaultPool(); std::shared_ptr fs_ = std::make_shared(); std::map options_ = {{"lumina.index.dimension", "4"}, @@ -470,4 +470,201 @@ TEST_F(LuminaGlobalIndexTest, TestHighCardinalityAndMultiThreadSearch) { } } +TEST_F(LuminaGlobalIndexTest, TestWriteWithNullRows) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + // Array with null at row 1 (middle): rows 0,2,3 are valid, row 1 is null + // This should split into two segments: [0,0] and [2,3] + std::shared_ptr array_with_null = + arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [[0.0, 0.0, 0.0, 0.0]], + [null], + [[1.0, 0.0, 1.0, 0.0]], + [[1.0, 1.0, 1.0, 1.0]] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN( + auto meta, WriteGlobalIndex(test_root, data_type_, options_, array_with_null, Range(0, 3))); + ASSERT_OK_AND_ASSIGN(auto reader, + CreateGlobalIndexReader(test_root, data_type_, options_, meta)); + { + // Search should return ids 0, 2, 3 (skipping null row 1) + ASSERT_OK_AND_ASSIGN( + auto scored_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/4, query_, /*filter=*/nullptr, + /*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_))); + // Only 3 vectors indexed (row 1 is null), so limit=4 returns 3 + CheckResult(scored_result, {3l, 2l, 0l}, {0.01f, 2.21f, 4.21f}); + } +} + +TEST_F(LuminaGlobalIndexTest, TestWriteWithMultipleNullSegments) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + // Nulls at rows 0, 2, 5: valid rows are 1, 3, 4 + // Splits into segments: [1,1], [3,4] + std::shared_ptr array_with_nulls = + arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [null], + [[0.0, 1.0, 0.0, 1.0]], + [null], + [[1.0, 0.0, 1.0, 0.0]], + [[1.0, 1.0, 1.0, 1.0]], + [null] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(auto meta, WriteGlobalIndex(test_root, data_type_, options_, + array_with_nulls, Range(0, 5))); + ASSERT_OK_AND_ASSIGN(auto reader, + CreateGlobalIndexReader(test_root, data_type_, options_, meta)); + { + ASSERT_OK_AND_ASSIGN( + auto scored_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/4, query_, /*filter=*/nullptr, + /*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_))); + // Only 3 vectors indexed at ids 1, 3, 4 + CheckResult(scored_result, {4l, 1l, 3l}, {0.01f, 2.01f, 2.21f}); + } +} + +TEST_F(LuminaGlobalIndexTest, TestWriteWithAllNullRows) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + // All rows are null — no vectors to index + std::shared_ptr all_null_array = + arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [null], + [null], + [null] + ])") + .ValueOrDie(); + + auto global_index = std::make_shared(options_); + auto path_factory = std::make_shared(test_root); + auto file_writer = std::make_shared(fs_, path_factory); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr global_writer, + global_index->CreateWriter("f0", CreateArrowSchema(data_type_).get(), file_writer, pool_)); + + ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*all_null_array, &c_array).ok()); + std::vector row_ids = {0, 1, 2}; + ASSERT_OK(global_writer->AddBatch(&c_array, std::move(row_ids))); + // Finish with zero indexed vectors — returns empty metas + ASSERT_OK_AND_ASSIGN(auto result_metas, global_writer->Finish()); + ASSERT_TRUE(result_metas.empty()); +} + +TEST_F(LuminaGlobalIndexTest, TestWriteWithNullAndFilter) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + // Null at row 2: valid rows are 0, 1, 3 + std::shared_ptr array_with_null = + arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [[0.0, 0.0, 0.0, 0.0]], + [[0.0, 1.0, 0.0, 1.0]], + [null], + [[1.0, 1.0, 1.0, 1.0]] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN( + auto meta, WriteGlobalIndex(test_root, data_type_, options_, array_with_null, Range(0, 3))); + ASSERT_OK_AND_ASSIGN(auto reader, + CreateGlobalIndexReader(test_root, data_type_, options_, meta)); + { + // Filter: only allow ids < 3 (filters out id=3), so only ids 0, 1 remain + auto filter = [](int64_t id) -> bool { return id < 3; }; + ASSERT_OK_AND_ASSIGN( + auto scored_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/4, query_, filter, + /*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_))); + CheckResult(scored_result, {1l, 0l}, {2.01f, 4.21f}); + } +} + +TEST_F(LuminaGlobalIndexTest, TestWriteWithNullAcrossMultipleBatches) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + // Batch 1: rows 0-2, null at row 1 → indexed ids: {0, 2} + std::shared_ptr batch1 = arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [[0.0, 0.0, 0.0, 0.0]], + [null], + [[1.0, 0.0, 1.0, 0.0]] + ])") + .ValueOrDie(); + + // Batch 2: rows 3-5, null at row 3 → indexed ids: {4, 5} + std::shared_ptr batch2 = arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [null], + [[1.0, 1.0, 1.0, 1.0]], + [[0.0, 1.0, 0.0, 1.0]] + ])") + .ValueOrDie(); + + auto global_index = std::make_shared(options_); + auto path_factory = std::make_shared(test_root); + auto file_writer = std::make_shared(fs_, path_factory); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr global_writer, + global_index->CreateWriter("f0", CreateArrowSchema(data_type_).get(), file_writer, pool_)); + + // AddBatch 1: row_ids {0, 1, 2} + { + ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*batch1, &c_array).ok()); + std::vector row_ids = {0, 1, 2}; + ASSERT_OK(global_writer->AddBatch(&c_array, std::move(row_ids))); + } + // AddBatch 2: row_ids {3, 4, 5} + { + ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*batch2, &c_array).ok()); + std::vector row_ids = {3, 4, 5}; + ASSERT_OK(global_writer->AddBatch(&c_array, std::move(row_ids))); + } + + ASSERT_OK_AND_ASSIGN(auto result_metas, global_writer->Finish()); + ASSERT_EQ(result_metas.size(), 1); + + ASSERT_OK_AND_ASSIGN(auto reader, + CreateGlobalIndexReader(test_root, data_type_, options_, result_metas[0])); + { + // Search all: should return ids {0, 2, 4, 5}, never {1, 3} + ASSERT_OK_AND_ASSIGN( + auto scored_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/10, query_, /*filter=*/nullptr, + /*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_))); + // id 0: [0,0,0,0] → L2 dist to [1,1,1,1.1] = 4.21 + // id 2: [1,0,1,0] → L2 dist = 2.21 + // id 4: [1,1,1,1] → L2 dist = 0.01 + // id 5: [0,1,0,1] → L2 dist = 2.01 + CheckResult(scored_result, {4l, 5l, 2l, 0l}, {0.01f, 2.01f, 2.21f, 4.21f}); + } +} + } // namespace paimon::lumina::test diff --git a/test/inte/global_index_test.cpp b/test/inte/global_index_test.cpp index 0dc7b9ab2..650e9e26e 100644 --- a/test/inte/global_index_test.cpp +++ b/test/inte/global_index_test.cpp @@ -292,6 +292,41 @@ TEST_P(GlobalIndexTest, TestWriteLuminaIndex) { ASSERT_TRUE(expected_commit_message->TEST_Equal(*index_commit_msg_impl)); } +TEST_P(GlobalIndexTest, TestWriteLuminaIndexWithMismatchedDimension) { + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8()), + arrow::field("f1", arrow::list(arrow::float32()))}; + auto schema = arrow::schema(fields); + std::map lumina_options = {{"lumina.index.dimension", "3"}, + {"lumina.index.type", "bruteforce"}, + {"lumina.distance.metric", "l2"}, + {"lumina.encoding.type", "rawf32"}, + {"lumina.search.parallel_number", "10"}}; + + std::map options = { + {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format_}, + {Options::FILE_SYSTEM, "local"}, {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, {Options::READ_BATCH_SIZE, "1"}}; + + CreateTable(/*partition_keys=*/{}, schema, options); + std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar"); + + std::vector write_cols = schema->field_names(); + auto src_array = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + ["a", [0.0, 0.0, 0.0]], + ["b", [0.0, 0.0, 0.0, 0.0]] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(auto commit_msgs, WriteArray(table_path, write_cols, src_array)); + ASSERT_OK(Commit(table_path, commit_msgs)); + + ASSERT_NOK_WITH_MSG( + WriteIndex(table_path, /*partition_filters=*/{}, "f1", "lumina", + /*options=*/lumina_options, Range(0, 1)), + "invalid input array in LuminaIndexWriter, length of field array [1] multiplied " + "dimension [3] must match length of field value array [4]"); +} + TEST_P(GlobalIndexTest, TestWriteIndex) { CreateTable(); std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar"); @@ -989,8 +1024,10 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { ["Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1], ["Emily", [1.0, 0.0, 1.0, 0.0], 10, 13.1], ["Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1], +["NullGuy1", null, 10, 20.0], ["Lucy", [10.0, 10.0, 10.0, 10.0], 20, 15.1], ["Bob", [10.0, 11.0, 10.0, 11.0], 20, 16.1], +["NullGuy2", null, 20, 21.0], ["Tony", [11.0, 10.0, 11.0, 10.0], 20, 17.1], ["Alice", [11.0, 11.0, 11.0, 11.0], 20, 18.1], ["Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1] @@ -1002,7 +1039,7 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { // write and commit lumina index ASSERT_OK(WriteIndex(table_path, /*partition_filters=*/{}, "f1", "lumina", - /*options=*/lumina_options, Range(0, 8))); + /*options=*/lumina_options, Range(0, 10))); auto scan_and_check_result = [&](const std::vector& read_row_ranges, const std::shared_ptr& expected_array, @@ -1020,8 +1057,8 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { result_fields.insert(result_fields.begin(), SpecialFields::ValueKind().ArrowField()); result_fields.push_back(SpecialFields::IndexScore().ArrowField()); std::map id_to_score = {{0, 4.21f}, {1, 2.01f}, {2, 2.21f}, - {3, 0.01f}, {4, 322.21f}, {5, 360.01f}, - {6, 360.21f}, {7, 398.01}, {8, 322.21f}}; + {3, 0.01f}, {5, 322.21f}, {6, 360.01f}, + {8, 360.21f}, {9, 398.01f}, {10, 322.21f}}; { auto expected_array = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([ @@ -1036,7 +1073,8 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { [0, "Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1, 322.21] ])") .ValueOrDie(); - scan_and_check_result({Range(0, 8)}, expected_array, id_to_score); + scan_and_check_result({Range(0, 3), Range(5, 6), Range(8, 10)}, expected_array, + id_to_score); } { auto expected_array = @@ -1047,7 +1085,7 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { [0, "Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1, 322.21] ])") .ValueOrDie(); - scan_and_check_result({Range(2, 3), Range(7, 8)}, expected_array, id_to_score); + scan_and_check_result({Range(2, 3), Range(9, 10)}, expected_array, id_to_score); } { auto expected_array = @@ -1055,7 +1093,7 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { [0, "Bob", [10.0, 11.0, 10.0, 11.0], 20, 16.1, 360.01] ])") .ValueOrDie(); - scan_and_check_result({Range(5, 5)}, expected_array, id_to_score); + scan_and_check_result({Range(6, 6)}, expected_array, id_to_score); } { auto expected_array = @@ -1066,7 +1104,31 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { [0, "Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1, null] ])") .ValueOrDie(); - scan_and_check_result({Range(2, 3), Range(7, 8)}, expected_array, /*id_to_score=*/{}); + scan_and_check_result({Range(2, 3), Range(9, 10)}, expected_array, /*id_to_score=*/{}); + } + { + // Verify null rows (id 4 and 7) are never recalled by vector search + ASSERT_OK_AND_ASSIGN( + std::shared_ptr global_index_scan, + GlobalIndexScan::Create(table_path, /*snapshot_id=*/std::nullopt, + /*partitions=*/std::nullopt, lumina_options, fs_, + /*executor=*/nullptr, pool_)); + ASSERT_OK_AND_ASSIGN(auto lumina_readers, + global_index_scan->CreateReaders("f1", std::nullopt)); + ASSERT_EQ(lumina_readers.size(), 1u); + std::vector query = {1.0f, 1.0f, 1.0f, 1.1f}; + auto vector_search = std::make_shared( + "f1", /*limit=*/20, query, /*filter=*/nullptr, + /*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/lumina_options); + ASSERT_OK_AND_ASSIGN(auto scored_result, + lumina_readers[0]->VisitVectorSearch(vector_search)); + auto typed_result = std::dynamic_pointer_cast(scored_result); + ASSERT_TRUE(typed_result); + // Should recall 9 vectors (11 total rows - 2 null rows) + ASSERT_EQ(typed_result->bitmap_.Cardinality(), 9u); + // Null row ids 4 and 7 must not be in the result + ASSERT_FALSE(typed_result->bitmap_.Contains(4)); + ASSERT_FALSE(typed_result->bitmap_.Contains(7)); } } #endif @@ -2631,6 +2693,33 @@ TEST_P(GlobalIndexTest, TestBTreeAndBitmapCoexist) { ASSERT_OK_AND_ASSIGN(auto index_readers, global_index_scan->CreateReaders("f0", std::nullopt)); ASSERT_EQ(index_readers.size(), 2u); + ASSERT_OK_AND_ASSIGN(std::shared_ptr btree_reader, + global_index_scan->CreateReader("f0", "btree", std::nullopt)); + ASSERT_TRUE(btree_reader); + ASSERT_OK_AND_ASSIGN(std::shared_ptr btree_result, + btree_reader->VisitEqual(Literal(FieldType::STRING, "Bob", 3))); + ASSERT_TRUE(btree_result); + ASSERT_EQ(btree_result->ToString(), "{1,2}"); + ASSERT_OK_AND_ASSIGN(std::shared_ptr btree_less_than_result, + btree_reader->VisitLessThan(Literal(FieldType::STRING, "Emily", 5))); + ASSERT_TRUE(btree_less_than_result); + ASSERT_EQ(btree_less_than_result->ToString(), "{0,1,2}"); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr bitmap_reader, + global_index_scan->CreateReader("f0", "bitmap", std::nullopt)); + ASSERT_TRUE(bitmap_reader); + ASSERT_OK_AND_ASSIGN(std::shared_ptr bitmap_result, + bitmap_reader->VisitEqual(Literal(FieldType::STRING, "Bob", 3))); + ASSERT_TRUE(bitmap_result); + ASSERT_EQ(bitmap_result->ToString(), "{1,2}"); + ASSERT_OK_AND_ASSIGN(std::shared_ptr bitmap_less_than_result, + bitmap_reader->VisitLessThan(Literal(FieldType::STRING, "Emily", 5))); + ASSERT_FALSE(bitmap_less_than_result); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr missing_reader, + global_index_scan->CreateReader("f0", "lucene", std::nullopt)); + ASSERT_FALSE(missing_reader); + // Each reader individually should return the same result for Equal("Bob") for (const auto& index_reader : index_readers) { ASSERT_OK_AND_ASSIGN(auto result,