Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ const std::vector<float>& BitmapTopKGlobalIndexResult::GetScores() const {
}

std::string BitmapTopKGlobalIndexResult::ToString() const {
return fmt::format("row ids: {}, scores: {{{}}}", bitmap_.ToString(), fmt::join(scores_, ","));
std::vector<std::string> formatted_scores;
formatted_scores.reserve(scores_.size());
for (const auto& score : scores_) {
formatted_scores.push_back(fmt::format("{:.2f}", score));
}
return fmt::format("row ids: {}, scores: {{{}}}", bitmap_.ToString(),
fmt::join(formatted_scores, ","));
}

} // namespace paimon
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,10 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestAndBitmapResult) {
ASSERT_OK_AND_ASSIGN(auto result, index_result1->And(index_result2));
ASSERT_EQ(result->ToString(), expected_str);
};
check_and_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {1, 2, 7}, "row ids: {1,2}, scores: {1.1,1.2}");
check_and_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {1, 2, 7},
"row ids: {1,2}, scores: {1.10,1.20}");
check_and_result({1, 2, 3}, {100.1f, 100.2f, 100.3f}, {1, 2, 3},
"row ids: {1,2,3}, scores: {100.1,100.2,100.3}");
"row ids: {1,2,3}, scores: {100.10,100.20,100.30}");
check_and_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {100, 200, 300}, "row ids: {}, scores: {}");
check_and_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {}, "row ids: {}, scores: {}");
check_and_result({}, {}, {}, "row ids: {}, scores: {}");
Expand Down Expand Up @@ -177,14 +178,14 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestOr) {
ASSERT_EQ(result->ToString(), expected_str);
};
check_or_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {100, 200, 300}, {100.1f, 200.1f, 300.1f},
"row ids: {1,2,3,100,200,300}, scores: {1.1,1.2,1.3,100.1,200.1,300.1}");
"row ids: {1,2,3,100,200,300}, scores: {1.10,1.20,1.30,100.10,200.10,300.10}");
check_or_result({1, 2, 3}, {1.1f, 1.2f, 1.3f}, {}, {},
"row ids: {1,2,3}, scores: {1.1,1.2,1.3}");
"row ids: {1,2,3}, scores: {1.10,1.20,1.30}");
check_or_result({}, {}, {}, {}, "row ids: {}, scores: {}");
check_or_result(
{1, 2, 3, RoaringBitmap64::MAX_VALUE}, {1.1f, 1.2f, 1.3f, 1.4f},
{RoaringBitmap32::MAX_VALUE}, {0.12f},
"row ids: {1,2,3,2147483647,9223372036854775807}, scores: {1.1,1.2,1.3,0.12,1.4}");
"row ids: {1,2,3,2147483647,9223372036854775807}, scores: {1.10,1.20,1.30,0.12,1.40}");
}

TEST_F(BitmapTopKGlobalIndexResultTest, TestOrBitmapResult) {
Expand Down Expand Up @@ -243,7 +244,7 @@ TEST_F(BitmapTopKGlobalIndexResultTest, TestAddOffset) {
auto index_result = std::make_shared<BitmapTopKGlobalIndexResult>(
RoaringBitmap64::From(ids), std::move(scores));
ASSERT_OK_AND_ASSIGN(auto result_with_offset, index_result->AddOffset(10));
ASSERT_EQ(result_with_offset->ToString(), "row ids: {11,12,13}, scores: {1.1,1.2,1.3}");
ASSERT_EQ(result_with_offset->ToString(), "row ids: {11,12,13}, scores: {1.10,1.20,1.30}");
}
{
std::vector<int64_t> ids = {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <vector>

#include "arrow/c/bridge.h"
#include "arrow/c/helpers.h"
#include "paimon/common/utils/scope_guard.h"
#include "paimon/core/global_index/global_index_evaluator_impl.h"
#include "paimon/global_index/global_indexer.h"
#include "paimon/global_index/global_indexer_factory.h"
Expand Down Expand Up @@ -106,9 +108,11 @@ Result<std::shared_ptr<GlobalIndexReader>> RowRangeGlobalIndexScannerImpl::Creat
// TODO(xinyu.lxy): c_arrow_schema may contains additional associated fields.
auto arrow_field = DataField::ConvertDataFieldToArrowField(field);
auto arrow_schema = arrow::schema({arrow_field});

ArrowSchema c_arrow_schema;
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*arrow_schema, &c_arrow_schema));
auto index_io_metas = ToGlobalIndexIOMetas(entries);
ScopeGuard guard([&]() { ArrowSchemaRelease(&c_arrow_schema); });
return indexer->CreateReader(&c_arrow_schema, index_file_manager_, index_io_metas, pool_);
}

