Skip to content

Commit 2bfa9da

Browse files
committed
Handle metadata
1 parent cf1564d commit 2bfa9da

File tree

3 files changed

+127
-215
lines changed

3 files changed

+127
-215
lines changed

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ set(SPARROW_IPC_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include)
2929
set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src)
3030

3131
set(SPARROW_IPC_HEADERS
32-
# config
33-
#TODO add header and split serialize/deserialize
32+
#TODO split serialize/deserialize fcts in two different files or just rename the current one?
3433
${SPARROW_IPC_INCLUDE_DIR}/config/config.hpp
34+
${SPARROW_IPC_INCLUDE_DIR}/serialize.hpp
3535
${SPARROW_IPC_INCLUDE_DIR}/sparrow-ipc.hpp
3636
)
3737

src/serialize.cpp

Lines changed: 62 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
#include <cstring>
33
#include <vector>
44

5-
65
#include "Message_generated.h"
76
#include "Schema_generated.h"
87

@@ -31,36 +30,30 @@ std::vector<uint8_t> serialize_primitive_array(const sparrow::primitive_array<T>
3130
// - 8-byte padding and alignment for the message body.
3231
// - Correctly populating the Flatbuffer-defined metadata for both messages.
3332

34-
// Create a mutable copy of the input array to allow moving its internal structures.
33+
// Create a mutable copy of the input array to allow moving its internal structures
3534
sparrow::primitive_array<T> mutable_arr = arr;
36-
37-
// Use the public API to extract the Arrow C data interface structures.
38-
// This gives us access to the raw layout needed for serialization.
39-
// TODO use get instead and avoid release?
40-
// Was using extract_arrow_structures(std::move(mutable_arr)) and releasing after
4135
auto [arrow_arr, arrow_schema] = sparrow::extract_arrow_structures(std::move(mutable_arr));
4236

4337
// This will be the final buffer holding the complete IPC stream.
4438
std::vector<uint8_t> final_buffer;
4539

46-
// --- Part 1: Serialize the Schema message ---
47-
// An Arrow IPC stream must start with a Schema message.
40+
// I - Serialize the Schema message
41+
// An Arrow IPC stream must start with a Schema message
4842
{
49-
// Create a new builder for the Schema message's metadata.
43+
// Create a new builder for the Schema message's metadata
5044
flatbuffers::FlatBufferBuilder schema_builder;
5145

52-
// Create the Field metadata, which describes a single column (or array).
46+
// Create the Field metadata, which describes a single column (or array)
5347
flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0;
5448
if (arrow_schema.name)
5549
{
5650
fb_name_offset = schema_builder.CreateString(arrow_schema.name);
5751
}
5852

59-
// Determine the Flatbuffer type information from the C schema's format string.
53+
// Determine the Flatbuffer type information from the C schema's format string
6054
org::apache::arrow::flatbuf::Type type_enum = org::apache::arrow::flatbuf::Type::NONE;
6155
flatbuffers::Offset<void> type_offset;
62-
// TODO check to be null terminated, check tests "c" and then smthng else
63-
// TODO not sure about these values...
56+
// TODO not sure about the way we should handle this, maybe use some utility fct from sparrow or define one to handle all possible formats?
6457
if (strcmp(arrow_schema.format, "i") == 0)
6558
{
6659
type_enum = org::apache::arrow::flatbuf::Type::Int;
@@ -84,24 +77,23 @@ std::vector<uint8_t> serialize_primitive_array(const sparrow::primitive_array<T>
8477
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
8578
fb_metadata_offset = 0;
8679

87-
//if (arrow_schema.metadata)
88-
if (arr.metadata()) // Use arr.metadata() directly
80+
if (arr.metadata())
8981
{
9082
sparrow::key_value_view metadata_view = *(arr.metadata());
9183
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets;
9284

9385
auto mv_it = metadata_view.cbegin();
9486
for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it)
9587
{
96-
auto key_offset = schema_builder.CreateString(std::string((*mv_it).first)); // Convert string_view to string
97-
auto value_offset = schema_builder.CreateString(std::string((*mv_it).second)); // Convert string_view to string
88+
auto key_offset = schema_builder.CreateString(std::string((*mv_it).first));
89+
auto value_offset = schema_builder.CreateString(std::string((*mv_it).second));
9890
kv_offsets.push_back(
9991
org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset));
10092
}
10193
fb_metadata_offset = schema_builder.CreateVector(kv_offsets);
10294
}
10395

104-
// Build the Field object.
96+
// Build the Field object
10597
auto fb_field = org::apache::arrow::flatbuf::CreateField(
10698
schema_builder,
10799
fb_name_offset,
@@ -112,14 +104,14 @@ std::vector<uint8_t> serialize_primitive_array(const sparrow::primitive_array<T>
112104
0, // children
113105
fb_metadata_offset);
114106

115-
// A Schema contains a vector of fields. For this primitive array, there is only one.
107+
// A Schema contains a vector of fields. For this primitive array, there is only one
116108
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field};
117109
auto fb_fields = schema_builder.CreateVector(fields_vec);
118110

119-
// Build the Schema object from the vector of fields.
111+
// Build the Schema object from the vector of fields
120112
auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields);
121113

122-
// Wrap the Schema in a top-level Message, which is the standard IPC envelope.
114+
// Wrap the Schema in a top-level Message, which is the standard IPC envelope
123115
auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage(
124116
schema_builder,
125117
org::apache::arrow::flatbuf::MetadataVersion::V5,
@@ -129,47 +121,46 @@ std::vector<uint8_t> serialize_primitive_array(const sparrow::primitive_array<T>
129121
);
130122
schema_builder.Finish(schema_message_offset);
131123

132-
// --- Assemble the Schema message bytes ---
133-
uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata.
134-
final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message.
135-
// Copy the metadata into the buffer, after the 4-byte length prefix.
124+
// Assemble the Schema message bytes
125+
uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata
126+
final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message
127+
// Copy the metadata into the buffer, after the 4-byte length prefix
136128
memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len);
137-
// Write the 4-byte metadata length at the beginning of the message.
129+
// Write the 4-byte metadata length at the beginning of the message
138130
*(reinterpret_cast<uint32_t*>(final_buffer.data())) = schema_len;
139131
}
140132

