Skip to content

Commit 812f4b1

Browse files
committed
Add more tests
1 parent c3dbe23 commit 812f4b1

File tree

6 files changed

+149
-75
lines changed

6 files changed

+149
-75
lines changed

include/sparrow_ipc/chunk_memory_serializer.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ namespace sparrow_ipc
4343
* @param stream Reference to a chunked memory output stream that will receive the serialized chunks
4444
* @param compression Optional: The compression type to use for record batch bodies.
4545
*/
46-
// TODO add tests with compression
4746
chunk_serializer(chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream, std::optional<CompressionType> compression = std::nullopt);
4847

4948
/**

tests/include/sparrow_ipc_tests_helpers.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,15 @@ namespace sparrow_ipc
7171
sp::array(sp::string_array(std::vector<std::string>{"hello", "world", "test", "data", "batch"}))}}
7272
);
7373
}
74+
75+
// Helper function to create a compressible record batch for testing
76+
inline sp::record_batch create_compressible_test_record_batch()
77+
{
78+
std::vector<int32_t> int_data(1000, 12345);
79+
std::vector<std::string> string_data(1000, "hello world");
80+
return sp::record_batch(
81+
{{"int_col", sp::array(sp::primitive_array<int32_t>(int_data))},
82+
{"string_col", sp::array(sp::string_array(string_data))}}
83+
);
84+
}
7485
}

tests/test_chunk_memory_serializer.cpp

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,37 @@ namespace sparrow_ipc
1515
{
1616
TEST_CASE("construction with single record batch")
1717
{
18-
SUBCASE("Valid record batch")
18+
SUBCASE("Valid record batch, with and without compression")
1919
{
20-
auto rb = create_test_record_batch();
21-
std::vector<std::vector<uint8_t>> chunks;
22-
chunked_memory_output_stream stream(chunks);
20+
auto rb = create_compressible_test_record_batch();
21+
std::vector<std::vector<uint8_t>> chunks_compressed;
22+
chunked_memory_output_stream stream_compressed(chunks_compressed);
2323

24-
chunk_serializer serializer(stream);
25-
serializer << rb;
24+
chunk_serializer serializer_compressed(stream_compressed, CompressionType::LZ4_FRAME);
25+
serializer_compressed << rb;
2626

2727
// After construction with single record batch, should have schema + record batch
28-
CHECK_EQ(chunks.size(), 2);
29-
CHECK_GT(chunks[0].size(), 0); // Schema message
30-
CHECK_GT(chunks[1].size(), 0); // Record batch message
31-
CHECK_GT(stream.size(), 0);
28+
CHECK_EQ(chunks_compressed.size(), 2);
29+
CHECK_GT(chunks_compressed[0].size(), 0); // Schema message
30+
CHECK_GT(chunks_compressed[1].size(), 0); // Record batch message
31+
CHECK_GT(stream_compressed.size(), 0);
32+
33+
std::vector<std::vector<uint8_t>> chunks_uncompressed;
34+
chunked_memory_output_stream stream_uncompressed(chunks_uncompressed);
35+
36+
chunk_serializer serializer_uncompressed(stream_uncompressed);
37+
serializer_uncompressed << rb;
38+
39+
CHECK_EQ(chunks_uncompressed.size(), 2);
40+
CHECK_GT(chunks_uncompressed[0].size(), 0); // Schema message
41+
CHECK_GT(chunks_uncompressed[1].size(), 0); // Record batch message
42+
CHECK_GT(stream_uncompressed.size(), 0);
43+
44+
// Check that schema size is the same
45+
CHECK_EQ(chunks_compressed[0].size(), chunks_uncompressed[0].size());
46+
47+
// Check that compressed record batch is smaller
48+
CHECK_LT(chunks_compressed[1].size(), chunks_uncompressed[1].size());
3249
}
3350

3451
SUBCASE("Empty record batch")

tests/test_flatbuffer_utils.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -558,12 +558,22 @@ namespace sparrow_ipc
558558

559559
TEST_CASE("get_record_batch_message_builder")
560560
{
561-
SUBCASE("Valid record batch with field nodes and buffers")
561+
auto test_get_record_batch_message_builder = [](std::optional<CompressionType> compression)
562562
{
563563
auto record_batch = create_test_record_batch();
564-
auto builder = get_record_batch_message_builder(record_batch);
564+
auto builder = get_record_batch_message_builder(record_batch, compression);
565565
CHECK_GT(builder.GetSize(), 0);
566566
CHECK_NE(builder.GetBufferPointer(), nullptr);
567+
};
568+
569+
SUBCASE("Valid record batch with field nodes and buffers (Without compression)")
570+
{
571+
test_get_record_batch_message_builder(std::nullopt);
572+
}
573+
574+
SUBCASE("Valid record batch with field nodes and buffers (With compression)")
575+
{
576+
test_get_record_batch_message_builder(CompressionType::LZ4_FRAME);
567577
}
568578
}
569579
}

tests/test_serialize_utils.cpp

Lines changed: 86 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ namespace sparrow_ipc
4141
}
4242
}
4343

44-
// TODO after the used fcts are stable regarding compression, add tests for fcts having it as an additional argument
45-
// cf. fill_body example
4644
TEST_CASE("fill_body")
4745
{
4846
SUBCASE("Simple primitive array (uncompressed)")
@@ -88,33 +86,50 @@ namespace sparrow_ipc
8886

8987
TEST_CASE("generate_body")
9088
{
91-
SUBCASE("Record batch with multiple columns")
89+
auto record_batch = create_test_record_batch();
90+
SUBCASE("Record batch with multiple columns (uncompressed)")
9291
{
93-
auto record_batch = create_test_record_batch();
9492
std::vector<uint8_t> serialized;
9593
memory_output_stream stream(serialized);
9694
any_output_stream astream(stream);
97-
generate_body(record_batch, astream);
95+
generate_body(record_batch, astream, std::nullopt);
96+
CHECK_GT(serialized.size(), 0);
97+
CHECK_EQ(serialized.size() % 8, 0);
98+
}
99+
100+
SUBCASE("Record batch with multiple columns (compressed)")
101+
{
102+
std::vector<uint8_t> serialized;
103+
memory_output_stream stream(serialized);
104+
any_output_stream astream(stream);
105+
generate_body(record_batch, astream, CompressionType::LZ4_FRAME);
98106
CHECK_GT(serialized.size(), 0);
99107
CHECK_EQ(serialized.size() % 8, 0);
100108
}
101109
}
102110

103111
TEST_CASE("calculate_body_size")
104112
{
105-
SUBCASE("Single array")
106-
{
107-
auto array = sp::primitive_array<int32_t>({1, 2, 3, 4, 5});
108-
auto proxy = sp::detail::array_access::get_arrow_proxy(array);
113+
auto array = sp::primitive_array<int32_t>({1, 2, 3, 4, 5});
114+
auto proxy = sp::detail::array_access::get_arrow_proxy(array);
109115

116+
SUBCASE("Single array (uncompressed)")
117+
{
110118
auto size = calculate_body_size(proxy);
111119
CHECK_GT(size, 0);
112120
CHECK_EQ(size % 8, 0);
113121
}
114122

115-
SUBCASE("Record batch")
123+
SUBCASE("Single array (compressed)")
124+
{
125+
auto size = calculate_body_size(proxy, CompressionType::LZ4_FRAME);
126+
CHECK_GT(size, 0);
127+
CHECK_EQ(size % 8, 0);
128+
}
129+
130+
auto record_batch = create_test_record_batch();
131+
SUBCASE("Record batch (uncompressed)")
116132
{
117-
auto record_batch = create_test_record_batch();
118133
auto size = calculate_body_size(record_batch);
119134
CHECK_GT(size, 0);
120135
CHECK_EQ(size % 8, 0);
@@ -124,6 +139,18 @@ namespace sparrow_ipc
124139
generate_body(record_batch, astream);
125140
CHECK_EQ(size, static_cast<int64_t>(serialized.size()));
126141
}
142+
143+
SUBCASE("Record batch (compressed)")
144+
{
145+
auto size = calculate_body_size(record_batch, CompressionType::LZ4_FRAME);
146+
CHECK_GT(size, 0);
147+
CHECK_EQ(size % 8, 0);
148+
std::vector<uint8_t> serialized;
149+
memory_output_stream stream(serialized);
150+
any_output_stream astream(stream);
151+
generate_body(record_batch, astream, CompressionType::LZ4_FRAME);
152+
CHECK_EQ(size, static_cast<int64_t>(serialized.size()));
153+
}
127154
}
128155

129156
TEST_CASE("calculate_schema_message_size")
@@ -165,55 +192,59 @@ namespace sparrow_ipc
165192

166193
TEST_CASE("calculate_record_batch_message_size")
167194
{
168-
SUBCASE("Single column record batch")
195+
auto test_calculate_record_batch_message_size = [](const sp::record_batch& record_batch, std::optional<CompressionType> compression)
169196
{
170-
auto array = sp::primitive_array<int32_t>({1, 2, 3, 4, 5});
171-
auto record_batch = sp::record_batch({{"column1", sp::array(std::move(array))}});
172-
173-
auto estimated_size = calculate_record_batch_message_size(record_batch);
197+
auto estimated_size = calculate_record_batch_message_size(record_batch, compression);
174198
CHECK_GT(estimated_size, 0);
175199
CHECK_EQ(estimated_size % 8, 0);
176200

177201
std::vector<uint8_t> serialized;
178202
memory_output_stream stream(serialized);
179203
any_output_stream astream(stream);
180-
serialize_record_batch(record_batch, astream, std::nullopt);
204+
serialize_record_batch(record_batch, astream, compression);
181205

182206
CHECK_EQ(estimated_size, serialized.size());
207+
};
208+
209+
SUBCASE("Single column record batch")
210+
{
211+
auto array = sp::primitive_array<int32_t>({1, 2, 3, 4, 5});
212+
auto record_batch = sp::record_batch({{"column1", sp::array(std::move(array))}});
213+
test_calculate_record_batch_message_size(record_batch, std::nullopt);
214+
test_calculate_record_batch_message_size(record_batch, CompressionType::LZ4_FRAME);
183215
}
184216

185217
SUBCASE("Multi-column record batch")
186218
{
187219
auto record_batch = create_test_record_batch();
188-
189-
auto estimated_size = calculate_record_batch_message_size(record_batch);
190-
CHECK_GT(estimated_size, 0);
191-
CHECK_EQ(estimated_size % 8, 0);
192-
193-
// Verify by actual serialization
194-
std::vector<uint8_t> serialized;
195-
memory_output_stream stream(serialized);
196-
any_output_stream astream(stream);
197-
serialize_record_batch(record_batch, astream, std::nullopt);
198-
199-
CHECK_EQ(estimated_size, serialized.size());
220+
test_calculate_record_batch_message_size(record_batch, std::nullopt);
221+
test_calculate_record_batch_message_size(record_batch, CompressionType::LZ4_FRAME);
200222
}
201223
}
202224

203225
TEST_CASE("calculate_total_serialized_size")
204226
{
227+
auto test_calculate_total_serialized_size = [](const std::vector<sp::record_batch>& batches, std::optional<CompressionType> compression)
228+
{
229+
auto estimated_size = calculate_total_serialized_size(batches, compression);
230+
CHECK_GT(estimated_size, 0);
231+
232+
// Should be equal to schema size + sum of record batch sizes
233+
auto schema_size = calculate_schema_message_size(batches[0]);
234+
int64_t batches_size = 0;
235+
for(const auto& batch : batches)
236+
{
237+
batches_size += calculate_record_batch_message_size(batch, compression);
238+
}
239+
CHECK_EQ(estimated_size, schema_size + batches_size);
240+
};
241+
205242
SUBCASE("Single record batch")
206243
{
207244
auto record_batch = create_test_record_batch();
208245
std::vector<sp::record_batch> batches = {record_batch};
209-
210-
auto estimated_size = calculate_total_serialized_size(batches);
211-
CHECK_GT(estimated_size, 0);
212-
213-
// Should equal schema size + record batch size
214-
auto schema_size = calculate_schema_message_size(record_batch);
215-
auto batch_size = calculate_record_batch_message_size(record_batch);
216-
CHECK_EQ(estimated_size, schema_size + batch_size);
246+
test_calculate_total_serialized_size(batches, std::nullopt);
247+
test_calculate_total_serialized_size(batches, CompressionType::LZ4_FRAME);
217248
}
218249

219250
SUBCASE("Multiple record batches")
@@ -231,15 +262,8 @@ namespace sparrow_ipc
231262
);
232263

233264
std::vector<sp::record_batch> batches = {record_batch1, record_batch2};
234-
235-
auto estimated_size = calculate_total_serialized_size(batches);
236-
CHECK_GT(estimated_size, 0);
237-
238-
// Should equal schema size + sum of record batch sizes
239-
auto schema_size = calculate_schema_message_size(batches[0]);
240-
auto batch1_size = calculate_record_batch_message_size(batches[0]);
241-
auto batch2_size = calculate_record_batch_message_size(batches[1]);
242-
CHECK_EQ(estimated_size, schema_size + batch1_size + batch2_size);
265+
test_calculate_total_serialized_size(batches, std::nullopt);
266+
test_calculate_total_serialized_size(batches, CompressionType::LZ4_FRAME);
243267
}
244268

245269
SUBCASE("Empty collection")
@@ -262,19 +286,19 @@ namespace sparrow_ipc
262286
std::vector<sp::record_batch> batches = {record_batch1, record_batch2};
263287

264288
CHECK_THROWS_AS(auto size = calculate_total_serialized_size(batches), std::invalid_argument);
289+
CHECK_THROWS_AS(auto size = calculate_total_serialized_size(batches, CompressionType::LZ4_FRAME), std::invalid_argument);
265290
}
266291
}
267292

268293
TEST_CASE("serialize_record_batch")
269294
{
270-
SUBCASE("Valid record batch")
295+
auto test_serialize_record_batch = [](const sp::record_batch& record_batch_to_serialize, std::optional<CompressionType> compression)
271296
{
272-
auto record_batch = create_test_record_batch();
273297
std::vector<uint8_t> serialized;
274298
memory_output_stream stream(serialized);
275299
any_output_stream astream(stream);
276-
serialize_record_batch(record_batch, astream, std::nullopt);
277-
CHECK_GT(serialized.size(), 0);
300+
serialize_record_batch(record_batch_to_serialize, astream, compression);
301+
CHECK_GT(serialized.size(), 0);
278302

279303
// Check that it starts with continuation bytes
280304
CHECK_GE(serialized.size(), continuation.size());
@@ -300,17 +324,23 @@ namespace sparrow_ipc
300324
// Verify alignment
301325
CHECK_EQ(aligned_metadata_end % 8, 0);
302326
CHECK_LE(aligned_metadata_end, serialized.size());
327+
328+
return serialized.size();
329+
};
330+
331+
SUBCASE("Valid record batch")
332+
{
333+
auto record_batch = create_compressible_test_record_batch();
334+
auto compressed_size = test_serialize_record_batch(record_batch, CompressionType::LZ4_FRAME);
335+
auto uncompressed_size = test_serialize_record_batch(record_batch, std::nullopt);
336+
CHECK_LT(compressed_size, uncompressed_size);
303337
}
304338

305339
SUBCASE("Empty record batch")
306340
{
307341
auto empty_batch = sp::record_batch({});
308-
std::vector<uint8_t> serialized;
309-
memory_output_stream stream(serialized);
310-
any_output_stream astream(stream);
311-
serialize_record_batch(empty_batch, astream, std::nullopt);
312-
CHECK_GT(serialized.size(), 0);
313-
CHECK_GE(serialized.size(), continuation.size());
342+
test_serialize_record_batch(empty_batch, std::nullopt);
343+
test_serialize_record_batch(empty_batch, CompressionType::LZ4_FRAME);
314344
}
315345
}
316346
}

tests/test_serializer.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,22 @@ namespace sparrow_ipc
3535
{
3636
TEST_CASE_TEMPLATE("construction and write single record batch", StreamWrapper, memory_stream_wrapper, ostringstream_wrapper)
3737
{
38-
SUBCASE("Valid record batch")
38+
SUBCASE("Valid record batch, with and without compression")
3939
{
40-
auto rb = create_test_record_batch();
41-
StreamWrapper wrapper;
42-
serializer ser(wrapper.get_stream());
43-
ser.write(rb);
40+
auto rb = create_compressible_test_record_batch();
41+
StreamWrapper wrapper_compressed;
42+
serializer ser_compressed(wrapper_compressed.get_stream(), CompressionType::LZ4_FRAME);
43+
ser_compressed.write(rb);
4444

4545
// After writing first record batch, should have schema + record batch
46-
CHECK_GT(wrapper.size(), 0);
46+
CHECK_GT(wrapper_compressed.size(), 0);
47+
48+
StreamWrapper wrapper_uncompressed;
49+
serializer ser_uncompressed(wrapper_uncompressed.get_stream());
50+
ser_uncompressed.write(rb);
51+
CHECK_GT(wrapper_uncompressed.size(), 0);
52+
53+
CHECK_LT(wrapper_compressed.size(), wrapper_uncompressed.size());
4754
}
4855

4956
SUBCASE("Empty record batch")

0 commit comments

Comments
 (0)