Expand Down
5 changes: 5 additions & 0 deletions src/paimon/global_index/lumina/lumina_global_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ TEST_F(LuminaGlobalIndexTest, TestSimple) {
/*predicate*/ nullptr));
CheckResult(topk_result, {3l, 1l, 2l}, {0.01f, 2.01f, 2.21f});
}
{
// visit equal will return all rows
ASSERT_OK_AND_ASSIGN(auto is_null_result, reader->VisitIsNull());
ASSERT_EQ(is_null_result->ToString(), "{0,1,2,3}");
}
}

TEST_F(LuminaGlobalIndexTest, TestWithFilter) {
Expand Down
8 changes: 8 additions & 0 deletions src/paimon/testing/utils/io_exception_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,12 @@ namespace paimon::test {
} \
} \
}

#define CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(status) \
{ \
auto __s = (status); \
if (!__s.ok()) { \
continue; \
} \
}
} // namespace paimon::test
158 changes: 157 additions & 1 deletion test/inte/global_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,34 @@ TEST_P(GlobalIndexTest, TestWriteIndex) {
/*partition=*/BinaryRow::EmptyRow(), /*bucket=*/0, /*total_buckets=*/std::nullopt,
expected_data_increment, CompactIncrement({}, {}, {}));
ASSERT_TRUE(expected_commit_message->TEST_Equal(*index_commit_msg_impl));

{
// test invalid write task with none-registered index type
ASSERT_NOK_WITH_MSG(
GlobalIndexWriteTask::WriteIndex(
table_path, "f0", "invalid",
std::make_shared<IndexedSplitImpl>(split, std::vector<Range>({Range(0, 7)})),
/*options=*/{}, pool_),
"Unknown index type invalid, may not registered");
}
{
// test invalid range mismatch
ASSERT_NOK_WITH_MSG(
GlobalIndexWriteTask::WriteIndex(
table_path, "f0", "bitmap",
std::make_shared<IndexedSplitImpl>(split, std::vector<Range>({Range(0, 8)})),
/*options=*/{}, pool_),
"specified range length 9 mismatch indexed range length 8");
}
{
// test invalid multiple ranges
ASSERT_NOK_WITH_MSG(GlobalIndexWriteTask::WriteIndex(
table_path, "f0", "bitmap",
std::make_shared<IndexedSplitImpl>(
split, std::vector<Range>({Range(0, 6), Range(7, 7)})),
/*options=*/{}, pool_),
"GlobalIndexWriteTask only supports a single contiguous range.");
}
}

TEST_P(GlobalIndexTest, TestWriteIndexWithPartition) {
Expand Down Expand Up @@ -641,6 +669,13 @@ TEST_P(GlobalIndexTest, TestScanIndexWithRange) {
index_reader->VisitEqual(Literal(FieldType::STRING, "Alice", 5)));
ASSERT_EQ(index_result->ToString(), "{0,7}");

{
// test non-exist index type
ASSERT_OK_AND_ASSIGN(auto non_exist_index_reader,
range_scanner->CreateReader("f0", "non-exist"));
ASSERT_FALSE(non_exist_index_reader);
}

// test evaluator
ASSERT_OK_AND_ASSIGN(auto evaluator, scanner_impl->CreateIndexEvaluator());
auto predicate =
Expand Down Expand Up @@ -952,6 +987,15 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithPartition) {
ASSERT_NOK_WITH_MSG(global_index_scan->CreateRangeScan(Range(0, 8)),
"input range contain multiple partitions, fail to create range scan");
}
{
// test invalid partition input
ASSERT_NOK_WITH_MSG(
GlobalIndexScan::Create(
table_path, /*snapshot_id=*/std::nullopt,
/*partitions=*/std::vector<std::map<std::string, std::string>>(), lumina_options,
/*file_system=*/nullptr, pool_),
"invalid input partition, supposed to be null or at least one partition");
}
}

TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) {
Expand Down Expand Up @@ -1675,13 +1719,125 @@ TEST_P(GlobalIndexTest, TestScanIndexWithTwoIndexes) {
std::vector<float> query = {11.0f, 11.0f, 11.0f, 11.0f};
ASSERT_OK_AND_ASSIGN(auto topk_result, index_readers[0]->VisitTopK(1, query, /*filter=*/nullptr,
/*predicate*/ nullptr));
ASSERT_EQ(topk_result->ToString(), "row ids: {7}, scores: {0}");
ASSERT_EQ(topk_result->ToString(), "row ids: {7}, scores: {0.00}");

// query f2
ASSERT_OK_AND_ASSIGN(index_readers, range_scanner->CreateReaders("f2"));
ASSERT_EQ(index_readers.size(), 0);
}

TEST_P(GlobalIndexTest, TestIOException) {
if (GetParam() == "lance") {
return;
}
arrow::FieldVector fields = {
arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::list(arrow::float32())),
arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())};

