11#pragma once
22
3+ #include < cstdint>
4+ #include < cstring>
5+ #include < optional>
6+ #include < stdexcept>
7+ #include < string>
8+ #include < string_view>
39#include < vector>
410
511#include " sparrow.hpp"
612
13+ #include " Message_generated.h"
14+ #include " Schema_generated.h"
15+
716#include " config/config.hpp"
17+ #include " serialize.hpp"
18+ #include " utils.hpp"
819
920namespace sparrow_ipc
1021{
1122 // TODO split serialize/deserialize fcts in two different files or just rename the current one?
1223 template <typename T>
13- SPARROW_IPC_API std::vector<uint8_t > serialize_primitive_array (const sparrow::primitive_array<T>& arr);
24+ std::vector<uint8_t > serialize_primitive_array (const sparrow::primitive_array<T>& arr);
25+
26+ template <typename T>
27+ sparrow::primitive_array<T> deserialize_primitive_array (const std::vector<uint8_t >& buffer);
28+
29+ template <typename T>
30+ std::vector<uint8_t > serialize_primitive_array (const sparrow::primitive_array<T>& arr)
31+ {
32+ // This function serializes a sparrow::primitive_array into a byte vector that is compliant
33+ // with the Apache Arrow IPC Streaming Format. It constructs a stream containing two messages:
34+ // 1. A Schema message: Describes the data's metadata (field name, type, nullability).
35+ // 2. A RecordBatch message: Contains the actual array data (null count, length, and raw buffers).
36+ // This two-part structure makes the data self-describing and readable by other Arrow-native tools.
37+ // The implementation adheres to the specification by correctly handling:
38+ // - Message order (Schema first, then RecordBatch).
39+ // - The encapsulated message format (4-byte metadata length prefix).
40+ // - 8-byte padding and alignment for the message body.
41+ // - Correctly populating the Flatbuffer-defined metadata for both messages.
42+
43+ // Create a mutable copy of the input array to allow moving its internal structures
44+ sparrow::primitive_array<T> mutable_arr = arr;
45+ auto [arrow_arr, arrow_schema] = sparrow::extract_arrow_structures (std::move (mutable_arr));
46+
47+ // This will be the final buffer holding the complete IPC stream.
48+ std::vector<uint8_t > final_buffer;
49+
50+ // I - Serialize the Schema message
51+ // An Arrow IPC stream must start with a Schema message
52+ {
53+ // Create a new builder for the Schema message's metadata
54+ flatbuffers::FlatBufferBuilder schema_builder;
55+
56+ // Create the Field metadata, which describes a single column (or array)
57+ flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0 ;
58+ if (arrow_schema.name )
59+ {
60+ fb_name_offset = schema_builder.CreateString (arrow_schema.name );
61+ }
62+
63+ // Determine the Flatbuffer type information from the C schema's format string
64+ auto [type_enum, type_offset] = utils::get_flatbuffer_type (schema_builder, arrow_schema.format );
65+
66+ // Handle metadata
67+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
68+ fb_metadata_offset = 0 ;
69+
70+ if (arr.metadata ())
71+ {
72+ sparrow::key_value_view metadata_view = *(arr.metadata ());
73+ std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets;
74+
75+ auto mv_it = metadata_view.cbegin ();
76+ for (auto i = 0 ; i < metadata_view.size (); ++i, ++mv_it)
77+ {
78+ auto key_offset = schema_builder.CreateString (std::string ((*mv_it).first ));
79+ auto value_offset = schema_builder.CreateString (std::string ((*mv_it).second ));
80+ kv_offsets.push_back (
81+ org::apache::arrow::flatbuf::CreateKeyValue (schema_builder, key_offset, value_offset));
82+ }
83+ fb_metadata_offset = schema_builder.CreateVector (kv_offsets);
84+ }
85+
86+ // Build the Field object
87+ auto fb_field = org::apache::arrow::flatbuf::CreateField (
88+ schema_builder,
89+ fb_name_offset,
90+ (arrow_schema.flags & static_cast <int64_t >(sparrow::ArrowFlag::NULLABLE)) != 0 ,
91+ type_enum,
92+ type_offset,
93+ 0 , // dictionary
94+ 0 , // children
95+ fb_metadata_offset);
96+
97+ // A Schema contains a vector of fields. For this primitive array, there is only one
98+ std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field};
99+ auto fb_fields = schema_builder.CreateVector (fields_vec);
100+
101+ // Build the Schema object from the vector of fields
102+ auto schema_offset = org::apache::arrow::flatbuf::CreateSchema (schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields);
103+
104+ // Wrap the Schema in a top-level Message, which is the standard IPC envelope
105+ auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage (
106+ schema_builder,
107+ org::apache::arrow::flatbuf::MetadataVersion::V5,
108+ org::apache::arrow::flatbuf::MessageHeader::Schema,
109+ schema_offset.Union (),
110+ 0
111+ );
112+ schema_builder.Finish (schema_message_offset);
113+
114+ // Assemble the Schema message bytes
115+ uint32_t schema_len = schema_builder.GetSize (); // Get the size of the serialized metadata
116+ final_buffer.resize (sizeof (uint32_t ) + schema_len); // Resize the buffer to hold the message
117+ // Copy the metadata into the buffer, after the 4-byte length prefix
118+ memcpy (final_buffer.data () + sizeof (uint32_t ), schema_builder.GetBufferPointer (), schema_len);
119+ // Write the 4-byte metadata length at the beginning of the message
120+ *(reinterpret_cast <uint32_t *>(final_buffer.data ())) = schema_len;
121+ }
122+
123+ // II - Serialize the RecordBatch message
124+ // After the Schema, we send the RecordBatch containing the actual data
125+ {
126+ // Create a new builder for the RecordBatch message's metadata
127+ flatbuffers::FlatBufferBuilder batch_builder;
128+
129+ // arrow_arr.buffers[0] is the validity bitmap
130+ // arrow_arr.buffers[1] is the data buffer
131+ const uint8_t * validity_bitmap = reinterpret_cast <const uint8_t *>(arrow_arr.buffers [0 ]);
132+ const uint8_t * data_buffer = reinterpret_cast <const uint8_t *>(arrow_arr.buffers [1 ]);
133+
134+ // Calculate the size of the validity and data buffers
135+ int64_t validity_size = (arrow_arr.length + 7 ) / 8 ;
136+ int64_t data_size = arrow_arr.length * sizeof (T);
137+ int64_t body_len = validity_size + data_size; // The total size of the message body
138+
139+ // Create Flatbuffer descriptions for the data buffers
140+ org::apache::arrow::flatbuf::Buffer validity_buffer_struct (0 , validity_size);
141+ org::apache::arrow::flatbuf::Buffer data_buffer_struct (validity_size, data_size);
142+ // Create the FieldNode, which describes the layout of the array data
143+ org::apache::arrow::flatbuf::FieldNode field_node_struct (arrow_arr.length , arrow_arr.null_count );
144+
145+ // A RecordBatch contains a vector of nodes and a vector of buffers
146+ auto fb_nodes_vector = batch_builder.CreateVectorOfStructs (&field_node_struct, 1 );
147+ std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec = {validity_buffer_struct, data_buffer_struct};
148+ auto fb_buffers_vector = batch_builder.CreateVectorOfStructs (buffers_vec);
149+
150+ // Build the RecordBatch metadata object
151+ auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch (batch_builder, arrow_arr.length , fb_nodes_vector, fb_buffers_vector);
152+
153+ // Wrap the RecordBatch in a top-level Message
154+ auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage (
155+ batch_builder,
156+ org::apache::arrow::flatbuf::MetadataVersion::V5,
157+ org::apache::arrow::flatbuf::MessageHeader::RecordBatch,
158+ record_batch_offset.Union (),
159+ body_len
160+ );
161+ batch_builder.Finish (batch_message_offset);
162+
163+ // III - Append the RecordBatch message to the final buffer
164+ uint32_t batch_meta_len = batch_builder.GetSize (); // Get the size of the batch metadata
165+ int64_t aligned_batch_meta_len = utils::align_to_8 (batch_meta_len); // Calculate the padded length
166+
167+ size_t current_size = final_buffer.size (); // Get the current size (which is the end of the Schema message)
168+ // Resize the buffer to append the new message
169+ final_buffer.resize (current_size + sizeof (uint32_t ) + aligned_batch_meta_len + body_len);
170+ uint8_t * dst = final_buffer.data () + current_size; // Get a pointer to where the new message will start
171+
172+ // Write the 4-byte metadata length for the RecordBatch message
173+ *(reinterpret_cast <uint32_t *>(dst)) = batch_meta_len;
174+ dst += sizeof (uint32_t );
175+ // Copy the RecordBatch metadata into the buffer
176+ memcpy (dst, batch_builder.GetBufferPointer (), batch_meta_len);
177+ // Add padding to align the body to an 8-byte boundary
178+ memset (dst + batch_meta_len, 0 , aligned_batch_meta_len - batch_meta_len);
179+ dst += aligned_batch_meta_len;
180+ // Copy the actual data buffers (the message body) into the buffer
181+ if (validity_bitmap)
182+ {
183+ memcpy (dst, validity_bitmap, validity_size);
184+ }
185+ else
186+ {
187+ // If validity_bitmap is null, it means there are no nulls
188+ memset (dst, 0xFF , validity_size);
189+ }
190+ dst += validity_size;
191+ if (data_buffer)
192+ {
193+ memcpy (dst, data_buffer, data_size);
194+ }
195+ }
196+
197+ // Release the memory managed by the C structures
198+ arrow_arr.release (&arrow_arr);
199+ arrow_schema.release (&arrow_schema);
200+
201+ // Return the final buffer containing the complete IPC stream
202+ return final_buffer;
203+ }
14204
15205 template <typename T>
16- SPARROW_IPC_API sparrow::primitive_array<T> deserialize_primitive_array (const std::vector<uint8_t >& buffer);
17- }
206+ sparrow::primitive_array<T> deserialize_primitive_array (const std::vector<uint8_t >& buffer) {
207+ const uint8_t * buf_ptr = buffer.data ();
208+ size_t current_offset = 0 ;
209+
210+ // I - Deserialize the Schema message
211+ uint32_t schema_meta_len = *(reinterpret_cast <const uint32_t *>(buf_ptr + current_offset));
212+ current_offset += sizeof (uint32_t );
213+ auto schema_message = org::apache::arrow::flatbuf::GetMessage (buf_ptr + current_offset);
214+ if (schema_message->header_type () != org::apache::arrow::flatbuf::MessageHeader::Schema)
215+ {
216+ throw std::runtime_error (" Expected Schema message at the start of the buffer." );
217+ }
218+ auto flatbuffer_schema = static_cast <const org::apache::arrow::flatbuf::Schema*>(schema_message->header ());
219+ auto fields = flatbuffer_schema->fields ();
220+ if (fields->size () != 1 )
221+ {
222+ throw std::runtime_error (" Expected schema with exactly one field for primitive_array." );
223+ }
224+ bool is_nullable = fields->Get (0 )->nullable ();
225+ current_offset += schema_meta_len;
226+
227+ // II - Deserialize the RecordBatch message
228+ uint32_t batch_meta_len = *(reinterpret_cast <const uint32_t *>(buf_ptr + current_offset));
229+ current_offset += sizeof (uint32_t );
230+ auto batch_message = org::apache::arrow::flatbuf::GetMessage (buf_ptr + current_offset);
231+ if (batch_message->header_type () != org::apache::arrow::flatbuf::MessageHeader::RecordBatch)
232+ {
233+ throw std::runtime_error (" Expected RecordBatch message, but got a different type." );
234+ }
235+ auto record_batch = static_cast <const org::apache::arrow::flatbuf::RecordBatch*>(batch_message->header ());
236+ current_offset += utils::align_to_8 (batch_meta_len);
237+ const uint8_t * body_ptr = buf_ptr + current_offset;
238+
239+ // Extract metadata from the RecordBatch
240+ auto buffers_meta = record_batch->buffers ();
241+ auto nodes_meta = record_batch->nodes ();
242+ auto node_meta = nodes_meta->Get (0 );
243+
244+ // The body contains the validity bitmap and the data buffer concatenated
245+ // We need to copy this data into memory owned by the new ArrowArray
246+ int64_t validity_len = buffers_meta->Get (0 )->length ();
247+ int64_t data_len = buffers_meta->Get (1 )->length ();
248+
249+ uint8_t * validity_buffer_copy = new uint8_t [validity_len];
250+ memcpy (validity_buffer_copy, body_ptr + buffers_meta->Get (0 )->offset (), validity_len);
251+
252+ uint8_t * data_buffer_copy = new uint8_t [data_len];
253+ memcpy (data_buffer_copy, body_ptr + buffers_meta->Get (1 )->offset (), data_len);
254+
255+ // Get name
256+ std::optional<std::string_view> name;
257+ const flatbuffers::String* fb_name_flatbuffer = fields->Get (0 )->name ();
258+ if (fb_name_flatbuffer)
259+ {
260+ name = std::string_view (fb_name_flatbuffer->c_str (), fb_name_flatbuffer->size ());
261+ }
262+
263+ // Handle metadata
264+ std::optional<std::vector<sparrow::metadata_pair>> metadata;
265+ auto fb_metadata = fields->Get (0 )->custom_metadata ();
266+ if (fb_metadata && !fb_metadata->empty ())
267+ {
268+ metadata = std::vector<sparrow::metadata_pair>();
269+ metadata->reserve (fb_metadata->size ());
270+ for (const auto & kv : *fb_metadata)
271+ {
272+ metadata->emplace_back (kv->key ()->c_str (), kv->value ()->c_str ());
273+ }
274+ }
275+
276+ auto data = sparrow::u8_buffer<T>(reinterpret_cast <T*>(data_buffer_copy), node_meta->length ());
277+ auto bitmap = sparrow::validity_bitmap (validity_buffer_copy, node_meta->length ());
278+
279+ return sparrow::primitive_array<T>(std::move (data), node_meta->length (), std::move (bitmap), name, metadata);
280+ }
281+ }
0 commit comments