Skip to content

Commit 6538849

Browse files
committed
Add serialize
1 parent b94eea7 commit 6538849

File tree

8 files changed

+551
-25
lines changed

8 files changed

+551
-25
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ set(SPARROW_IPC_HEADERS
9797
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp
9898
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_utils.hpp
9999
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp
100+
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serialize.hpp
100101
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp
101102
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp
102103
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/metadata.hpp
@@ -111,6 +112,7 @@ set(SPARROW_IPC_SRC
111112
${SPARROW_IPC_SOURCE_DIR}/deserialize_fixedsizebinary_array.cpp
112113
${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp
113114
${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp
115+
${SPARROW_IPC_SOURCE_DIR}/serialize.cpp
114116
${SPARROW_IPC_SOURCE_DIR}/encapsulated_message.cpp
115117
${SPARROW_IPC_SOURCE_DIR}/metadata.cpp
116118
${SPARROW_IPC_SOURCE_DIR}/utils.cpp

include/sparrow_ipc/magic_values.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
namespace sparrow_ipc
88
{
9-
109
/**
1110
* Continuation value defined in the Arrow IPC specification:
1211
* https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format

include/sparrow_ipc/serialize.hpp

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#pragma once
2+
3+
#include <ostream>
4+
#include <ranges>
5+
#include <vector>
6+
7+
#include <sparrow/record_batch.hpp>
8+
9+
#include "Message_generated.h"
10+
#include "sparrow_ipc/config/config.hpp"
11+
#include "sparrow_ipc/magic_values.hpp"
12+
#include "sparrow_ipc/utils.hpp"
13+
14+
namespace sparrow_ipc
15+
{
16+
template <std::ranges::input_range R>
17+
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
18+
[[nodiscard]] std::vector<uint8_t> serialize(const R& record_batches);
19+
20+
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t>
21+
serialize_schema_message(const sparrow::record_batch& record_batch);
22+
23+
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t>
24+
serialize_record_batch(const sparrow::record_batch& record_batch);
25+
26+
template <std::ranges::input_range R>
27+
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
28+
[[nodiscard]] std::vector<uint8_t> serialize_record_batches(const R& record_batches)
29+
{
30+
std::vector<uint8_t> output;
31+
for (const auto& record_batch : record_batches)
32+
{
33+
const auto rb_serialized = serialize_record_batch(record_batch);
34+
output.insert(
35+
output.end(),
36+
std::make_move_iterator(rb_serialized.begin()),
37+
std::make_move_iterator(rb_serialized.end())
38+
);
39+
}
40+
return output;
41+
}
42+
43+
[[nodiscard]] SPARROW_IPC_API
44+
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
45+
create_metadata(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema);
46+
47+
[[nodiscard]] SPARROW_IPC_API ::flatbuffers::Offset<org::apache::arrow::flatbuf::Field>
48+
create_field(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema);
49+
50+
[[nodiscard]] SPARROW_IPC_API ::flatbuffers::Offset<
51+
::flatbuffers::Vector<::flatbuffers::Offset<org::apache::arrow::flatbuf::Field>>>
52+
create_children(flatbuffers::FlatBufferBuilder& builder, sparrow::record_batch::column_range columns);
53+
54+
[[nodiscard]] SPARROW_IPC_API ::flatbuffers::Offset<
55+
::flatbuffers::Vector<::flatbuffers::Offset<org::apache::arrow::flatbuf::Field>>>
56+
create_children(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema);
57+
58+
[[nodiscard]] SPARROW_IPC_API flatbuffers::FlatBufferBuilder
59+
get_schema_message_builder(const sparrow::record_batch& record_batch);
60+
61+
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t>
62+
serialize_schema_message(const sparrow::record_batch& record_batch);
63+
64+
SPARROW_IPC_API void fill_fieldnodes(
65+
const sparrow::arrow_proxy& arrow_proxy,
66+
std::vector<org::apache::arrow::flatbuf::FieldNode>& nodes
67+
);
68+
69+
[[nodiscard]] SPARROW_IPC_API std::vector<org::apache::arrow::flatbuf::FieldNode>
70+
create_fieldnodes(const sparrow::record_batch& record_batch);
71+
72+
SPARROW_IPC_API void fill_buffers(
73+
const sparrow::arrow_proxy& arrow_proxy,
74+
std::vector<org::apache::arrow::flatbuf::Buffer>& flatbuf_buffers,
75+
int64_t& offset
76+
);
77+
78+
[[nodiscard]] SPARROW_IPC_API std::vector<org::apache::arrow::flatbuf::Buffer>
79+
get_buffers(const sparrow::record_batch& record_batch);
80+
81+
SPARROW_IPC_API void fill_body(const sparrow::arrow_proxy& arrow_proxy, std::vector<uint8_t>& body);
82+
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t> generate_body(const sparrow::record_batch& record_batch);
83+
[[nodiscard]] SPARROW_IPC_API int64_t calculate_body_size(const sparrow::arrow_proxy& arrow_proxy);
84+
[[nodiscard]] SPARROW_IPC_API int64_t calculate_body_size(const sparrow::record_batch& record_batch);
85+
86+
[[nodiscard]] SPARROW_IPC_API flatbuffers::FlatBufferBuilder get_record_batch_message_builder(
87+
const sparrow::record_batch& record_batch,
88+
const std::vector<org::apache::arrow::flatbuf::FieldNode>& nodes,
89+
const std::vector<org::apache::arrow::flatbuf::Buffer>& buffers
90+
);
91+
92+
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t>
93+
serialize_record_batch(const sparrow::record_batch& record_batch);
94+
95+
template <std::ranges::input_range R>
96+
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
97+
std::vector<uint8_t> serialize(const R& record_batches)
98+
{
99+
if (record_batches.empty())
100+
{
101+
return {};
102+
}
103+
if (!utils::check_record_batches_consistency(record_batches))
104+
{
105+
throw std::invalid_argument(
106+
"All record batches must have the same schema to be serialized together."
107+
);
108+
}
109+
std::vector<uint8_t> serialized_schema = serialize_schema_message(record_batches[0]);
110+
std::vector<uint8_t> serialized_record_batches = serialize_record_batches(record_batches);
111+
serialized_schema.insert(
112+
serialized_schema.end(),
113+
std::make_move_iterator(serialized_record_batches.begin()),
114+
std::make_move_iterator(serialized_record_batches.end())
115+
);
116+
// End of stream message
117+
serialized_schema.insert(serialized_schema.end(), end_of_stream.begin(), end_of_stream.end());
118+
return serialized_schema;
119+
}
120+
}

include/sparrow_ipc/utils.hpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include <string_view>
66
#include <utility>
77

8+
#include <sparrow/record_batch.hpp>
9+
810
#include "Schema_generated.h"
911
#include "sparrow_ipc/config/config.hpp"
1012

@@ -17,4 +19,39 @@ namespace sparrow_ipc::utils
1719
// This function maps a sparrow data type to the corresponding Flatbuffers type
1820
SPARROW_IPC_API std::pair<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>>
1921
get_flatbuffer_type(flatbuffers::FlatBufferBuilder& builder, std::string_view format_str);
22+
23+
template <std::ranges::input_range R>
24+
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
25+
bool check_record_batches_consistency(const R& record_batches)
26+
{
27+
if (record_batches.empty())
28+
{
29+
return true;
30+
}
31+
const sparrow::record_batch& first_rb = record_batches[0];
32+
const size_t first_rb_nb_rows = first_rb.nb_rows();
33+
const size_t first_rb_nb_columns = first_rb.nb_columns();
34+
for (const sparrow::record_batch& rb : record_batches)
35+
{
36+
const auto rb_nb_columns = rb.nb_columns();
37+
if (rb_nb_columns != first_rb_nb_columns)
38+
{
39+
return false;
40+
}
41+
for (size_t col_idx = 0; col_idx < rb.nb_columns(); ++col_idx)
42+
{
43+
const sparrow::array& arr = rb.get_column(col_idx);
44+
const sparrow::array& first_arr = first_rb.get_column(col_idx);
45+
const auto arr_data_type = arr.data_type();
46+
const auto first_arr_data_type = first_arr.data_type();
47+
if (arr_data_type != first_arr_data_type)
48+
{
49+
return false;
50+
}
51+
}
52+
}
53+
return true;
54+
}
55+
56+
// size_t calculate_output_serialized_size(const sparrow::record_batch& record_batch);
2057
}

src/encapsulated_message.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ namespace sparrow_ipc
9999
std::pair<encapsulated_message, std::span<const uint8_t>>
100100
extract_encapsulated_message(std::span<const uint8_t> data)
101101
{
102-
if (!data.size() || data.size() < 8)
102+
if (data.size() < 8)
103103
{
104104
throw std::invalid_argument("Buffer is too small to contain a valid message.");
105105
}

0 commit comments

Comments
 (0)