Skip to content

Commit 733e7c0

Browse files
committed
Make serialize_record_batch_message generic
1 parent c14fedd commit 733e7c0

File tree

4 files changed

+40
-103
lines changed

4 files changed

+40
-103
lines changed

include/serialize.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ namespace sparrow_ipc
1212
namespace details
1313
{
1414
SPARROW_IPC_API void serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional<sparrow::key_value_view>& metadata, std::vector<uint8_t>& final_buffer);
15-
SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, std::vector<uint8_t>& final_buffer);
15+
SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector<int64_t>& buffers_sizes, std::vector<uint8_t>& final_buffer);
1616
}
1717
}

include/serialize_null_array.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace sparrow_ipc
3838
details::serialize_schema_message(arrow_schema, arr.metadata(), final_buffer);
3939

4040
// II - Serialize the RecordBatch message
41-
details::serialize_record_batch_message(arrow_arr, final_buffer);
41+
details::serialize_record_batch_message(arrow_arr, {}, final_buffer);
4242

4343
// Return the final buffer containing the complete IPC stream
4444
return final_buffer;

include/serialize_primitive_array.hpp

Lines changed: 6 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -50,77 +50,12 @@ namespace sparrow_ipc
5050

5151
// II - Serialize the RecordBatch message
5252
// After the Schema, we send the RecordBatch containing the actual data
53-
{
54-
// Create a new builder for the RecordBatch message's metadata
55-
flatbuffers::FlatBufferBuilder batch_builder;
56-
57-
// arrow_arr.buffers[0] is the validity bitmap
58-
// arrow_arr.buffers[1] is the data buffer
59-
const uint8_t* validity_bitmap = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[0]);
60-
const uint8_t* data_buffer = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[1]);
61-
62-
// Calculate the size of the validity and data buffers
63-
int64_t validity_size = (arrow_arr.length + 7) / 8;
64-
int64_t data_size = arrow_arr.length * sizeof(T);
65-
int64_t body_len = validity_size + data_size; // The total size of the message body
66-
67-
// Create Flatbuffer descriptions for the data buffers
68-
org::apache::arrow::flatbuf::Buffer validity_buffer_struct(0, validity_size);
69-
org::apache::arrow::flatbuf::Buffer data_buffer_struct(validity_size, data_size);
70-
// Create the FieldNode, which describes the layout of the array data
71-
org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count);
72-
73-
// A RecordBatch contains a vector of nodes and a vector of buffers
74-
auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1);
75-
std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec = {validity_buffer_struct, data_buffer_struct};
76-
auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec);
77-
78-
// Build the RecordBatch metadata object
79-
auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector);
80-
81-
// Wrap the RecordBatch in a top-level Message
82-
auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage(
83-
batch_builder,
84-
org::apache::arrow::flatbuf::MetadataVersion::V5,
85-
org::apache::arrow::flatbuf::MessageHeader::RecordBatch,
86-
record_batch_offset.Union(),
87-
body_len
88-
);
89-
batch_builder.Finish(batch_message_offset);
90-
91-
// III - Append the RecordBatch message to the final buffer
92-
uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata
93-
int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); // Calculate the padded length
94-
95-
size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message)
96-
// Resize the buffer to append the new message
97-
final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len + body_len);
98-
uint8_t* dst = final_buffer.data() + current_size; // Get a pointer to where the new message will start
99-
100-
// Write the 4-byte metadata length for the RecordBatch message
101-
*(reinterpret_cast<uint32_t*>(dst)) = batch_meta_len;
102-
dst += sizeof(uint32_t);
103-
// Copy the RecordBatch metadata into the buffer
104-
memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len);
105-
// Add padding to align the body to an 8-byte boundary
106-
memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len);
107-
dst += aligned_batch_meta_len;
108-
// Copy the actual data buffers (the message body) into the buffer
109-
if (validity_bitmap)
110-
{
111-
memcpy(dst, validity_bitmap, validity_size);
112-
}
113-
else
114-
{
115-
// If validity_bitmap is null, it means there are no nulls
116-
memset(dst, 0xFF, validity_size);
117-
}
118-
dst += validity_size;
119-
if (data_buffer)
120-
{
121-
memcpy(dst, data_buffer, data_size);
122-
}
123-
}
53+
54+
// Calculate the size of the validity and data buffers
55+
int64_t validity_size = (arrow_arr.length + 7) / 8;
56+
int64_t data_size = arrow_arr.length * sizeof(T);
57+
std::vector<int64_t> buffers_sizes = {validity_size, data_size};
58+
details::serialize_record_batch_message(arrow_arr, buffers_sizes, final_buffer);
12459

