Skip to content

Commit 6ff5bc7

Browse files
authored
Add serialize (#22)
Add serialize
1 parent 60a6a51 commit 6ff5bc7

13 files changed

+1393
-127
lines changed

CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,18 @@ set(SPARROW_IPC_HEADERS
106106
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixedsizebinary_array.hpp
107107
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp
108108
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_utils.hpp
109+
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
109110
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp
110111
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp
111112
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp
112113
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/metadata.hpp
114+
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serialize_utils.hpp
115+
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serialize.hpp
113116
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/utils.hpp
114117
)
115118

116119
set(SPARROW_IPC_SRC
120+
${SPARROW_IPC_SOURCE_DIR}/serialize_utils.cpp
117121
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array.cpp
118122
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array/private_data.cpp
119123
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema.cpp

cmake/external_dependencies.cmake

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ function(find_package_or_fetch)
4040
FetchContent_MakeAvailable(${arg_PACKAGE_NAME})
4141
message(STATUS "\t✅ Fetched ${arg_PACKAGE_NAME}")
4242
else()
43-
message(STATUS "📦 ${actual_pkg_name} found here: ${actual_pkg_name}_DIR")
43+
message(STATUS "📦 ${actual_pkg_name} found here: ${${actual_pkg_name}_DIR}")
4444
endif()
4545
endif()
4646
endfunction()
@@ -52,7 +52,7 @@ endif()
5252
find_package_or_fetch(
5353
PACKAGE_NAME sparrow
5454
GIT_REPOSITORY https://github.com/man-group/sparrow.git
55-
TAG 1.1.1
55+
TAG 1.1.2
5656
)
5757
unset(CREATE_JSON_READER_TARGET)
5858

include/sparrow_ipc/magic_values.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@
22

33
#include <algorithm>
44
#include <array>
5+
#include <cstdint>
56
#include <istream>
67

78
namespace sparrow_ipc
89
{
9-
1010
/**
1111
* Continuation value defined in the Arrow IPC specification:
1212
* https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
1313
*/
14-
constexpr std::array<uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF};
14+
inline constexpr std::array<std::uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF};
1515

1616
/**
1717
* End-of-stream marker defined in the Arrow IPC specification:
1818
* https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1919
*/
20-
constexpr std::array<uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00};
20+
inline constexpr std::array<std::uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00};
2121

2222
template <std::ranges::input_range R>
2323
[[nodiscard]] bool is_continuation(const R& buf)
@@ -30,4 +30,4 @@ namespace sparrow_ipc
3030
{
3131
return std::ranges::equal(buf, end_of_stream);
3232
}
33-
}
33+
}

include/sparrow_ipc/serialize.hpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#pragma once
2+
3+
#include <ostream>
4+
#include <ranges>
5+
#include <vector>
6+
7+
#include <sparrow/record_batch.hpp>
8+
9+
#include "Message_generated.h"
10+
#include "sparrow_ipc/config/config.hpp"
11+
#include "sparrow_ipc/magic_values.hpp"
12+
#include "sparrow_ipc/serialize_utils.hpp"
13+
#include "sparrow_ipc/utils.hpp"
14+
15+
namespace sparrow_ipc
16+
{
17+
/**
18+
* @brief Serializes a collection of record batches into a binary format.
19+
*
20+
* This function takes a collection of record batches and serializes them into a single
21+
* binary representation following the Arrow IPC format. The serialization includes:
22+
* - Schema message (derived from the first record batch)
23+
* - All record batch data
24+
* - End-of-stream marker
25+
*
26+
* @tparam R Container type that holds record batches (must support empty(), operator[], begin(), end())
27+
* @param record_batches Collection of record batches to serialize. All batches must have identical
28+
* schemas.
29+
*
30+
* @return std::vector<uint8_t> Binary serialized data containing schema, record batches, and
31+
* end-of-stream marker. Returns empty vector if input collection is empty.
32+
*
33+
* @throws std::invalid_argument If record batches have inconsistent schemas or if the collection
34+
* contains batches that cannot be serialized together.
35+
*
36+
* @pre All record batches in the collection must have the same schema
37+
* @pre The container R must not be empty when consistency checking is required
38+
*/
39+
template <std::ranges::input_range R>
40+
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
41+
std::vector<uint8_t> serialize(const R& record_batches)
42+
{
43+
if (record_batches.empty())
44+
{
45+
return {};
46+
}
47+
if (!utils::check_record_batches_consistency(record_batches))
48+
{
49+
throw std::invalid_argument(
50+
"All record batches must have the same schema to be serialized together."
51+
);
52+
}
53+
std::vector<uint8_t> serialized_schema = serialize_schema_message(record_batches[0]);
54+
std::vector<uint8_t> serialized_record_batches = serialize_record_batches_without_schema_message(record_batches);
55+
serialized_schema.insert(
56+
serialized_schema.end(),
57+
std::make_move_iterator(serialized_record_batches.begin()),
58+
std::make_move_iterator(serialized_record_batches.end())
59+
);
60+
// End of stream message
61+
serialized_schema.insert(serialized_schema.end(), end_of_stream.begin(), end_of_stream.end());
62+
return serialized_schema;
63+
}
64+
}

0 commit comments

Comments
 (0)