Skip to content

Commit 8c4e318

Browse files
committed
Reorganize code
1 parent 38cc796 commit 8c4e318

File tree

6 files changed

+103
-103
lines changed

6 files changed

+103
-103
lines changed

include/sparrow_ipc/flatbuffer_utils.hpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,48 @@ namespace sparrow_ipc
207207
[[nodiscard]] std::vector<org::apache::arrow::flatbuf::Buffer>
208208
get_buffers(const sparrow::record_batch& record_batch);
209209

210+
/**
211+
* @brief Generates the compressed message body and buffer metadata for a record batch.
212+
*
213+
* This function traverses the record batch, compresses each buffer using the specified
214+
* compression algorithm, and constructs the message body. For each compressed buffer,
215+
* it is prefixed by its 8-byte uncompressed size. Padding is added after each
216+
* compressed buffer to ensure 8-byte alignment.
217+
*
218+
* @param record_batch The record batch to serialize.
219+
* @param compression_type The compression algorithm to use (e.g., LZ4_FRAME, ZSTD).
220+
* @return A vector of FlatBuffer Buffer objects describing the offset and
221+
* size of each buffer within the compressed body.
222+
*/
223+
[[nodiscard]] SPARROW_IPC_API std::vector<org::apache::arrow::flatbuf::Buffer>
224+
generate_compressed_buffers(const sparrow::record_batch& record_batch, const CompressionType compression_type);
225+
226+
/**
227+
* @brief Calculates the total size of the body section for an Arrow array.
228+
*
229+
* This function recursively computes the total size needed for all buffers
230+
* in an Arrow array structure, including buffers from child arrays. Each
231+
* buffer size is aligned to 8-byte boundaries as required by the Arrow format.
232+
*
233+
* @param arrow_proxy The Arrow array proxy containing buffers and child arrays
234+
* @param compression The compression type to use when serializing
235+
* @return int64_t The total aligned size in bytes of all buffers in the array hierarchy
236+
*/
237+
[[nodiscard]] SPARROW_IPC_API int64_t calculate_body_size(const sparrow::arrow_proxy& arrow_proxy, std::optional<CompressionType> compression = std::nullopt);
238+
239+
/**
240+
* @brief Calculates the total body size of a record batch by summing the body sizes of all its columns.
241+
*
242+
* This function iterates through all columns in the given record batch and accumulates
243+
* the body size of each column's underlying Arrow array proxy. The body size represents
244+
* the total memory required for the serialized data content of the record batch.
245+
*
246+
* @param record_batch The sparrow record batch containing columns to calculate size for
247+
* @param compression The compression type to use when serializing
248+
* @return int64_t The total body size in bytes of all columns in the record batch
249+
*/
250+
[[nodiscard]] SPARROW_IPC_API int64_t calculate_body_size(const sparrow::record_batch& record_batch, std::optional<CompressionType> compression = std::nullopt);
251+
210252
/**
211253
* @brief Creates a FlatBuffer message containing a serialized Apache Arrow RecordBatch.
212254
*

include/sparrow_ipc/serialize_utils.hpp

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -119,22 +119,6 @@ namespace sparrow_ipc
119119
return total_size;
120120
}
121121

122-
/**
123-
* @brief Generates the compressed message body and buffer metadata for a record batch.
124-
*
125-
* This function traverses the record batch, compresses each buffer using the specified
126-
* compression algorithm, and constructs the message body. For each compressed buffer,
127-
* it is prefixed by its 8-byte uncompressed size. Padding is added after each
128-
* compressed buffer to ensure 8-byte alignment.
129-
*
130-
* @param record_batch The record batch to serialize.
131-
* @param compression_type The compression algorithm to use (e.g., LZ4_FRAME, ZSTD).
132-
* @return A vector of FlatBuffer Buffer objects describing the offset and
133-
* size of each buffer within the compressed body.
134-
*/
135-
[[nodiscard]] SPARROW_IPC_API std::vector<org::apache::arrow::flatbuf::Buffer>
136-
generate_compressed_buffers(const sparrow::record_batch& record_batch, const CompressionType compression_type);
137-
138122
/**
139123
* @brief Fills the body vector with serialized data from an arrow proxy and its children.
140124
*
@@ -166,31 +150,5 @@ namespace sparrow_ipc
166150
*/
167151
SPARROW_IPC_API void generate_body(const sparrow::record_batch& record_batch, any_output_stream& stream, std::optional<CompressionType> compression = std::nullopt);
168152

