Skip to content

Commit 619fb19

Browse files
authored
feat: support serialize/deserialize for GlobalIndexResult in distributed global index search (#15)
1 parent b43d1b5 commit 619fb19

File tree

3 files changed

+169
-0
lines changed

3 files changed

+169
-0
lines changed

include/paimon/global_index/global_index_result.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#include <string>
2121
#include <utility>
2222

23+
#include "paimon/memory/bytes.h"
24+
#include "paimon/memory/memory_pool.h"
2325
#include "paimon/result.h"
2426
#include "paimon/visibility.h"
2527

@@ -62,6 +64,38 @@ class PAIMON_EXPORT GlobalIndexResult : public std::enable_shared_from_this<Glob
6264
const std::shared_ptr<GlobalIndexResult>& other);
6365

6466
virtual std::string ToString() const = 0;
67+
68+
/// Serializes a GlobalIndexResult object into a byte array.
69+
///
70+
/// @note This method only supports the following concrete implementations:
71+
/// - BitmapTopKGlobalIndexResult
72+
/// - BitmapGlobalIndexResult
73+
///
74+
/// @param global_index_result The GlobalIndexResult instance to serialize (must not be null).
75+
/// @param pool Memory pool used to allocate the output byte buffer.
76+
/// @return A Result containing a unique pointer to the serialized Bytes on success,
77+
/// or an error status on failure.
78+
static Result<PAIMON_UNIQUE_PTR<Bytes>> Serialize(
79+
const std::shared_ptr<GlobalIndexResult>& global_index_result,
80+
const std::shared_ptr<MemoryPool>& pool);
81+
82+
/// Deserializes a GlobalIndexResult object from a raw byte buffer.
83+
///
84+
/// @note The concrete type of the deserialized object is determined by metadata
85+
/// embedded in the buffer. Currently, only the following types are supported:
86+
/// - BitmapTopKGlobalIndexResult
87+
/// - BitmapGlobalIndexResult
88+
///
89+
/// @param buffer Pointer to the serialized byte data (must not be null).
90+
/// @param length Size of the buffer in bytes.
91+
/// @param pool Memory pool used to allocate internal objects during deserialization.
92+
/// @return A Result containing a shared pointer to the reconstructed GlobalIndexResult
93+
/// on success, or an error status on failure.
94+
static Result<std::shared_ptr<GlobalIndexResult>> Deserialize(
95+
const char* buffer, size_t length, const std::shared_ptr<MemoryPool>& pool);
96+
97+
private:
98+
static constexpr int32_t VERSION = 1;
6599
};
66100

67101
/// Represents the result of a Top-K query against a global index.

src/paimon/common/global_index/global_index_result.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,30 @@
1616

1717
#include "paimon/global_index/global_index_result.h"
1818