141-
// --- Part 2: Serialize the RecordBatch message ---
142-
// After the Schema, we send the RecordBatch containing the actual data.
133+
// II - Serialize the RecordBatch message
134+
// After the Schema, we send the RecordBatch containing the actual data
143135
{
144-
// Create a new builder for the RecordBatch message's metadata.
136+
// Create a new builder for the RecordBatch message's metadata
145137
flatbuffers::FlatBufferBuilder batch_builder;
146138

147-
// Now use arrow_arr for serialization.
148139
// arrow_arr.buffers[0] is the validity bitmap
149140
// arrow_arr.buffers[1] is the data buffer
150141
const uint8_t* validity_bitmap = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[0]);
151142
const uint8_t* data_buffer = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[1]);
152143

153-
// Calculate the size of the validity and data buffers.
144+
// Calculate the size of the validity and data buffers
154145
int64_t validity_size = (arrow_arr.length + 7) / 8;
155146
int64_t data_size = arrow_arr.length * sizeof(T);
156-
int64_t body_len = validity_size + data_size; // The total size of the message body.
147+
int64_t body_len = validity_size + data_size; // The total size of the message body
157148

158-
// Create Flatbuffer descriptions for the data buffers.
149+
// Create Flatbuffer descriptions for the data buffers
159150
org::apache::arrow::flatbuf::Buffer validity_buffer_struct(0, validity_size);
160151
org::apache::arrow::flatbuf::Buffer data_buffer_struct(validity_size, data_size);
161-
// Create the FieldNode, which describes the layout of the array data.
152+
// Create the FieldNode, which describes the layout of the array data
162153
org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count);
163154

164-
// A RecordBatch contains a vector of nodes and a vector of buffers.
155+
// A RecordBatch contains a vector of nodes and a vector of buffers
165156
auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1);
166157
std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec = {validity_buffer_struct, data_buffer_struct};
167158
auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec);
168159

169-
// Build the RecordBatch metadata object.
160+
// Build the RecordBatch metadata object
170161
auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector);
171162

172-
// Wrap the RecordBatch in a top-level Message.
163+
// Wrap the RecordBatch in a top-level Message
173164
auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage(
174165
batch_builder,
175166
org::apache::arrow::flatbuf::MetadataVersion::V5,
@@ -179,32 +170,32 @@ std::vector<uint8_t> serialize_primitive_array(const sparrow::primitive_array<T>
179170
);
180171
batch_builder.Finish(batch_message_offset);
181172

182-
// --- Part 3: Append the RecordBatch message to the final buffer ---
183-
uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata.
184-
int64_t aligned_batch_meta_len = align_to_8(batch_meta_len); // Calculate the padded length.
173+
// III - Append the RecordBatch message to the final buffer
174+
uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata
175+
int64_t aligned_batch_meta_len = align_to_8(batch_meta_len); // Calculate the padded length
185176

186-
size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message).
187-
// Resize the buffer to append the new message.
177+
size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message)
178+
// Resize the buffer to append the new message
188179
final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len + body_len);
189-
uint8_t* dst = final_buffer.data() + current_size; // Get a pointer to where the new message will start.
180+
uint8_t* dst = final_buffer.data() + current_size; // Get a pointer to where the new message will start
190181

