diff --git a/src/paimon/common/global_index/bitmap_topk_global_index_result.cpp b/src/paimon/common/global_index/bitmap_topk_global_index_result.cpp index 385c359a..7d70d6a9 100644 --- a/src/paimon/common/global_index/bitmap_topk_global_index_result.cpp +++ b/src/paimon/common/global_index/bitmap_topk_global_index_result.cpp @@ -143,7 +143,13 @@ const std::vector& BitmapTopKGlobalIndexResult::GetScores() const { } std::string BitmapTopKGlobalIndexResult::ToString() const { - return fmt::format("row ids: {}, scores: {{{}}}", bitmap_.ToString(), fmt::join(scores_, ",")); + std::vector formatted_scores; + formatted_scores.reserve(scores_.size()); + for (const auto& score : scores_) { + formatted_scores.push_back(fmt::format("{:.2f}", score)); + } + return fmt::format("row ids: {}, scores: {{{}}}", bitmap_.ToString(), + fmt::join(formatted_scores, ",")); } } // namespace paimon diff --git a/src/paimon/common/global_index/bitmap_topk_global_index_result_test.cpp b/src/paimon/common/global_index/bitmap_topk_global_index_result_test.cpp index b0cf7f04..771a62de 100644 --- a/src/paimon/common/global_index/bitmap_topk_global_index_result_test.cpp +++ b/src/paimon/common/global_index/bitmap_topk_global_index_result_test.cpp @@ -143,9 +143,10 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestAndBitmapResult) { ASSERT_OK_AND_ASSIGN(auto result, index_result1->And(index_result2)); ASSERT_EQ(result->ToString(), expected_str); }; - check_and_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {1, 2, 7}, "row ids: {1,2}, scores: {1.1,1.2}"); + check_and_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {1, 2, 7}, + "row ids: {1,2}, scores: {1.10,1.20}"); check_and_result({1, 2, 3}, {100.1f, 100.2f, 100.3f}, {1, 2, 3}, - "row ids: {1,2,3}, scores: {100.1,100.2,100.3}"); + "row ids: {1,2,3}, scores: {100.10,100.20,100.30}"); check_and_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {100, 200, 300}, "row ids: {}, scores: {}"); check_and_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {}, "row ids: {}, scores: {}"); check_and_result({}, {}, {}, "row ids: {}, scores: {}"); @@ -177,14 +178,14 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestOr) { ASSERT_EQ(result->ToString(), expected_str); }; check_or_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {100, 200, 300}, {100.1f, 200.1f, 300.1f}, - "row ids: {1,2,3,100,200,300}, scores: {1.1,1.2,1.3,100.1,200.1,300.1}"); + "row ids: {1,2,3,100,200,300}, scores: {1.10,1.20,1.30,100.10,200.10,300.10}"); check_or_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {}, {}, - "row ids: {1,2,3}, scores: {1.1,1.2,1.3}"); + "row ids: {1,2,3}, scores: {1.10,1.20,1.30}"); check_or_result({}, {}, {}, {}, "row ids: {}, scores: {}"); check_or_result( {1, 2, 3, RoaringBitmap64::MAX_VALUE}, {1.1f, 1.2f, 1.3f, 1.4f}, {RoaringBitmap32::MAX_VALUE}, {0.12f}, - "row ids: {1,2,3,2147483647,9223372036854775807}, scores: {1.1,1.2,1.3,0.12,1.4}"); + "row ids: {1,2,3,2147483647,9223372036854775807}, scores: {1.10,1.20,1.30,0.12,1.40}"); } TEST_F(BitmapTopKGlobalIndexResultTest, TestOrBitmapResult) { @@ -243,7 +244,7 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestAddOffset) { auto index_result = std::make_shared( RoaringBitmap64::From(ids), std::move(scores)); ASSERT_OK_AND_ASSIGN(auto result_with_offset, index_result->AddOffset(10)); - ASSERT_EQ(result_with_offset->ToString(), "row ids: {11,12,13}, scores: {1.1,1.2,1.3}"); + ASSERT_EQ(result_with_offset->ToString(), "row ids: {11,12,13}, scores: {1.10,1.20,1.30}"); } { std::vector ids = {}; diff --git a/src/paimon/core/global_index/row_range_global_index_scanner_impl.cpp b/src/paimon/core/global_index/row_range_global_index_scanner_impl.cpp index c9d925c3..b254ddc3 100644 --- a/src/paimon/core/global_index/row_range_global_index_scanner_impl.cpp +++ b/src/paimon/core/global_index/row_range_global_index_scanner_impl.cpp @@ -22,6 +22,8 @@ #include #include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "paimon/common/utils/scope_guard.h" #include "paimon/core/global_index/global_index_evaluator_impl.h" #include "paimon/global_index/global_indexer.h" #include "paimon/global_index/global_indexer_factory.h" @@ -106,9 +108,11 @@ Result> RowRangeGlobalIndexScannerImpl::Creat // TODO(xinyu.lxy): c_arrow_schema may contains additional associated fields. auto arrow_field = DataField::ConvertDataFieldToArrowField(field); auto arrow_schema = arrow::schema({arrow_field}); + ArrowSchema c_arrow_schema; PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*arrow_schema, &c_arrow_schema)); auto index_io_metas = ToGlobalIndexIOMetas(entries); + ScopeGuard guard([&]() { ArrowSchemaRelease(&c_arrow_schema); }); return indexer->CreateReader(&c_arrow_schema, index_file_manager_, index_io_metas, pool_); } diff --git a/src/paimon/global_index/lumina/lumina_global_index_test.cpp b/src/paimon/global_index/lumina/lumina_global_index_test.cpp index d8c5fc78..7f1e5056 100644 --- a/src/paimon/global_index/lumina/lumina_global_index_test.cpp +++ b/src/paimon/global_index/lumina/lumina_global_index_test.cpp @@ -200,6 +200,11 @@ TEST_F(LuminaGlobalIndexTest, TestSimple) { /*predicate*/ nullptr)); CheckResult(topk_result, {3l, 1l, 2l}, {0.01f, 2.01f, 2.21f}); } + { + // visit equal will return all rows + ASSERT_OK_AND_ASSIGN(auto is_null_result, reader->VisitIsNull()); + ASSERT_EQ(is_null_result->ToString(), "{0,1,2,3}"); + } } TEST_F(LuminaGlobalIndexTest, TestWithFilter) { diff --git a/src/paimon/testing/utils/io_exception_helper.h b/src/paimon/testing/utils/io_exception_helper.h index e1a3e7d7..f41b084b 100644 --- a/src/paimon/testing/utils/io_exception_helper.h +++ b/src/paimon/testing/utils/io_exception_helper.h @@ -51,4 +51,12 @@ namespace paimon::test { } \ } \ } + +#define CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(status) \ + { \ + auto __s = (status); \ + if (!__s.ok()) { \ + continue; \ + } \ + } } // namespace paimon::test diff --git a/test/inte/global_index_test.cpp b/test/inte/global_index_test.cpp index b81bd993..5fd7dcd2 100644 --- a/test/inte/global_index_test.cpp +++ b/test/inte/global_index_test.cpp @@ -311,6 +311,34 @@ TEST_P(GlobalIndexTest, TestWriteIndex) { /*partition=*/BinaryRow::EmptyRow(), /*bucket=*/0, /*total_buckets=*/std::nullopt, expected_data_increment, CompactIncrement({}, {}, {})); ASSERT_TRUE(expected_commit_message->TEST_Equal(*index_commit_msg_impl)); + + { + // test invalid write task with none-registered index type + ASSERT_NOK_WITH_MSG( + GlobalIndexWriteTask::WriteIndex( + table_path, "f0", "invalid", + std::make_shared(split, std::vector({Range(0, 7)})), + /*options=*/{}, pool_), + "Unknown index type invalid, may not registered"); + } + { + // test invalid range mismatch + ASSERT_NOK_WITH_MSG( + GlobalIndexWriteTask::WriteIndex( + table_path, "f0", "bitmap", + std::make_shared(split, std::vector({Range(0, 8)})), + /*options=*/{}, pool_), + "specified range length 9 mismatch indexed range length 8"); + } + { + // test invalid multiple ranges + ASSERT_NOK_WITH_MSG(GlobalIndexWriteTask::WriteIndex( + table_path, "f0", "bitmap", + std::make_shared( + split, std::vector({Range(0, 6), Range(7, 7)})), + /*options=*/{}, pool_), + "GlobalIndexWriteTask only supports a single contiguous range."); + } } TEST_P(GlobalIndexTest, TestWriteIndexWithPartition) { @@ -641,6 +669,13 @@ TEST_P(GlobalIndexTest, TestScanIndexWithRange) { index_reader->VisitEqual(Literal(FieldType::STRING, "Alice", 5))); ASSERT_EQ(index_result->ToString(), "{0,7}"); + { + // test non-exist index type + ASSERT_OK_AND_ASSIGN(auto non_exist_index_reader, + range_scanner->CreateReader("f0", "non-exist")); + ASSERT_FALSE(non_exist_index_reader); + } + // test evaluator ASSERT_OK_AND_ASSIGN(auto evaluator, scanner_impl->CreateIndexEvaluator()); auto predicate = @@ -952,6 +987,15 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithPartition) { ASSERT_NOK_WITH_MSG(global_index_scan->CreateRangeScan(Range(0, 8)), "input range contain multiple partitions, fail to create range scan"); } + { + // test invalid partition input + ASSERT_NOK_WITH_MSG( + GlobalIndexScan::Create( + table_path, /*snapshot_id=*/std::nullopt, + /*partitions=*/std::vector>(), lumina_options, + /*file_system=*/nullptr, pool_), + "invalid input partition, supposed to be null or at least one partition"); + } } TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { @@ -1675,13 +1719,125 @@ TEST_P(GlobalIndexTest, TestScanIndexWithTwoIndexes) { std::vector query = {11.0f, 11.0f, 11.0f, 11.0f}; ASSERT_OK_AND_ASSIGN(auto topk_result, index_readers[0]->VisitTopK(1, query, /*filter=*/nullptr, /*predicate*/ nullptr)); - ASSERT_EQ(topk_result->ToString(), "row ids: {7}, scores: {0}"); + ASSERT_EQ(topk_result->ToString(), "row ids: {7}, scores: {0.00}"); // query f2 ASSERT_OK_AND_ASSIGN(index_readers, range_scanner->CreateReaders("f2")); ASSERT_EQ(index_readers.size(), 0); } +TEST_P(GlobalIndexTest, TestIOException) { + if (GetParam() == "lance") { + return; + } + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::list(arrow::float32())), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + + auto schema = arrow::schema(fields); + std::vector write_cols = schema->field_names(); + auto src_array = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ +["Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1], +["Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1], +["Emily", [1.0, 0.0, 1.0, 0.0], 10, 13.1], +["Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1] + ])") + .ValueOrDie(); + + std::map options = {{Options::MANIFEST_FORMAT, "orc"}, + {Options::FILE_FORMAT, GetParam()}, + {Options::FILE_SYSTEM, "local"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}}; + std::map lumina_options = { + {"lumina.dimension", "4"}, + {"lumina.indextype", "bruteforce"}, + {"lumina.distance.metric", "l2"}, + {"lumina.encoding.type", "encoding.rawf32"}, + {"lumina.search.threadcount", "10"}}; + std::string table_path; + bool write_run_complete = false; + auto io_hook = IOHook::GetInstance(); + for (size_t i = 0; i < 2000; i += paimon::test::RandomNumber(20, 30)) { + ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); + dir_ = UniqueTestDirectory::Create("local"); + // create table and write data + CreateTable(/*partition_keys=*/{}, schema, options); + table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar"); + ASSERT_OK_AND_ASSIGN(auto commit_msgs, WriteArray(table_path, write_cols, src_array)); + ASSERT_OK(Commit(table_path, commit_msgs)); + + io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); + // write bitmap index + auto bitmap_index_write_status = + WriteIndex(table_path, /*partition_filters=*/{}, "f0", "bitmap", + /*options=*/{}, Range(0, 3)); + CHECK_HOOK_STATUS(bitmap_index_write_status, i); + // write lumina index + auto lumina_index_write_status = + WriteIndex(table_path, /*partition_filters=*/{}, "f1", "lumina", + /*options=*/lumina_options, Range(0, 3)); + CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(lumina_index_write_status); + write_run_complete = true; + break; + } + ASSERT_TRUE(write_run_complete); + + // read for bitmap + bool read_run_complete = false; + for (size_t i = 0; i < 2000; i += paimon::test::RandomNumber(20, 30)) { + ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); + io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); + auto predicate = + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, "Alice", 5)); + auto result_fields = fields; + result_fields.insert(result_fields.begin(), SpecialFields::ValueKind().ArrowField()); + auto expected_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([ +[0, "Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1] + ])") + .ValueOrDie(); + + auto plan_result = ScanGlobalIndexAndData(table_path, predicate); + CHECK_HOOK_STATUS(plan_result.status(), i); + auto plan = std::move(plan_result).value(); + auto read_status = ReadData(table_path, write_cols, expected_array, predicate, plan); + CHECK_HOOK_STATUS(read_status, i); + read_run_complete = true; + break; + } + ASSERT_TRUE(read_run_complete); + + // read for lumina + read_run_complete = false; + for (size_t i = 0; i < 2000; i += paimon::test::RandomNumber(20, 30)) { + ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); + io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); + auto global_index_scan_result = + GlobalIndexScan::Create(table_path, /*snapshot_id=*/std::nullopt, + /*partitions=*/std::nullopt, lumina_options, + /*file_system=*/nullptr, pool_); + CHECK_HOOK_STATUS(global_index_scan_result.status(), i); + auto global_index_scan = std::move(global_index_scan_result).value(); + auto range_scanner_result = global_index_scan->CreateRangeScan(Range(0, 3)); + CHECK_HOOK_STATUS(range_scanner_result.status(), i); + auto range_scanner = std::move(range_scanner_result).value(); + auto lumina_reader_result = range_scanner->CreateReader("f1", "lumina"); + CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(lumina_reader_result.status()); + auto lumina_reader = std::move(lumina_reader_result).value(); + + std::vector query = {1.0f, 1.0f, 1.0f, 1.1f}; + auto topk_result = lumina_reader->VisitTopK(1, query, /*filter=*/nullptr, + /*predicate*/ nullptr); + CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(topk_result.status()); + ASSERT_EQ(topk_result.value()->ToString(), "row ids: {3}, scores: {0.01}"); + read_run_complete = true; + break; + } + ASSERT_TRUE(read_run_complete); +} + std::vector GetTestValuesForGlobalIndexTest() { std::vector values; values.emplace_back("parquet");