Skip to content

Commit 37bc5ff

Browse files
committed
wip
1 parent e5d5b81 commit 37bc5ff

File tree

5 files changed

+189
-119
lines changed

5 files changed

+189
-119
lines changed

include/sparrow_ipc/serialize.hpp

Lines changed: 74 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,59 +7,111 @@
77
#include <sparrow/record_batch.hpp>
88

99
#include "Message_generated.h"
10+
#include "sparrow_ipc/config/config.hpp"
11+
#include "sparrow_ipc/utils.hpp"
1012

1113
namespace sparrow_ipc
1214
{
1315
template <std::ranges::input_range R>
1416
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
15-
std::vector<uint8_t> serialize(const R& record_batches);
17+
[[nodiscard]] std::vector<uint8_t> serialize(const R& record_batches);
1618

17-
std::vector<uint8_t> serialize_schema_message(const ArrowSchema& arrow_schema);
19+
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t>
20+
serialize_schema_message(const sparrow::record_batch& record_batch);
1821

19-
std::vector<uint8_t> serialize_record_batch(const sparrow::record_batch& record_batch);
22+
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t>
23+
serialize_record_batch(const sparrow::record_batch& record_batch);
2024

2125
template <std::ranges::input_range R>
2226
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
23-
std::vector<uint8_t> serialize_record_batches(const R& record_batches);
24-
25-
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
26-
create_metadata(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema);
27-
28-
::flatbuffers::Offset<org::apache::arrow::flatbuf::Field>
27+
[[nodiscard]] std::vector<uint8_t> serialize_record_batches(const R& record_batches)
28+
{
29+
std::vector<uint8_t> output;
30+
for (const auto& record_batch : record_batches)
31+
{
32+
const auto rb_serialized = serialize_record_batch(record_batch);
33+
output.insert(
34+
output.end(),
35+
std::make_move_iterator(rb_serialized.begin()),
36+
std::make_move_iterator(rb_serialized.end())
37+
);
38+
}
39+
return output;
40+
}
41+
42+
[[nodiscard]] SPARROW_IPC_API
43+
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
44+
create_metadata(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema);
45+
46+
[[nodiscard]] SPARROW_IPC_API ::flatbuffers::Offset<org::apache::arrow::flatbuf::Field>
2947
create_field(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema);
3048

31-
::flatbuffers::Offset<::flatbuffers::Vector<::flatbuffers::Offset<org::apache::arrow::flatbuf::Field>>>
49+
[[nodiscard]] SPARROW_IPC_API ::flatbuffers::Offset<
50+
::flatbuffers::Vector<::flatbuffers::Offset<org::apache::arrow::flatbuf::Field>>>
51+
create_children(flatbuffers::FlatBufferBuilder& builder, sparrow::record_batch::column_range columns);
52+
53+
[[nodiscard]] SPARROW_IPC_API ::flatbuffers::Offset<
54+
::flatbuffers::Vector<::flatbuffers::Offset<org::apache::arrow::flatbuf::Field>>>
3255
create_children(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema);
3356

34-
flatbuffers::FlatBufferBuilder get_schema_message_builder(const ArrowSchema& arrow_schema);
35-
std::vector<uint8_t> serialize_schema_message(const ArrowSchema& arrow_schema);
57+
[[nodiscard]] SPARROW_IPC_API flatbuffers::FlatBufferBuilder
58+
get_schema_message_builder(const sparrow::record_batch& record_batch);
59+
60+
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t>
61+
serialize_schema_message(const sparrow::record_batch& record_batch);
3662

37-
void fill_fieldnodes(
63+
SPARROW_IPC_API void fill_fieldnodes(
3864
const sparrow::arrow_proxy& arrow_proxy,
3965
std::vector<org::apache::arrow::flatbuf::FieldNode>& nodes
4066
);
4167

42-
std::vector<org::apache::arrow::flatbuf::FieldNode>
68+
[[nodiscard]] SPARROW_IPC_API std::vector<org::apache::arrow::flatbuf::FieldNode>
4369
create_fieldnodes(const sparrow::record_batch& record_batch);
4470

45-
void fill_buffers(
71+
SPARROW_IPC_API void fill_buffers(
4672
const sparrow::arrow_proxy& arrow_proxy,
4773
std::vector<org::apache::arrow::flatbuf::Buffer>& flatbuf_buffers,
4874
int64_t& offset
4975
);
5076

51-
std::vector<org::apache::arrow::flatbuf::Buffer> get_buffers(const sparrow::record_batch& record_batch);
77+
[[nodiscard]] SPARROW_IPC_API std::vector<org::apache::arrow::flatbuf::Buffer>
78+
get_buffers(const sparrow::record_batch& record_batch);
5279

53-
void fill_body(const sparrow::arrow_proxy& arrow_proxy, std::vector<uint8_t>& body);
54-
std::vector<uint8_t> generate_body(const sparrow::record_batch& record_batch);
55-
int64_t calculate_body_size(const sparrow::arrow_proxy& arrow_proxy);
56-
int64_t calculate_body_size(const sparrow::record_batch& record_batch);
80+
SPARROW_IPC_API void fill_body(const sparrow::arrow_proxy& arrow_proxy, std::vector<uint8_t>& body);
81+
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t> generate_body(const sparrow::record_batch& record_batch);
82+
[[nodiscard]] SPARROW_IPC_API int64_t calculate_body_size(const sparrow::arrow_proxy& arrow_proxy);
83+
[[nodiscard]] SPARROW_IPC_API int64_t calculate_body_size(const sparrow::record_batch& record_batch);
5784

58-
flatbuffers::FlatBufferBuilder get_record_batch_message_builder(
85+
[[nodiscard]] SPARROW_IPC_API flatbuffers::FlatBufferBuilder get_record_batch_message_builder(
5986
const sparrow::record_batch& record_batch,
6087
const std::vector<org::apache::arrow::flatbuf::FieldNode>& nodes,
6188
const std::vector<org::apache::arrow::flatbuf::Buffer>& buffers
6289
);
6390

64-
std::vector<uint8_t> serialize_record_batch(const sparrow::record_batch& record_batch);
91+
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t>
92+
serialize_record_batch(const sparrow::record_batch& record_batch);
93+
94+
template <std::ranges::input_range R>
95+
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
96+
std::vector<uint8_t> serialize(const R& record_batches)
97+
{
98+
if (record_batches.empty())
99+
{
100+
return {};
101+
}
102+
if (utils::check_record_batches_consistency(record_batches))
103+
{
104+
throw std::invalid_argument(
105+
"All record batches must have the same schema to be serialized together."
106+
);
107+
}
108+
std::vector<uint8_t> serialized_schema = serialize_schema_message(record_batches[0]);
109+
std::vector<uint8_t> serialized_record_batches = serialize_record_batches(record_batches);
110+
serialized_schema.insert(
111+
serialized_schema.end(),
112+
std::make_move_iterator(serialized_record_batches.begin()),
113+
std::make_move_iterator(serialized_record_batches.end())
114+
);
115+
return serialized_schema;
116+
}
65117
}

include/sparrow_ipc/utils.hpp

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,35 @@ namespace sparrow_ipc::utils
2222

2323
template <std::ranges::input_range R>
2424
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
25-
SPARROW_IPC_API bool check_record_batches_consistency(const R& record_batches);
25+
bool check_record_batches_consistency(const R& record_batches)
26+
{
27+
if (record_batches.empty())
28+
{
29+
return true;
30+
}
31+
const sparrow::record_batch& first_rb = record_batches[0];
32+
for (const sparrow::record_batch& rb : record_batches)
33+
{
34+
if (rb.nb_columns() != first_rb.nb_columns())
35+
{
36+
return false;
37+
}
38+
if (rb.nb_rows() != first_rb.nb_rows())
39+
{
40+
return false;
41+
}
42+
for (size_t col_idx = 0; col_idx < rb.nb_columns(); ++col_idx)
43+
{
44+
const sparrow::array& arr = rb.get_column(col_idx);
45+
const sparrow::array& first_arr = first_rb.get_column(col_idx);
46+
if (arr.data_type() != first_arr.data_type())
47+
{
48+
return false;
49+
}
50+
}
51+
}
52+
return true;
53+
}
2654

27-
size_t calculate_output_serialized_size(const sparrow::record_batch& record_batch);
55+
// size_t calculate_output_serialized_size(const sparrow::record_batch& record_batch);
2856
}

src/serialize.cpp

Lines changed: 31 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -56,25 +56,38 @@ namespace sparrow_ipc
5656
{
5757
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> children_vec;
5858
children_vec.reserve(arrow_schema.n_children);
59-
for (int i = 0; i < arrow_schema.n_children; ++i)
59+
for (size_t i = 0; i < arrow_schema.n_children; ++i)
6060
{
6161
if (arrow_schema.children[i] == nullptr)
6262
{
63-
throw std::invalid_argument("ArrowSchema has null child at index " + std::to_string(i));
63+
throw std::invalid_argument("ArrowSchema has null child pointer");
6464
}
65-
flatbuffers::Offset<org::apache::arrow::flatbuf::Field> field = create_field(
66-
builder,
67-
*(arrow_schema.children[i])
68-
);
65+
const auto& child = *arrow_schema.children[i];
66+
flatbuffers::Offset<org::apache::arrow::flatbuf::Field> field = create_field(builder, child);
6967
children_vec.emplace_back(field);
7068
}
7169
return children_vec.empty() ? 0 : builder.CreateVector(children_vec);
7270
}
7371

74-
flatbuffers::FlatBufferBuilder get_schema_message_builder(const ArrowSchema& arrow_schema)
72+
::flatbuffers::Offset<::flatbuffers::Vector<::flatbuffers::Offset<org::apache::arrow::flatbuf::Field>>>
73+
create_children(flatbuffers::FlatBufferBuilder& builder, sparrow::record_batch::column_range columns)
74+
{
75+
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> children_vec;
76+
children_vec.reserve(columns.size());
77+
for (const auto& column : columns)
78+
{
79+
const auto& arrow_schema = sparrow::detail::array_access::get_arrow_proxy(column).schema();
80+
flatbuffers::Offset<org::apache::arrow::flatbuf::Field> field = create_field(builder, arrow_schema);
81+
children_vec.emplace_back(field);
82+
}
83+
return children_vec.empty() ? 0 : builder.CreateVector(children_vec);
84+
}
85+
86+
flatbuffers::FlatBufferBuilder get_schema_message_builder(const sparrow::record_batch& record_batch)
7587
{
7688
flatbuffers::FlatBufferBuilder schema_builder;
77-
const auto fields_vec = create_children(schema_builder, arrow_schema);
89+
record_batch.columns();
90+
const auto fields_vec = create_children(schema_builder, record_batch.columns());
7891
const auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(
7992
schema_builder,
8093
org::apache::arrow::flatbuf::Endianness::Little, // TODO: make configurable
@@ -92,12 +105,11 @@ namespace sparrow_ipc
92105
return schema_builder;
93106
}
94107

95-
std::vector<uint8_t> serialize_schema_message(const ArrowSchema& arrow_schema)
108+
std::vector<uint8_t> serialize_schema_message(const sparrow::record_batch& record_batch)
96109
{
97110
std::vector<uint8_t> schema_buffer;
98-
99111
schema_buffer.insert(schema_buffer.end(), continuation.begin(), continuation.end());
100-
flatbuffers::FlatBufferBuilder schema_builder = get_schema_message_builder(arrow_schema);
112+
flatbuffers::FlatBufferBuilder schema_builder = get_schema_message_builder(record_batch);
101113
const flatbuffers::uoffset_t schema_len = schema_builder.GetSize();
102114
schema_buffer.reserve(schema_buffer.size() + sizeof(uint32_t) + schema_len);
103115
// Write the 4-byte length prefix after the continuation bytes
@@ -162,8 +174,7 @@ namespace sparrow_ipc
162174
}
163175
for (const auto& child : arrow_proxy.children())
164176
{
165-
const auto& child_arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(child);
166-
fill_buffers(child_arrow_proxy, flatbuf_buffers, offset);
177+
fill_buffers(child, flatbuf_buffers, offset);
167178
}
168179
}
169180

@@ -190,8 +201,7 @@ namespace sparrow_ipc
190201
}
191202
for (const auto& child : arrow_proxy.children())
192203
{
193-
const auto& child_arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(child);
194-
fill_body(child_arrow_proxy, body);
204+
fill_body(child, body);
195205
}
196206
}
197207

