55
66namespace sparrow_ipc
77{
8- constexpr int64_t arrow_alignment = 8 ;
9-
10- // Aligns a value to the next multiple of 8, as required by the Arrow IPC format for message bodies.
11- int64_t align_to_8 ( int64_t n )
8+ void common_serialize (
9+ const flatbuffers::FlatBufferBuilder& builder,
10+ any_output_stream& stream
11+ )
1212 {
13- return (n + arrow_alignment - 1 ) & -arrow_alignment;
13+ stream.write (continuation);
14+ const flatbuffers::uoffset_t size = builder.GetSize ();
15+ const std::span<const uint8_t > size_span (reinterpret_cast <const uint8_t *>(&size), sizeof (uint32_t ));
16+ stream.write (size_span);
17+ stream.write (std::span (builder.GetBufferPointer (), size));
18+ stream.add_padding ();
1419 }
1520
1621 void serialize_schema_message (const sparrow::record_batch& record_batch, any_output_stream& stream)
@@ -22,233 +27,7 @@ namespace sparrow_ipc
2227 std::optional<CompressionType> compression,
2328 std::optional<std::reference_wrapper<CompressionCache>> cache)
2429 {
25- // Create a new builder for the Schema message's metadata
26- flatbuffers::FlatBufferBuilder schema_builder;
27-
28- // Create the Field metadata, which describes a single column (or array)
29- flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0 ;
30- if (arrow_schema.name )
31- {
32- fb_name_offset = schema_builder.CreateString (arrow_schema.name );
33- }
34-
35- // Determine the Flatbuffer type information from the C schema's format string
36- auto [type_enum, type_offset] = get_flatbuffer_type (schema_builder, arrow_schema.format );
37-
38- // Handle metadata
39- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
40- fb_metadata_offset = 0 ;
41-
42- if (arr.metadata ())
43- {
44- sparrow::key_value_view metadata_view = *(arr.metadata ());
45- std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets;
46-
47- auto mv_it = metadata_view.cbegin ();
48- for (auto i = 0 ; i < metadata_view.size (); ++i, ++mv_it)
49- {
50- auto key_offset = schema_builder.CreateString (std::string ((*mv_it).first ));
51- auto value_offset = schema_builder.CreateString (std::string ((*mv_it).second ));
52- kv_offsets.push_back (
53- org::apache::arrow::flatbuf::CreateKeyValue (schema_builder, key_offset, value_offset));
54- }
55- fb_metadata_offset = schema_builder.CreateVector (kv_offsets);
56- }
57-
58- // Build the Field object
59- auto fb_field = org::apache::arrow::flatbuf::CreateField (
60- schema_builder,
61- fb_name_offset,
62- (arrow_schema.flags & static_cast <int64_t >(sparrow::ArrowFlag::NULLABLE)) != 0 ,
63- type_enum,
64- type_offset,
65- 0 , // dictionary
66- 0 , // children
67- fb_metadata_offset);
68-
69- // A Schema contains a vector of fields. For this primitive array, there is only one
70- std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field};
71- auto fb_fields = schema_builder.CreateVector (fields_vec);
72-
73- // Build the Schema object from the vector of fields
74- auto schema_offset = org::apache::arrow::flatbuf::CreateSchema (schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields);
75-
76- // Wrap the Schema in a top-level Message, which is the standard IPC envelope
77- auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage (
78- schema_builder,
79- org::apache::arrow::flatbuf::MetadataVersion::V5,
80- org::apache::arrow::flatbuf::MessageHeader::Schema,
81- schema_offset.Union (),
82- 0
83- );
84- schema_builder.Finish (schema_message_offset);
85-
86- // Assemble the Schema message bytes
87- uint32_t schema_len = schema_builder.GetSize (); // Get the size of the serialized metadata
88- final_buffer.resize (sizeof (uint32_t ) + schema_len); // Resize the buffer to hold the message
89- // Copy the metadata into the buffer, after the 4-byte length prefix
90- memcpy (final_buffer.data () + sizeof (uint32_t ), schema_builder.GetBufferPointer (), schema_len);
91- // Write the 4-byte metadata length at the beginning of the message
92- memcpy (final_buffer.data (), &schema_len, sizeof (schema_len));
93- }
94-
95- // II - Serialize the RecordBatch message
96- // After the Schema, we send the RecordBatch containing the actual data
97- {
98- // Create a new builder for the RecordBatch message's metadata
99- flatbuffers::FlatBufferBuilder batch_builder;
100-
101- // arrow_arr.buffers[0] is the validity bitmap
102- // arrow_arr.buffers[1] is the data buffer
103- const uint8_t * validity_bitmap = static_cast <const uint8_t *>(arrow_arr.buffers [0 ]);
104- const uint8_t * data_buffer = static_cast <const uint8_t *>(arrow_arr.buffers [1 ]);
105-
106- // Calculate the size of the validity and data buffers
107- int64_t validity_size = (arrow_arr.length + arrow_alignment - 1 ) / arrow_alignment;
108- int64_t data_size = arrow_arr.length * sizeof (T);
109- int64_t body_len = validity_size + data_size; // The total size of the message body
110-
111- // Create Flatbuffer descriptions for the data buffers
112- org::apache::arrow::flatbuf::Buffer validity_buffer_struct (0 , validity_size);
113- org::apache::arrow::flatbuf::Buffer data_buffer_struct (validity_size, data_size);
114- // Create the FieldNode, which describes the layout of the array data
115- org::apache::arrow::flatbuf::FieldNode field_node_struct (arrow_arr.length , arrow_arr.null_count );
116-
117- // A RecordBatch contains a vector of nodes and a vector of buffers
118- auto fb_nodes_vector = batch_builder.CreateVectorOfStructs (&field_node_struct, 1 );
119- std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec = {validity_buffer_struct, data_buffer_struct};
120- auto fb_buffers_vector = batch_builder.CreateVectorOfStructs (buffers_vec);
121-
122- // Build the RecordBatch metadata object
123- auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch (batch_builder, arrow_arr.length , fb_nodes_vector, fb_buffers_vector);
124-
125- // Wrap the RecordBatch in a top-level Message
126- auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage (
127- batch_builder,
128- org::apache::arrow::flatbuf::MetadataVersion::V5,
129- org::apache::arrow::flatbuf::MessageHeader::RecordBatch,
130- record_batch_offset.Union (),
131- body_len
132- );
133- batch_builder.Finish (batch_message_offset);
134-
135- // III - Append the RecordBatch message to the final buffer
136- uint32_t batch_meta_len = batch_builder.GetSize (); // Get the size of the batch metadata
137- int64_t aligned_batch_meta_len = align_to_8 (batch_meta_len); // Calculate the padded length
138-
139- size_t current_size = final_buffer.size (); // Get the current size (which is the end of the Schema message)
140- // Resize the buffer to append the new message
141- final_buffer.resize (current_size + sizeof (uint32_t ) + aligned_batch_meta_len + body_len);
142- uint8_t * dst = final_buffer.data () + current_size; // Get a pointer to where the new message will start
143-
144- // Write the 4-byte metadata length for the RecordBatch message
145- memcpy (dst, &batch_meta_len, sizeof (batch_meta_len));
146- dst += sizeof (uint32_t );
147- // Copy the RecordBatch metadata into the buffer
148- memcpy (dst, batch_builder.GetBufferPointer (), batch_meta_len);
149- // Add padding to align the body to an 8-byte boundary
150- memset (dst + batch_meta_len, 0 , aligned_batch_meta_len - batch_meta_len);
151- dst += aligned_batch_meta_len;
152- // Copy the actual data buffers (the message body) into the buffer
153- if (validity_bitmap)
154- {
155- memcpy (dst, validity_bitmap, validity_size);
156- }
157- else
158- {
159- // If validity_bitmap is null, it means there are no nulls
160- memset (dst, 0xFF , validity_size);
161- }
162- dst += validity_size;
163- if (data_buffer)
164- {
165- memcpy (dst, data_buffer, data_size);
166- }
30+ common_serialize (get_record_batch_message_builder (record_batch, compression, cache), stream);
31+ generate_body (record_batch, stream, compression, cache);
16732 }
168-
169- // Release the memory managed by the C structures
170- arrow_arr.release(&arrow_arr);
171- arrow_schema.release(&arrow_schema);
172-
173- // Return the final buffer containing the complete IPC stream
174- return final_buffer;
175- }
176-
177- template <typename T>
178- sparrow::primitive_array<T> deserialize_primitive_array (const std::vector<uint8_t >& buffer) {
179- const uint8_t * buf_ptr = buffer.data ();
180- size_t current_offset = 0 ;
181-
182- // I - Deserialize the Schema message
183- uint32_t schema_meta_len;
184- memcpy (&schema_meta_len, buf_ptr + current_offset, sizeof (schema_meta_len));
185- current_offset += sizeof (uint32_t );
186- auto schema_message = org::apache::arrow::flatbuf::GetMessage (buf_ptr + current_offset);
187- if (schema_message->header_type () != org::apache::arrow::flatbuf::MessageHeader::Schema)
188- {
189- throw std::runtime_error (" Expected Schema message at the start of the buffer." );
190- }
191- auto flatbuffer_schema = static_cast <const org::apache::arrow::flatbuf::Schema*>(schema_message->header ());
192- auto fields = flatbuffer_schema->fields ();
193- if (fields->size () != 1 )
194- {
195- throw std::runtime_error (" Expected schema with exactly one field for primitive_array." );
196- }
197- bool is_nullable = fields->Get (0 )->nullable ();
198- current_offset += schema_meta_len;
199-
200- // II - Deserialize the RecordBatch message
201- uint32_t batch_meta_len;
202- memcpy (&batch_meta_len, buf_ptr + current_offset, sizeof (batch_meta_len));
203- current_offset += sizeof (uint32_t );
204- auto batch_message = org::apache::arrow::flatbuf::GetMessage (buf_ptr + current_offset);
205- if (batch_message->header_type () != org::apache::arrow::flatbuf::MessageHeader::RecordBatch)
206- {
207- throw std::runtime_error (" Expected RecordBatch message, but got a different type." );
208- }
209- auto record_batch = static_cast <const org::apache::arrow::flatbuf::RecordBatch*>(batch_message->header ());
210- current_offset += align_to_8 (batch_meta_len);
211- const uint8_t * body_ptr = buf_ptr + current_offset;
212-
213- // Extract metadata from the RecordBatch
214- auto buffers_meta = record_batch->buffers ();
215- auto nodes_meta = record_batch->nodes ();
216- auto node_meta = nodes_meta->Get (0 );
217-
218- // The body contains the validity bitmap and the data buffer concatenated
219- // We need to copy this data into memory owned by the new ArrowArray
220- int64_t validity_len = buffers_meta->Get (0 )->length ();
221- int64_t data_len = buffers_meta->Get (1 )->length ();
222-
223- uint8_t * validity_buffer_copy = new uint8_t [validity_len];
224- memcpy (validity_buffer_copy, body_ptr + buffers_meta->Get (0 )->offset (), validity_len);
225-
226- uint8_t * data_buffer_copy = new uint8_t [data_len];
227- memcpy (data_buffer_copy, body_ptr + buffers_meta->Get (1 )->offset (), data_len);
228-
229- // Get name
230- std::optional<std::string_view> name;
231- const flatbuffers::String* fb_name_flatbuffer = fields->Get (0 )->name ();
232- if (fb_name_flatbuffer)
233- {
234- name = std::string_view (fb_name_flatbuffer->c_str (), fb_name_flatbuffer->size ());
235- }
236-
237- // Handle metadata
238- std::optional<std::vector<sparrow::metadata_pair>> metadata;
239- auto fb_metadata = fields->Get (0 )->custom_metadata ();
240- if (fb_metadata && !fb_metadata->empty ())
241- {
242- metadata = std::vector<sparrow::metadata_pair>();
243- metadata->reserve (fb_metadata->size ());
244- for (const auto & kv : *fb_metadata)
245- {
246- metadata->emplace_back (kv->key ()->c_str (), kv->value ()->c_str ());
247- }
248- }
249-
250- auto data = sparrow::u8_buffer<T>(reinterpret_cast <T*>(data_buffer_copy), node_meta->length ());
251- auto bitmap = sparrow::validity_bitmap (validity_buffer_copy, node_meta->length ());
252-
253- return sparrow::primitive_array<T>(std::move (data), node_meta->length (), std::move (bitmap), name, metadata);
25433}
0 commit comments