|
| 1 | +# Serialization and Deserialization {#serialization} |
| 2 | + |
| 3 | +This page describes how to serialize and deserialize record batches using `sparrow-ipc`. |
| 4 | + |
| 5 | +## Overview |
| 6 | + |
| 7 | +`sparrow-ipc` provides two main approaches for both serialization and deserialization: |
| 8 | + |
| 9 | +- **Function API**: Simple one-shot operations for serializing/deserializing complete data |
| 10 | +- **Class API**: Streaming-oriented classes (`serializer` and `deserializer`) for incremental operations |
| 11 | + |
| 12 | +## Serialization |
| 13 | + |
| 14 | +### Serialize record batches to a memory stream |
| 15 | + |
| 16 | +The simplest way to serialize record batches is to use the `serializer` class with a `memory_output_stream`: |
| 17 | + |
| 18 | +```cpp |
| 19 | +#include <vector> |
| 20 | +#include <sparrow_ipc/memory_output_stream.hpp> |
| 21 | +#include <sparrow_ipc/serializer.hpp> |
| 22 | +#include <sparrow/record_batch.hpp> |
| 23 | + |
| 24 | +namespace sp = sparrow; |
| 25 | +namespace sp_ipc = sparrow_ipc; |
| 26 | + |
| 27 | +std::vector<uint8_t> serialize_batches_to_stream(const std::vector<sp::record_batch>& batches) |
| 28 | +{ |
| 29 | + std::vector<uint8_t> stream_data; |
| 30 | + sp_ipc::memory_output_stream stream(stream_data); |
| 31 | + sp_ipc::serializer serializer(stream); |
| 32 | + |
| 33 | + // Serialize all batches using the streaming operator |
| 34 | + serializer << batches << sp_ipc::end_stream; |
| 35 | + |
| 36 | + return stream_data; |
| 37 | +} |
| 38 | +``` |
| 39 | + |
| 40 | +### Serialize individual record batches |
| 41 | + |
| 42 | +You can also serialize record batches one at a time: |
| 43 | + |
| 44 | +```cpp |
| 45 | +#include <vector> |
| 46 | +#include <sparrow_ipc/memory_output_stream.hpp> |
| 47 | +#include <sparrow_ipc/serializer.hpp> |
| 48 | +#include <sparrow/record_batch.hpp> |
| 49 | + |
| 50 | +namespace sp = sparrow; |
| 51 | +namespace sp_ipc = sparrow_ipc; |
| 52 | + |
| 53 | +std::vector<uint8_t> serialize_batches_individually(const std::vector<sp::record_batch>& batches) |
| 54 | +{ |
| 55 | + std::vector<uint8_t> stream_data; |
| 56 | + sp_ipc::memory_output_stream stream(stream_data); |
| 57 | + sp_ipc::serializer serializer(stream); |
| 58 | + |
| 59 | + // Serialize batches one by one |
| 60 | + for (const auto& batch : batches) |
| 61 | + { |
| 62 | + serializer << batch; |
| 63 | + } |
| 64 | + |
| 65 | + // Don't forget to end the stream |
| 66 | + serializer << sp_ipc::end_stream; |
| 67 | + |
| 68 | + return stream_data; |
| 69 | +} |
| 70 | +``` |
| 71 | + |
| 72 | +### Pipe a source of record batches to a stream |
| 73 | + |
| 74 | +For streaming scenarios where batches are generated on-the-fly: |
| 75 | + |
| 76 | +```cpp |
| 77 | +#include <optional> |
| 78 | +#include <vector> |
| 79 | +#include <sparrow_ipc/memory_output_stream.hpp> |
| 80 | +#include <sparrow_ipc/serializer.hpp> |
| 81 | +#include <sparrow/record_batch.hpp> |
| 82 | + |
| 83 | +namespace sp = sparrow; |
| 84 | +namespace sp_ipc = sparrow_ipc; |
| 85 | + |
| 86 | +class record_batch_source |
| 87 | +{ |
| 88 | +public: |
| 89 | + std::optional<sp::record_batch> next(); |
| 90 | +}; |
| 91 | + |
| 92 | +std::vector<uint8_t> stream_from_source(record_batch_source& source) |
| 93 | +{ |
| 94 | + std::vector<uint8_t> stream_data; |
| 95 | + sp_ipc::memory_output_stream stream(stream_data); |
| 96 | + sp_ipc::serializer serializer(stream); |
| 97 | + |
| 98 | + std::optional<sp::record_batch> batch; |
| 99 | + while ((batch = source.next())) |
| 100 | + { |
| 101 | + serializer << *batch; |
| 102 | + } |
| 103 | + serializer << sp_ipc::end_stream; |
| 104 | + |
| 105 | + return stream_data; |
| 106 | +} |
| 107 | +``` |
| 108 | +
|
| 109 | +## Deserialization |
| 110 | +
|
| 111 | +### Using the function API |
| 112 | +
|
| 113 | +The simplest way to deserialize a complete Arrow IPC stream is using `deserialize_stream`: |
| 114 | +
|
| 115 | +```cpp |
| 116 | +#include <vector> |
| 117 | +#include <sparrow_ipc/deserialize.hpp> |
| 118 | +#include <sparrow/record_batch.hpp> |
| 119 | +
|
| 120 | +namespace sp = sparrow; |
| 121 | +namespace sp_ipc = sparrow_ipc; |
| 122 | +
|
| 123 | +std::vector<sp::record_batch> deserialize_stream_example(const std::vector<uint8_t>& stream_data) |
| 124 | +{ |
| 125 | + // Deserialize the entire stream at once |
| 126 | + auto batches = sp_ipc::deserialize_stream(stream_data); |
| 127 | + return batches; |
| 128 | +} |
| 129 | +``` |
| 130 | + |
| 131 | +### Using the deserializer class |
| 132 | + |
| 133 | +The `deserializer` class provides more control over deserialization and is useful when you want to: |
| 134 | +- Accumulate batches into an existing container |
| 135 | +- Deserialize data incrementally as it arrives |
| 136 | +- Process multiple streams into a single container |
| 137 | + |
| 138 | +#### Basic usage |
| 139 | + |
| 140 | +```cpp |
| 141 | +#include <iostream> |
| 142 | +#include <span> |
| 143 | +#include <vector> |
| 144 | +#include <sparrow_ipc/deserializer.hpp> |
| 145 | +#include <sparrow/record_batch.hpp> |
| 146 | + |
| 147 | +namespace sp = sparrow; |
| 148 | +namespace sp_ipc = sparrow_ipc; |
| 149 | + |
| 150 | +void deserializer_basic_example(const std::vector<uint8_t>& stream_data) |
| 151 | +{ |
| 152 | + // Create a container to hold the deserialized batches |
| 153 | + std::vector<sp::record_batch> batches; |
| 154 | + |
| 155 | + // Create a deserializer that will append to our container |
| 156 | + sp_ipc::deserializer deser(batches); |
| 157 | + |
| 158 | + // Deserialize the stream data |
| 159 | + deser.deserialize(std::span<const uint8_t>(stream_data)); |
| 160 | + |
| 161 | + // Process the accumulated batches |
| 162 | + for (const auto& batch : batches) |
| 163 | + { |
| 164 | + std::cout << "Batch with " << batch.nb_rows() << " rows and " |
| 165 | + << batch.nb_columns() << " columns\n"; |
| 166 | + } |
| 167 | +} |
| 168 | +``` |
| 169 | + |
| 170 | +#### Incremental deserialization |
| 171 | + |
| 172 | +The `deserializer` class is particularly useful for streaming scenarios where data arrives in chunks: |
| 173 | + |
| 174 | +```cpp |
| 175 | +#include <iostream> |
| 176 | +#include <span> |
| 177 | +#include <vector> |
| 178 | +#include <sparrow_ipc/deserializer.hpp> |
| 179 | +#include <sparrow/record_batch.hpp> |
| 180 | + |
| 181 | +namespace sp = sparrow; |
| 182 | +namespace sp_ipc = sparrow_ipc; |
| 183 | + |
| 184 | +void deserializer_incremental_example(const std::vector<std::vector<uint8_t>>& stream_chunks) |
| 185 | +{ |
| 186 | + // Container to accumulate all deserialized batches |
| 187 | + std::vector<sp::record_batch> batches; |
| 188 | + |
| 189 | + // Create a deserializer |
| 190 | + sp_ipc::deserializer deser(batches); |
| 191 | + |
| 192 | + // Deserialize chunks as they arrive using the streaming operator |
| 193 | + for (const auto& chunk : stream_chunks) |
| 194 | + { |
| 195 | + deser << std::span<const uint8_t>(chunk); |
| 196 | + std::cout << "After chunk: " << batches.size() << " batches accumulated\n"; |
| 197 | + } |
| 198 | + |
| 199 | + // All batches are now available in the container |
| 200 | + std::cout << "Total batches deserialized: " << batches.size() << "\n"; |
| 201 | +} |
| 202 | +``` |
| 203 | + |
| 204 | +#### Chaining deserializations |
| 205 | + |
| 206 | +The streaming operator can be chained for fluent API usage: |
| 207 | + |
| 208 | +```cpp |
| 209 | +#include <span> |
| 210 | +#include <vector> |
| 211 | +#include <sparrow_ipc/deserializer.hpp> |
| 212 | +#include <sparrow/record_batch.hpp> |
| 213 | + |
| 214 | +namespace sp = sparrow; |
| 215 | +namespace sp_ipc = sparrow_ipc; |
| 216 | + |
| 217 | +void deserializer_chaining_example( |
| 218 | + const std::vector<uint8_t>& chunk1, |
| 219 | + const std::vector<uint8_t>& chunk2, |
| 220 | + const std::vector<uint8_t>& chunk3) |
| 221 | +{ |
| 222 | + std::vector<sp::record_batch> batches; |
| 223 | + sp_ipc::deserializer deser(batches); |
| 224 | + |
| 225 | + // Chain multiple deserializations in a single expression |
| 226 | + deser << std::span<const uint8_t>(chunk1) |
| 227 | + << std::span<const uint8_t>(chunk2) |
| 228 | + << std::span<const uint8_t>(chunk3); |
| 229 | +} |
| 230 | +``` |
| 231 | + |
| 232 | +### Using different container types |
| 233 | + |
| 234 | +The `deserializer` class works with any container that satisfies `std::ranges::input_range` and supports `insert` at the end: |
| 235 | + |
| 236 | +```cpp |
| 237 | +#include <deque> |
| 238 | +#include <list> |
| 239 | +#include <span> |
| 240 | +#include <vector> |
| 241 | +#include <sparrow_ipc/deserializer.hpp> |
| 242 | +#include <sparrow/record_batch.hpp> |
| 243 | + |
| 244 | +namespace sp = sparrow; |
| 245 | +namespace sp_ipc = sparrow_ipc; |
| 246 | + |
| 247 | +void different_containers_example(const std::vector<uint8_t>& stream_data) |
| 248 | +{ |
| 249 | + // Using std::deque |
| 250 | + std::deque<sp::record_batch> deque_batches; |
| 251 | + sp_ipc::deserializer deser_deque(deque_batches); |
| 252 | + deser_deque.deserialize(std::span<const uint8_t>(stream_data)); |
| 253 | + |
| 254 | + // Using std::list |
| 255 | + std::list<sp::record_batch> list_batches; |
| 256 | + sp_ipc::deserializer deser_list(list_batches); |
| 257 | + deser_list.deserialize(std::span<const uint8_t>(stream_data)); |
| 258 | +} |
| 259 | +``` |
| 260 | + |
| 261 | +## Round-trip example |
| 262 | + |
| 263 | +Here's a complete example showing serialization and deserialization: |
| 264 | + |
| 265 | +```cpp |
| 266 | +#include <cassert> |
| 267 | +#include <span> |
| 268 | +#include <vector> |
| 269 | +#include <sparrow_ipc/deserialize.hpp> |
| 270 | +#include <sparrow_ipc/deserializer.hpp> |
| 271 | +#include <sparrow_ipc/memory_output_stream.hpp> |
| 272 | +#include <sparrow_ipc/serializer.hpp> |
| 273 | +#include <sparrow/record_batch.hpp> |
| 274 | + |
| 275 | +namespace sp = sparrow; |
| 276 | +namespace sp_ipc = sparrow_ipc; |
| 277 | + |
| 278 | +void round_trip_example() |
| 279 | +{ |
| 280 | + // Create sample data |
| 281 | + auto int_array = sp::primitive_array<int32_t>({1, 2, 3, 4, 5}); |
| 282 | + auto string_array = sp::string_array( |
| 283 | + std::vector<std::string>{"hello", "world", "test", "data", "example"} |
| 284 | + ); |
| 285 | + |
| 286 | + sp::record_batch original_batch( |
| 287 | + {{"id", sp::array(std::move(int_array))}, |
| 288 | + {"name", sp::array(std::move(string_array))}} |
| 289 | + ); |
| 290 | + |
| 291 | + // Serialize |
| 292 | + std::vector<uint8_t> stream_data; |
| 293 | + sp_ipc::memory_output_stream stream(stream_data); |
| 294 | + sp_ipc::serializer serializer(stream); |
| 295 | + serializer << original_batch << sp_ipc::end_stream; |
| 296 | + |
| 297 | + // Deserialize using function API |
| 298 | + auto deserialized_batches = sp_ipc::deserialize_stream(stream_data); |
| 299 | + |
| 300 | + assert(deserialized_batches.size() == 1); |
| 301 | + assert(deserialized_batches[0].nb_rows() == original_batch.nb_rows()); |
| 302 | + assert(deserialized_batches[0].nb_columns() == original_batch.nb_columns()); |
| 303 | + |
| 304 | + // Or using deserializer class |
| 305 | + std::vector<sp::record_batch> batches; |
| 306 | + sp_ipc::deserializer deser(batches); |
| 307 | + deser << std::span<const uint8_t>(stream_data); |
| 308 | + |
| 309 | + assert(batches.size() == 1); |
| 310 | +} |
| 311 | +``` |
0 commit comments