Skip to content

Commit f2af01e

Browse files
committed
Simplify get_record_batch_message_builder
1 parent 46291cb commit f2af01e

File tree

3 files changed

+10
-24
lines changed

3 files changed

+10
-24
lines changed

include/sparrow_ipc/flatbuffer_utils.hpp

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,6 @@ namespace sparrow_ipc
214214
*
215215
* @param record_batch The source record batch containing the data to be serialized
216216
* @param compression Optional: The compression algorithm to be used for the message body
217-
* @param body_size Optional: An override for the total size of the message body
218-
* If not provided, the size is calculated from the uncompressed buffers
219-
* This is required when using compression
220-
* @param compressed_buffers Optional: A pointer to a vector of buffer metadata.
221-
* If provided, this metadata is used instead of generating it from the
222-
* uncompressed record batch. This is required when using compression.
223217
* @return A FlatBufferBuilder containing the complete serialized message ready for
224218
* transmission or storage. The builder is finished and ready to be accessed
225219
* via GetBufferPointer() and GetSize().
@@ -228,5 +222,5 @@ namespace sparrow_ipc
228222
* @note Variadic buffer counts is not currently implemented (set to 0)
229223
*/
230224
[[nodiscard]] flatbuffers::FlatBufferBuilder
231-
get_record_batch_message_builder(const sparrow::record_batch& record_batch, std::optional<org::apache::arrow::flatbuf::CompressionType> compression = std::nullopt, std::optional<std::int64_t> body_size = std::nullopt, const std::vector<org::apache::arrow::flatbuf::Buffer>* compressed_buffers = nullptr);
225+
get_record_batch_message_builder(const sparrow::record_batch& record_batch, std::optional<org::apache::arrow::flatbuf::CompressionType> compression = std::nullopt);
232226
}

src/flatbuffer_utils.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -562,18 +562,20 @@ namespace sparrow_ipc
562562
return buffers;
563563
}
564564

565-
flatbuffers::FlatBufferBuilder get_record_batch_message_builder(const sparrow::record_batch& record_batch, std::optional<org::apache::arrow::flatbuf::CompressionType> compression, std::optional<std::int64_t> body_size_override, const std::vector<org::apache::arrow::flatbuf::Buffer>* compressed_buffers)
565+
flatbuffers::FlatBufferBuilder get_record_batch_message_builder(const sparrow::record_batch& record_batch, std::optional<org::apache::arrow::flatbuf::CompressionType> compression)
566566
{
567-
const std::vector<org::apache::arrow::flatbuf::FieldNode> nodes = create_fieldnodes(record_batch);
568-
const std::vector<org::apache::arrow::flatbuf::Buffer>& buffers = compressed_buffers ? *compressed_buffers : get_buffers(record_batch);
569567
flatbuffers::FlatBufferBuilder record_batch_builder;
570-
auto nodes_offset = record_batch_builder.CreateVectorOfStructs(nodes);
571-
auto buffers_offset = record_batch_builder.CreateVectorOfStructs(buffers);
572568
flatbuffers::Offset<org::apache::arrow::flatbuf::BodyCompression> compression_offset = 0;
569+
std::optional<std::vector<org::apache::arrow::flatbuf::Buffer>> compressed_buffers;
573570
if (compression)
574571
{
572+
compressed_buffers = generate_compressed_buffers(record_batch, compression.value());
575573
compression_offset = org::apache::arrow::flatbuf::CreateBodyCompression(record_batch_builder, compression.value(), org::apache::arrow::flatbuf::BodyCompressionMethod::BUFFER);
576574
}
575+
const auto& buffers = compressed_buffers ? *compressed_buffers : get_buffers(record_batch);
576+
const std::vector<org::apache::arrow::flatbuf::FieldNode> nodes = create_fieldnodes(record_batch);
577+
auto nodes_offset = record_batch_builder.CreateVectorOfStructs(nodes);
578+
auto buffers_offset = record_batch_builder.CreateVectorOfStructs(buffers);
577579
const auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(
578580
record_batch_builder,
579581
static_cast<int64_t>(record_batch.nb_rows()),
@@ -583,7 +585,7 @@ namespace sparrow_ipc
583585
0 // TODO :variadic buffer Counts
584586
);
585587

586-
const int64_t body_size = body_size_override.value_or(calculate_body_size(record_batch, compression));
588+
const int64_t body_size = calculate_body_size(record_batch, compression);
587589
const auto record_batch_message_offset = org::apache::arrow::flatbuf::CreateMessage(
588590
record_batch_builder,
589591
org::apache::arrow::flatbuf::MetadataVersion::V5,

src/serialize.cpp

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,7 @@ namespace sparrow_ipc
2525

2626
void serialize_record_batch(const sparrow::record_batch& record_batch, any_output_stream& stream, std::optional<org::apache::arrow::flatbuf::CompressionType> compression)
2727
{
28-
if (compression.has_value())
29-
{
30-
// TODO Handle this inside get_record_batch_message_builder
31-
auto compressed_buffers = generate_compressed_buffers(record_batch, compression.value());
32-
auto body_size_override = calculate_body_size(record_batch, compression);
33-
common_serialize(get_record_batch_message_builder(record_batch, compression, body_size_override, &compressed_buffers), stream);
34-
}
35-
else
36-
{
37-
common_serialize(get_record_batch_message_builder(record_batch, compression, std::nullopt, nullptr), stream);
38-
}
28+
common_serialize(get_record_batch_message_builder(record_batch, compression), stream);
3929
generate_body(record_batch, stream, compression);
4030
}
4131
}

0 commit comments

Comments
 (0)