19+
#include "fmt/format.h"
20+
#include "paimon/common/io/memory_segment_output_stream.h"
21+
#include "paimon/common/memory/memory_segment_utils.h"
1922
#include "paimon/global_index/bitmap_global_index_result.h"
23+
#include "paimon/global_index/bitmap_topk_global_index_result.h"
24+
#include "paimon/io/byte_array_input_stream.h"
25+
#include "paimon/io/data_input_stream.h"
26+
#include "paimon/memory/bytes.h"
27+
#include "paimon/memory/memory_pool.h"
2028
namespace paimon {
29+
namespace {
30+
void WriteBitmapAndScores(const RoaringBitmap64* bitmap, const std::vector<float>& scores,
31+
MemorySegmentOutputStream* out, MemoryPool* pool) {
32+
std::shared_ptr<Bytes> bitmap_bytes = bitmap->Serialize(pool);
33+
out->WriteValue<int32_t>(bitmap_bytes->size());
34+
out->WriteBytes(bitmap_bytes);
35+
36+
out->WriteValue<int32_t>(scores.size());
37+
for (auto score : scores) {
38+
out->WriteValue<float>(score);
39+
}
40+
}
41+
42+
} // namespace
2143
Result<std::shared_ptr<GlobalIndexResult>> GlobalIndexResult::And(
2244
const std::shared_ptr<GlobalIndexResult>& other) {
2345
auto supplier = [other, result = shared_from_this()]() -> Result<RoaringBitmap64> {
@@ -57,4 +79,59 @@ Result<std::shared_ptr<GlobalIndexResult>> GlobalIndexResult::Or(
5779
};
5880
return std::make_shared<BitmapGlobalIndexResult>(supplier);
5981
}
82+
83+
Result<PAIMON_UNIQUE_PTR<Bytes>> GlobalIndexResult::Serialize(
84+
const std::shared_ptr<GlobalIndexResult>& global_index_result,
85+
const std::shared_ptr<MemoryPool>& pool) {
86+
MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool);
87+
out.WriteValue<int32_t>(VERSION);
88+
if (auto bitmap_result =
89+
std::dynamic_pointer_cast<BitmapGlobalIndexResult>(global_index_result)) {
90+
PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* bitmap, bitmap_result->GetBitmap());
91+
WriteBitmapAndScores(bitmap, {}, &out, pool.get());
92+
} else if (auto bitmap_topk_result =
93+
std::dynamic_pointer_cast<BitmapTopKGlobalIndexResult>(global_index_result)) {
94+
PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* bitmap, bitmap_topk_result->GetBitmap());
95+
const auto& scores = bitmap_topk_result->GetScores();
96+
WriteBitmapAndScores(bitmap, scores, &out, pool.get());
97+
} else {
98+
return Status::Invalid(
99+
"invalid GlobalIndexResult, must be BitmapGlobalIndexResult or "
100+
"BitmapTopkGlobalIndexResult");
101+
}
102+
return MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get());
103+
}
104+
105+
Result<std::shared_ptr<GlobalIndexResult>> GlobalIndexResult::Deserialize(
106+
const char* buffer, size_t length, const std::shared_ptr<MemoryPool>& pool) {
107+
auto input_stream = std::make_shared<ByteArrayInputStream>(buffer, length);
108+
DataInputStream in(input_stream);
109+
PAIMON_ASSIGN_OR_RAISE(int32_t version, in.ReadValue<int32_t>());
110+
if (version != VERSION) {
111+
return Status::Invalid(
112+
fmt::format(fmt::format("invalid version {} for GlobalIndexResult", version)));
113+
}
114+
PAIMON_ASSIGN_OR_RAISE(int32_t bitmap_bytes_len, in.ReadValue<int32_t>());
115+
auto bitmap_bytes = Bytes::AllocateBytes(bitmap_bytes_len, pool.get());
116+
PAIMON_RETURN_NOT_OK(in.ReadBytes(bitmap_bytes.get()));
117+
RoaringBitmap64 bitmap;
118+
PAIMON_RETURN_NOT_OK(bitmap.Deserialize(bitmap_bytes->data(), bitmap_bytes->size()));
119+
120+
PAIMON_ASSIGN_OR_RAISE(int32_t score_len, in.ReadValue<int32_t>());
121+
if (score_len == 0) {
122+
return std::make_shared<BitmapGlobalIndexResult>(
123+
[bitmap]() -> Result<RoaringBitmap64> { return bitmap; });
124+
}
125+
if (score_len != bitmap.Cardinality()) {
126+
return Status::Invalid("row id count mismatches score count");
127+
}
128+
std::vector<float> scores;
129+
scores.reserve(score_len);
130+
for (int32_t i = 0; i < score_len; i++) {
131+
PAIMON_ASSIGN_OR_RAISE(float score, in.ReadValue<float>());
132+
scores.push_back(score);
133+
}
134+
return std::make_shared<BitmapTopKGlobalIndexResult>(std::move(bitmap), std::move(scores));
135+
}
136+
60137
} // namespace paimon

src/paimon/common/global_index/global_index_result_test.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include <utility>
2020

2121
#include "gtest/gtest.h"
22+
#include "paimon/global_index/bitmap_global_index_result.h"
23+
#include "paimon/global_index/bitmap_topk_global_index_result.h"
2224
#include "paimon/testing/utils/testharness.h"
2325

