Skip to content

Commit 1841ef1

Browse files
authored
Serialize/Deserialize null_array (#10)
* Add null_array serialization * Move things around and renaming * Renaming * Remove commented code * Refactor serialization into chunk functions * Make serialize_record_batch_message generic * Add deserialize_schema_message * Add deserialize_record_batch_message * Move includes around * Move defintions to serialize_null_array.cpp * Do a const pass * Use range loop * Return final buffer instead of arg Add/move comments * Get metadata directly from arrow schema
1 parent 19ee9cc commit 1841ef1

12 files changed

+533
-354
lines changed

CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,14 @@ set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src)
5050
set(SPARROW_IPC_HEADERS
5151
${SPARROW_IPC_INCLUDE_DIR}/config/config.hpp
5252
${SPARROW_IPC_INCLUDE_DIR}/serialize.hpp
53+
${SPARROW_IPC_INCLUDE_DIR}/serialize_primitive_array.hpp
54+
${SPARROW_IPC_INCLUDE_DIR}/serialize_null_array.hpp
5355
${SPARROW_IPC_INCLUDE_DIR}/utils.hpp
5456
)
5557

5658
set(SPARROW_IPC_SRC
59+
${SPARROW_IPC_SOURCE_DIR}/serialize.cpp
60+
${SPARROW_IPC_SOURCE_DIR}/serialize_null_array.cpp
5761
${SPARROW_IPC_SOURCE_DIR}/utils.cpp
5862
)
5963

include/serialize.hpp

Lines changed: 6 additions & 255 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
#pragma once
22

33
#include <cstdint>
4-
#include <cstring>
54
#include <optional>
6-
#include <stdexcept>
75
#include <string>
86
#include <vector>
97

@@ -12,263 +10,16 @@
1210
#include "Message_generated.h"
1311
#include "Schema_generated.h"
1412

15-
#include "utils.hpp"
13+
#include "config/config.hpp"
1614

