Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ set(SPARROW_IPC_HEADERS
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserializer.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/flatbuffer_utils.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp
Expand Down
48 changes: 48 additions & 0 deletions include/sparrow_ipc/deserializer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <cstddef>
#include <iterator>
#include <numeric>
#include <ranges>

#include <sparrow/record_batch.hpp>

#include "deserialize.hpp"
#include "sparrow_ipc/deserialize.hpp"

namespace sparrow_ipc
{
template <std::ranges::input_range R>
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
class deserializer
{
public:

deserializer(R& data)
: m_data(&data)
{
}

void deserialize(std::span<const uint8_t> data)
{
// Insert at the end of m_data container the deserialized record batches
auto& container = *m_data;
auto deserialized_batches = sparrow_ipc::deserialize_stream(data);
container.insert(
std::end(container),
std::make_move_iterator(std::begin(deserialized_batches)),
std::make_move_iterator(std::end(deserialized_batches))
);
}

deserializer& operator<<(std::span<const uint8_t> data)
{
deserialize(data);
return *this;
}

private:

R* m_data;
};
}
26 changes: 19 additions & 7 deletions src/deserialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,13 @@ namespace sparrow_ipc
const size_t length = static_cast<size_t>(record_batch.length());
size_t buffer_index = 0;

const size_t num_fields = schema.fields() == nullptr ? 0 : static_cast<size_t>(schema.fields()->size());
std::vector<sparrow::array> arrays;
arrays.reserve(schema.fields()->size());
if (num_fields == 0)
{
return arrays;
}
arrays.reserve(num_fields);
size_t field_idx = 0;
for (const auto field : *(schema.fields()))
{
Expand Down Expand Up @@ -215,18 +220,24 @@ namespace sparrow_ipc
case org::apache::arrow::flatbuf::MessageHeader::Schema:
{
schema = message->header_as_Schema();
const size_t size = static_cast<size_t>(schema->fields()->size());
const size_t size = schema->fields() == nullptr
? 0
: static_cast<size_t>(schema->fields()->size());
field_names.reserve(size);
fields_nullable.reserve(size);
fields_metadata.reserve(size);

if (schema->fields() == nullptr)
{
break;
}
for (const auto field : *(schema->fields()))
{
if(field != nullptr && field->name() != nullptr)
if (field != nullptr && field->name() != nullptr)
{
field_names.emplace_back(field->name()->str());
field_names.emplace_back(field->name()->str());
}
else {
else
{
field_names.emplace_back("_unnamed_");
}
fields_nullable.push_back(field->nullable());
Expand Down Expand Up @@ -257,7 +268,8 @@ namespace sparrow_ipc
encapsulated_message,
fields_metadata
);
auto names_copy = field_names; // TODO: Remove when issue with the to_vector of record_batch is fixed
auto names_copy = field_names; // TODO: Remove when issue with the to_vector of
// record_batch is fixed
sparrow::record_batch sp_record_batch(std::move(names_copy), std::move(arrays));
record_batches.emplace_back(std::move(sp_record_batch));
}
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(SPARROW_IPC_TESTS_SRC
test_chunk_memory_output_stream.cpp
test_chunk_memory_serializer.cpp
test_de_serialization_with_files.cpp
test_deserializer.cpp
$<$<NOT:$<BOOL:${SPARROW_IPC_BUILD_SHARED}>>:test_flatbuffer_utils.cpp>
test_memory_output_streams.cpp
test_serialize_utils.cpp
Expand Down
Loading
Loading