Skip to content

Commit ca52ecf

Browse files
authored
Serialize/Deserialize primitive_array (#4)
* First impl * Add comment * Handle metadata * Use function to handle flatbuffers format * Fix headers with sparrow 1.0.0 * Add macro to functions and fix windows * Add comment * Code review: Remove incorrect comment Template tests * Use data_type_to_format in get_flatbuffer_type
1 parent 0fe5fda commit ca52ecf

File tree

6 files changed

+476
-26
lines changed

6 files changed

+476
-26
lines changed

CMakeLists.txt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,12 @@ set(SPARROW_IPC_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include)
2929
set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src)
3030

3131
set(SPARROW_IPC_HEADERS
32-
# config
3332
${SPARROW_IPC_INCLUDE_DIR}/config/config.hpp
34-
${SPARROW_IPC_INCLUDE_DIR}/sparrow-ipc.hpp
33+
${SPARROW_IPC_INCLUDE_DIR}/serialize.hpp
3534
)
3635

3736
set(SPARROW_IPC_SRC
38-
${SPARROW_IPC_SOURCE_DIR}/sparrow-ipc.cpp
37+
${SPARROW_IPC_SOURCE_DIR}/serialize.cpp
3938
)
4039

4140
set(SCHEMA_DIR ${CMAKE_BINARY_DIR}/format)
@@ -102,6 +101,13 @@ find_package(sparrow CONFIG REQUIRED)
102101

103102
add_library(sparrow-ipc ${SPARROW_IPC_LIBRARY_TYPE} ${SPARROW_IPC_SRC} ${SPARROW_IPC_HEADERS})
104103
target_compile_definitions(sparrow-ipc PUBLIC ${SPARROW_IPC_COMPILE_DEFINITIONS})
104+
105+
if(UNIX)
106+
target_compile_options(sparrow-ipc PRIVATE "-fvisibility=hidden")
107+
else()
108+
target_compile_definitions(sparrow-ipc PRIVATE SPARROW_IPC_EXPORTS)
109+
endif()
110+
105111
target_include_directories(sparrow-ipc PUBLIC ${SPARROW_IPC_INCLUDE_DIR} PRIVATE ${SPARROW_IPC_SOURCE_DIR} )
106112
target_link_libraries(sparrow-ipc PRIVATE flatbuffers_interface)
107113
target_link_libraries(sparrow-ipc PUBLIC flatbuffers::flatbuffers sparrow::sparrow)

include/serialize.hpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#pragma once
2+
3+
#include <vector>
4+
#include "sparrow.hpp"
5+
6+
#include "config/config.hpp"
7+
8+
//TODO split serialize/deserialize fcts in two different files or just rename the current one?
9+
template <typename T>
10+
SPARROW_IPC_API std::vector<uint8_t> serialize_primitive_array(const sparrow::primitive_array<T>& arr);
11+
12+
template <typename T>
13+
SPARROW_IPC_API sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_t>& buffer);

include/sparrow-ipc.hpp

Lines changed: 0 additions & 3 deletions
This file was deleted.