191-
// Write the 4-byte metadata length for the RecordBatch message.
182+
// Write the 4-byte metadata length for the RecordBatch message
192183
*(reinterpret_cast<uint32_t*>(dst)) = batch_meta_len;
193184
dst += sizeof(uint32_t);
194-
// Copy the RecordBatch metadata into the buffer.
185+
// Copy the RecordBatch metadata into the buffer
195186
memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len);
196-
// Add padding to align the body to an 8-byte boundary.
187+
// Add padding to align the body to an 8-byte boundary
197188
memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len);
198189
dst += aligned_batch_meta_len;
199-
// Copy the actual data buffers (the message body) into the buffer.
190+
// Copy the actual data buffers (the message body) into the buffer
200191
if (validity_bitmap)
201192
{
202193
memcpy(dst, validity_bitmap, validity_size);
203194
}
204195
else
205196
{
206-
// If validity_bitmap is null, it means there are no nulls.
207-
// The Arrow spec recommends writing a bitmap with all bits set to 1.
197+
// If validity_bitmap is null, it means there are no nulls
198+
// The Arrow spec recommends writing a bitmap with all bits set to 1
208199
memset(dst, 0xFF, validity_size);
209200
}
210201
dst += validity_size;
@@ -214,11 +205,11 @@ std::vector<uint8_t> serialize_primitive_array(const sparrow::primitive_array<T>
214205
}
215206
}
216207

217-
// Release the memory managed by the C structures, as we have copied the data.
208+
// Release the memory managed by the C structures
218209
arrow_arr.release(&arrow_arr);
219210
arrow_schema.release(&arrow_schema);
220211

221-
// Return the final buffer containing the complete IPC stream.
212+
// Return the final buffer containing the complete IPC stream
222213
return final_buffer;
223214
}
224215

@@ -227,26 +218,29 @@ sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_
227218
const uint8_t* buf_ptr = buffer.data();
228219
size_t current_offset = 0;
229220

230-
// --- Part 1: Deserialize the Schema message ---
221+
// I - Deserialize the Schema message
231222
uint32_t schema_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
232223
current_offset += sizeof(uint32_t);
233224
auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
234-
if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema) {
225+
if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema)
226+
{
235227
throw std::runtime_error("Expected Schema message at the start of the buffer.");
236228
}
237229
auto flatbuffer_schema = static_cast<const org::apache::arrow::flatbuf::Schema*>(schema_message->header());
238230
auto fields = flatbuffer_schema->fields();
239-
if (fields->size() != 1) {
231+
if (fields->size() != 1)
232+
{
240233
throw std::runtime_error("Expected schema with exactly one field for primitive_array.");
241234
}
242235
bool is_nullable = fields->Get(0)->nullable();
243236
current_offset += schema_meta_len;
244237

245-
// --- Part 2: Deserialize the RecordBatch message ---
238+
// II - Deserialize the RecordBatch message
246239
uint32_t batch_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
247240
current_offset += sizeof(uint32_t);
248241
auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
249-
if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch) {
242+
if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch)
243+
{
250244
throw std::runtime_error("Expected RecordBatch message, but got a different type.");
251245
}
252246
auto record_batch = static_cast<const org::apache::arrow::flatbuf::RecordBatch*>(batch_message->header());
@@ -258,8 +252,8 @@ sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_
258252
auto nodes_meta = record_batch->nodes();
259253
auto node_meta = nodes_meta->Get(0);
260254

261-
// The body contains the validity bitmap and the data buffer concatenated.
262-
// We need to copy this data into memory owned by the new ArrowArray.
255+
// The body contains the validity bitmap and the data buffer concatenated
256+
// We need to copy this data into memory owned by the new ArrowArray
263257
int64_t validity_len = buffers_meta->Get(0)->length();
264258
int64_t data_len = buffers_meta->Get(1)->length();
265259

@@ -269,105 +263,30 @@ sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_
269263
uint8_t* data_buffer_copy = new uint8_t[data_len];
270264
memcpy(data_buffer_copy, body_ptr + buffers_meta->Get(1)->offset(), data_len);
271265