169-
/**
170-
* @brief Calculates the total size of the body section for an Arrow array.
171-
*
172-
* This function recursively computes the total size needed for all buffers
173-
* in an Arrow array structure, including buffers from child arrays. Each
174-
* buffer size is aligned to 8-byte boundaries as required by the Arrow format.
175-
*
176-
* @param arrow_proxy The Arrow array proxy containing buffers and child arrays
177-
* @param compression The compression type to use when serializing
178-
* @return int64_t The total aligned size in bytes of all buffers in the array hierarchy
179-
*/
180-
[[nodiscard]] SPARROW_IPC_API int64_t calculate_body_size(const sparrow::arrow_proxy& arrow_proxy, std::optional<CompressionType> compression = std::nullopt);
181-
182-
/**
183-
* @brief Calculates the total body size of a record batch by summing the body sizes of all its columns.
184-
*
185-
* This function iterates through all columns in the given record batch and accumulates
186-
* the body size of each column's underlying Arrow array proxy. The body size represents
187-
* the total memory required for the serialized data content of the record batch.
188-
*
189-
* @param record_batch The sparrow record batch containing columns to calculate size for
190-
* @param compression The compression type to use when serializing
191-
* @return int64_t The total body size in bytes of all columns in the record batch
192-
*/
193-
[[nodiscard]] SPARROW_IPC_API int64_t calculate_body_size(const sparrow::record_batch& record_batch, std::optional<CompressionType> compression = std::nullopt);
194-
195153
SPARROW_IPC_API std::vector<sparrow::data_type> get_column_dtypes(const sparrow::record_batch& rb);
196154
}

src/flatbuffer_utils.cpp

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
#include "compression_impl.hpp"
44
#include "sparrow_ipc/flatbuffer_utils.hpp"
5-
#include "sparrow_ipc/serialize_utils.hpp"
65
#include "sparrow_ipc/utils.hpp"
76

87
namespace sparrow_ipc
@@ -563,6 +562,65 @@ namespace sparrow_ipc
563562
return buffers;
564563
}
565564

565+
std::vector<org::apache::arrow::flatbuf::Buffer>
566+
generate_compressed_buffers(const sparrow::record_batch& record_batch, const CompressionType compression_type)
567+
{
568+
std::vector<org::apache::arrow::flatbuf::Buffer> compressed_buffers;
569+
int64_t current_offset = 0;
570+
571+
for (const auto& column : record_batch.columns())
572+
{
573+
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(column);
574+
for (const auto& buffer : arrow_proxy.buffers())
575+
{
576+
std::vector<uint8_t> compressed_buffer_with_header = compress(compression_type, std::span<const uint8_t>(buffer.data(), buffer.size()));
577+
const size_t aligned_chunk_size = utils::align_to_8(compressed_buffer_with_header.size());
578+
compressed_buffers.emplace_back(current_offset, aligned_chunk_size);
579+
current_offset += aligned_chunk_size;
580+
}
581+
}
582+
return compressed_buffers;
583+
}
584+
585+
int64_t calculate_body_size(const sparrow::arrow_proxy& arrow_proxy, std::optional<CompressionType> compression)
586+
{
587+
int64_t total_size = 0;
588+
if (compression.has_value())
589+
{
590+
for (const auto& buffer : arrow_proxy.buffers())
591+
{
592+
total_size += utils::align_to_8(compress(compression.value(), std::span<const uint8_t>(buffer.data(), buffer.size())).size());
593+
}
594+
}
595+
else
596+
{
597+
for (const auto& buffer : arrow_proxy.buffers())
598+
{
599+
total_size += utils::align_to_8(buffer.size());
600+
}
601+
}
602+
603+
for (const auto& child : arrow_proxy.children())
604+
{
605+
total_size += calculate_body_size(child, compression);
606+
}
607+
return total_size;
608+
}
609+
610+
int64_t calculate_body_size(const sparrow::record_batch& record_batch, std::optional<CompressionType> compression)
611+
{
612+
return std::accumulate(
613+
record_batch.columns().begin(),
614+
record_batch.columns().end(),
615+
int64_t{0},
616+
[&](int64_t acc, const sparrow::array& arr)
617+
{
618+
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(arr);
619+
return acc + calculate_body_size(arrow_proxy, compression);
620+
}
621+
);
622+
}
623+
566624
flatbuffers::FlatBufferBuilder get_record_batch_message_builder(const sparrow::record_batch& record_batch, std::optional<CompressionType> compression)
567625
{
568626
flatbuffers::FlatBufferBuilder record_batch_builder;

src/serialize_utils.cpp

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -34,45 +34,6 @@ namespace sparrow_ipc
3434
});
3535
}
3636

