Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/sparrow_ipc/flatbuffer_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ namespace sparrow_ipc
{
std::vector<org::apache::arrow::flatbuf::Buffer> buffers;
int64_t offset = 0;
for (const auto& column : record_batch.columns())
for (size_t i = 0; i < record_batch.nb_columns(); ++i)
{
const auto& column = record_batch.get_column(i);
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(column);
fill_buffers_func(arrow_proxy, buffers, offset);
}
Expand Down
31 changes: 15 additions & 16 deletions src/flatbuffer_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,13 @@ namespace sparrow_ipc
::flatbuffers::Offset<::flatbuffers::Vector<::flatbuffers::Offset<org::apache::arrow::flatbuf::Field>>>
create_children(flatbuffers::FlatBufferBuilder& builder, const sparrow::record_batch& record_batch)
{
const auto& columns = record_batch.columns();
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> children_vec;
children_vec.reserve(columns.size());
children_vec.reserve(record_batch.nb_columns());
const auto names = record_batch.names();
for (size_t i = 0; i < columns.size(); ++i)
for (size_t i = 0; i < record_batch.nb_columns(); ++i)
{
const auto& arrow_schema = sparrow::detail::array_access::get_arrow_proxy(columns[i]).schema();
const auto& column = record_batch.get_column(i);
const auto& arrow_schema = sparrow::detail::array_access::get_arrow_proxy(column).schema();
flatbuffers::Offset<org::apache::arrow::flatbuf::Field> field = create_field(
builder,
arrow_schema,
Expand Down Expand Up @@ -523,9 +523,10 @@ namespace sparrow_ipc
create_fieldnodes(const sparrow::record_batch& record_batch)
{
std::vector<org::apache::arrow::flatbuf::FieldNode> nodes;
nodes.reserve(record_batch.columns().size());
for (const auto& column : record_batch.columns())
nodes.reserve(record_batch.nb_columns());
for (size_t i = 0; i < record_batch.nb_columns(); ++i)
{
const auto& column = record_batch.get_column(i);
fill_fieldnodes(sparrow::detail::array_access::get_arrow_proxy(column), nodes);
}
return nodes;
Expand Down Expand Up @@ -608,16 +609,14 @@ namespace sparrow_ipc
std::optional<CompressionType> compression,
std::optional<std::reference_wrapper<CompressionCache>> cache)
{
return std::accumulate(
record_batch.columns().begin(),
record_batch.columns().end(),
int64_t{0},
[&](int64_t acc, const sparrow::array& arr)
{
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(arr);
return acc + calculate_body_size(arrow_proxy, compression, cache);
}
);
int64_t acc = 0;
for (size_t i = 0; i < record_batch.nb_columns(); ++i)
{
const auto& arr = record_batch.get_column(i);
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(arr);
acc += calculate_body_size(arrow_proxy, compression, cache);
}
return acc;
}

flatbuffers::FlatBufferBuilder get_record_batch_message_builder(const sparrow::record_batch& record_batch,
Expand Down
2 changes: 1 addition & 1 deletion src/metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ namespace sparrow_ipc
);
return sparrow_metadata;
}
}
}
18 changes: 8 additions & 10 deletions src/serialize_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ namespace sparrow_ipc
std::optional<CompressionType> compression,
std::optional<std::reference_wrapper<CompressionCache>> cache)
{
std::for_each(record_batch.columns().begin(), record_batch.columns().end(), [&](const auto& column) {
for (size_t i = 0; i < record_batch.nb_columns(); ++i)
{
const auto& column = record_batch.get_column(i);
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(column);
fill_body(arrow_proxy, stream, compression, cache);
});
}
}

std::size_t calculate_schema_message_size(const sparrow::record_batch& record_batch)
Expand Down Expand Up @@ -85,14 +87,10 @@ namespace sparrow_ipc
{
std::vector<sparrow::data_type> dtypes;
dtypes.reserve(rb.nb_columns());
std::ranges::transform(
rb.columns(),
std::back_inserter(dtypes),
[](const auto& col)
{
return col.data_type();
}
);
for (size_t i = 0; i < rb.nb_columns(); ++i)
{
dtypes.push_back(rb.get_column(i).data_type());
}
return dtypes;
}
}
4 changes: 2 additions & 2 deletions src/serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ namespace sparrow_ipc
{
std::vector<sparrow::data_type> dtypes;
dtypes.reserve(rb.nb_columns());
for (const auto& col : rb.columns())
for (size_t i = 0; i < rb.nb_columns(); ++i)
{
dtypes.push_back(col.data_type());
dtypes.push_back(rb.get_column(i).data_type());
}
return dtypes;
}
Expand Down
Loading