272-
// Heap-allocate the ArrowArray and its buffer array so they live as long as the sparrow array
273-
auto arr = new ArrowArray();
274-
arr->length = node_meta->length();
275-
arr->null_count = node_meta->null_count();
276-
arr->offset = 0;
277-
arr->n_buffers = 2;
278-
arr->n_children = 0;
279-
arr->children = nullptr;
280-
arr->dictionary = nullptr;
281-
282-
auto buffers_ptr_array = new const void*[2];
283-
buffers_ptr_array[0] = validity_buffer_copy;
284-
buffers_ptr_array[1] = data_buffer_copy;
285-
arr->buffers = buffers_ptr_array;
286-
287-
// The release callback is responsible for freeing all memory we just allocated
288-
arr->release = [](ArrowArray* arr) {
289-
if (!arr || !arr->release) return;
290-
// Free the buffer data
291-
delete[] reinterpret_cast<const uint8_t*>(arr->buffers[0]);
292-
delete[] reinterpret_cast<const uint8_t*>(arr->buffers[1]);
293-
// Free the buffer pointer array
294-
delete[] arr->buffers;
295-
// Free the array struct itself
296-
delete arr;
297-
arr->release = nullptr;
298-
};
299-
300-
// Heap-allocate the ArrowSchema
301-
auto schema = new ArrowSchema();
302-
// Infer the format string from the template type T
303-
const char* format_str = nullptr;
304-
if constexpr (std::is_same_v<T, int>) {
305-
format_str = "i";
306-
} else if constexpr (std::is_same_v<T, float>) {
307-
format_str = "f";
308-
} else if constexpr (std::is_same_v<T, double>) {
309-
format_str = "g";
310-
} else {
311-
format_str = ""; // Unknown format
312-
}
313-
schema->format = strdup(format_str);
314-
315-
// Get name from Flatbuffer schema and duplicate it
266+
// Get name
267+
std::optional<std::string_view> name;
316268
const flatbuffers::String* fb_name_flatbuffer = fields->Get(0)->name();
317269
if (fb_name_flatbuffer)
318270
{
319-
schema->name = strdup(fb_name_flatbuffer->c_str());
320-
}
321-
else
322-
{
323-
schema->name = nullptr; // Or strdup("") if an empty string is preferred for null
271+
name = std::string_view(fb_name_flatbuffer->c_str(), fb_name_flatbuffer->size());
324272
}
325273

326274
// Handle metadata
275+
std::optional<std::vector<sparrow::metadata_pair>> metadata;
327276
auto fb_metadata = fields->Get(0)->custom_metadata();
328277
if (fb_metadata && fb_metadata->size() > 0)
329278
{
330-
std::vector<sparrow::metadata_pair> kvs;
279+
metadata = std::vector<sparrow::metadata_pair>();
331280
for (const auto& kv : *fb_metadata)
332281
{
333-
kvs.emplace_back(kv->key()->c_str(), kv->value()->c_str());
282+
metadata->emplace_back(kv->key()->c_str(), kv->value()->c_str());
334283
}
335-
for (auto i : kvs)
336-
std::cout << "===========> kv first: " << i.first << " sec: " << i.second << std::endl;
337-
std::string metadata_str = sparrow::get_metadata_from_key_values(kvs);
338-
// char* metadata_buf = (char*)malloc(metadata_str.size());
339-
// if (!metadata_buf)
340-
// {
341-
// throw std::runtime_error("Failed to allocate memory for metadata");
342-
// }
343-
// memcpy(metadata_buf, metadata_str.data(), metadata_str.size());
344-
// schema->metadata = metadata_buf;
345-
schema->metadata = metadata_str.data();
346-
}
347-
else
348-
{
349-
schema->metadata = nullptr;
350284
}
351285

352-
schema->flags = is_nullable ? static_cast<int64_t>(sparrow::ArrowFlag::NULLABLE) : 0;
353-
schema->n_children = 0;
354-
schema->children = nullptr;
355-
schema->dictionary = nullptr;
356-
schema->release = [](ArrowSchema* schema_ptr) {
357-
if (!schema_ptr || !schema_ptr->release) return;
358-
free((void*)schema_ptr->format);
359-
free((void*)schema_ptr->name);
360-
if (schema_ptr->metadata)
361-
{
362-
free((void*)schema_ptr->metadata);
363-
}
364-
// If metadata was duplicated, free it here too
365-
delete schema_ptr;
366-
schema_ptr->release = nullptr;
367-
};
286+
auto data = sparrow::u8_buffer<T>(reinterpret_cast<T*>(data_buffer_copy), node_meta->length());
287+
auto bitmap = sparrow::validity_bitmap(validity_buffer_copy, node_meta->length());
368288

369-
// The sparrow::arrow_proxy will take ownership of the heap-allocated structs
370-
return sparrow::primitive_array<T>(sparrow::arrow_proxy(arr, schema));
289+
return sparrow::primitive_array<T>(std::move(data), node_meta->length(), std::move(bitmap), name, metadata);
371290
}
372291

373292
// Explicit template instantiation

0 commit comments

Comments
 (0)