Skip to content

Commit 66a701e

Browse files
authored
feat(scan): support create index readers with field name during scan process (#26)
1 parent 803e9b3 commit 66a701e

File tree

4 files changed

+101
-0
lines changed

4 files changed

+101
-0
lines changed

include/paimon/global_index/row_range_global_index_scanner.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ class PAIMON_EXPORT RowRangeGlobalIndexScanner {
5252
/// format).
5353
virtual Result<std::shared_ptr<GlobalIndexReader>> CreateReader(
5454
const std::string& field_name, const std::string& index_type) const = 0;
55+
56+
/// Creates several `GlobalIndexReader`s for a specific field within this range.
57+
///
58+
/// @param field_name Name of the indexed column.
59+
/// @return A `Result` that is:
60+
/// - Successful with several readers if the indexes exist and load correctly;
61+
/// - Successful with an empty vector if no index was built for the given field;
62+
/// - Error returns when loading fails (e.g., file corruption, I/O error, unsupported
63+
/// format) or the predicate method was incorrectly invoked (e.g., VisitTopK was invoked
64+
/// incorrectly).
65+
virtual Result<std::vector<std::shared_ptr<GlobalIndexReader>>> CreateReaders(
66+
const std::string& field_name) const = 0;
5567
};
5668

5769
} // namespace paimon

src/paimon/core/global_index/row_range_global_index_scanner_impl.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,20 @@ Result<std::shared_ptr<GlobalIndexReader>> RowRangeGlobalIndexScannerImpl::Creat
6464
return CreateReader(field, index_type, entries);
6565
}
6666

