Skip to content

Commit f7dd61b

Browse files
authored
Handle different data types (#8)
* Handle different data types * Move utils fcts to utils files * Add namespaces * Add tests * Rename test.cpp * Comment possible offending line * Change way of testing * Factorize * Comment offending line * Use same comparion for other test cases * Add comment and use sp namespace * Move everything to serialize.hpp * Remove leftover * Use get_arrow_structures instead of extract_arrow_structures * Remove leftovers * Use string instead of string_view * Use data_type_to_format in get_flatbuffer_type test
1 parent 7a7d44d commit f7dd61b

File tree

10 files changed

+1039
-474
lines changed

10 files changed

+1039
-474
lines changed

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src)
4444
set(SPARROW_IPC_HEADERS
4545
${SPARROW_IPC_INCLUDE_DIR}/config/config.hpp
4646
${SPARROW_IPC_INCLUDE_DIR}/serialize.hpp
47+
${SPARROW_IPC_INCLUDE_DIR}/utils.hpp
4748
)
4849

4950
set(SPARROW_IPC_SRC
50-
${SPARROW_IPC_SOURCE_DIR}/serialize.cpp
51+
${SPARROW_IPC_SOURCE_DIR}/utils.cpp
5152
)
5253

5354
set(SCHEMA_DIR ${CMAKE_BINARY_DIR}/format)

include/serialize.hpp

Lines changed: 267 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,274 @@
11
#pragma once
22

3+
#include <cstdint>
4+
#include <cstring>
5+
#include <optional>
6+
#include <stdexcept>
7+
#include <string>
38
#include <vector>
9+
410
#include "sparrow.hpp"
511

