77
88namespace sparrow_ipc
99{
10- void fill_body (const sparrow::arrow_proxy& arrow_proxy, any_output_stream& stream)
10+ void fill_body (const sparrow::arrow_proxy& arrow_proxy, any_output_stream& stream, std::optional<org::apache::arrow::flatbuf::CompressionType> compression )
1111 {
12- for (const auto & buffer : arrow_proxy.buffers ())
13- {
14- stream.write (buffer);
12+ std::for_each (arrow_proxy.buffers ().begin (), arrow_proxy.buffers ().end (), [&](const auto & buffer) {
13+ if (compression.has_value ())
14+ {
15+ auto compressed_buffer_with_header = compress (compression.value (), std::span<const uint8_t >(buffer.data (), buffer.size ()));
16+ stream.write (std::span (compressed_buffer_with_header.data (), compressed_buffer_with_header.size ()));
17+ }
18+ else
19+ {
20+ stream.write (buffer);
21+ }
1522 stream.add_padding ();
16- }
17- for ( const auto & child : arrow_proxy. children ())
18- {
19- fill_body (child, stream);
20- }
23+ });
24+
25+ std::for_each (arrow_proxy. children (). begin (), arrow_proxy. children (). end (), [&]( const auto & child) {
26+ fill_body (child, stream, compression );
27+ });
2128 }
2229
23- void generate_body (const sparrow::record_batch& record_batch, any_output_stream& stream)
30+ void generate_body (const sparrow::record_batch& record_batch, any_output_stream& stream, std::optional<org::apache::arrow::flatbuf::CompressionType> compression )
2431 {
25- for (const auto & column : record_batch.columns ())
26- {
32+ std::for_each (record_batch.columns ().begin (), record_batch.columns ().end (), [&](const auto & column) {
2733 const auto & arrow_proxy = sparrow::detail::array_access::get_arrow_proxy (column);
28- fill_body (arrow_proxy, stream);
29- }
34+ fill_body (arrow_proxy, stream, compression );
35+ });
3036 }
3137
32- int64_t calculate_body_size (const sparrow::arrow_proxy& arrow_proxy)
38+ int64_t calculate_body_size (const sparrow::arrow_proxy& arrow_proxy, std::optional<org::apache::arrow::flatbuf::CompressionType> compression )
3339 {
3440 int64_t total_size = 0 ;
35- for (const auto & buffer : arrow_proxy.buffers ())
41+ if (compression.has_value ())
42+ {
43+ for (const auto & buffer : arrow_proxy.buffers ())
44+ {
45+ total_size += utils::align_to_8 (compress (compression.value (), std::span<const uint8_t >(buffer.data (), buffer.size ())).size ());
46+ }
47+ }
48+ else
3649 {
37- total_size += utils::align_to_8 (buffer.size ());
50+ for (const auto & buffer : arrow_proxy.buffers ())
51+ {
52+ total_size += utils::align_to_8 (buffer.size ());
53+ }
3854 }
55+
3956 for (const auto & child : arrow_proxy.children ())
4057 {
41- total_size += calculate_body_size (child);
58+ total_size += calculate_body_size (child, compression );
4259 }
4360 return total_size;
4461 }
4562
46- int64_t calculate_body_size (const sparrow::record_batch& record_batch)
63+ int64_t calculate_body_size (const sparrow::record_batch& record_batch, std::optional<org::apache::arrow::flatbuf::CompressionType> compression )
4764 {
4865 return std::accumulate (
4966 record_batch.columns ().begin (),
5067 record_batch.columns ().end (),
5168 int64_t {0 },
52- [](int64_t acc, const sparrow::array& arr)
69+ [& ](int64_t acc, const sparrow::array& arr)
5370 {
5471 const auto & arrow_proxy = sparrow::detail::array_access::get_arrow_proxy (arr);
55- return acc + calculate_body_size (arrow_proxy);
72+ return acc + calculate_body_size (arrow_proxy, compression );
5673 }
5774 );
5875 }
@@ -78,18 +95,7 @@ namespace sparrow_ipc
7895 flatbuffers::FlatBufferBuilder record_batch_builder = get_record_batch_message_builder (record_batch, compression);
7996 const flatbuffers::uoffset_t record_batch_len = record_batch_builder.GetSize ();
8097
81- std::size_t actual_body_size = 0 ;
82- if (compression.has_value ())
83- {
84- // If compressed, the body size is the sum of compressed buffer sizes + original size prefixes + padding
85- auto [compressed_body, compressed_buffers] = generate_compressed_body_and_buffers (record_batch, compression.value ());
86- actual_body_size = compressed_body.size ();
87- }
88- else
89- {
90- // If not compressed, the body size is the sum of uncompressed buffer sizes with padding
91- actual_body_size = static_cast <std::size_t >(calculate_body_size (record_batch));
92- }
98+ const std::size_t actual_body_size = static_cast <std::size_t >(calculate_body_size (record_batch, compression));
9399
94100 // Calculate total size:
95101 // - Continuation bytes (4)
@@ -103,10 +109,9 @@ namespace sparrow_ipc
103109 return metadata_size + actual_body_size;
104110 }
105111
106- std::pair<std:: vector<uint8_t >, std::vector< org::apache::arrow::flatbuf::Buffer> >
107- generate_compressed_body_and_buffers (const sparrow::record_batch& record_batch, const org::apache::arrow::flatbuf::CompressionType compression_type)
112+ std::vector<org::apache::arrow::flatbuf::Buffer>
113+ generate_compressed_buffers (const sparrow::record_batch& record_batch, const org::apache::arrow::flatbuf::CompressionType compression_type)
108114 {
109- std::vector<uint8_t > compressed_body;
110115 std::vector<org::apache::arrow::flatbuf::Buffer> compressed_buffers;
111116 int64_t current_offset = 0 ;
112117
@@ -115,24 +120,13 @@ namespace sparrow_ipc
115120 const auto & arrow_proxy = sparrow::detail::array_access::get_arrow_proxy (column);
116121 for (const auto & buffer : arrow_proxy.buffers ())
117122 {
118- // Compress the buffer. The returned buffer already has the correct size header.
119123 std::vector<uint8_t > compressed_buffer_with_header = compress (compression_type, std::span<const uint8_t >(buffer.data (), buffer.size ()));
120-
121124 const size_t aligned_chunk_size = utils::align_to_8 (compressed_buffer_with_header.size ());
122- const size_t padding_needed = aligned_chunk_size - compressed_buffer_with_header.size ();
123-
124- // Write compressed data with header
125- compressed_body.insert (compressed_body.end (), compressed_buffer_with_header.begin (), compressed_buffer_with_header.end ());
126-
127- // Add padding
128- compressed_body.insert (compressed_body.end (), padding_needed, 0 );
129-
130- // Update compressed buffer metadata
131125 compressed_buffers.emplace_back (current_offset, aligned_chunk_size);
132126 current_offset += aligned_chunk_size;
133127 }
134128 }
135- return {compressed_body, compressed_buffers} ;
129+ return compressed_buffers;
136130 }
137131
138132 std::vector<sparrow::data_type> get_column_dtypes (const sparrow::record_batch& rb)
0 commit comments