37-
int64_t calculate_body_size(const sparrow::arrow_proxy& arrow_proxy, std::optional<CompressionType> compression)
38-
{
39-
int64_t total_size = 0;
40-
if (compression.has_value())
41-
{
42-
for (const auto& buffer : arrow_proxy.buffers())
43-
{
44-
total_size += utils::align_to_8(compress(compression.value(), std::span<const uint8_t>(buffer.data(), buffer.size())).size());
45-
}
46-
}
47-
else
48-
{
49-
for (const auto& buffer : arrow_proxy.buffers())
50-
{
51-
total_size += utils::align_to_8(buffer.size());
52-
}
53-
}
54-
55-
for (const auto& child : arrow_proxy.children())
56-
{
57-
total_size += calculate_body_size(child, compression);
58-
}
59-
return total_size;
60-
}
61-
62-
int64_t calculate_body_size(const sparrow::record_batch& record_batch, std::optional<CompressionType> compression)
63-
{
64-
return std::accumulate(
65-
record_batch.columns().begin(),
66-
record_batch.columns().end(),
67-
int64_t{0},
68-
[&](int64_t acc, const sparrow::array& arr)
69-
{
70-
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(arr);
71-
return acc + calculate_body_size(arrow_proxy, compression);
72-
}
73-
);
74-
}
75-
7637
std::size_t calculate_schema_message_size(const sparrow::record_batch& record_batch)
7738
{
7839
// Build the schema message to get its exact size
@@ -108,26 +69,6 @@ namespace sparrow_ipc
10869
return metadata_size + actual_body_size;
10970
}
11071

111-
std::vector<org::apache::arrow::flatbuf::Buffer>
112-
generate_compressed_buffers(const sparrow::record_batch& record_batch, const CompressionType compression_type)
113-
{
114-
std::vector<org::apache::arrow::flatbuf::Buffer> compressed_buffers;
115-
int64_t current_offset = 0;
116-
117-
for (const auto& column : record_batch.columns())
118-
{
119-
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(column);
120-
for (const auto& buffer : arrow_proxy.buffers())
121-
{
122-
std::vector<uint8_t> compressed_buffer_with_header = compress(compression_type, std::span<const uint8_t>(buffer.data(), buffer.size()));
123-
const size_t aligned_chunk_size = utils::align_to_8(compressed_buffer_with_header.size());
124-
compressed_buffers.emplace_back(current_offset, aligned_chunk_size);
125-
current_offset += aligned_chunk_size;
126-
}
127-
}
128-
return compressed_buffers;
129-
}
130-
13172
std::vector<sparrow::data_type> get_column_dtypes(const sparrow::record_batch& rb)
13273
{
13374
std::vector<sparrow::data_type> dtypes;

tests/test_flatbuffer_utils.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,4 +532,4 @@ namespace sparrow_ipc
532532
}
533533
}
534534
}
535-
}
535+
}

tests/test_serialize_utils.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <sparrow.hpp>
55

66
#include "sparrow_ipc/any_output_stream.hpp"
7+
#include "sparrow_ipc/flatbuffer_utils.hpp"
78
#include "sparrow_ipc/magic_values.hpp"
89
#include "sparrow_ipc/memory_output_stream.hpp"
910
#include "sparrow_ipc/serialize_utils.hpp"

0 commit comments

Comments
 (0)