6-
#include "config/config.hpp"
12+
#include "Message_generated.h"
13+
#include "Schema_generated.h"
14+
15+
#include "utils.hpp"
16+
17+
namespace sparrow_ipc
18+
{
19+
//TODO split serialize/deserialize fcts in two different files or just rename the current one?
20+
template <typename T>
21+
std::vector<uint8_t> serialize_primitive_array(sparrow::primitive_array<T>& arr);
22+
23+
template <typename T>
24+
sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_t>& buffer);
25+
26+
template <typename T>
27+
std::vector<uint8_t> serialize_primitive_array(sparrow::primitive_array<T>& arr)
28+
{
29+
// This function serializes a sparrow::primitive_array into a byte vector that is compliant
30+
// with the Apache Arrow IPC Streaming Format. It constructs a stream containing two messages:
31+
// 1. A Schema message: Describes the data's metadata (field name, type, nullability).
32+
// 2. A RecordBatch message: Contains the actual array data (null count, length, and raw buffers).
33+
// This two-part structure makes the data self-describing and readable by other Arrow-native tools.
34+
// The implementation adheres to the specification by correctly handling:
35+
// - Message order (Schema first, then RecordBatch).
36+
// - The encapsulated message format (4-byte metadata length prefix).
37+
// - 8-byte padding and alignment for the message body.
38+
// - Correctly populating the Flatbuffer-defined metadata for both messages.
39+
40+
// Get arrow structures
41+
auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr);
42+
auto& arrow_arr = *arrow_arr_ptr;
43+
auto& arrow_schema = *arrow_schema_ptr;
44+
45+
// This will be the final buffer holding the complete IPC stream.
46+
std::vector<uint8_t> final_buffer;
47+
48+
// I - Serialize the Schema message
49+
// An Arrow IPC stream must start with a Schema message
50+
{
51+
// Create a new builder for the Schema message's metadata
52+
flatbuffers::FlatBufferBuilder schema_builder;
53+
54+
// Create the Field metadata, which describes a single column (or array)
55+
flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0;
56+
if (arrow_schema.name)
57+
{
58+
fb_name_offset = schema_builder.CreateString(arrow_schema.name);
59+
}
60+
61+
// Determine the Flatbuffer type information from the C schema's format string
62+
auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format);
63+
64+
// Handle metadata
65+
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
66+
fb_metadata_offset = 0;
67+
68+
if (arr.metadata())
69+
{
70+
sparrow::key_value_view metadata_view = *(arr.metadata());
71+
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets;
72+
73+
auto mv_it = metadata_view.cbegin();
74+
for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it)
75+
{
76+
auto key_offset = schema_builder.CreateString(std::string((*mv_it).first));
77+
auto value_offset = schema_builder.CreateString(std::string((*mv_it).second));
78+
kv_offsets.push_back(
79+
org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset));
80+
}
81+
fb_metadata_offset = schema_builder.CreateVector(kv_offsets);
82+
}
83+
84+
// Build the Field object
85+
auto fb_field = org::apache::arrow::flatbuf::CreateField(
86+
schema_builder,
87+
fb_name_offset,
88+
(arrow_schema.flags & static_cast<int64_t>(sparrow::ArrowFlag::NULLABLE)) != 0,
89+
type_enum,
90+
type_offset,
91+
0, // dictionary
92+
0, // children
93+
fb_metadata_offset);
94+
95+
// A Schema contains a vector of fields. For this primitive array, there is only one
96+
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field};
97+
auto fb_fields = schema_builder.CreateVector(fields_vec);
98+
99+
// Build the Schema object from the vector of fields
100+
auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields);
101+
102+
// Wrap the Schema in a top-level Message, which is the standard IPC envelope
103+
auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage(
104+
schema_builder,
105+
org::apache::arrow::flatbuf::MetadataVersion::V5,
106+
org::apache::arrow::flatbuf::MessageHeader::Schema,
107+
schema_offset.Union(),
108+
0
109+
);
110+
schema_builder.Finish(schema_message_offset);
111+
112+
// Assemble the Schema message bytes
113+
uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata
114+
final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message
115+
// Copy the metadata into the buffer, after the 4-byte length prefix
116+
memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len);
117+
// Write the 4-byte metadata length at the beginning of the message
118+
*(reinterpret_cast<uint32_t*>(final_buffer.data())) = schema_len;
119+
}
120+
121+
// II - Serialize the RecordBatch message
122+
// After the Schema, we send the RecordBatch containing the actual data
123+
{
124+
// Create a new builder for the RecordBatch message's metadata
125+
flatbuffers::FlatBufferBuilder batch_builder;
126+
127+
// arrow_arr.buffers[0] is the validity bitmap
128+
// arrow_arr.buffers[1] is the data buffer
129+
const uint8_t* validity_bitmap = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[0]);
130+
const uint8_t* data_buffer = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[1]);
131+
132+
// Calculate the size of the validity and data buffers
133+
int64_t validity_size = (arrow_arr.length + 7) / 8;
134+
int64_t data_size = arrow_arr.length * sizeof(T);
135+
int64_t body_len = validity_size + data_size; // The total size of the message body
136+
137+
// Create Flatbuffer descriptions for the data buffers
138+
org::apache::arrow::flatbuf::Buffer validity_buffer_struct(0, validity_size);
139+
org::apache::arrow::flatbuf::Buffer data_buffer_struct(validity_size, data_size);
140+
// Create the FieldNode, which describes the layout of the array data
141+
org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count);
142+
143+
// A RecordBatch contains a vector of nodes and a vector of buffers
144+
auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1);
145+
std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec = {validity_buffer_struct, data_buffer_struct};
146+
auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec);
147+
148+
// Build the RecordBatch metadata object
149+
auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector);
150+
151+
// Wrap the RecordBatch in a top-level Message
152+
auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage(
153+
batch_builder,
154+
org::apache::arrow::flatbuf::MetadataVersion::V5,
155+
org::apache::arrow::flatbuf::MessageHeader::RecordBatch,
156+
record_batch_offset.Union(),
157+
body_len
158+
);
159+
batch_builder.Finish(batch_message_offset);
160+
161+
// III - Append the RecordBatch message to the final buffer
162+
uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata
163+
int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); // Calculate the padded length
164+
165+
size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message)
166+
// Resize the buffer to append the new message
167+
final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len + body_len);
168+
uint8_t* dst = final_buffer.data() + current_size; // Get a pointer to where the new message will start
169+
170+
// Write the 4-byte metadata length for the RecordBatch message
171+
*(reinterpret_cast<uint32_t*>(dst)) = batch_meta_len;
172+
dst += sizeof(uint32_t);
173+
// Copy the RecordBatch metadata into the buffer
174+
memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len);
175+
// Add padding to align the body to an 8-byte boundary
176+
memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len);
177+
dst += aligned_batch_meta_len;
178+
// Copy the actual data buffers (the message body) into the buffer
179+
if (validity_bitmap)
180+
{
181+
memcpy(dst, validity_bitmap, validity_size);
182+
}
183+
else
184+
{
185+
// If validity_bitmap is null, it means there are no nulls
186+
memset(dst, 0xFF, validity_size);
187+
}
188+
dst += validity_size;
189+
if (data_buffer)
190+
{
191+
memcpy(dst, data_buffer, data_size);
192+
}
193+
}
194+
195+
// Return the final buffer containing the complete IPC stream
196+
return final_buffer;
197+
}
198+
199+
template <typename T>
200+
sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_t>& buffer) {
201+
const uint8_t* buf_ptr = buffer.data();
202+
size_t current_offset = 0;
203+
204+
// I - Deserialize the Schema message
205+
uint32_t schema_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
206+
current_offset += sizeof(uint32_t);
207+
auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
208+
if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema)
209+
{
210+
throw std::runtime_error("Expected Schema message at the start of the buffer.");
211+
}
212+
auto flatbuffer_schema = static_cast<const org::apache::arrow::flatbuf::Schema*>(schema_message->header());
213+
auto fields = flatbuffer_schema->fields();
214+
if (fields->size() != 1)
215+
{
216+
throw std::runtime_error("Expected schema with exactly one field for primitive_array.");
217+
}
218+
current_offset += schema_meta_len;
219+
220+
// II - Deserialize the RecordBatch message
221+
uint32_t batch_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
222+
current_offset += sizeof(uint32_t);
223+
auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
224+
if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch)
225+
{
226+
throw std::runtime_error("Expected RecordBatch message, but got a different type.");
227+
}
228+
auto record_batch = static_cast<const org::apache::arrow::flatbuf::RecordBatch*>(batch_message->header());
229+
current_offset += utils::align_to_8(batch_meta_len);
230+
const uint8_t* body_ptr = buf_ptr + current_offset;
231+
232+
// Extract metadata from the RecordBatch
233+
auto buffers_meta = record_batch->buffers();
234+
auto nodes_meta = record_batch->nodes();
235+
auto node_meta = nodes_meta->Get(0);
236+
237+
// The body contains the validity bitmap and the data buffer concatenated
238+
// We need to copy this data into memory owned by the new ArrowArray
239+
int64_t validity_len = buffers_meta->Get(0)->length();
240+
int64_t data_len = buffers_meta->Get(1)->length();
241+
242+
uint8_t* validity_buffer_copy = new uint8_t[validity_len];
243+
memcpy(validity_buffer_copy, body_ptr + buffers_meta->Get(0)->offset(), validity_len);
244+
245+
uint8_t* data_buffer_copy = new uint8_t[data_len];
246+
memcpy(data_buffer_copy, body_ptr + buffers_meta->Get(1)->offset(), data_len);
247+
248+
// Get name
249+
std::optional<std::string> name;
250+
const flatbuffers::String* fb_name_flatbuffer = fields->Get(0)->name();
251+
if (fb_name_flatbuffer)
252+
{
253+
name = std::string(fb_name_flatbuffer->c_str(), fb_name_flatbuffer->size());
254+
}
255+
256+
// Handle metadata
257+
std::optional<std::vector<sparrow::metadata_pair>> metadata;
258+
auto fb_metadata = fields->Get(0)->custom_metadata();
259+
if (fb_metadata && !fb_metadata->empty())
260+
{
261+
metadata = std::vector<sparrow::metadata_pair>();
262+
metadata->reserve(fb_metadata->size());
263+
for (const auto& kv : *fb_metadata)
264+
{
265+
metadata->emplace_back(kv->key()->c_str(), kv->value()->c_str());
266+
}
267+
}
7268

