Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ set(SPARROW_IPC_HEADERS
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserializer.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/flatbuffer_utils.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp
Expand Down
48 changes: 48 additions & 0 deletions include/sparrow_ipc/deserializer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <cstddef>
#include <iterator>
#include <numeric>
#include <ranges>

#include <sparrow/record_batch.hpp>

#include "deserialize.hpp"
#include "sparrow_ipc/deserialize.hpp"

namespace sparrow_ipc
{
template <std::ranges::input_range R>
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
class deserializer
{
public:

deserializer(R& data)
: m_data(&data)
{
}

void deserialize(std::span<const uint8_t> data)
{
// Insert at the end of m_data container the deserialized record batches
auto& container = *m_data;
auto deserialized_batches = sparrow_ipc::deserialize_stream(data);
container.insert(
std::end(container),
std::make_move_iterator(std::begin(deserialized_batches)),
std::make_move_iterator(std::end(deserialized_batches))
);
}

deserializer& operator<<(std::span<const uint8_t> data)
{
deserialize(data);
return *this;
}

private:

R* m_data;
};
}
26 changes: 19 additions & 7 deletions src/deserialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,13 @@ namespace sparrow_ipc
const size_t length = static_cast<size_t>(record_batch.length());
size_t buffer_index = 0;

const size_t num_fields = schema.fields() == nullptr ? 0 : static_cast<size_t>(schema.fields()->size());
std::vector<sparrow::array> arrays;
arrays.reserve(schema.fields()->size());
if (num_fields == 0)
{
return arrays;
}
arrays.reserve(num_fields);
size_t field_idx = 0;
for (const auto field : *(schema.fields()))
{
Expand Down Expand Up @@ -215,18 +220,24 @@ namespace sparrow_ipc
case org::apache::arrow::flatbuf::MessageHeader::Schema:
{
schema = message->header_as_Schema();
const size_t size = static_cast<size_t>(schema->fields()->size());
const size_t size = schema->fields() == nullptr
? 0
: static_cast<size_t>(schema->fields()->size());
field_names.reserve(size);
fields_nullable.reserve(size);
fields_metadata.reserve(size);

if (schema->fields() == nullptr)
{
break;
}
for (const auto field : *(schema->fields()))
{
if(field != nullptr && field->name() != nullptr)
if (field != nullptr && field->name() != nullptr)
{
field_names.emplace_back(field->name()->str());
field_names.emplace_back(field->name()->str());
}
else {
else
{
field_names.emplace_back("_unnamed_");
}
fields_nullable.push_back(field->nullable());
Expand Down Expand Up @@ -257,7 +268,8 @@ namespace sparrow_ipc
encapsulated_message,
fields_metadata
);
auto names_copy = field_names; // TODO: Remove when issue with the to_vector of record_batch is fixed
auto names_copy = field_names; // TODO: Remove when issue with the to_vector of
// record_batch is fixed
sparrow::record_batch sp_record_batch(std::move(names_copy), std::move(arrays));
record_batches.emplace_back(std::move(sp_record_batch));
}
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(SPARROW_IPC_TESTS_SRC
test_chunk_memory_output_stream.cpp
test_chunk_memory_serializer.cpp
test_de_serialization_with_files.cpp
test_deserializer.cpp
$<$<NOT:$<BOOL:${SPARROW_IPC_BUILD_SHARED}>>:test_flatbuffer_utils.cpp>
test_memory_output_streams.cpp
test_serialize_utils.cpp
Expand Down
Loading
Loading