|
1 | 1 | #pragma once |
2 | 2 |
|
3 | | -#include <cstdint> |
4 | | -#include <cstring> |
5 | | -#include <optional> |
6 | | -#include <stdexcept> |
7 | | -#include <string> |
8 | | -#include <string_view> |
9 | 3 | #include <vector> |
10 | | - |
11 | 4 | #include "sparrow.hpp" |
12 | 5 |
|
13 | | -#include "config/config.hpp" |
14 | | - |
15 | | -#include "Message_generated.h" |
16 | | -#include "Schema_generated.h" |
17 | | - |
18 | | -namespace |
19 | | -{ |
20 | | - // Aligns a value to the next multiple of 8, as required by the Arrow IPC format for message bodies. |
21 | | - int64_t align_to_8(int64_t n) |
22 | | - { |
23 | | - return (n + 7) & -8; |
24 | | - } |
25 | | - |
26 | | - std::pair<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> |
27 | | - get_flatbuffer_type(flatbuffers::FlatBufferBuilder& builder, const char* format_str) |
28 | | - { |
29 | | - if (strcmp(format_str, "i") == 0) |
30 | | - { |
31 | | - auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 32, true); |
32 | | - return {org::apache::arrow::flatbuf::Type::Int, int_type.Union()}; |
33 | | - } |
34 | | - else if (strcmp(format_str, "f") == 0) |
35 | | - { |
36 | | - auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint( |
37 | | - builder, org::apache::arrow::flatbuf::Precision::SINGLE); |
38 | | - return {org::apache::arrow::flatbuf::Type::FloatingPoint, fp_type.Union()}; |
39 | | - } |
40 | | - else if (strcmp(format_str, "g") == 0) |
41 | | - { |
42 | | - auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint( |
43 | | - builder, org::apache::arrow::flatbuf::Precision::DOUBLE); |
44 | | - return {org::apache::arrow::flatbuf::Type::FloatingPoint, fp_type.Union()}; |
45 | | - } |
46 | | - else |
47 | | - { |
48 | | - throw std::runtime_error("Unsupported data type for serialization"); |
49 | | - } |
50 | | - } |
51 | | -} |
52 | | - |
53 | 6 | //TODO split serialize/deserialize fcts in two different files or just rename the current one? |
54 | 7 | template <typename T> |
55 | | -SPARROW_IPC_API std::vector<uint8_t> serialize_primitive_array(const sparrow::primitive_array<T>& arr) |
56 | | -{ |
57 | | - // This function serializes a sparrow::primitive_array into a byte vector that is compliant |
58 | | - // with the Apache Arrow IPC Streaming Format. It constructs a stream containing two messages: |
59 | | - // 1. A Schema message: Describes the data's metadata (field name, type, nullability). |
60 | | - // 2. A RecordBatch message: Contains the actual array data (null count, length, and raw buffers). |
61 | | - // This two-part structure makes the data self-describing and readable by other Arrow-native tools. |
62 | | - // The implementation adheres to the specification by correctly handling: |
63 | | - // - Message order (Schema first, then RecordBatch). |
64 | | - // - The encapsulated message format (4-byte metadata length prefix). |
65 | | - // - 8-byte padding and alignment for the message body. |
66 | | - // - Correctly populating the Flatbuffer-defined metadata for both messages. |
67 | | - |
68 | | - // Create a mutable copy of the input array to allow moving its internal structures |
69 | | - sparrow::primitive_array<T> mutable_arr = arr; |
70 | | - auto [arrow_arr, arrow_schema] = sparrow::extract_arrow_structures(std::move(mutable_arr)); |
71 | | - |
72 | | - // This will be the final buffer holding the complete IPC stream. |
73 | | - std::vector<uint8_t> final_buffer; |
74 | | - |
75 | | - // I - Serialize the Schema message |
76 | | - // An Arrow IPC stream must start with a Schema message |
77 | | - { |
78 | | - // Create a new builder for the Schema message's metadata |
79 | | - flatbuffers::FlatBufferBuilder schema_builder; |
80 | | - |
81 | | - // Create the Field metadata, which describes a single column (or array) |
82 | | - flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0; |
83 | | - if (arrow_schema.name) |
84 | | - { |
85 | | - fb_name_offset = schema_builder.CreateString(arrow_schema.name); |
86 | | - } |
87 | | - |
88 | | - // Determine the Flatbuffer type information from the C schema's format string |
89 | | - auto [type_enum, type_offset] = get_flatbuffer_type(schema_builder, arrow_schema.format); |
90 | | - |
91 | | - // Handle metadata |
92 | | - flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>> |
93 | | - fb_metadata_offset = 0; |
94 | | - |
95 | | - if (arr.metadata()) |
96 | | - { |
97 | | - sparrow::key_value_view metadata_view = *(arr.metadata()); |
98 | | - std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets; |
99 | | - |
100 | | - auto mv_it = metadata_view.cbegin(); |
101 | | - for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it) |
102 | | - { |
103 | | - auto key_offset = schema_builder.CreateString(std::string((*mv_it).first)); |
104 | | - auto value_offset = schema_builder.CreateString(std::string((*mv_it).second)); |
105 | | - kv_offsets.push_back( |
106 | | - org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset)); |
107 | | - } |
108 | | - fb_metadata_offset = schema_builder.CreateVector(kv_offsets); |
109 | | - } |
110 | | - |
111 | | - // Build the Field object |
112 | | - auto fb_field = org::apache::arrow::flatbuf::CreateField( |
113 | | - schema_builder, |
114 | | - fb_name_offset, |
115 | | - (arrow_schema.flags & static_cast<int64_t>(sparrow::ArrowFlag::NULLABLE)) != 0, |
116 | | - type_enum, |
117 | | - type_offset, |
118 | | - 0, // dictionary |
119 | | - 0, // children |
120 | | - fb_metadata_offset); |
121 | | - |
122 | | - // A Schema contains a vector of fields. For this primitive array, there is only one |
123 | | - std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field}; |
124 | | - auto fb_fields = schema_builder.CreateVector(fields_vec); |
125 | | - |
126 | | - // Build the Schema object from the vector of fields |
127 | | - auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields); |
128 | | - |
129 | | - // Wrap the Schema in a top-level Message, which is the standard IPC envelope |
130 | | - auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage( |
131 | | - schema_builder, |
132 | | - org::apache::arrow::flatbuf::MetadataVersion::V5, |
133 | | - org::apache::arrow::flatbuf::MessageHeader::Schema, |
134 | | - schema_offset.Union(), |
135 | | - 0 |
136 | | - ); |
137 | | - schema_builder.Finish(schema_message_offset); |
138 | | - |
139 | | - // Assemble the Schema message bytes |
140 | | - uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata |
141 | | - final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message |
142 | | - // Copy the metadata into the buffer, after the 4-byte length prefix |
143 | | - memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len); |
144 | | - // Write the 4-byte metadata length at the beginning of the message |
145 | | - *(reinterpret_cast<uint32_t*>(final_buffer.data())) = schema_len; |
146 | | - } |
147 | | - |
148 | | - // II - Serialize the RecordBatch message |
149 | | - // After the Schema, we send the RecordBatch containing the actual data |
150 | | - { |
151 | | - // Create a new builder for the RecordBatch message's metadata |
152 | | - flatbuffers::FlatBufferBuilder batch_builder; |
153 | | - |
154 | | - // arrow_arr.buffers[0] is the validity bitmap |
155 | | - // arrow_arr.buffers[1] is the data buffer |
156 | | - const uint8_t* validity_bitmap = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[0]); |
157 | | - const uint8_t* data_buffer = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[1]); |
158 | | - |
159 | | - // Calculate the size of the validity and data buffers |
160 | | - int64_t validity_size = (arrow_arr.length + 7) / 8; |
161 | | - int64_t data_size = arrow_arr.length * sizeof(T); |
162 | | - int64_t body_len = validity_size + data_size; // The total size of the message body |
163 | | - |
164 | | - // Create Flatbuffer descriptions for the data buffers |
165 | | - org::apache::arrow::flatbuf::Buffer validity_buffer_struct(0, validity_size); |
166 | | - org::apache::arrow::flatbuf::Buffer data_buffer_struct(validity_size, data_size); |
167 | | - // Create the FieldNode, which describes the layout of the array data |
168 | | - org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count); |
169 | | - |
170 | | - // A RecordBatch contains a vector of nodes and a vector of buffers |
171 | | - auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1); |
172 | | - std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec = {validity_buffer_struct, data_buffer_struct}; |
173 | | - auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec); |
174 | | - |
175 | | - // Build the RecordBatch metadata object |
176 | | - auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector); |
177 | | - |
178 | | - // Wrap the RecordBatch in a top-level Message |
179 | | - auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage( |
180 | | - batch_builder, |
181 | | - org::apache::arrow::flatbuf::MetadataVersion::V5, |
182 | | - org::apache::arrow::flatbuf::MessageHeader::RecordBatch, |
183 | | - record_batch_offset.Union(), |
184 | | - body_len |
185 | | - ); |
186 | | - batch_builder.Finish(batch_message_offset); |
187 | | - |
188 | | - // III - Append the RecordBatch message to the final buffer |
189 | | - uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata |
190 | | - int64_t aligned_batch_meta_len = align_to_8(batch_meta_len); // Calculate the padded length |
191 | | - |
192 | | - size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message) |
193 | | - // Resize the buffer to append the new message |
194 | | - final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len + body_len); |
195 | | - uint8_t* dst = final_buffer.data() + current_size; // Get a pointer to where the new message will start |
196 | | - |
197 | | - // Write the 4-byte metadata length for the RecordBatch message |
198 | | - *(reinterpret_cast<uint32_t*>(dst)) = batch_meta_len; |
199 | | - dst += sizeof(uint32_t); |
200 | | - // Copy the RecordBatch metadata into the buffer |
201 | | - memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len); |
202 | | - // Add padding to align the body to an 8-byte boundary |
203 | | - memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len); |
204 | | - dst += aligned_batch_meta_len; |
205 | | - // Copy the actual data buffers (the message body) into the buffer |
206 | | - if (validity_bitmap) |
207 | | - { |
208 | | - memcpy(dst, validity_bitmap, validity_size); |
209 | | - } |
210 | | - else |
211 | | - { |
212 | | - // If validity_bitmap is null, it means there are no nulls |
213 | | - // The Arrow spec recommends writing a bitmap with all bits set to 1 |
214 | | - memset(dst, 0xFF, validity_size); |
215 | | - } |
216 | | - dst += validity_size; |
217 | | - if (data_buffer) |
218 | | - { |
219 | | - memcpy(dst, data_buffer, data_size); |
220 | | - } |
221 | | - } |
222 | | - |
223 | | - // Release the memory managed by the C structures |
224 | | - arrow_arr.release(&arrow_arr); |
225 | | - arrow_schema.release(&arrow_schema); |
226 | | - |
227 | | - // Return the final buffer containing the complete IPC stream |
228 | | - return final_buffer; |
229 | | -} |
| 8 | +std::vector<uint8_t> serialize_primitive_array(const sparrow::primitive_array<T>& arr); |
230 | 9 |
|
231 | 10 | template <typename T> |
232 | | -SPARROW_IPC_API sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_t>& buffer) |
233 | | -{ |
234 | | - const uint8_t* buf_ptr = buffer.data(); |
235 | | - size_t current_offset = 0; |
236 | | - |
237 | | - // I - Deserialize the Schema message |
238 | | - uint32_t schema_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset)); |
239 | | - current_offset += sizeof(uint32_t); |
240 | | - auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); |
241 | | - if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema) |
242 | | - { |
243 | | - throw std::runtime_error("Expected Schema message at the start of the buffer."); |
244 | | - } |
245 | | - auto flatbuffer_schema = static_cast<const org::apache::arrow::flatbuf::Schema*>(schema_message->header()); |
246 | | - auto fields = flatbuffer_schema->fields(); |
247 | | - if (fields->size() != 1) |
248 | | - { |
249 | | - throw std::runtime_error("Expected schema with exactly one field for primitive_array."); |
250 | | - } |
251 | | - bool is_nullable = fields->Get(0)->nullable(); |
252 | | - current_offset += schema_meta_len; |
253 | | - |
254 | | - // II - Deserialize the RecordBatch message |
255 | | - uint32_t batch_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset)); |
256 | | - current_offset += sizeof(uint32_t); |
257 | | - auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); |
258 | | - if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch) |
259 | | - { |
260 | | - throw std::runtime_error("Expected RecordBatch message, but got a different type."); |
261 | | - } |
262 | | - auto record_batch = static_cast<const org::apache::arrow::flatbuf::RecordBatch*>(batch_message->header()); |
263 | | - current_offset += align_to_8(batch_meta_len); |
264 | | - const uint8_t* body_ptr = buf_ptr + current_offset; |
265 | | - |
266 | | - // Extract metadata from the RecordBatch |
267 | | - auto buffers_meta = record_batch->buffers(); |
268 | | - auto nodes_meta = record_batch->nodes(); |
269 | | - auto node_meta = nodes_meta->Get(0); |
270 | | - |
271 | | - // The body contains the validity bitmap and the data buffer concatenated |
272 | | - // We need to copy this data into memory owned by the new ArrowArray |
273 | | - int64_t validity_len = buffers_meta->Get(0)->length(); |
274 | | - int64_t data_len = buffers_meta->Get(1)->length(); |
275 | | - |
276 | | - uint8_t* validity_buffer_copy = new uint8_t[validity_len]; |
277 | | - memcpy(validity_buffer_copy, body_ptr + buffers_meta->Get(0)->offset(), validity_len); |
278 | | - |
279 | | - uint8_t* data_buffer_copy = new uint8_t[data_len]; |
280 | | - memcpy(data_buffer_copy, body_ptr + buffers_meta->Get(1)->offset(), data_len); |
281 | | - |
282 | | - // Get name |
283 | | - std::optional<std::string_view> name; |
284 | | - const flatbuffers::String* fb_name_flatbuffer = fields->Get(0)->name(); |
285 | | - if (fb_name_flatbuffer) |
286 | | - { |
287 | | - name = std::string_view(fb_name_flatbuffer->c_str(), fb_name_flatbuffer->size()); |
288 | | - } |
289 | | - |
290 | | - // Handle metadata |
291 | | - std::optional<std::vector<sparrow::metadata_pair>> metadata; |
292 | | - auto fb_metadata = fields->Get(0)->custom_metadata(); |
293 | | - if (fb_metadata && !fb_metadata->empty()) |
294 | | - { |
295 | | - metadata = std::vector<sparrow::metadata_pair>(); |
296 | | - metadata->reserve(fb_metadata->size()); |
297 | | - for (const auto& kv : *fb_metadata) |
298 | | - { |
299 | | - metadata->emplace_back(kv->key()->c_str(), kv->value()->c_str()); |
300 | | - } |
301 | | - } |
302 | | - |
303 | | - auto data = sparrow::u8_buffer<T>(reinterpret_cast<T*>(data_buffer_copy), node_meta->length()); |
304 | | - auto bitmap = sparrow::validity_bitmap(validity_buffer_copy, node_meta->length()); |
305 | | - |
306 | | - return sparrow::primitive_array<T>(std::move(data), node_meta->length(), std::move(bitmap), name, metadata); |
307 | | -} |
308 | | - |
309 | | -// Explicit template instantiation |
310 | | -template SPARROW_IPC_API std::vector<uint8_t> serialize_primitive_array<int>(const sparrow::primitive_array<int>& arr); |
311 | | -template SPARROW_IPC_API sparrow::primitive_array<int> deserialize_primitive_array<int>(const std::vector<uint8_t>& buffer); |
312 | | -template SPARROW_IPC_API std::vector<uint8_t> serialize_primitive_array<float>(const sparrow::primitive_array<float>& arr); |
313 | | -template SPARROW_IPC_API sparrow::primitive_array<float> deserialize_primitive_array<float>(const std::vector<uint8_t>& buffer); |
314 | | -template SPARROW_IPC_API std::vector<uint8_t> serialize_primitive_array<double>(const sparrow::primitive_array<double>& arr); |
315 | | -template SPARROW_IPC_API sparrow::primitive_array<double> deserialize_primitive_array<double>(const std::vector<uint8_t>& buffer); |
| 11 | +sparrow::primitive_array<T> deserialize_primitive_array(const std::vector<uint8_t>& buffer); |
0 commit comments