1715
namespace sparrow_ipc
1816
{
19-
//TODO split serialize/deserialize fcts in two different files or just rename the current one?
20-
template <typename T>
21-
std::vector<uint8_t> serialize_primitive_array(sparrow::primitive_array<T>& arr);
22-
23-
template <typename T>
24-
sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_t>& buffer);
25-
26-
template <typename T>
27-
std::vector<uint8_t> serialize_primitive_array(sparrow::primitive_array<T>& arr)
17+
namespace details
2818
{
29-
// This function serializes a sparrow::primitive_array into a byte vector that is compliant
30-
// with the Apache Arrow IPC Streaming Format. It constructs a stream containing two messages:
31-
// 1. A Schema message: Describes the data's metadata (field name, type, nullability).
32-
// 2. A RecordBatch message: Contains the actual array data (null count, length, and raw buffers).
33-
// This two-part structure makes the data self-describing and readable by other Arrow-native tools.
34-
// The implementation adheres to the specification by correctly handling:
35-
// - Message order (Schema first, then RecordBatch).
36-
// - The encapsulated message format (4-byte metadata length prefix).
37-
// - 8-byte padding and alignment for the message body.
38-
// - Correctly populating the Flatbuffer-defined metadata for both messages.
39-
40-
// Get arrow structures
41-
auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr);
42-
auto& arrow_arr = *arrow_arr_ptr;
43-
auto& arrow_schema = *arrow_schema_ptr;
44-
45-
// This will be the final buffer holding the complete IPC stream.
46-
std::vector<uint8_t> final_buffer;
47-
48-
// I - Serialize the Schema message
49-
// An Arrow IPC stream must start with a Schema message
50-
{
51-
// Create a new builder for the Schema message's metadata
52-
flatbuffers::FlatBufferBuilder schema_builder;
53-
54-
// Create the Field metadata, which describes a single column (or array)
55-
flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0;
56-
if (arrow_schema.name)
57-
{
58-
fb_name_offset = schema_builder.CreateString(arrow_schema.name);
59-
}
60-
61-
// Determine the Flatbuffer type information from the C schema's format string
62-
auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format);
63-
64-
// Handle metadata
65-
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
66-
fb_metadata_offset = 0;
67-
68-
if (arr.metadata())
69-
{
70-
sparrow::key_value_view metadata_view = *(arr.metadata());
71-
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets;
72-
73-
auto mv_it = metadata_view.cbegin();
74-
for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it)
75-
{
76-
auto key_offset = schema_builder.CreateString(std::string((*mv_it).first));
77-
auto value_offset = schema_builder.CreateString(std::string((*mv_it).second));
78-
kv_offsets.push_back(
79-
org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset));
80-
}
81-
fb_metadata_offset = schema_builder.CreateVector(kv_offsets);
82-
}
83-
84-
// Build the Field object
85-
auto fb_field = org::apache::arrow::flatbuf::CreateField(
86-
schema_builder,
87-
fb_name_offset,
88-
(arrow_schema.flags & static_cast<int64_t>(sparrow::ArrowFlag::NULLABLE)) != 0,
89-
type_enum,
90-
type_offset,
91-
0, // dictionary
92-
0, // children
93-
fb_metadata_offset);
94-
95-
// A Schema contains a vector of fields. For this primitive array, there is only one
96-
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field};
97-
auto fb_fields = schema_builder.CreateVector(fields_vec);
98-
99-
// Build the Schema object from the vector of fields
100-
auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields);
101-
102-
// Wrap the Schema in a top-level Message, which is the standard IPC envelope
103-
auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage(
104-
schema_builder,
105-
org::apache::arrow::flatbuf::MetadataVersion::V5,
106-
org::apache::arrow::flatbuf::MessageHeader::Schema,
107-
schema_offset.Union(),
108-
0
109-
);
110-
schema_builder.Finish(schema_message_offset);
111-
112-
// Assemble the Schema message bytes
113-
uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata
114-
final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message
115-
// Copy the metadata into the buffer, after the 4-byte length prefix
116-
memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len);
117-
// Write the 4-byte metadata length at the beginning of the message
118-
*(reinterpret_cast<uint32_t*>(final_buffer.data())) = schema_len;
119-
}
120-
121-
// II - Serialize the RecordBatch message
122-
// After the Schema, we send the RecordBatch containing the actual data
123-
{
124-
// Create a new builder for the RecordBatch message's metadata
125-
flatbuffers::FlatBufferBuilder batch_builder;
126-
127-
// arrow_arr.buffers[0] is the validity bitmap
128-
// arrow_arr.buffers[1] is the data buffer
129-
const uint8_t* validity_bitmap = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[0]);
130-
const uint8_t* data_buffer = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[1]);
131-
132-
// Calculate the size of the validity and data buffers
133-
int64_t validity_size = (arrow_arr.length + 7) / 8;
134-
int64_t data_size = arrow_arr.length * sizeof(T);
135-
int64_t body_len = validity_size + data_size; // The total size of the message body
136-
137-
// Create Flatbuffer descriptions for the data buffers
138-
org::apache::arrow::flatbuf::Buffer validity_buffer_struct(0, validity_size);
139-
org::apache::arrow::flatbuf::Buffer data_buffer_struct(validity_size, data_size);
140-
// Create the FieldNode, which describes the layout of the array data
141-
org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count);
142-
143-
// A RecordBatch contains a vector of nodes and a vector of buffers
144-
auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1);
145-
std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec = {validity_buffer_struct, data_buffer_struct};
146-
auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec);
147-
148-
// Build the RecordBatch metadata object
149-
auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector);
150-
151-
// Wrap the RecordBatch in a top-level Message
152-
auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage(
153-
batch_builder,
154-
org::apache::arrow::flatbuf::MetadataVersion::V5,
155-
org::apache::arrow::flatbuf::MessageHeader::RecordBatch,
156-
record_batch_offset.Union(),
157-
body_len
158-
);
159-
batch_builder.Finish(batch_message_offset);
160-
161-
// III - Append the RecordBatch message to the final buffer
162-
uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata
163-
int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); // Calculate the padded length
164-
165-
size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message)
166-
// Resize the buffer to append the new message
167-
final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len + body_len);
168-
uint8_t* dst = final_buffer.data() + current_size; // Get a pointer to where the new message will start
169-
170-
// Write the 4-byte metadata length for the RecordBatch message
171-
*(reinterpret_cast<uint32_t*>(dst)) = batch_meta_len;
172-
dst += sizeof(uint32_t);
173-
// Copy the RecordBatch metadata into the buffer
174-
memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len);
175-
// Add padding to align the body to an 8-byte boundary
176-
memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len);
177-
dst += aligned_batch_meta_len;
178-
// Copy the actual data buffers (the message body) into the buffer
179-
if (validity_bitmap)
180-
{
181-
memcpy(dst, validity_bitmap, validity_size);
182-
}
183-
else
184-
{
185-
// If validity_bitmap is null, it means there are no nulls
186-
memset(dst, 0xFF, validity_size);
187-
}
188-
dst += validity_size;
189-
if (data_buffer)
190-
{
191-
memcpy(dst, data_buffer, data_size);
192-
}
193-
}
194-
195-
// Return the final buffer containing the complete IPC stream
196-
return final_buffer;
197-
}
198-
199-
template <typename T>
200-
sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_t>& buffer) {
201-
const uint8_t* buf_ptr = buffer.data();
202-
size_t current_offset = 0;
203-
204-
// I - Deserialize the Schema message
205-
uint32_t schema_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
206-
current_offset += sizeof(uint32_t);
207-
auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
208-
if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema)
209-
{
210-
throw std::runtime_error("Expected Schema message at the start of the buffer.");
211-
}
212-
auto flatbuffer_schema = static_cast<const org::apache::arrow::flatbuf::Schema*>(schema_message->header());
213-
auto fields = flatbuffer_schema->fields();
214-
if (fields->size() != 1)
215-
{
216-
throw std::runtime_error("Expected schema with exactly one field for primitive_array.");
217-
}
218-
current_offset += schema_meta_len;
219-
220-
// II - Deserialize the RecordBatch message
221-
uint32_t batch_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
222-
current_offset += sizeof(uint32_t);
223-
auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
224-
if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch)
225-
{
226-
throw std::runtime_error("Expected RecordBatch message, but got a different type.");
227-
}
228-
auto record_batch = static_cast<const org::apache::arrow::flatbuf::RecordBatch*>(batch_message->header());
229-
current_offset += utils::align_to_8(batch_meta_len);
230-
const uint8_t* body_ptr = buf_ptr + current_offset;
231-
232-
// Extract metadata from the RecordBatch
233-
auto buffers_meta = record_batch->buffers();
234-
auto nodes_meta = record_batch->nodes();
235-
auto node_meta = nodes_meta->Get(0);
236-
237-
// The body contains the validity bitmap and the data buffer concatenated
238-
// We need to copy this data into memory owned by the new ArrowArray
239-
int64_t validity_len = buffers_meta->Get(0)->length();
240-
int64_t data_len = buffers_meta->Get(1)->length();
241-
242-
uint8_t* validity_buffer_copy = new uint8_t[validity_len];
243-
memcpy(validity_buffer_copy, body_ptr + buffers_meta->Get(0)->offset(), validity_len);
244-
245-
uint8_t* data_buffer_copy = new uint8_t[data_len];
246-
memcpy(data_buffer_copy, body_ptr + buffers_meta->Get(1)->offset(), data_len);
247-
248-
// Get name
249-
std::optional<std::string> name;
250-
const flatbuffers::String* fb_name_flatbuffer = fields->Get(0)->name();
251-
if (fb_name_flatbuffer)
252-
{
253-
name = std::string(fb_name_flatbuffer->c_str(), fb_name_flatbuffer->size());
254-
}
255-
256-
// Handle metadata
257-
std::optional<std::vector<sparrow::metadata_pair>> metadata;
258-
auto fb_metadata = fields->Get(0)->custom_metadata();
259-
if (fb_metadata && !fb_metadata->empty())
260-
{
261-
metadata = std::vector<sparrow::metadata_pair>();
262-
metadata->reserve(fb_metadata->size());
263-
for (const auto& kv : *fb_metadata)
264-
{
265-
metadata->emplace_back(kv->key()->c_str(), kv->value()->c_str());
266-
}
267-
}
268-
269-
auto data = sparrow::u8_buffer<T>(reinterpret_cast<T*>(data_buffer_copy), node_meta->length());
270-
auto bitmap = sparrow::validity_bitmap(validity_buffer_copy, node_meta->length());
19+
SPARROW_IPC_API std::vector<uint8_t> serialize_schema_message(const ArrowSchema& arrow_schema);
20+
SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector<int64_t>& buffers_sizes, std::vector<uint8_t>& final_buffer);
27121

