Skip to content

Commit cdf8a26

Browse files
committed
Add null_array serialization
1 parent 478d903 commit cdf8a26

File tree

4 files changed

+299
-1
lines changed

4 files changed

+299
-1
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src)
4444
set(SPARROW_IPC_HEADERS
4545
${SPARROW_IPC_INCLUDE_DIR}/config/config.hpp
4646
${SPARROW_IPC_INCLUDE_DIR}/serialize.hpp
47+
${SPARROW_IPC_INCLUDE_DIR}/serialize_null_array.hpp
4748
${SPARROW_IPC_INCLUDE_DIR}/utils.hpp
4849
)
4950

include/serialize_null_array.hpp

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
#pragma once
2+
3+
// TODO check needs of all these below
4+
#include <cstdint>
5+
#include <cstring>
6+
#include <optional>
7+
#include <stdexcept>
8+
#include <string>
9+
#include <string_view>
10+
#include <vector>
11+
12+
#include "sparrow.hpp"
13+
14+
// TODO check needs of these two
15+
#include "Message_generated.h"
16+
#include "Schema_generated.h"
17+
18+
#include "utils.hpp"
19+
20+
namespace sparrow_ipc
21+
{
22+
// TODO move to cpp if not templated
23+
// TODO ask to add comments and review them thouroughly
24+
25+
// This function serializes a sparrow::null_array into a byte vector compliant
26+
// with the Apache Arrow IPC Streaming Format. It mirrors the structure of
27+
// serialize_primitive_array but is optimized for null_array's properties.
28+
// A null_array is represented by metadata only (Schema, RecordBatch) and has no data buffers,
29+
// making its message body zero-length.
30+
std::vector<uint8_t> serialize_null_array(sparrow::null_array& arr)
31+
{
32+
// Use the Arrow C Data Interface to get a generic description of the array.
33+
// For a null_array, the ArrowArray struct will report n_buffers = 0.
34+
auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr);
35+
auto& arrow_arr = *arrow_arr_ptr;
36+
auto& arrow_schema = *arrow_schema_ptr;
37+
38+
std::vector<uint8_t> final_buffer;
39+
40+
// I - Serialize the Schema message
41+
// This part is almost identical to how a primitive_array's schema is serialized.
42+
{
43+
flatbuffers::FlatBufferBuilder schema_builder;
44+
45+
flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0;
46+
if (arrow_schema.name)
47+
{
48+
fb_name_offset = schema_builder.CreateString(arrow_schema.name);
49+
}
50+
51+
// For null_array, the format string is "n".
52+
auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format);
53+
54+
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
55+
fb_metadata_offset = 0;
56+
57+
if (arr.metadata())
58+
{
59+
sparrow::key_value_view metadata_view = *(arr.metadata());
60+
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets;
61+
// kv_offsets.reserve(metadata_view.size());
62+
// for (const auto& pair : metadata_view)
63+
// {
64+
// auto key_offset = schema_builder.CreateString(std::string(pair.first));
65+
// auto value_offset = schema_builder.CreateString(std::string(pair.second));
66+
// kv_offsets.push_back(
67+
// org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset));
68+
// }
69+
auto mv_it = metadata_view.cbegin();
70+
for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it)
71+
{
72+
auto key_offset = schema_builder.CreateString(std::string((*mv_it).first));
73+
auto value_offset = schema_builder.CreateString(std::string((*mv_it).second));
74+
kv_offsets.push_back(
75+
org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset));
76+
}
77+
fb_metadata_offset = schema_builder.CreateVector(kv_offsets);
78+
}
79+
80+
auto fb_field = org::apache::arrow::flatbuf::CreateField(
81+
schema_builder,
82+
fb_name_offset,
83+
(arrow_schema.flags & static_cast<int64_t>(sparrow::ArrowFlag::NULLABLE)) != 0,
84+
type_enum,
85+
type_offset,
86+
0, // dictionary
87+
0, // children
88+
fb_metadata_offset);
89+
90+
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field};
91+
auto fb_fields = schema_builder.CreateVector(fields_vec);
92+
93+
auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields);
94+
95+
auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage(
96+
schema_builder,
97+
org::apache::arrow::flatbuf::MetadataVersion::V5,
98+
org::apache::arrow::flatbuf::MessageHeader::Schema,
99+
schema_offset.Union(),
100+
0 // bodyLength
101+
);
102+
schema_builder.Finish(schema_message_offset);
103+
104+
uint32_t schema_len = schema_builder.GetSize();
105+
final_buffer.resize(sizeof(uint32_t) + schema_len);
106+
memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len);
107+
*(reinterpret_cast<uint32_t*>(final_buffer.data())) = schema_len;
108+
}
109+
110+
// II - Serialize the RecordBatch message
111+
{
112+
flatbuffers::FlatBufferBuilder batch_builder;
113+
114+
// The FieldNode describes the layout (length and null count).
115+
// For a null_array, length and null_count are always equal.
116+
org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count);
117+
auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1);
118+
119+
// A null_array has no buffers. The ArrowArray struct reports n_buffers = 0,
120+
// so we create an empty vector of buffers for the Flatbuffers message.
121+
auto fb_buffers_vector = batch_builder.CreateVectorOfStructs<org::apache::arrow::flatbuf::Buffer>({});
122+
123+
auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector);
124+
125+
// The bodyLength is 0 because there are no data buffers.
126+
auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage(
127+
batch_builder,
128+
org::apache::arrow::flatbuf::MetadataVersion::V5,
129+
org::apache::arrow::flatbuf::MessageHeader::RecordBatch,
130+
record_batch_offset.Union(),
131+
0 // bodyLength
132+
);
133+
batch_builder.Finish(batch_message_offset);
134+
135+
uint32_t batch_meta_len = batch_builder.GetSize();
136+
int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len);
137+
138+
size_t current_size = final_buffer.size();
139+
// Resize for the RecordBatch metadata. There is no body to append.
140+
final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len);
141+
uint8_t* dst = final_buffer.data() + current_size;
142+
143+
*(reinterpret_cast<uint32_t*>(dst)) = batch_meta_len;
144+
dst += sizeof(uint32_t);
145+
memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len);
146+
memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len);
147+
}
148+
149+
return final_buffer;
150+
}
151+
152+
// This function deserializes a byte vector into a sparrow::null_array.
153+
// It reads the Schema and RecordBatch messages to extract the array's length,
154+
// name, and metadata, then constructs a null_array.
155+
sparrow::null_array deserialize_null_array(const std::vector<uint8_t>& buffer)
156+
{
157+
const uint8_t* buf_ptr = buffer.data();
158+
size_t current_offset = 0;
159+
160+
// I - Deserialize the Schema message
161+
uint32_t schema_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
162+
current_offset += sizeof(uint32_t);
163+
auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
164+
if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema)
165+
{
166+
throw std::runtime_error("Expected Schema message at the start of the buffer.");
167+
}
168+
auto flatbuffer_schema = static_cast<const org::apache::arrow::flatbuf::Schema*>(schema_message->header());
169+
auto fields = flatbuffer_schema->fields();
170+
if (fields->size() != 1)
171+
{
172+
throw std::runtime_error("Expected schema with exactly one field for null_array.");
173+
}
174+
auto field = fields->Get(0);
175+
if (field->type_type() != org::apache::arrow::flatbuf::Type::Null)
176+
{
177+
throw std::runtime_error("Expected Null type in schema.");
178+
}
179+
180+
std::optional<std::string_view> name;
181+
if (auto fb_name = field->name())
182+
{
183+
name = std::string_view(fb_name->c_str(), fb_name->size());
184+
}
185+
186+
// std::optional<sparrow::key_value_metadata> metadata;
187+
std::optional<std::vector<sparrow::metadata_pair>> metadata;
188+
if (auto fb_metadata = field->custom_metadata())
189+
{
190+
if (fb_metadata->size() > 0)
191+
{
192+
metadata = std::vector<sparrow::metadata_pair>();
193+
metadata->reserve(fb_metadata->size());
194+
// sparrow::metadata_map map;
195+
for (const auto& kv : *fb_metadata)
196+
{
197+
// map.emplace(kv->key()->str(), kv->value()->str());
198+
metadata->emplace_back(kv->key()->str(), kv->value()->str());
199+
}
200+
// metadata = sparrow::key_value_metadata(map);
201+
}
202+
}
203+
204+
current_offset += schema_meta_len;
205+
206+
// II - Deserialize the RecordBatch message
207+
uint32_t batch_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
208+
current_offset += sizeof(uint32_t);
209+
auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
210+
if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch)
211+
{
212+
throw std::runtime_error("Expected RecordBatch message, but got a different type.");
213+
}
214+
auto record_batch = static_cast<const org::apache::arrow::flatbuf::RecordBatch*>(batch_message->header());
215+
216+
// The body is empty, so we don't need to read any further.
217+
// Construct the null_array from the deserialized metadata.
218+
return sparrow::null_array(record_batch->length(), name, metadata);
219+
}
220+
}