@@ -215,8 +225,7 @@ namespace sparrow_ipc
215225
}
216226
for (const auto& child : arrow_proxy.children())
217227
{
218-
const auto& child_arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(child);
219-
total_size += calculate_body_size(child_arrow_proxy);
228+
total_size += calculate_body_size(child);
220229
}
221230
return total_size;
222231
}
@@ -270,14 +279,11 @@ namespace sparrow_ipc
270279
{
271280
std::vector<org::apache::arrow::flatbuf::FieldNode> nodes = create_fieldnodes(record_batch);
272281
std::vector<org::apache::arrow::flatbuf::Buffer> flatbuf_buffers = get_buffers(record_batch);
273-
flatbuffers::FlatBufferBuilder record_batch_builder;
274-
::flatbuffers::Offset<org::apache::arrow::flatbuf::RecordBatch>
275-
record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatchDirect(
276-
record_batch_builder,
277-
static_cast<int64_t>(record_batch.nb_rows()),
278-
&nodes,
279-
&flatbuf_buffers
280-
);
282+
flatbuffers::FlatBufferBuilder record_batch_builder = get_record_batch_message_builder(
283+
record_batch,
284+
nodes,
285+
flatbuf_buffers
286+
);
281287
std::vector<uint8_t> output;
282288
output.insert(output.end(), continuation.begin(), continuation.end());
283289
const flatbuffers::uoffset_t record_batch_len = record_batch_builder.GetSize();
@@ -302,40 +308,4 @@ namespace sparrow_ipc
302308
return output;
303309
}
304310