67+
Result<std::vector<std::shared_ptr<GlobalIndexReader>>>
68+
RowRangeGlobalIndexScannerImpl::CreateReaders(const std::string& field_name) const {
69+
PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_name));
70+
return CreateReaders(field);
71+
}
72+
6773
Result<std::vector<std::shared_ptr<GlobalIndexReader>>>
6874
RowRangeGlobalIndexScannerImpl::CreateReaders(int32_t field_id) const {
6975
PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_id));
76+
return CreateReaders(field);
77+
}
78+
79+
Result<std::vector<std::shared_ptr<GlobalIndexReader>>>
80+
RowRangeGlobalIndexScannerImpl::CreateReaders(const DataField& field) const {
7081
auto field_iter = grouped_entries_.find(field.Id());
7182
if (field_iter == grouped_entries_.end()) {
7283
return std::vector<std::shared_ptr<GlobalIndexReader>>();

src/paimon/core/global_index/row_range_global_index_scanner_impl.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,13 @@ class RowRangeGlobalIndexScannerImpl
4747
Result<std::shared_ptr<GlobalIndexReader>> CreateReader(
4848
const std::string& field_name, const std::string& index_type) const override;
4949

50+
Result<std::vector<std::shared_ptr<GlobalIndexReader>>> CreateReaders(
51+
const std::string& field_name) const override;
52+
5053
private:
5154
Result<std::vector<std::shared_ptr<GlobalIndexReader>>> CreateReaders(int32_t field_id) const;
55+
Result<std::vector<std::shared_ptr<GlobalIndexReader>>> CreateReaders(
56+
const DataField& field) const;
5257

5358
Result<std::shared_ptr<GlobalIndexReader>> CreateReader(
5459
const DataField& field, const std::string& index_type,

test/inte/global_index_test.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1601,6 +1601,79 @@ TEST_P(GlobalIndexTest, TestDataEvolutionBatchScanWithPartitionWithTwoFields) {
16011601
}
16021602
}
16031603

1604+
TEST_P(GlobalIndexTest, TestScanIndexWithTwoIndexes) {
1605+
arrow::FieldVector fields = {
1606+
arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::list(arrow::float32())),
1607+
arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())};
1608+
std::map<std::string, std::string> lumina_options = {
1609+
{"lumina.dimension", "4"},
1610+
{"lumina.indextype", "bruteforce"},
1611+
{"lumina.distance.metric", "l2"},
1612+
{"lumina.encoding.type", "encoding.rawf32"},
1613+
{"lumina.search.threadcount", "10"}};
1614+
auto schema = arrow::schema(fields);
1615+
std::map<std::string, std::string> options = {{Options::MANIFEST_FORMAT, "orc"},
1616+
{Options::FILE_FORMAT, GetParam()},
1617+
{Options::FILE_SYSTEM, "local"},
1618+
{Options::ROW_TRACKING_ENABLED, "true"},
1619+
{Options::DATA_EVOLUTION_ENABLED, "true"}};
1620+
CreateTable(/*partition_keys=*/{}, schema, options);
1621+
1622+
std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar");
1623+
std::vector<std::string> write_cols = schema->field_names();
1624+
1625+
auto src_array = std::dynamic_pointer_cast<arrow::StructArray>(
1626+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
1627+
["Alice", [0.0, 0.0, 0.0, 0.0], 10, 11.1],
1628+
["Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1],
1629+
["Emily", [1.0, 0.0, 1.0, 0.0], 10, 13.1],
1630+
["Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1],
1631+
["Lucy", [10.0, 10.0, 10.0, 10.0], 20, 15.1],
1632+
["Bob", [10.0, 11.0, 10.0, 11.0], 20, 16.1],
1633+
["Tony", [11.0, 10.0, 11.0, 10.0], 20, 17.1],
1634+
["Alice", [11.0, 11.0, 11.0, 11.0], 20, 18.1],
1635+
["Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1]
1636+
])")
1637+
.ValueOrDie());
1638+
ASSERT_OK_AND_ASSIGN(auto commit_msgs, WriteArray(table_path, write_cols, src_array));
1639+
ASSERT_OK(Commit(table_path, commit_msgs));
1640+
1641+
// write and commit bitmap global index
1642+
ASSERT_OK(WriteIndex(table_path, /*partition_filters=*/{}, "f0", "bitmap",
1643+
/*options=*/{}, Range(0, 8)));
1644+
1645+
// write and commit lumina global index
1646+
ASSERT_OK(WriteIndex(table_path, /*partition_filters=*/{}, "f1", "lumina",
1647+
/*options=*/lumina_options, Range(0, 8)));
1648+
1649+
ASSERT_OK_AND_ASSIGN(
1650+
auto global_index_scan,
1651+
GlobalIndexScan::Create(table_path, /*snapshot_id=*/std::nullopt,
1652+
/*partitions=*/std::nullopt, /*options=*/lumina_options,
1653+
/*file_system=*/nullptr, pool_));
1654+
ASSERT_OK_AND_ASSIGN(std::vector<Range> ranges, global_index_scan->GetRowRangeList());
1655+
ASSERT_EQ(ranges, std::vector<Range>({Range(0, 8)}));
1656+
ASSERT_OK_AND_ASSIGN(auto range_scanner, global_index_scan->CreateRangeScan(Range(0, 8)));
1657+
// query f0
1658+
ASSERT_OK_AND_ASSIGN(auto index_readers, range_scanner->CreateReaders("f0"));
1659+
ASSERT_EQ(index_readers.size(), 1);
1660+
ASSERT_OK_AND_ASSIGN(auto index_result,
1661+
index_readers[0]->VisitEqual(Literal(FieldType::STRING, "Alice", 5)));
1662+
ASSERT_EQ(index_result->ToString(), "{0,7}");
1663+
1664+
// query f1
1665+
ASSERT_OK_AND_ASSIGN(index_readers, range_scanner->CreateReaders("f1"));
1666+
ASSERT_EQ(index_readers.size(), 1);
1667+
std::vector<float> query = {11.0f, 11.0f, 11.0f, 11.0f};
1668+
ASSERT_OK_AND_ASSIGN(auto topk_result, index_readers[0]->VisitTopK(1, query, /*filter=*/nullptr,
1669+
/*predicate*/ nullptr));
1670+
ASSERT_EQ(topk_result->ToString(), "row ids: {7}, scores: {0}");
1671+
1672+
// query f2
1673+
ASSERT_OK_AND_ASSIGN(index_readers, range_scanner->CreateReaders("f2"));
1674+
ASSERT_EQ(index_readers.size(), 0);
1675+
}
1676+
16041677
std::vector<std::string> GetTestValuesForGlobalIndexTest() {
16051678
std::vector<std::string> values = {"parquet"};
16061679
#ifdef PAIMON_ENABLE_ORC

0 commit comments

Comments
 (0)