272-
return sparrow::primitive_array<T>(std::move(data), node_meta->length(), std::move(bitmap), name, metadata);
22+
SPARROW_IPC_API void deserialize_schema_message(const uint8_t* buf_ptr, size_t& current_offset, std::optional<std::string>& name, std::optional<std::vector<sparrow::metadata_pair>>& metadata);
23+
SPARROW_IPC_API const org::apache::arrow::flatbuf::RecordBatch* deserialize_record_batch_message(const uint8_t* buf_ptr, size_t& current_offset);
27324
}
27425
}

include/serialize_null_array.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#pragma once
2+
3+
#include "config/config.hpp"
4+
#include "serialize.hpp"
5+
6+
namespace sparrow_ipc
7+
{
8+
// TODO Use `arr` as const after fixing the issue upstream in sparrow::get_arrow_structures
9+
SPARROW_IPC_API std::vector<uint8_t> serialize_null_array(sparrow::null_array& arr);
10+
SPARROW_IPC_API sparrow::null_array deserialize_null_array(const std::vector<uint8_t>& buffer);
11+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#pragma once
2+
3+
#include <cstring>
4+
5+
#include "serialize.hpp"
6+
#include "utils.hpp"
7+
8+
namespace sparrow_ipc
9+
{
10+
// TODO Use `arr` as const after fixing the issue upstream in sparrow::get_arrow_structures
11+
template <typename T>
12+
std::vector<uint8_t> serialize_primitive_array(sparrow::primitive_array<T>& arr);
13+
14+
template <typename T>
15+
sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_t>& buffer);
16+
17+
template <typename T>
18+
std::vector<uint8_t> serialize_primitive_array(sparrow::primitive_array<T>& arr)
19+
{
20+
// This function serializes a sparrow::primitive_array into a byte vector that is compliant
21+
// with the Apache Arrow IPC Streaming Format. It constructs a stream containing two messages:
22+
// 1. A Schema message: Describes the data's metadata (field name, type, nullability).
23+
// 2. A RecordBatch message: Contains the actual array data (null count, length, and raw buffers).
24+
// This two-part structure makes the data self-describing and readable by other Arrow-native tools.
25+
// The implementation adheres to the specification by correctly handling:
26+
// - Message order (Schema first, then RecordBatch).
27+
// - The encapsulated message format (4-byte metadata length prefix).
28+
// - 8-byte padding and alignment for the message body.
29+
// - Correctly populating the Flatbuffer-defined metadata for both messages.
30+
31+
// Get arrow structures
32+
const auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr);
33+
const auto& arrow_arr = *arrow_arr_ptr;
34+
const auto& arrow_schema = *arrow_schema_ptr;
35+
36+
// I - Serialize the Schema message
37+
auto final_buffer = details::serialize_schema_message(arrow_schema);
38+
39+
// II - Serialize the RecordBatch message
40+
// After the Schema, we send the RecordBatch containing the actual data
41+
42+
// Calculate the size of the validity and data buffers
43+
const int64_t validity_size = (arrow_arr.length + 7) / 8;
44+
const int64_t data_size = arrow_arr.length * sizeof(T);
45+
const std::vector<int64_t> buffers_sizes = {validity_size, data_size};
46+
details::serialize_record_batch_message(arrow_arr, buffers_sizes, final_buffer);
47+
48+
// Return the final buffer containing the complete IPC stream
49+
return final_buffer;
50+
}
51+
52+
template <typename T>
53+
sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_t>& buffer) {
54+
const uint8_t* buf_ptr = buffer.data();
55+
size_t current_offset = 0;
56+
57+
// I - Deserialize the Schema message
58+
std::optional<std::string> name;
59+
std::optional<std::vector<sparrow::metadata_pair>> metadata;
60+
details::deserialize_schema_message(buf_ptr, current_offset, name, metadata);
61+
62+
// II - Deserialize the RecordBatch message
63+
const uint32_t batch_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
64+
const auto* record_batch = details::deserialize_record_batch_message(buf_ptr, current_offset);
65+
66+
current_offset += utils::align_to_8(batch_meta_len);
67+
const uint8_t* body_ptr = buf_ptr + current_offset;
68+
69+
// Extract metadata from the RecordBatch
70+
const auto buffers_meta = record_batch->buffers();
71+
const auto nodes_meta = record_batch->nodes();
72+
const auto node_meta = nodes_meta->Get(0);
73+
74+
// The body contains the validity bitmap and the data buffer concatenated
75+
// We need to copy this data into memory owned by the new ArrowArray
76+
const int64_t validity_len = buffers_meta->Get(0)->length();
77+
const int64_t data_len = buffers_meta->Get(1)->length();
78+
79+
uint8_t* validity_buffer_copy = new uint8_t[validity_len];
80+
memcpy(validity_buffer_copy, body_ptr + buffers_meta->Get(0)->offset(), validity_len);
81+
82+
uint8_t* data_buffer_copy = new uint8_t[data_len];
83+
memcpy(data_buffer_copy, body_ptr + buffers_meta->Get(1)->offset(), data_len);
84+
85+
86+
auto data = sparrow::u8_buffer<T>(reinterpret_cast<T*>(data_buffer_copy), node_meta->length());
87+
auto bitmap = sparrow::validity_bitmap(validity_buffer_copy, node_meta->length());
88+
89+
return sparrow::primitive_array<T>(std::move(data), node_meta->length(), std::move(bitmap), name, metadata);
90+
}
91+
}

0 commit comments

Comments
 (0)