diff --git a/CMakeLists.txt b/CMakeLists.txt index 02d2f1e..a2a704b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -114,6 +114,7 @@ set(SPARROW_IPC_HEADERS ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserializer.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/flatbuffer_utils.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp diff --git a/include/sparrow_ipc/deserializer.hpp b/include/sparrow_ipc/deserializer.hpp new file mode 100644 index 0000000..28738b4 --- /dev/null +++ b/include/sparrow_ipc/deserializer.hpp @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include +#include + +#include + +#include "deserialize.hpp" +#include "sparrow_ipc/deserialize.hpp" + +namespace sparrow_ipc +{ + template + requires std::same_as, sparrow::record_batch> + class deserializer + { + public: + + deserializer(R& data) + : m_data(&data) + { + } + + void deserialize(std::span data) + { + // Insert at the end of m_data container the deserialized record batches + auto& container = *m_data; + auto deserialized_batches = sparrow_ipc::deserialize_stream(data); + container.insert( + std::end(container), + std::make_move_iterator(std::begin(deserialized_batches)), + std::make_move_iterator(std::end(deserialized_batches)) + ); + } + + deserializer& operator<<(std::span data) + { + deserialize(data); + return *this; + } + + private: + + R* m_data; + }; +} diff --git a/src/deserialize.cpp b/src/deserialize.cpp index 5779ca9..62b76d5 100644 --- a/src/deserialize.cpp +++ b/src/deserialize.cpp @@ -52,8 +52,13 @@ namespace sparrow_ipc const size_t length = static_cast(record_batch.length()); size_t buffer_index = 0; + const size_t num_fields = schema.fields() == nullptr ? 0 : static_cast(schema.fields()->size()); std::vector arrays; - arrays.reserve(schema.fields()->size()); + if (num_fields == 0) + { + return arrays; + } + arrays.reserve(num_fields); size_t field_idx = 0; for (const auto field : *(schema.fields())) { @@ -215,18 +220,24 @@ namespace sparrow_ipc case org::apache::arrow::flatbuf::MessageHeader::Schema: { schema = message->header_as_Schema(); - const size_t size = static_cast(schema->fields()->size()); + const size_t size = schema->fields() == nullptr + ? 0 + : static_cast(schema->fields()->size()); field_names.reserve(size); fields_nullable.reserve(size); fields_metadata.reserve(size); - + if (schema->fields() == nullptr) + { + break; + } for (const auto field : *(schema->fields())) { - if(field != nullptr && field->name() != nullptr) + if (field != nullptr && field->name() != nullptr) { - field_names.emplace_back(field->name()->str()); + field_names.emplace_back(field->name()->str()); } - else { + else + { field_names.emplace_back("_unnamed_"); } fields_nullable.push_back(field->nullable()); @@ -257,7 +268,8 @@ namespace sparrow_ipc encapsulated_message, fields_metadata ); - auto names_copy = field_names; // TODO: Remove when issue with the to_vector of record_batch is fixed + auto names_copy = field_names; // TODO: Remove when issue with the to_vector of + // record_batch is fixed sparrow::record_batch sp_record_batch(std::move(names_copy), std::move(arrays)); record_batches.emplace_back(std::move(sp_record_batch)); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 11c2f9f..bff0af2 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -11,6 +11,7 @@ set(SPARROW_IPC_TESTS_SRC test_chunk_memory_output_stream.cpp test_chunk_memory_serializer.cpp test_de_serialization_with_files.cpp + test_deserializer.cpp $<$>:test_flatbuffer_utils.cpp> test_memory_output_streams.cpp test_serialize_utils.cpp diff --git a/tests/test_deserializer.cpp b/tests/test_deserializer.cpp new file mode 100644 index 0000000..b12176c --- /dev/null +++ b/tests/test_deserializer.cpp @@ -0,0 +1,496 @@ +#include +#include +#include +#include + +#include +#include + +#include "sparrow_ipc/deserializer.hpp" +#include "sparrow_ipc/memory_output_stream.hpp" +#include "sparrow_ipc/serializer.hpp" +#include "sparrow_ipc_tests_helpers.hpp" + +namespace sparrow_ipc +{ + namespace sp = sparrow; + + // Helper function to serialize record batches to a byte buffer + std::vector serialize_record_batches(const std::vector& batches) + { + std::vector buffer; + memory_output_stream stream(buffer); + serializer ser(stream); + ser << batches << end_stream; + return buffer; + } + + // Helper function to create multiple compatible record batches + std::vector create_test_record_batches(size_t count) + { + std::vector batches; + for (size_t i = 0; i < count; ++i) + { + auto int_array = sp::primitive_array({static_cast(i * 10), + static_cast(i * 10 + 1), + static_cast(i * 10 + 2)}); + auto string_array = sp::string_array( + std::vector{"batch_" + std::to_string(i) + "_a", + "batch_" + std::to_string(i) + "_b", + "batch_" + std::to_string(i) + "_c"} + ); + batches.push_back(sp::record_batch({{"int_col", sp::array(std::move(int_array))}, + {"string_col", sp::array(std::move(string_array))}})); + } + return batches; + } + + TEST_SUITE("deserializer") + { + TEST_CASE("construction with empty vector") + { + SUBCASE("Construct with empty vector reference") + { + std::vector batches; + deserializer deser(batches); + CHECK_EQ(batches.size(), 0); + } + } + + TEST_CASE("deserialize single record batch") + { + SUBCASE("Deserialize one batch into empty vector") + { + std::vector batches; + deserializer deser(batches); + + // Create and serialize a single record batch + auto original_batch = create_test_record_batch(); + auto serialized_data = serialize_record_batches({original_batch}); + + // Deserialize + deser.deserialize(std::span(serialized_data)); + + // Verify + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), original_batch.nb_columns()); + CHECK_EQ(batches[0].nb_rows(), original_batch.nb_rows()); + } + + SUBCASE("Deserialize batch with different data types") + { + std::vector batches; + deserializer deser(batches); + + auto int_array = sp::primitive_array({1, 2, 3}); + auto double_array = sp::primitive_array({1.5, 2.5, 3.5}); + auto float_array = sp::primitive_array({1.0f, 2.0f, 3.0f}); + + auto rb = sp::record_batch({{"int_col", sp::array(std::move(int_array))}, + {"double_col", sp::array(std::move(double_array))}, + {"float_col", sp::array(std::move(float_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 3); + CHECK_EQ(batches[0].nb_rows(), 3); + } + + SUBCASE("Deserialize empty record batch") + { + std::vector batches; + deserializer deser(batches); + + auto empty_batch = sp::record_batch({}); + auto serialized_data = serialize_record_batches({empty_batch}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 0); + } + } + + TEST_CASE("deserialize multiple record batches") + { + SUBCASE("Deserialize multiple batches at once") + { + std::vector batches; + deserializer deser(batches); + + // Create multiple compatible batches + auto original_batches = create_test_record_batches(3); + auto serialized_data = serialize_record_batches(original_batches); + + // Deserialize + deser.deserialize(std::span(serialized_data)); + + // Verify + REQUIRE_EQ(batches.size(), 3); + for (size_t i = 0; i < batches.size(); ++i) + { + CHECK_EQ(batches[i].nb_columns(), original_batches[i].nb_columns()); + CHECK_EQ(batches[i].nb_rows(), original_batches[i].nb_rows()); + } + } + + SUBCASE("Deserialize large number of batches") + { + std::vector batches; + deserializer deser(batches); + + const size_t num_batches = 100; + auto original_batches = create_test_record_batches(num_batches); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), num_batches); + } + } + + TEST_CASE("incremental deserialization") + { + SUBCASE("Deserialize in multiple calls") + { + std::vector batches; + deserializer deser(batches); + + // First deserialization + auto batch1 = create_test_record_batches(2); + auto serialized_data1 = serialize_record_batches(batch1); + deser.deserialize(std::span(serialized_data1)); + + CHECK_EQ(batches.size(), 2); + + // Second deserialization - should append to existing batches + auto batch2 = create_test_record_batches(3); + auto serialized_data2 = serialize_record_batches(batch2); + deser.deserialize(std::span(serialized_data2)); + + CHECK_EQ(batches.size(), 5); + } + + SUBCASE("Multiple incremental deserializations") + { + std::vector batches; + deserializer deser(batches); + + for (size_t i = 0; i < 5; ++i) + { + auto new_batches = create_test_record_batches(2); + auto serialized_data = serialize_record_batches(new_batches); + deser.deserialize(std::span(serialized_data)); + + CHECK_EQ(batches.size(), (i + 1) * 2); + } + } + + SUBCASE("Deserialize into non-empty vector") + { + // Start with existing batches + std::vector batches = {create_test_record_batch()}; + CHECK_EQ(batches.size(), 1); + + deserializer deser(batches); + + // Add more batches + auto new_batches = create_test_record_batches(2); + auto serialized_data = serialize_record_batches(new_batches); + deser.deserialize(std::span(serialized_data)); + + CHECK_EQ(batches.size(), 3); + } + } + + TEST_CASE("operator<< for deserialization") + { + SUBCASE("Single deserialization with <<") + { + std::vector batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(1); + auto serialized_data = serialize_record_batches(original_batches); + + deser << std::span(serialized_data); + + REQUIRE_EQ(batches.size(), 1); + } + + SUBCASE("Chain multiple deserializations with <<") + { + std::vector batches; + deserializer deser(batches); + + auto batch1 = create_test_record_batches(1); + auto serialized_data1 = serialize_record_batches(batch1); + + auto batch2 = create_test_record_batches(2); + auto serialized_data2 = serialize_record_batches(batch2); + + auto batch3 = create_test_record_batches(1); + auto serialized_data3 = serialize_record_batches(batch3); + + deser << std::span(serialized_data1) + << std::span(serialized_data2) + << std::span(serialized_data3); + + CHECK_EQ(batches.size(), 4); + } + + SUBCASE("Mix deserialization methods") + { + std::vector batches; + deserializer deser(batches); + + auto batch1 = create_test_record_batches(1); + auto serialized_data1 = serialize_record_batches(batch1); + deser.deserialize(std::span(serialized_data1)); + + CHECK_EQ(batches.size(), 1); + + auto batch2 = create_test_record_batches(2); + auto serialized_data2 = serialize_record_batches(batch2); + deser << std::span(serialized_data2); + + CHECK_EQ(batches.size(), 3); + } + } + + TEST_CASE("deserialize with different container types") + { + SUBCASE("std::deque") + { + std::deque batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(2); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 2); + } + + SUBCASE("std::list") + { + std::list batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(3); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 3); + } + } + + TEST_CASE("round-trip serialization and deserialization") + { + SUBCASE("Single batch round-trip") + { + auto original_batch = create_test_record_batch(); + auto serialized_data = serialize_record_batches({original_batch}); + + std::vector deserialized_batches; + deserializer deser(deserialized_batches); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(deserialized_batches.size(), 1); + CHECK_EQ(deserialized_batches[0].nb_columns(), original_batch.nb_columns()); + CHECK_EQ(deserialized_batches[0].nb_rows(), original_batch.nb_rows()); + + // Verify column names match + for (size_t i = 0; i < original_batch.nb_columns(); ++i) + { + CHECK_EQ(deserialized_batches[0].names()[i], original_batch.names()[i]); + } + } + + SUBCASE("Multiple batches round-trip") + { + auto original_batches = create_test_record_batches(5); + auto serialized_data = serialize_record_batches(original_batches); + + std::vector deserialized_batches; + deserializer deser(deserialized_batches); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(deserialized_batches.size(), original_batches.size()); + + for (size_t i = 0; i < original_batches.size(); ++i) + { + CHECK_EQ(deserialized_batches[i].nb_columns(), original_batches[i].nb_columns()); + CHECK_EQ(deserialized_batches[i].nb_rows(), original_batches[i].nb_rows()); + } + } + + SUBCASE("Double round-trip") + { + // First round-trip + auto original_batches = create_test_record_batches(2); + auto serialized_data1 = serialize_record_batches(original_batches); + + std::vector deserialized_batches1; + deserializer deser1(deserialized_batches1); + deser1.deserialize(std::span(serialized_data1)); + + // Second round-trip + auto serialized_data2 = serialize_record_batches(deserialized_batches1); + + std::vector deserialized_batches2; + deserializer deser2(deserialized_batches2); + deser2.deserialize(std::span(serialized_data2)); + + // Verify both results match + REQUIRE_EQ(deserialized_batches2.size(), original_batches.size()); + for (size_t i = 0; i < original_batches.size(); ++i) + { + CHECK_EQ(deserialized_batches2[i].nb_columns(), original_batches[i].nb_columns()); + CHECK_EQ(deserialized_batches2[i].nb_rows(), original_batches[i].nb_rows()); + } + } + } + + TEST_CASE("deserialize with complex data types") + { + SUBCASE("Mixed primitive types") + { + std::vector batches; + deserializer deser(batches); + + auto int8_array = sp::primitive_array({1, 2, 3}); + auto int16_array = sp::primitive_array({100, 200, 300}); + auto int32_array = sp::primitive_array({1000, 2000, 3000}); + auto int64_array = sp::primitive_array({10000, 20000, 30000}); + + auto rb = sp::record_batch({{"int8_col", sp::array(std::move(int8_array))}, + {"int16_col", sp::array(std::move(int16_array))}, + {"int32_col", sp::array(std::move(int32_array))}, + {"int64_col", sp::array(std::move(int64_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 4); + } + + SUBCASE("String arrays") + { + std::vector batches; + deserializer deser(batches); + + auto string_array = sp::string_array( + std::vector{"hello", "world", "test", "data"} + ); + auto rb = sp::record_batch({{"string_col", sp::array(std::move(string_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_rows(), 4); + } + } + + TEST_CASE("edge cases") + { + SUBCASE("Deserialize empty data") + { + std::vector batches; + deserializer deser(batches); + + // Create an empty batch + auto empty_batch = sp::record_batch({}); + auto serialized_data = serialize_record_batches({empty_batch}); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 0); + } + + SUBCASE("Very large batch") + { + std::vector batches; + deserializer deser(batches); + + // Create a large array + std::vector large_data(10000); + std::iota(large_data.begin(), large_data.end(), 0); + auto large_array = sp::primitive_array(large_data); + auto rb = sp::record_batch({{"large_col", sp::array(std::move(large_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_rows(), 10000); + } + + SUBCASE("Single row batches") + { + std::vector batches; + deserializer deser(batches); + + auto int_array = sp::primitive_array({42}); + auto string_array = sp::string_array(std::vector{"single"}); + auto rb = sp::record_batch({{"int_col", sp::array(std::move(int_array))}, + {"string_col", sp::array(std::move(string_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_rows(), 1); + } + } + + TEST_CASE("workflow example") + { + SUBCASE("Typical streaming deserialization workflow") + { + std::vector batches; + deserializer deser(batches); + + // Simulate receiving data in chunks + for (size_t i = 0; i < 3; ++i) + { + auto chunk_batches = create_test_record_batches(2); + auto serialized_chunk = serialize_record_batches(chunk_batches); + deser << std::span(serialized_chunk); + } + + // Verify all batches accumulated + CHECK_EQ(batches.size(), 6); + + // Add one more batch using deserialize method + auto final_batch = create_test_record_batches(1); + auto serialized_final = serialize_record_batches(final_batch); + deser.deserialize(std::span(serialized_final)); + + CHECK_EQ(batches.size(), 7); + } + } + + TEST_CASE("deserializer with const container") + { + SUBCASE("Works with non-const reference") + { + std::vector batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(1); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + CHECK_EQ(batches.size(), 1); + } + } + } +}