305-
template <std::ranges::input_range R>
306-
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
307-
std::vector<uint8_t> serialize_record_batches(const R& record_batches)
308-
{
309-
std::vector<uint8_t> output;
310-
for (const auto& record_batch : record_batches)
311-
{
312-
const auto rb_serialized = serialize_record_batch(record_batch);
313-
output.insert(
314-
output.end(),
315-
std::make_move_iterator(rb_serialized.begin()),
316-
std::make_move_iterator(rb_serialized.end())
317-
);
318-
}
319-
return output;
320-
}
321-
322-
template <std::ranges::input_range R>
323-
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
324-
std::vector<uint8_t> serialize(const R& record_batches)
325-
{
326-
if (check_record_batches_consistency(record_batches))
327-
{
328-
throw std::invalid_argument(
329-
"All record batches must have the same schema to be serialized together."
330-
);
331-
}
332-
std::vector<uint8_t> serialized_schema = serialize_schema_message(record_batches[0].schema());
333-
std::vector<uint8_t> serialized_record_batches = serialize_record_batches(record_batches);
334-
serialized_schema.insert(
335-
serialized_schema.end(),
336-
std::make_move_iterator(serialized_record_batches.begin()),
337-
std::make_move_iterator(serialized_record_batches.end())
338-
);
339-
return serialized_schema;
340-
}
341311
}