8-
//TODO split serialize/deserialize fcts in two different files or just rename the current one?
9-
template <typename T>
10-
SPARROW_IPC_API std::vector<uint8_t> serialize_primitive_array(const sparrow::primitive_array<T>& arr);
269+
auto data = sparrow::u8_buffer<T>(reinterpret_cast<T*>(data_buffer_copy), node_meta->length());
270+
auto bitmap = sparrow::validity_bitmap(validity_buffer_copy, node_meta->length());
11271

12-
template <typename T>
13-
SPARROW_IPC_API sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_t>& buffer);
272+
return sparrow::primitive_array<T>(std::move(data), node_meta->length(), std::move(bitmap), name, metadata);
273+
}
274+
}

include/utils.hpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <optional>
5+
#include <string_view>
6+
#include <utility>
7+
8+
#include "Schema_generated.h"
9+
10+
#include "config/config.hpp"
11+
12+
namespace sparrow_ipc
13+
{
14+
namespace utils
15+
{
16+
// Aligns a value to the next multiple of 8, as required by the Arrow IPC format for message bodies
17+
SPARROW_IPC_API int64_t align_to_8(int64_t n);
18+
19+
// Creates a Flatbuffers type from a format string
20+
// This function maps a sparrow data type to the corresponding Flatbuffers type
21+
SPARROW_IPC_API std::pair<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>>
22+
get_flatbuffer_type(flatbuffers::FlatBufferBuilder& builder, std::string_view format_str);
23+
}
24+
}

0 commit comments

Comments
 (0)