Skip to content

Commit 0289a62

Browse files
committed
wip
1 parent 37bc5ff commit 0289a62

File tree

5 files changed

+53
-47
lines changed

5 files changed

+53
-47
lines changed

include/sparrow_ipc/serialize.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include "Message_generated.h"
1010
#include "sparrow_ipc/config/config.hpp"
11+
#include "sparrow_ipc/magic_values.hpp"
1112
#include "sparrow_ipc/utils.hpp"
1213

1314
namespace sparrow_ipc
@@ -99,7 +100,7 @@ namespace sparrow_ipc
99100
{
100101
return {};
101102
}
102-
if (utils::check_record_batches_consistency(record_batches))
103+
if (!utils::check_record_batches_consistency(record_batches))
103104
{
104105
throw std::invalid_argument(
105106
"All record batches must have the same schema to be serialized together."
@@ -112,6 +113,8 @@ namespace sparrow_ipc
112113
std::make_move_iterator(serialized_record_batches.begin()),
113114
std::make_move_iterator(serialized_record_batches.end())
114115
);
116+
// End of stream message
117+
serialized_schema.insert(serialized_schema.end(), end_of_stream.begin(), end_of_stream.end());
115118
return serialized_schema;
116119
}
117120
}

include/sparrow_ipc/utils.hpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,22 @@ namespace sparrow_ipc::utils
2929
return true;
3030
}
3131
const sparrow::record_batch& first_rb = record_batches[0];
32+
const size_t first_rb_nb_rows = first_rb.nb_rows();
33+
const size_t first_rb_nb_columns = first_rb.nb_columns();
3234
for (const sparrow::record_batch& rb : record_batches)
3335
{
34-
if (rb.nb_columns() != first_rb.nb_columns())
35-
{
36-
return false;
37-
}
38-
if (rb.nb_rows() != first_rb.nb_rows())
36+
const auto rb_nb_columns = rb.nb_columns();
37+
if (rb_nb_columns != first_rb_nb_columns)
3938
{
4039
return false;
4140
}
4241
for (size_t col_idx = 0; col_idx < rb.nb_columns(); ++col_idx)
4342
{
4443
const sparrow::array& arr = rb.get_column(col_idx);
4544
const sparrow::array& first_arr = first_rb.get_column(col_idx);
46-
if (arr.data_type() != first_arr.data_type())
45+
const auto arr_data_type = arr.data_type();
46+
const auto first_arr_data_type = first_arr.data_type();
47+
if (arr_data_type != first_arr_data_type)
4748
{
4849
return false;
4950
}

src/encapsulated_message.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ namespace sparrow_ipc
9999
std::pair<encapsulated_message, std::span<const uint8_t>>
100100
extract_encapsulated_message(std::span<const uint8_t> data)
101101
{
102-
if (!data.size() || data.size() < 8)
102+
if (data.size() < 8)
103103
{
104104
throw std::invalid_argument("Buffer is too small to contain a valid message.");
105105
}

src/serialize.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,9 @@ namespace sparrow_ipc
3333
flatbuffers::Offset<flatbuffers::String> fb_name_offset = (arrow_schema.name == nullptr)
3434
? 0
3535
: builder.CreateString(arrow_schema.name);
36-
3736
const auto [type_enum, type_offset] = utils::get_flatbuffer_type(builder, arrow_schema.format);
3837
auto fb_metadata_offset = create_metadata(builder, arrow_schema);
3938
const auto children = create_children(builder, arrow_schema);
40-
4139
const auto fb_field = org::apache::arrow::flatbuf::CreateField(
4240
builder,
4341
fb_name_offset,
@@ -251,6 +249,7 @@ namespace sparrow_ipc
251249
)
252250
{
253251
flatbuffers::FlatBufferBuilder record_batch_builder;
252+
254253
auto nodes_offset = record_batch_builder.CreateVectorOfStructs(nodes);
255254
auto buffers_offset = record_batch_builder.CreateVectorOfStructs(buffers);
256255
const auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(

tests/test_deserialization_with_files.cpp

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <nlohmann/json.hpp>
88

99
#include <sparrow/record_batch.hpp>
10+
#include <sparrow/utils/format.hpp>
1011

1112
#include "sparrow/json_reader/json_parser.hpp"
1213

@@ -22,9 +23,9 @@ const std::filesystem::path tests_resources_files_path = arrow_testing_data_dir
2223

2324
const std::vector<std::filesystem::path> files_paths_to_test = {
2425
tests_resources_files_path / "generated_primitive",
25-
// tests_resources_files_path / "generated_primitive_large_offsets",
26+
tests_resources_files_path / "generated_primitive_large_offsets",
2627
tests_resources_files_path / "generated_primitive_zerolength",
27-
tests_resources_files_path / "generated_primitive_no_batches"
28+
// tests_resources_files_path / "generated_primitive_no_batches"
2829
};
2930

3031
size_t get_number_of_batches(const std::filesystem::path& json_path)
@@ -48,6 +49,33 @@ nlohmann::json load_json_file(const std::filesystem::path& json_path)
4849
return nlohmann::json::parse(json_file);
4950
}
5051

52+
void compare_record_batches(
53+
const std::vector<sparrow::record_batch>& record_batches_1,
54+
const std::vector<sparrow::record_batch>& record_batches_2
55+
)
56+
{
57+
REQUIRE_EQ(record_batches_1.size(), record_batches_2.size());
58+
for (size_t i = 0; i < record_batches_1.size(); ++i)
59+
{
60+
for (size_t y = 0; y < record_batches_1[i].nb_columns(); y++)
61+
{
62+
const auto& column_1 = record_batches_1[i].get_column(y);
63+
const auto& column_2 = record_batches_2[i].get_column(y);
64+
REQUIRE_EQ(column_1.size(), column_2.size());
65+
for (size_t z = 0; z < column_1.size(); z++)
66+
{
67+
const auto col_name = column_1.name().value_or("NA");
68+
INFO("Comparing batch " << i << ", column " << y << " named :" << col_name << " , row " << z);
69+
REQUIRE_EQ(column_1.data_type(), column_2.data_type());
70+
CHECK_EQ(column_1.name(), column_2.name());
71+
const auto& column_1_value = column_1[z];
72+
const auto& column_2_value = column_2[z];
73+
CHECK_EQ(column_1_value, column_2_value);
74+
}
75+
}
76+
}
77+
}
78+
5179
TEST_SUITE("Integration tests")
5280
{
5381
TEST_CASE("Compare stream deserialization with JSON deserialization")
@@ -90,29 +118,7 @@ TEST_SUITE("Integration tests")
90118
const auto record_batches_from_stream = sparrow_ipc::deserialize_stream(
91119
std::span<const uint8_t>(stream_data)
92120
);
93-
94-
// Compare record batches
95-
REQUIRE_EQ(record_batches_from_stream.size(), record_batches_from_json.size());
96-
for (size_t i = 0; i < record_batches_from_stream.size(); ++i)
97-
{
98-
for (size_t y = 0; y < record_batches_from_stream[i].nb_columns(); y++)
99-
{
100-
const auto& column_stream = record_batches_from_stream[i].get_column(y);
101-
const auto& column_json = record_batches_from_json[i].get_column(y);
102-
REQUIRE_EQ(column_stream.size(), column_json.size());
103-
for (size_t z = 0; z < column_json.size(); z++)
104-
{
105-
const auto col_name = column_stream.name().value_or("NA");
106-
INFO(
107-
"Comparing batch " << i << ", column " << y << " named :" << col_name
108-
<< " , row " << z
109-
);
110-
const auto& column_stream_value = column_stream[z];
111-
const auto& column_json_value = column_json[z];
112-
CHECK_EQ(column_stream_value, column_json_value);
113-
}
114-
}
115-
}
121+
compare_record_batches(record_batches_from_json, record_batches_from_stream);
116122
}
117123
}
118124
}
@@ -131,9 +137,7 @@ TEST_SUITE("Integration tests")
131137
CHECK(json_data != nullptr);
132138

133139
const size_t num_batches = get_number_of_batches(json_path);
134-
135140
std::vector<sparrow::record_batch> record_batches_from_json;
136-
137141
for (size_t batch_idx = 0; batch_idx < num_batches; ++batch_idx)
138142
{
139143
INFO("Processing batch " << batch_idx << " of " << num_batches);
@@ -153,17 +157,16 @@ TEST_SUITE("Integration tests")
153157
);
154158
stream_file.close();
155159

156-
// Serialize the record batches from JSON
157-
const auto serialized_data = sparrow_ipc::serialize(record_batches_from_json);
160+
// Process the stream file
161+
const auto record_batches_from_stream = sparrow_ipc::deserialize_stream(
162+
std::span<const uint8_t>(stream_data)
163+
);
158164

159-
// Compare the serialized data with the original stream data
160-
// CHECK_EQ(serialized_data, stream_data);
161-
// REQUIRE_EQ(serialized_data.size(), stream_data.size());
162-
for (size_t i = 0; i < std::min(serialized_data.size(), stream_data.size()); ++i)
163-
{
164-
INFO("Comparing byte " << i << " of " << serialized_data.size());
165-
CHECK_EQ(serialized_data[i], stream_data[i]);
166-
}
165+
const auto serialized_data = sparrow_ipc::serialize(record_batches_from_json);
166+
const auto deserialized_serialized_data = sparrow_ipc::deserialize_stream(
167+
std::span<const uint8_t>(serialized_data)
168+
);
169+
compare_record_batches(record_batches_from_stream, deserialized_serialized_data);
167170
}
168171
}
169172
}

0 commit comments

Comments
 (0)