auto schema = arrow::schema(fields);
std::vector<std::string> write_cols = schema->field_names();
auto src_array = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
["Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1],
["Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1],
["Emily", [1.0, 0.0, 1.0, 0.0], 10, 13.1],
["Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1]
])")
.ValueOrDie();

std::map<std::string, std::string> options = {{Options::MANIFEST_FORMAT, "orc"},
{Options::FILE_FORMAT, GetParam()},
{Options::FILE_SYSTEM, "local"},
{Options::ROW_TRACKING_ENABLED, "true"},
{Options::DATA_EVOLUTION_ENABLED, "true"}};
std::map<std::string, std::string> lumina_options = {
{"lumina.dimension", "4"},
{"lumina.indextype", "bruteforce"},
{"lumina.distance.metric", "l2"},
{"lumina.encoding.type", "encoding.rawf32"},
{"lumina.search.threadcount", "10"}};
std::string table_path;
bool write_run_complete = false;
auto io_hook = IOHook::GetInstance();
for (size_t i = 0; i < 2000; i += paimon::test::RandomNumber(20, 30)) {
ScopeGuard guard([&io_hook]() { io_hook->Clear(); });
dir_ = UniqueTestDirectory::Create("local");
// create table and write data
CreateTable(/*partition_keys=*/{}, schema, options);
table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar");
ASSERT_OK_AND_ASSIGN(auto commit_msgs, WriteArray(table_path, write_cols, src_array));
ASSERT_OK(Commit(table_path, commit_msgs));

io_hook->Reset(i, IOHook::Mode::RETURN_ERROR);
// write bitmap index
auto bitmap_index_write_status =
WriteIndex(table_path, /*partition_filters=*/{}, "f0", "bitmap",
/*options=*/{}, Range(0, 3));
CHECK_HOOK_STATUS(bitmap_index_write_status, i);
// write lumina index
auto lumina_index_write_status =
WriteIndex(table_path, /*partition_filters=*/{}, "f1", "lumina",
/*options=*/lumina_options, Range(0, 3));
CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(lumina_index_write_status);
write_run_complete = true;
break;
}
ASSERT_TRUE(write_run_complete);

// read for bitmap
bool read_run_complete = false;
for (size_t i = 0; i < 2000; i += paimon::test::RandomNumber(20, 30)) {
ScopeGuard guard([&io_hook]() { io_hook->Clear(); });
io_hook->Reset(i, IOHook::Mode::RETURN_ERROR);
auto predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
auto result_fields = fields;
result_fields.insert(result_fields.begin(), SpecialFields::ValueKind().ArrowField());
auto expected_array =
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([
[0, "Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1]
])")
.ValueOrDie();

auto plan_result = ScanGlobalIndexAndData(table_path, predicate);
CHECK_HOOK_STATUS(plan_result.status(), i);
auto plan = std::move(plan_result).value();
auto read_status = ReadData(table_path, write_cols, expected_array, predicate, plan);
CHECK_HOOK_STATUS(read_status, i);
read_run_complete = true;
break;
}
ASSERT_TRUE(read_run_complete);

// read for lumina
read_run_complete = false;
for (size_t i = 0; i < 2000; i += paimon::test::RandomNumber(20, 30)) {
ScopeGuard guard([&io_hook]() { io_hook->Clear(); });
io_hook->Reset(i, IOHook::Mode::RETURN_ERROR);
auto global_index_scan_result =
GlobalIndexScan::Create(table_path, /*snapshot_id=*/std::nullopt,
/*partitions=*/std::nullopt, lumina_options,
/*file_system=*/nullptr, pool_);
CHECK_HOOK_STATUS(global_index_scan_result.status(), i);
auto global_index_scan = std::move(global_index_scan_result).value();
auto range_scanner_result = global_index_scan->CreateRangeScan(Range(0, 3));
CHECK_HOOK_STATUS(range_scanner_result.status(), i);
auto range_scanner = std::move(range_scanner_result).value();
auto lumina_reader_result = range_scanner->CreateReader("f1", "lumina");
CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(lumina_reader_result.status());
auto lumina_reader = std::move(lumina_reader_result).value();

std::vector<float> query = {1.0f, 1.0f, 1.0f, 1.1f};
auto topk_result = lumina_reader->VisitTopK(1, query, /*filter=*/nullptr,
/*predicate*/ nullptr);
CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(topk_result.status());
ASSERT_EQ(topk_result.value()->ToString(), "row ids: {3}, scores: {0.01}");
read_run_complete = true;
break;
}
ASSERT_TRUE(read_run_complete);
}

std::vector<std::string> GetTestValuesForGlobalIndexTest() {
std::vector<std::string> values;
values.emplace_back("parquet");
Expand Down
Loading