2426
namespace paimon::test {
@@ -75,4 +77,60 @@ TEST_F(GlobalIndexResultTest, TestSimple) {
7577
ASSERT_OK_AND_ASSIGN(auto or_result, result1->Or(result2));
7678
ASSERT_EQ(or_result->ToString(), "{1,3,4,5,100,200}");
7779
}
80+
81+
TEST_F(GlobalIndexResultTest, TestSerializeAndDeserializeSimple) {
82+
auto pool = GetDefaultPool();
83+
std::vector<uint8_t> byte_buffer = {
84+
0, 0, 0, 1, 0, 0, 0, 69, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 59,
85+
48, 3, 0, 5, 0, 0, 5, 0, 255, 127, 0, 0, 0, 128, 2, 0, 245, 133, 0, 0, 37,
86+
0, 0, 0, 47, 0, 0, 0, 49, 0, 0, 0, 55, 0, 0, 0, 2, 0, 1, 0, 4, 0,
87+
10, 0, 0, 0, 255, 255, 1, 0, 0, 0, 2, 0, 255, 224, 0, 0, 0, 0};
88+
ASSERT_OK_AND_ASSIGN(
89+
std::shared_ptr<GlobalIndexResult> index_result,
90+
GlobalIndexResult::Deserialize((char*)byte_buffer.data(), byte_buffer.size(), pool));
91+
auto typed_result = std::dynamic_pointer_cast<BitmapGlobalIndexResult>(index_result);
92+
ASSERT_TRUE(typed_result);
93+
94+
auto bitmap = RoaringBitmap64::From(
95+
{1l, 2l, 3l, 4l, 5l, 10l, 2247483647l, 2147483647l, 2147483648l, 2147483649l, 2147483650l});
96+
auto expected_result = std::make_shared<BitmapGlobalIndexResult>(
97+
[bitmap]() -> Result<RoaringBitmap64> { return bitmap; });
98+
ASSERT_EQ(expected_result->ToString(), typed_result->ToString());
99+
ASSERT_OK_AND_ASSIGN(auto serialize_bytes, GlobalIndexResult::Serialize(index_result, pool));
100+
ASSERT_EQ(byte_buffer, std::vector<uint8_t>(serialize_bytes->data(),
101+
serialize_bytes->data() + serialize_bytes->size()));
102+
}
103+
104+
TEST_F(GlobalIndexResultTest, TestSerializeAndDeserializeWithScore) {
105+
auto pool = GetDefaultPool();
106+
std::vector<uint8_t> byte_buffer = {
107+
0, 0, 0, 1, 0, 0, 0, 64, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
108+
58, 48, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 255, 127, 0, 0, 0, 128, 2, 0,
109+
245, 133, 0, 0, 40, 0, 0, 0, 42, 0, 0, 0, 44, 0, 0, 0, 50, 0, 0, 0,
110+
10, 0, 255, 255, 1, 0, 3, 0, 5, 0, 255, 224, 0, 0, 0, 6, 63, 129, 71, 174,
111+
191, 168, 245, 195, 64, 135, 92, 41, 66, 74, 245, 195, 194, 200, 128, 0, 64, 6, 102, 102};
112+
ASSERT_OK_AND_ASSIGN(
113+
std::shared_ptr<GlobalIndexResult> index_result,
114+
GlobalIndexResult::Deserialize((char*)byte_buffer.data(), byte_buffer.size(), pool));
115+
auto typed_result = std::dynamic_pointer_cast<BitmapTopKGlobalIndexResult>(index_result);
116+
ASSERT_TRUE(typed_result);
117+
118+
auto bitmap = RoaringBitmap64::From(
119+
{10l, 2147483647l, 2147483649l, 2147483651l, 2147483653l, 2247483647l});
120+
std::vector<float> scores = {1.01f, -1.32f, 4.23f, 50.74f, -100.25f, 2.10f};
121+
auto expected_result =
122+
std::make_shared<BitmapTopKGlobalIndexResult>(std::move(bitmap), std::move(scores));
123+
ASSERT_EQ(expected_result->ToString(), typed_result->ToString());
124+
ASSERT_OK_AND_ASSIGN(auto serialize_bytes, GlobalIndexResult::Serialize(index_result, pool));
125+
ASSERT_EQ(byte_buffer, std::vector<uint8_t>(serialize_bytes->data(),
126+
serialize_bytes->data() + serialize_bytes->size()));
127+
}
128+
129+
TEST_F(GlobalIndexResultTest, TestInvalidSerialize) {
130+
auto pool = GetDefaultPool();
131+
auto result = std::make_shared<FakeGlobalIndexResult>(std::vector<int64_t>({1, 3, 5, 100}));
132+
ASSERT_NOK_WITH_MSG(GlobalIndexResult::Serialize(result, pool),
133+
"invalid GlobalIndexResult, must be BitmapGlobalIndexResult or "
134+
"BitmapTopkGlobalIndexResult");
135+
}
78136
} // namespace paimon::test

0 commit comments

Comments
 (0)