12560
// Return the final buffer containing the complete IPC stream
12661
return final_buffer;

src/serialize.cpp

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -85,28 +85,25 @@ namespace sparrow_ipc
8585
*(reinterpret_cast<uint32_t*>(final_buffer.data())) = schema_len;
8686
}
8787

88-
void serialize_record_batch_message(const ArrowArray& arrow_arr, std::vector<uint8_t>& final_buffer)
88+
void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector<int64_t>& buffers_sizes, std::vector<uint8_t>& final_buffer)
8989
{
9090
// Create a new builder for the RecordBatch message's metadata
9191
flatbuffers::FlatBufferBuilder batch_builder;
9292

93-
// arrow_arr.buffers[0] is the validity bitmap
94-
// arrow_arr.buffers[1] is the data buffer
95-
// const uint8_t* validity_bitmap = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[0]);
96-
// const uint8_t* data_buffer = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[1]);
97-
98-
// Calculate the size of the validity and data buffers
99-
// int64_t validity_size = (arrow_arr.length + 7) / 8;
100-
// int64_t data_size = arrow_arr.length * sizeof(T);
101-
// int64_t body_len = validity_size + data_size; // The total size of the message body
102-
int64_t body_len = 0; // NULL ARRAY
93+
std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec;
94+
int64_t current_offset = 0;
95+
int64_t body_len = 0; // The total size of the message body
96+
for (const auto& size : buffers_sizes)
97+
{
98+
buffers_vec.emplace_back(current_offset, size);
99+
current_offset += size;
100+
}
101+
body_len = current_offset;
103102

104103
// Create the FieldNode, which describes the layout of the array data
105104
org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count);
106105
// A RecordBatch contains a vector of nodes and a vector of buffers
107106
auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1);
108-
// std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec = {validity_buffer_struct, data_buffer_struct};
109-
std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec = {}; // NULL ARRAY
110107
auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec);
111108

112109
// Build the RecordBatch metadata object
@@ -122,7 +119,7 @@ namespace sparrow_ipc
122119
);
123120
batch_builder.Finish(batch_message_offset);
124121

125-
// III - Append the RecordBatch message to the final buffer
122+
// Append the RecordBatch message to the final buffer
126123
uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata
127124
int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); // Calculate the padded length
128125

@@ -139,22 +136,27 @@ namespace sparrow_ipc
139136
// Add padding to align the body to an 8-byte boundary
140137
memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len);
141138

142-
// dst += aligned_batch_meta_len;
143-
// // Copy the actual data buffers (the message body) into the buffer
144-
// if (validity_bitmap)
145-
// {
146-
// memcpy(dst, validity_bitmap, validity_size);
147-
// }
148-
// else
149-
// {
150-
// // If validity_bitmap is null, it means there are no nulls
151-
// memset(dst, 0xFF, validity_size);
152-
// }
153-
// dst += validity_size;
154-
// if (data_buffer)
155-
// {
156-
// memcpy(dst, data_buffer, data_size);
157-
// }
139+
dst += aligned_batch_meta_len;
140+
// Copy the actual data buffers (the message body) into the buffer
141+
for (size_t i = 0; i < buffers_sizes.size(); ++i)
142+
{
143+
// arrow_arr.buffers[0] is the validity bitmap
144+
// arrow_arr.buffers[1] is the actual data buffer
145+
const uint8_t* data_buffer = reinterpret_cast<const uint8_t*>(arrow_arr.buffers[i]);
146+
if (data_buffer)
147+
{
148+
memcpy(dst, data_buffer, buffers_sizes[i]);
149+
}
150+
else
151+
{
152+
// If validity_bitmap is null, it means there are no nulls
153+
if (i == 0)
154+
{
155+
memset(dst, 0xFF, buffers_sizes[i]);
156+
}
157+
}
158+
dst += buffers_sizes[i];
159+
}
158160
}
159161
} // namespace details
160162
} // namespace sparrow-ipc

0 commit comments

Comments
 (0)