src/utils.cpp

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -445,39 +445,8 @@ namespace sparrow_ipc
445445
}
446446
}
447447

448-
template <std::ranges::input_range R>
449-
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
450-
bool check_record_batches_consistency(const R& record_batches)
451-
{
452-
if (record_batches.empty())
453-
{
454-
return true;
455-
}
456-
const sparrow::record_batch& first_rb = record_batches[0];
457-
for (const sparrow::record_batch& rb : record_batches)
458-
{
459-
if (rb.nb_columns() != first_rb.nb_columns())
460-
{
461-
return false;
462-
}
463-
if (rb.nb_rows() != first_rb.nb_rows())
464-
{
465-
return false;
466-
}
467-
for (size_t col_idx = 0; col_idx < rb.nb_columns(); ++col_idx)
468-
{
469-
const sparrow::array& arr = rb.get_column(col_idx);
470-
const sparrow::array& first_arr = first_rb.get_column(col_idx);
471-
if (arr.data_type() != first_arr.data_type())
472-
{
473-
return false;
474-
}
475-
}
476-
}
477-
}
478-
479-
size_t calculate_output_serialized_size(const sparrow::record_batch& record_batch)
480-
{
481-
}
448+
// size_t calculate_output_serialized_size(const sparrow::record_batch& record_batch)
449+
// {
450+
// }
482451
}
483452
}

0 commit comments

Comments
 (0)