src/serialize.cpp

Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
#include <cstdint>
2+
#include <cstring>
3+
#include <optional>
4+
#include <stdexcept>
5+
#include <string>
6+
#include <string_view>
7+
8+
#include "Message_generated.h"
9+
#include "Schema_generated.h"
10+
11+
#include "serialize.hpp"
12+
13+
namespace
14+
{
15+
// Aligns a value to the next multiple of 8, as required by the Arrow IPC format for message bodies.
16+
int64_t align_to_8(int64_t n)
17+
{
18+
return (n + 7) & -8;
19+
}
20+
21+
// TODO Complete this with all possible formats?
22+
std::pair<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>>
23+
get_flatbuffer_type(flatbuffers::FlatBufferBuilder& builder, const char* format_str)
24+
{
25+
if (format_str == sparrow::data_type_to_format(sparrow::data_type::INT32))
26+
{
27+
auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 32, true);
28+
return {org::apache::arrow::flatbuf::Type::Int, int_type.Union()};
29+
}
30+
else if (format_str == sparrow::data_type_to_format(sparrow::data_type::FLOAT))
31+
{
32+
auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint(
33+
builder, org::apache::arrow::flatbuf::Precision::SINGLE);
34+
return {org::apache::arrow::flatbuf::Type::FloatingPoint, fp_type.Union()};
35+
}
36+
else if (format_str == sparrow::data_type_to_format(sparrow::data_type::DOUBLE))
37+
{
38+
auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint(
39+
builder, org::apache::arrow::flatbuf::Precision::DOUBLE);
40+
return {org::apache::arrow::flatbuf::Type::FloatingPoint, fp_type.Union()};
41+
}
42+
else
43+
{
44+
throw std::runtime_error("Unsupported data type for serialization");
45+
}
46+
}
47+
}
48+
49+
template <typename T>
50+
std::vector<uint8_t> serialize_primitive_array(const sparrow::primitive_array<T>& arr)
51+
{
52+
// This function serializes a sparrow::primitive_array into a byte vector that is compliant
53+
// with the Apache Arrow IPC Streaming Format. It constructs a stream containing two messages:
54+
// 1. A Schema message: Describes the data's metadata (field name, type, nullability).
55+
// 2. A RecordBatch message: Contains the actual array data (null count, length, and raw buffers).
56+
// This two-part structure makes the data self-describing and readable by other Arrow-native tools.
57+
// The implementation adheres to the specification by correctly handling:
58+
// - Message order (Schema first, then RecordBatch).
59+
// - The encapsulated message format (4-byte metadata length prefix).
60+
// - 8-byte padding and alignment for the message body.
61+
// - Correctly populating the Flatbuffer-defined metadata for both messages.
62+
63+
// Create a mutable copy of the input array to allow moving its internal structures
64+
sparrow::primitive_array<T> mutable_arr = arr;
65+
auto [arrow_arr, arrow_schema] = sparrow::extract_arrow_structures(std::move(mutable_arr));
66+
67+
// This will be the final buffer holding the complete IPC stream.
68+
std::vector<uint8_t> final_buffer;
69+
70+
// I - Serialize the Schema message
71+
// An Arrow IPC stream must start with a Schema message
72+
{
73+
// Create a new builder for the Schema message's metadata
74+
flatbuffers::FlatBufferBuilder schema_builder;
75+
76+
// Create the Field metadata, which describes a single column (or array)
77+
flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0;
78+
if (arrow_schema.name)
79+
{
80+
fb_name_offset = schema_builder.CreateString(arrow_schema.name);
81+
}
82+
83+
// Determine the Flatbuffer type information from the C schema's format string
84+
auto [type_enum, type_offset] = get_flatbuffer_type(schema_builder, arrow_schema.format);
85+
86+
// Handle metadata
87+
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
88+
fb_metadata_offset = 0;
89+
90+
if (arr.metadata())
91+
{
92+
sparrow::key_value_view metadata_view = *(arr.metadata());
93+
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets;
94+
95+
auto mv_it = metadata_view.cbegin();
96+
for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it)
97+
{
98+
auto key_offset = schema_builder.CreateString(std::string((*mv_it).first));
99+
auto value_offset = schema_builder.CreateString(std::string((*mv_it).second));
100+
kv_offsets.push_back(
101+
org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset));
102+
}
103+
fb_metadata_offset = schema_builder.CreateVector(kv_offsets);
104+
}
105+
106+
// Build the Field object
107+
auto fb_field = org::apache::arrow::flatbuf::CreateField(
108+
schema_builder,
109+
fb_name_offset,
110+
(arrow_schema.flags & static_cast<int64_t>(sparrow::ArrowFlag::NULLABLE)) != 0,
111+
type_enum,
112+
type_offset,
113+
0, // dictionary
114+
0, // children
115+
fb_metadata_offset);
116+
117+
// A Schema contains a vector of fields. For this primitive array, there is only one
118+
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field};
119+
auto fb_fields = schema_builder.CreateVector(fields_vec);
120+
121+
// Build the Schema object from the vector of fields
122+
auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields);
123+
124+
// Wrap the Schema in a top-level Message, which is the standard IPC envelope
125+
auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage(
126+
schema_builder,
127+
org::apache::arrow::flatbuf::MetadataVersion::V5,
128+
org::apache::arrow::flatbuf::MessageHeader::Schema,
129+
schema_offset.Union(),
130+
0
131+
);
132+
schema_builder.Finish(schema_message_offset);
133+
134+
// Assemble the Schema message bytes
135+
uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata
136+
final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message
137+
// Copy the metadata into the buffer, after the 4-byte length prefix
138+
memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len);
139+
// Write the 4-byte metadata length at the beginning of the message
140+
*(reinterpret_cast<uint32_t*>(final_buffer.data())) = schema_len;
141+
}
142+
143+
// II - Serialize the RecordBatch message
144+
// After the Schema, we send the RecordBatch containing the actual data
145+
{
146+
// Create a new builder for the RecordBatch message's metadata
147+
flatbuffers::FlatBufferBuilder batch_builder;
148+
149+
// arrow_arr.buffers[0] is the validity bitmap
150+
// arrow_arr.buffers[1] is the data buffer
151+
const uint8_t* validity_bitmap = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[0]);
152+
const uint8_t* data_buffer = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[1]);
153+
154+
// Calculate the size of the validity and data buffers
155+
int64_t validity_size = (arrow_arr.length + 7) / 8;
156+
int64_t data_size = arrow_arr.length * sizeof(T);
157+
int64_t body_len = validity_size + data_size; // The total size of the message body
158+
159+
// Create Flatbuffer descriptions for the data buffers
160+
org::apache::arrow::flatbuf::Buffer validity_buffer_struct(0, validity_size);
161+
org::apache::arrow::flatbuf::Buffer data_buffer_struct(validity_size, data_size);
162+
// Create the FieldNode, which describes the layout of the array data
163+
org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count);
164+
165+
// A RecordBatch contains a vector of nodes and a vector of buffers
166+
auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1);
167+
std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec = {validity_buffer_struct, data_buffer_struct};
168+
auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec);
169+
170+
// Build the RecordBatch metadata object
171+
auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector);
172+
173+
// Wrap the RecordBatch in a top-level Message
174+
auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage(
175+
batch_builder,
176+
org::apache::arrow::flatbuf::MetadataVersion::V5,
177+
org::apache::arrow::flatbuf::MessageHeader::RecordBatch,
178+
record_batch_offset.Union(),
179+
body_len
180+
);
181+
batch_builder.Finish(batch_message_offset);
182+
183+
// III - Append the RecordBatch message to the final buffer
184+
uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata
185+
int64_t aligned_batch_meta_len = align_to_8(batch_meta_len); // Calculate the padded length
186+
187+
size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message)
188+
// Resize the buffer to append the new message
189+
final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len + body_len);
190+
uint8_t* dst = final_buffer.data() + current_size; // Get a pointer to where the new message will start
191+
192+
// Write the 4-byte metadata length for the RecordBatch message
193+
*(reinterpret_cast<uint32_t*>(dst)) = batch_meta_len;
194+
dst += sizeof(uint32_t);
195+
// Copy the RecordBatch metadata into the buffer
196+
memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len);
197+
// Add padding to align the body to an 8-byte boundary
198+
memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len);
199+
dst += aligned_batch_meta_len;
200+
// Copy the actual data buffers (the message body) into the buffer
201+
if (validity_bitmap)
202+
{
203+
memcpy(dst, validity_bitmap, validity_size);
204+
}
205+
else
206+
{
207+
// If validity_bitmap is null, it means there are no nulls
208+
memset(dst, 0xFF, validity_size);
209+
}
210+
dst += validity_size;
211+
if (data_buffer)
212+
{
213+
memcpy(dst, data_buffer, data_size);
214+
}
215+
}
216+
217+
// Release the memory managed by the C structures
218+
arrow_arr.release(&arrow_arr);
219+
arrow_schema.release(&arrow_schema);
220+
221+
// Return the final buffer containing the complete IPC stream
222+
return final_buffer;
223+
}
224+
225+
template <typename T>
226+
sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_t>& buffer) {
227+
const uint8_t* buf_ptr = buffer.data();
228+
size_t current_offset = 0;
229+
230+
// I - Deserialize the Schema message
231+
uint32_t schema_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
232+
current_offset += sizeof(uint32_t);
233+
auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
234+
if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema)
235+
{
236+
throw std::runtime_error("Expected Schema message at the start of the buffer.");
237+
}
238+
auto flatbuffer_schema = static_cast<const org::apache::arrow::flatbuf::Schema*>(schema_message->header());
239+
auto fields = flatbuffer_schema->fields();
240+
if (fields->size() != 1)
241+
{
242+
throw std::runtime_error("Expected schema with exactly one field for primitive_array.");
243+
}
244+
bool is_nullable = fields->Get(0)->nullable();
245+
current_offset += schema_meta_len;
246+
247+
// II - Deserialize the RecordBatch message
248+
uint32_t batch_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
249+
current_offset += sizeof(uint32_t);
250+
auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
251+
if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch)
252+
{
253+
throw std::runtime_error("Expected RecordBatch message, but got a different type.");
254+
}
255+
auto record_batch = static_cast<const org::apache::arrow::flatbuf::RecordBatch*>(batch_message->header());
256+
current_offset += align_to_8(batch_meta_len);
257+
const uint8_t* body_ptr = buf_ptr + current_offset;
258+
259+
// Extract metadata from the RecordBatch
260+
auto buffers_meta = record_batch->buffers();
261+
auto nodes_meta = record_batch->nodes();
262+
auto node_meta = nodes_meta->Get(0);
263+
264+
// The body contains the validity bitmap and the data buffer concatenated
265+
// We need to copy this data into memory owned by the new ArrowArray
266+
int64_t validity_len = buffers_meta->Get(0)->length();
267+
int64_t data_len = buffers_meta->Get(1)->length();
268+
269+
uint8_t* validity_buffer_copy = new uint8_t[validity_len];
270+
memcpy(validity_buffer_copy, body_ptr + buffers_meta->Get(0)->offset(), validity_len);
271+
272+
uint8_t* data_buffer_copy = new uint8_t[data_len];
273+
memcpy(data_buffer_copy, body_ptr + buffers_meta->Get(1)->offset(), data_len);
274+
275+
// Get name
276+
std::optional<std::string_view> name;
277+
const flatbuffers::String* fb_name_flatbuffer = fields->Get(0)->name();
278+
if (fb_name_flatbuffer)
279+
{
280+
name = std::string_view(fb_name_flatbuffer->c_str(), fb_name_flatbuffer->size());
281+
}
282+
283+
// Handle metadata
284+
std::optional<std::vector<sparrow::metadata_pair>> metadata;
285+
auto fb_metadata = fields->Get(0)->custom_metadata();
286+
if (fb_metadata && !fb_metadata->empty())
287+
{
288+
metadata = std::vector<sparrow::metadata_pair>();
289+
metadata->reserve(fb_metadata->size());
290+
for (const auto& kv : *fb_metadata)
291+
{
292+
metadata->emplace_back(kv->key()->c_str(), kv->value()->c_str());
293+
}
294+
}
295+
296+
auto data = sparrow::u8_buffer<T>(reinterpret_cast<T*>(data_buffer_copy), node_meta->length());
297+
auto bitmap = sparrow::validity_bitmap(validity_buffer_copy, node_meta->length());
298+
299+
return sparrow::primitive_array<T>(std::move(data), node_meta->length(), std::move(bitmap), name, metadata);
300+
}
301+
302+
// Explicit template instantiation
303+
template SPARROW_IPC_API std::vector<uint8_t> serialize_primitive_array<int>(const sparrow::primitive_array<int>& arr);
304+
template SPARROW_IPC_API sparrow::primitive_array<int> deserialize_primitive_array<int>(const std::vector<uint8_t>& buffer);
305+
template SPARROW_IPC_API std::vector<uint8_t> serialize_primitive_array<float>(const sparrow::primitive_array<float>& arr);
306+
template SPARROW_IPC_API sparrow::primitive_array<float> deserialize_primitive_array<float>(const std::vector<uint8_t>& buffer);
307+
template SPARROW_IPC_API std::vector<uint8_t> serialize_primitive_array<double>(const sparrow::primitive_array<double>& arr);
308+
template SPARROW_IPC_API sparrow::primitive_array<double> deserialize_primitive_array<double>(const std::vector<uint8_t>& buffer);

src/sparrow-ipc.cpp

Lines changed: 0 additions & 7 deletions
This file was deleted.

0 commit comments

Comments
 (0)