Skip to content

Commit 8f6e3d4

Browse files
committed
feat: support returning index scores in read process
1 parent 4b0123e commit 8f6e3d4

37 files changed

+1188
-320
lines changed

include/paimon/global_index/global_index_scan.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,15 @@ class PAIMON_EXPORT GlobalIndexScan {
6363
virtual Result<std::shared_ptr<RowRangeGlobalIndexScanner>> CreateRangeScan(
6464
const Range& range) = 0;
6565

66-
/// Returns the set of row ID ranges covered by this global index.
66+
/// Returns row ID ranges covered by this global index (sorted and non-overlapping
67+
/// ranges).
6768
///
6869
/// Each `Range` represents a contiguous segment of row IDs for which global index
6970
/// data exists. This allows the query engine to parallelize scanning and be aware
7071
/// of ranges that are not covered by any global index.
7172
///
72-
/// @return A `Result` containing a set of non-overlapping `Range` objects.
73-
virtual Result<std::set<Range>> GetRowRangeList() = 0;
73+
/// @return A `Result` containing sorted and non-overlapping `Range` objects.
74+
virtual Result<std::vector<Range>> GetRowRangeList() = 0;
7475
};
7576

7677
} // namespace paimon

include/paimon/read_context.h

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
#include "paimon/predicate/predicate.h"
2727
#include "paimon/result.h"
2828
#include "paimon/type_fwd.h"
29-
#include "paimon/utils/range.h"
3029
#include "paimon/visibility.h"
3130

