1+ #include " sparrow_ipc/serialize.hpp"
2+
3+ #include < cstdint>
14#include < optional>
25
3- #include " sparrow_ipc/serialize.hpp"
46#include " sparrow_ipc/flatbuffer_utils.hpp"
57#include " sparrow_ipc/utils.hpp"
68
79namespace sparrow_ipc
810{
9- void common_serialize (
10- const flatbuffers::FlatBufferBuilder& builder,
11- any_output_stream& stream
12- )
11+ void common_serialize (const flatbuffers::FlatBufferBuilder& builder, any_output_stream& stream)
1312 {
1413 stream.write (continuation);
1514 const flatbuffers::uoffset_t size = builder.GetSize ();
16- const std::span<const uint8_t > size_span (reinterpret_cast <const uint8_t *>(&size), sizeof (uint32_t ));
15+ const int32_t size_with_padding = utils::align_to_8 (static_cast <int32_t >(size));
16+ const std::span<const uint8_t > size_span (
17+ reinterpret_cast <const uint8_t *>(&size_with_padding),
18+ sizeof (int32_t )
19+ );
1720 stream.write (size_span);
1821 stream.write (std::span (builder.GetBufferPointer (), size));
1922 stream.add_padding ();
@@ -24,39 +27,27 @@ namespace sparrow_ipc
2427 common_serialize (get_schema_message_builder (record_batch), stream);
2528 }
2629
27- serialized_record_batch_info serialize_record_batch (const sparrow::record_batch& record_batch, any_output_stream& stream, std::optional<CompressionType> compression)
30+ serialized_record_batch_info serialize_record_batch (
31+ const sparrow::record_batch& record_batch,
32+ any_output_stream& stream,
33+ std::optional<CompressionType> compression
34+ )
2835 {
2936 // Build and serialize metadata
3037 flatbuffers::FlatBufferBuilder builder = get_record_batch_message_builder (record_batch, compression);
31- const flatbuffers::uoffset_t flatbuffer_size = builder.GetSize ();
32-
33- // Calculate metadata length for the Block in the footer
34- // According to Arrow spec, metadata_length must be a multiple of 8.
35- // The encapsulated message format is:
36- // - continuation (4 bytes)
37- // - size prefix (4 bytes)
38- // - flatbuffer metadata (flatbuffer_size bytes)
39- // - padding to 8-byte boundary
40- //
41- // Arrow's WriteMessage returns metadata_length = align_to_8(8 + flatbuffer_size)
42- // which INCLUDES the continuation bytes.
43- const size_t prefix_size = continuation.size () + sizeof (uint32_t ); // 8 bytes
44- const int32_t metadata_length = static_cast <int32_t >(
45- utils::align_to_8 (prefix_size + flatbuffer_size)
46- );
47-
38+
4839 // Write metadata
4940 common_serialize (builder, stream);
50-
41+
5142 // Track position before body to calculate body length
5243 const size_t body_start = stream.size ();
53-
44+
5445 // Write body
5546 generate_body (record_batch, stream, compression);
56-
57- // Calculate body length (should already be 8-aligned since generate_body pads each buffer)
58- const int64_t body_length = static_cast < int64_t >(stream. size () - body_start );
59-
60- return {metadata_length, body_length};
47+
48+ const auto body_length = static_cast < int64_t >(stream. size () - body_start);
49+ const flatbuffers:: uoffset_t flatbuffer_size = builder. GetSize ( );
50+ const auto metadata_length = static_cast < int32_t >( utils::align_to_8 (flatbuffer_size));
51+ return {. metadata_length = metadata_length, . body_length = body_length};
6152 }
6253}
0 commit comments