tests/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ find_package(doctest CONFIG REQUIRED)
44

55
set(test_target "test_sparrow_ipc_lib")
66

7-
add_executable(${test_target} main.cpp test_primitive_array.cpp test_utils.cpp)
7+
add_executable(${test_target} main.cpp test_utils.cpp test_primitive_array.cpp test_serialize_null_array.cpp)
88
target_link_libraries(${test_target}
99
PRIVATE
1010
sparrow-ipc
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#include "doctest/doctest.h"
2+
#include "sparrow.hpp"
3+
4+
#include "serialize_null_array.hpp"
5+
6+
namespace sparrow_ipc
7+
{
8+
namespace sp = sparrow;
9+
// TODO have a generic compare_metadata in tests/helpers.hpp, cpp
10+
// taking pa and na
11+
void compare_metadata(sp::null_array& na1, sp::null_array& na2)
12+
{
13+
if (!na1.metadata().has_value())
14+
{
15+
CHECK(!na2.metadata().has_value());
16+
return;
17+
}
18+
19+
CHECK(na2.metadata().has_value());
20+
sp::key_value_view kvs1_view = *(na1.metadata());
21+
sp::key_value_view kvs2_view = *(na2.metadata());
22+
23+
CHECK_EQ(kvs1_view.size(), kvs2_view.size());
24+
auto kvs1_it = kvs1_view.cbegin();
25+
auto kvs2_it = kvs2_view.cbegin();
26+
for (auto i = 0; i < kvs1_view.size(); ++i)
27+
{
28+
CHECK_EQ(*kvs1_it, *kvs2_it);
29+
++kvs1_it;
30+
++kvs2_it;
31+
}
32+
}
33+
34+
TEST_CASE("Serialize and deserialize null_array")
35+
{
36+
const std::size_t size = 10;
37+
const std::string_view name = "my_null_array";
38+
39+
const std::vector<sp::metadata_pair> metadata_vec = {{"key1", "value1"}, {"key2", "value2"}};
40+
const std::optional<std::vector<sp::metadata_pair>> metadata = metadata_vec;
41+
42+
sp::null_array arr(size, name, metadata);
43+
44+
auto buffer = serialize_null_array(arr);
45+
auto deserialized_arr = deserialize_null_array(buffer);
46+
47+
CHECK_EQ(deserialized_arr.size(), arr.size());
48+
REQUIRE(deserialized_arr.name().has_value());
49+
CHECK_EQ(deserialized_arr.name().value(), arr.name().value());
50+
51+
REQUIRE(deserialized_arr.metadata().has_value());
52+
const auto& deserialized_metadata = deserialized_arr.metadata().value();
53+
REQUIRE_EQ(deserialized_metadata.size(), arr.metadata().value().size());
54+
55+
compare_metadata(arr, deserialized_arr);
56+
57+
// Check the deserialized object is a null_array
58+
const auto& arrow_proxy = sp::detail::array_access::get_arrow_proxy(deserialized_arr);
59+
CHECK_EQ(arrow_proxy.format(), "n");
60+
CHECK_EQ(arrow_proxy.n_children(), 0);
61+
CHECK_EQ(arrow_proxy.flags(), std::unordered_set<sp::ArrowFlag>{sp::ArrowFlag::NULLABLE});
62+
CHECK_EQ(arrow_proxy.name(), name);
63+
CHECK_EQ(arrow_proxy.dictionary(), nullptr);
64+
CHECK_EQ(arrow_proxy.buffers().size(), 0);
65+
}
66+
67+
TEST_CASE("Serialize and deserialize null_array with no name and no metadata")
68+
{
69+
const std::size_t size = 100;
70+
sp::null_array arr(size);
71+
auto buffer = serialize_null_array(arr);
72+
auto deserialized_arr = deserialize_null_array(buffer);
73+
CHECK_EQ(deserialized_arr.size(), arr.size());
74+
CHECK_FALSE(deserialized_arr.name().has_value());
75+
CHECK_FALSE(deserialized_arr.metadata().has_value());
76+
}
77+
}

0 commit comments

Comments
 (0)