@@ -311,6 +311,34 @@ TEST_P(GlobalIndexTest, TestWriteIndex) {
311311 /* partition=*/ BinaryRow::EmptyRow (), /* bucket=*/ 0 , /* total_buckets=*/ std::nullopt ,
312312 expected_data_increment, CompactIncrement ({}, {}, {}));
313313 ASSERT_TRUE (expected_commit_message->TEST_Equal (*index_commit_msg_impl));
314+
315+ {
316+ // test invalid write task with none-registered index type
317+ ASSERT_NOK_WITH_MSG (
318+ GlobalIndexWriteTask::WriteIndex (
319+ table_path, " f0" , " invalid" ,
320+ std::make_shared<IndexedSplitImpl>(split, std::vector<Range>({Range (0 , 7 )})),
321+ /* options=*/ {}, pool_),
322+ " Unknown index type invalid, may not registered" );
323+ }
324+ {
325+ // test invalid range mismatch
326+ ASSERT_NOK_WITH_MSG (
327+ GlobalIndexWriteTask::WriteIndex (
328+ table_path, " f0" , " bitmap" ,
329+ std::make_shared<IndexedSplitImpl>(split, std::vector<Range>({Range (0 , 8 )})),
330+ /* options=*/ {}, pool_),
331+ " specified range length 9 mismatch indexed range length 8" );
332+ }
333+ {
334+ // test invalid multiple ranges
335+ ASSERT_NOK_WITH_MSG (GlobalIndexWriteTask::WriteIndex (
336+ table_path, " f0" , " bitmap" ,
337+ std::make_shared<IndexedSplitImpl>(
338+ split, std::vector<Range>({Range (0 , 6 ), Range (7 , 7 )})),
339+ /* options=*/ {}, pool_),
340+ " GlobalIndexWriteTask only supports a single contiguous range." );
341+ }
314342}
315343
316344TEST_P (GlobalIndexTest, TestWriteIndexWithPartition) {
@@ -641,6 +669,13 @@ TEST_P(GlobalIndexTest, TestScanIndexWithRange) {
641669 index_reader->VisitEqual (Literal (FieldType::STRING, " Alice" , 5 )));
642670 ASSERT_EQ (index_result->ToString (), " {0,7}" );
643671
672+ {
673+ // test non-exist index type
674+ ASSERT_OK_AND_ASSIGN (auto non_exist_index_reader,
675+ range_scanner->CreateReader (" f0" , " non-exist" ));
676+ ASSERT_FALSE (non_exist_index_reader);
677+ }
678+
644679 // test evaluator
645680 ASSERT_OK_AND_ASSIGN (auto evaluator, scanner_impl->CreateIndexEvaluator ());
646681 auto predicate =
@@ -952,6 +987,15 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithPartition) {
952987 ASSERT_NOK_WITH_MSG (global_index_scan->CreateRangeScan (Range (0 , 8 )),
953988 " input range contain multiple partitions, fail to create range scan" );
954989 }
990+ {
991+ // test invalid partition input
992+ ASSERT_NOK_WITH_MSG (
993+ GlobalIndexScan::Create (
994+ table_path, /* snapshot_id=*/ std::nullopt ,
995+ /* partitions=*/ std::vector<std::map<std::string, std::string>>(), lumina_options,
996+ /* file_system=*/ nullptr , pool_),
997+ " invalid input partition, supposed to be null or at least one partition" );
998+ }
955999}
9561000
9571001TEST_P (GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) {
@@ -1675,13 +1719,125 @@ TEST_P(GlobalIndexTest, TestScanIndexWithTwoIndexes) {
16751719 std::vector<float > query = {11 .0f , 11 .0f , 11 .0f , 11 .0f };
16761720 ASSERT_OK_AND_ASSIGN (auto topk_result, index_readers[0 ]->VisitTopK (1 , query, /* filter=*/ nullptr ,
16771721 /* predicate*/ nullptr ));
1678- ASSERT_EQ (topk_result->ToString (), " row ids: {7}, scores: {0}" );
1722+ ASSERT_EQ (topk_result->ToString (), " row ids: {7}, scores: {0.00 }" );
16791723
16801724 // query f2
16811725 ASSERT_OK_AND_ASSIGN (index_readers, range_scanner->CreateReaders (" f2" ));
16821726 ASSERT_EQ (index_readers.size (), 0 );
16831727}
16841728
1729+ TEST_P (GlobalIndexTest, TestIOException) {
1730+ if (GetParam () == " lance" ) {
1731+ return ;
1732+ }
1733+ arrow::FieldVector fields = {
1734+ arrow::field (" f0" , arrow::utf8 ()), arrow::field (" f1" , arrow::list (arrow::float32 ())),
1735+ arrow::field (" f2" , arrow::int32 ()), arrow::field (" f3" , arrow::float64 ())};
1736+
1737+ auto schema = arrow::schema (fields);
1738+ std::vector<std::string> write_cols = schema->field_names ();
1739+ auto src_array = arrow::ipc::internal::json::ArrayFromJSON (arrow::struct_ (fields), R"( [
1740+ ["Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1],
1741+ ["Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1],
1742+ ["Emily", [1.0, 0.0, 1.0, 0.0], 10, 13.1],
1743+ ["Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1]
1744+ ])" )
1745+ .ValueOrDie ();
1746+
1747+ std::map<std::string, std::string> options = {{Options::MANIFEST_FORMAT, " orc" },
1748+ {Options::FILE_FORMAT, GetParam ()},
1749+ {Options::FILE_SYSTEM, " local" },
1750+ {Options::ROW_TRACKING_ENABLED, " true" },
1751+ {Options::DATA_EVOLUTION_ENABLED, " true" }};
1752+ std::map<std::string, std::string> lumina_options = {
1753+ {" lumina.dimension" , " 4" },
1754+ {" lumina.indextype" , " bruteforce" },
1755+ {" lumina.distance.metric" , " l2" },
1756+ {" lumina.encoding.type" , " encoding.rawf32" },
1757+ {" lumina.search.threadcount" , " 10" }};
1758+ std::string table_path;
1759+ bool write_run_complete = false ;
1760+ auto io_hook = IOHook::GetInstance ();
1761+ for (size_t i = 0 ; i < 2000 ; i += paimon::test::RandomNumber (20 , 30 )) {
1762+ ScopeGuard guard ([&io_hook]() { io_hook->Clear (); });
1763+ dir_ = UniqueTestDirectory::Create (" local" );
1764+ // create table and write data
1765+ CreateTable (/* partition_keys=*/ {}, schema, options);
1766+ table_path = PathUtil::JoinPath (dir_->Str (), " foo.db/bar" );
1767+ ASSERT_OK_AND_ASSIGN (auto commit_msgs, WriteArray (table_path, write_cols, src_array));
1768+ ASSERT_OK (Commit (table_path, commit_msgs));
1769+
1770+ io_hook->Reset (i, IOHook::Mode::RETURN_ERROR);
1771+ // write bitmap index
1772+ auto bitmap_index_write_status =
1773+ WriteIndex (table_path, /* partition_filters=*/ {}, " f0" , " bitmap" ,
1774+ /* options=*/ {}, Range (0 , 3 ));
1775+ CHECK_HOOK_STATUS (bitmap_index_write_status, i);
1776+ // write lumina index
1777+ auto lumina_index_write_status =
1778+ WriteIndex (table_path, /* partition_filters=*/ {}, " f1" , " lumina" ,
1779+ /* options=*/ lumina_options, Range (0 , 3 ));
1780+ CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK (lumina_index_write_status);
1781+ write_run_complete = true ;
1782+ break ;
1783+ }
1784+ ASSERT_TRUE (write_run_complete);
1785+
1786+ // read for bitmap
1787+ bool read_run_complete = false ;
1788+ for (size_t i = 0 ; i < 2000 ; i += paimon::test::RandomNumber (20 , 30 )) {
1789+ ScopeGuard guard ([&io_hook]() { io_hook->Clear (); });
1790+ io_hook->Reset (i, IOHook::Mode::RETURN_ERROR);
1791+ auto predicate =
1792+ PredicateBuilder::Equal (/* field_index=*/ 0 , /* field_name=*/ " f0" , FieldType::STRING,
1793+ Literal (FieldType::STRING, " Alice" , 5 ));
1794+ auto result_fields = fields;
1795+ result_fields.insert (result_fields.begin (), SpecialFields::ValueKind ().ArrowField ());
1796+ auto expected_array =
1797+ arrow::ipc::internal::json::ArrayFromJSON (arrow::struct_ (result_fields), R"( [
1798+ [0, "Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1]
1799+ ])" )
1800+ .ValueOrDie ();
1801+
1802+ auto plan_result = ScanGlobalIndexAndData (table_path, predicate);
1803+ CHECK_HOOK_STATUS (plan_result.status (), i);
1804+ auto plan = std::move (plan_result).value ();
1805+ auto read_status = ReadData (table_path, write_cols, expected_array, predicate, plan);
1806+ CHECK_HOOK_STATUS (read_status, i);
1807+ read_run_complete = true ;
1808+ break ;
1809+ }
1810+ ASSERT_TRUE (read_run_complete);
1811+
1812+ // read for lumina
1813+ read_run_complete = false ;
1814+ for (size_t i = 0 ; i < 2000 ; i += paimon::test::RandomNumber (20 , 30 )) {
1815+ ScopeGuard guard ([&io_hook]() { io_hook->Clear (); });
1816+ io_hook->Reset (i, IOHook::Mode::RETURN_ERROR);
1817+ auto global_index_scan_result =
1818+ GlobalIndexScan::Create (table_path, /* snapshot_id=*/ std::nullopt ,
1819+ /* partitions=*/ std::nullopt , lumina_options,
1820+ /* file_system=*/ nullptr , pool_);
1821+ CHECK_HOOK_STATUS (global_index_scan_result.status (), i);
1822+ auto global_index_scan = std::move (global_index_scan_result).value ();
1823+ auto range_scanner_result = global_index_scan->CreateRangeScan (Range (0 , 3 ));
1824+ CHECK_HOOK_STATUS (range_scanner_result.status (), i);
1825+ auto range_scanner = std::move (range_scanner_result).value ();
1826+ auto lumina_reader_result = range_scanner->CreateReader (" f1" , " lumina" );
1827+ CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK (lumina_reader_result.status ());
1828+ auto lumina_reader = std::move (lumina_reader_result).value ();
1829+
1830+ std::vector<float > query = {1 .0f , 1 .0f , 1 .0f , 1 .1f };
1831+ auto topk_result = lumina_reader->VisitTopK (1 , query, /* filter=*/ nullptr ,
1832+ /* predicate*/ nullptr );
1833+ CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK (topk_result.status ());
1834+ ASSERT_EQ (topk_result.value ()->ToString (), " row ids: {3}, scores: {0.01}" );
1835+ read_run_complete = true ;
1836+ break ;
1837+ }
1838+ ASSERT_TRUE (read_run_complete);
1839+ }
1840+
16851841std::vector<std::string> GetTestValuesForGlobalIndexTest () {
16861842 std::vector<std::string> values;
16871843 values.emplace_back (" parquet" );
0 commit comments