3231
namespace paimon {
@@ -43,8 +42,8 @@ class PAIMON_EXPORT ReadContext {
4342
public:
4443
ReadContext(const std::string& path, const std::string& branch,
4544
const std::vector<std::string>& read_schema,
46-
const std::shared_ptr<Predicate>& predicate, const std::vector<Range>& row_ranges,
47-
bool enable_predicate_filter, bool enable_prefetch, uint32_t prefetch_batch_count,
45+
const std::shared_ptr<Predicate>& predicate, bool enable_predicate_filter,
46+
bool enable_prefetch, uint32_t prefetch_batch_count,
4847
uint32_t prefetch_max_parallel_num, bool enable_multi_thread_row_to_batch,
4948
uint32_t row_to_batch_thread_number, const std::optional<std::string>& table_schema,
5049
const std::shared_ptr<MemoryPool>& memory_pool,
@@ -77,10 +76,6 @@ class PAIMON_EXPORT ReadContext {
7776
return predicate_;
7877
}
7978

80-
const std::vector<Range>& GetRowRanges() const {
81-
return row_ranges_;
82-
}
83-
8479
bool EnablePredicateFilter() const {
8580
return enable_predicate_filter_;
8681
}
@@ -114,7 +109,6 @@ class PAIMON_EXPORT ReadContext {
114109
std::string branch_;
115110
std::vector<std::string> read_schema_;
116111
std::shared_ptr<Predicate> predicate_;
117-
std::vector<Range> row_ranges_;
118112
bool enable_predicate_filter_;
119113
bool enable_prefetch_;
120114
uint32_t prefetch_batch_count_;
@@ -273,18 +267,6 @@ class PAIMON_EXPORT ReadContextBuilder {
273267
ReadContextBuilder& WithFileSystemSchemeToIdentifierMap(
274268
const std::map<std::string, std::string>& fs_scheme_to_identifier_map);
275269

276-
/// Set specific row ranges to read for targeted data access.
277-
///
278-
/// This is primarily used in data evolution scenarios where only specific rows
279-
/// need to be read. File ranges that do not intersect with the specified row ranges
280-
/// will be filtered out, improving performance by avoiding unnecessary I/O.
281-
///
282-
/// @param row_ranges Vector of specific row ranges to read.
283-
/// @return Reference to this builder for method chaining.
284-
/// @note If not set, all rows in the selected files will be returned.
285-
/// @note This is commonly used in data evolution mode for selective reading.
286-
ReadContextBuilder& SetRowRanges(const std::vector<Range>& row_ranges);
287-
288270
/// Build and return a `ReadContext` instance with input validation.
289271
/// @return Result containing the constructed `ReadContext` or an error status.
290272
Result<std::unique_ptr<ReadContext>> Finish();

include/paimon/utils/range.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#pragma once
1818
#include <optional>
1919
#include <string>
20+
#include <vector>
2021

2122
#include "paimon/visibility.h"
2223

@@ -28,9 +29,23 @@ struct PAIMON_EXPORT Range {
2829
/// Returns the number of integers in the range [from, to].
2930
int64_t Count() const;
3031

32+
/// Computes the intersection of two ranges.
3133
static std::optional<Range> Intersection(const Range& left, const Range& right);
34+
35+
/// Checks whether two ranges have any overlap.
3236
static bool HasIntersection(const Range& left, const Range& right);
3337

38+
/// Sorts a list of ranges by `from`, then merges overlapping or adjacent ranges.
39+
/// @param ranges Input vector of ranges to merge.
40+
/// @param adjacent If true, also merges ranges that are adjacent (e.g., [1,3] and [4,5] →
41+
/// [1,5]).
42+
/// If false, only merges strictly overlapping ranges.
43+
/// @return A new vector of non-overlapping, sorted ranges.
44+
static std::vector<Range> SortAndMergeOverlap(const std::vector<Range>& ranges, bool adjacent);
45+
46+
/// Computes the set intersection of two collections of disjoint, sorted ranges.
47+
static std::vector<Range> And(const std::vector<Range>& left, const std::vector<Range>& right);
48+
3449
bool operator==(const Range& other) const;
3550
bool operator<(const Range& other) const;
3651

src/paimon/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ set(PAIMON_COMMON_SRCS
4646
common/fs/file_system.cpp
4747
common/fs/resolving_file_system.cpp
4848
common/fs/file_system_factory.cpp
49-
common/global_index/bitmap_topk_global_index_result.cpp
49+
common/global_index/complete_index_score_batch_reader.cpp
50+
common/global_index/bitmap_topk_global_index_result.cpp
5051
common/global_index/bitmap_global_index_result.cpp
5152
common/global_index/global_index_result.cpp
5253
common/global_index/global_indexer_factory.cpp
@@ -326,6 +327,7 @@ if(PAIMON_BUILD_TESTS)
326327
common/file_index/bsi/bit_slice_index_roaring_bitmap_test.cpp
327328
common/file_index/bloomfilter/bloom_filter_file_index_test.cpp
328329
common/file_index/bloomfilter/fast_hash_test.cpp
330+
common/global_index/complete_index_score_batch_reader_test.cpp
329331
common/global_index/global_index_result_test.cpp
330332
common/global_index/global_indexer_factory_test.cpp
331333
common/global_index/bitmap_global_index_result_test.cpp

src/paimon/common/data/blob_utils.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,10 @@ Result<BlobUtils::SeparatedStructArrays> BlobUtils::SeparateBlobArray(
7474
}
7575

7676
SeparatedStructArrays result;
77-
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
78-
result.main_array,
79-
arrow::StructArray::Make(remaining_arrays, remaining_fields, struct_array->null_bitmap(),
80-
struct_array->null_count(), struct_array->offset()));
81-
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
82-
result.blob_array,
83-
arrow::StructArray::Make(blob_arrays, blob_fields, struct_array->null_bitmap(),
84-
struct_array->null_count(), struct_array->offset()));
77+
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(result.main_array,
78+
arrow::StructArray::Make(remaining_arrays, remaining_fields));
79+
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(result.blob_array,
80+
arrow::StructArray::Make(blob_arrays, blob_fields));
8581
return result;
8682
}
8783

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2024-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "paimon/common/global_index/complete_index_score_batch_reader.h"
18+
19+
#include <cstddef>
20+
21+
#include "arrow/api.h"
22+
#include "arrow/array/array_base.h"
23+
#include "arrow/array/array_nested.h"
24+
#include "arrow/array/util.h"
25+
#include "arrow/c/abi.h"
26+
#include "arrow/c/bridge.h"
27+
#include "arrow/scalar.h"
28+
#include "paimon/common/reader/reader_utils.h"
29+
#include "paimon/common/table/special_fields.h"
30+
#include "paimon/common/types/row_kind.h"
31+
#include "paimon/common/utils/arrow/mem_utils.h"
32+
#include "paimon/common/utils/arrow/status_utils.h"
33+
#include "paimon/status.h"
34+
namespace paimon {
35+
CompleteIndexScoreBatchReader::CompleteIndexScoreBatchReader(
36+
std::unique_ptr<BatchReader>&& reader, const std::vector<float>& scores,
37+
const std::shared_ptr<MemoryPool>& pool)
38+
: arrow_pool_(GetArrowPool(pool)), reader_(std::move(reader)), scores_(scores) {}
39+
40+
Result<BatchReader::ReadBatch> CompleteIndexScoreBatchReader::NextBatch() {
41+
PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap,
42+
NextBatchWithBitmap());
43+
return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), arrow_pool_.get());
44+
}
45+
46+
void CompleteIndexScoreBatchReader::UpdateScoreFieldIndex(const arrow::StructType* struct_type) {
47+
if (index_score_field_idx_ != -1) {
48+
return;
49+
}
50+
index_score_field_idx_ = struct_type->GetFieldIndex(SpecialFields::IndexScore().Name());
51+
field_names_with_score_.reserve(struct_type->num_fields());
52+
for (const auto& field : struct_type->fields()) {
53+
field_names_with_score_.push_back(field->name());
54+
}
55+
}
56+
Result<BatchReader::ReadBatchWithBitmap> CompleteIndexScoreBatchReader::NextBatchWithBitmap() {
57+
PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap,
58+
reader_->NextBatchWithBitmap());
59+
if (BatchReader::IsEofBatch(batch_with_bitmap)) {
60+
return batch_with_bitmap;
61+
}
62+
if (scores_.empty()) {
63+
// Indicates score field all null.
64+
return batch_with_bitmap;
65+
}
66+
67+
auto& [batch, bitmap] = batch_with_bitmap;
68+
auto& [c_array, c_schema] = batch;
69+
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array,
70+
arrow::ImportArray(c_array.get(), c_schema.get()));
71+
auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(arrow_array);
72+
if (!struct_array) {
73+
return Status::Invalid("cannot cast array to StructArray in CompleteIndexScoreBatchReader");
74+
}
75+
auto struct_type = struct_array->struct_type();
76+
UpdateScoreFieldIndex(struct_type);
77+
78+
// prepare index score array
79+
std::unique_ptr<arrow::ArrayBuilder> index_score_builder;
80+
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::MakeBuilder(
81+
arrow_pool_.get(), SpecialFields::IndexScore().Type(), &index_score_builder));
82+
auto typed_builder = dynamic_cast<arrow::FloatBuilder*>(index_score_builder.get());
83+
assert(typed_builder);
84+
PAIMON_RETURN_NOT_OK_FROM_ARROW(typed_builder->Reserve(struct_array->length()));
85+
bool all_not_null = (struct_array->length() == bitmap.Cardinality());
86+
for (int64_t i = 0; i < struct_array->length(); i++) {
87+
if (all_not_null || bitmap.Contains(i)) {
88+
PAIMON_RETURN_NOT_OK_FROM_ARROW(typed_builder->Append(scores_[score_cursor_++]));
89+
} else {
90+
PAIMON_RETURN_NOT_OK_FROM_ARROW(typed_builder->AppendNull());
91+
}
92+
}
93+
std::shared_ptr<arrow::Array> index_score_array;
94+
PAIMON_RETURN_NOT_OK_FROM_ARROW(typed_builder->Finish(&index_score_array));
95+
// update index score array to struct array
96+
arrow::ArrayVector array_vec = struct_array->fields();
97+
array_vec[index_score_field_idx_] = index_score_array;
98+
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::StructArray> array_with_score,
99+
arrow::StructArray::Make(array_vec, field_names_with_score_));
100+
PAIMON_RETURN_NOT_OK_FROM_ARROW(
101+
arrow::ExportArray(*array_with_score, c_array.get(), c_schema.get()));
102+
return batch_with_bitmap;
103+
}
104+
} // namespace paimon
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2024-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <cstdint>
20+
#include <memory>
21+
#include <string>
22+
#include <utility>
23+
#include <vector>
24+
25+
#include "arrow/api.h"
26+
#include "arrow/array/array_base.h"
27+
#include "paimon/reader/batch_reader.h"
28+
#include "paimon/result.h"
29+
30+
namespace paimon {
31+
class MemoryPool;
32+
class Metrics;
33+
/// A batch reader that enriches the output Arrow array with index score information.
34+
/// It assumes the input data already contains the `_INDEX_SCORE` column,
35+
/// and ensures this score is properly updated in the returned batches.
36+
///
37+
/// @pre The read schema must include the `_INDEX_SCORE` field.
38+
class CompleteIndexScoreBatchReader : public BatchReader {
39+
public:
40+
CompleteIndexScoreBatchReader(std::unique_ptr<BatchReader>&& reader,
41+
const std::vector<float>& scores,
42+
const std::shared_ptr<MemoryPool>& pool);
43+
44+
Result<ReadBatch> NextBatch() override;
45+
46+
Result<ReadBatchWithBitmap> NextBatchWithBitmap() override;
47+
48+
void Close() override {
49+
reader_->Close();
50+
}
51+
52+
std::shared_ptr<Metrics> GetReaderMetrics() const override {
53+
return reader_->GetReaderMetrics();
54+
}
55+
56+
private:
57+
void UpdateScoreFieldIndex(const arrow::StructType* struct_type);
58+
59+
private:
60+
size_t score_cursor_ = 0;
61+
int32_t index_score_field_idx_ = -1;
62+
std::vector<std::string> field_names_with_score_;
63+
std::unique_ptr<arrow::MemoryPool> arrow_pool_;
64+
std::unique_ptr<BatchReader> reader_;
65+
std::vector<float> scores_;
66+
};
67+
} // namespace paimon

0 commit comments

Comments
 (0)