Skip to content

Commit 32403c9

Browse files
committed
wip
1 parent dd6b9cd commit 32403c9

File tree

8 files changed

+361
-11
lines changed

8 files changed

+361
-11
lines changed

include/sparrow_ipc/serialize.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,21 @@
1010
#include "sparrow_ipc/serialize_utils.hpp"
1111
#include "sparrow_ipc/utils.hpp"
1212

13+
namespace sparrow_ipc
14+
{
15+
/**
16+
* @brief Information about a serialized record batch block.
17+
*
18+
* Contains the metadata length and body length of a serialized record batch,
19+
* used for populating the Arrow IPC file format footer.
20+
*/
21+
struct serialized_record_batch_info
22+
{
23+
int32_t metadata_length; ///< Length of the metadata (FlatBuffer message + padding)
24+
int64_t body_length; ///< Length of the record batch body (data buffers)
25+
};
26+
}
27+
1328
namespace sparrow_ipc
1429
{
1530
/**

include/sparrow_ipc/serializer.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include "sparrow_ipc/any_output_stream.hpp"
77
#include "sparrow_ipc/compression.hpp"
8+
#include "sparrow_ipc/serialize.hpp"
89
#include "sparrow_ipc/serialize_utils.hpp"
910

1011
namespace sparrow_ipc

include/sparrow_ipc/stream_file_serializer.hpp

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,29 @@
1616

1717
namespace sparrow_ipc
1818
{
19+
/**
20+
* @brief Represents a block entry in the Arrow IPC file footer.
21+
*
22+
* Each block describes the location and size of a record batch in the file.
23+
*/
24+
struct record_batch_block
25+
{
26+
int64_t offset; ///< Offset from the start of the file to the record batch message
27+
int32_t metadata_length; ///< Length of the metadata (FlatBuffer message)
28+
int64_t body_length; ///< Length of the record batch body (data buffers)
29+
};
30+
1931
/**
2032
* @brief Writes the Arrow IPC file footer.
2133
*
2234
* @param record_batch A record batch containing the schema for the footer
35+
* @param record_batch_blocks Vector of block information for each record batch
2336
* @param stream The output stream to write the footer to
2437
* @return The size of the footer in bytes
2538
*/
2639
SPARROW_IPC_API size_t write_footer(
2740
const sparrow::record_batch& record_batch,
41+
const std::vector<record_batch_block>& record_batch_blocks,
2842
any_output_stream& stream
2943
);
3044

@@ -184,7 +198,14 @@ namespace sparrow_ipc
184198
{
185199
throw std::invalid_argument("Record batch schema does not match file serializer schema");
186200
}
187-
serialize_record_batch(rb, m_stream, m_compression, compressed_buffers_cache);
201+
202+
// Offset is from the start of the file to the record batch message
203+
const int64_t offset = static_cast<int64_t>(m_stream.size());
204+
205+
// Serialize and get block info
206+
const auto info = serialize_record_batch(rb, m_stream, m_compression, compressed_buffers_cache);
207+
208+
m_record_batch_blocks.push_back({offset, info.metadata_length, info.body_length});
188209
}
189210
}
190211

@@ -280,6 +301,7 @@ namespace sparrow_ipc
280301
any_output_stream m_stream;
281302
bool m_ended{false};
282303
std::optional<CompressionType> m_compression;
304+
std::vector<record_batch_block> m_record_batch_blocks;
283305
};
284306

285307
/**

integration_tests/test_integration_tools.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ TEST_SUITE("Integration Tools Tests")
173173

174174
TEST_CASE("json_to_stream and validate - Round-trip with validation")
175175
{
176-
const std::filesystem::path json_file = tests_resources_files_path / "generated_binary.json";
176+
const std::filesystem::path json_file = tests_resources_files_path / "generated_primitive.json";
177177

178178
if (!std::filesystem::exists(json_file))
179179
{
@@ -190,11 +190,17 @@ TEST_SUITE("Integration Tools Tests")
190190
json_file,
191191
std::span<const uint8_t>(arrow_file_data)
192192
);
193+
// write to a temp file
194+
std::filesystem::path temp_file = std::filesystem::temp_directory_path() / "temp.arrow_file";
195+
std::ofstream out(temp_file, std::ios::binary);
196+
out.write(reinterpret_cast<const char*>(arrow_file_data.data()), arrow_file_data.size());
197+
out.close();
198+
193199
CHECK(matches);
194200

195201
// Also verify by deserializing
196-
auto batches = sparrow_ipc::deserialize_file(std::span<const uint8_t>(arrow_file_data));
197-
CHECK_GT(batches.size(), 0);
202+
const auto batches = sparrow_ipc::deserialize_file(std::span<const uint8_t>(arrow_file_data));
203+
CHECK_EQ(batches.size(), 2);
198204
}
199205

200206
TEST_CASE("compare_record_batch - Identical batches")

src/serialize.cpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "sparrow_ipc/serialize.hpp"
44
#include "sparrow_ipc/flatbuffer_utils.hpp"
5+
#include "sparrow_ipc/utils.hpp"
56

67
namespace sparrow_ipc
78
{
@@ -23,11 +24,33 @@ namespace sparrow_ipc
2324
common_serialize(get_schema_message_builder(record_batch), stream);
2425
}
2526

26-
void serialize_record_batch(const sparrow::record_batch& record_batch, any_output_stream& stream,
27+
serialized_record_batch_info serialize_record_batch(const sparrow::record_batch& record_batch, any_output_stream& stream,
2728
std::optional<CompressionType> compression,
2829
std::optional<std::reference_wrapper<CompressionCache>> cache)
2930
{
30-
common_serialize(get_record_batch_message_builder(record_batch, compression, cache), stream);
31+
// Build and serialize metadata
32+
flatbuffers::FlatBufferBuilder builder = get_record_batch_message_builder(record_batch, compression, cache);
33+
const flatbuffers::uoffset_t flatbuffer_size = builder.GetSize();
34+
35+
// Calculate metadata length: flatbuffer size + padding to 8-byte alignment
36+
// Note: The metadata_length in the Arrow file footer includes the size prefix (4 bytes)
37+
// but not the continuation bytes
38+
const int32_t metadata_length = static_cast<int32_t>(
39+
sizeof(uint32_t) + utils::align_to_8(static_cast<size_t>(flatbuffer_size))
40+
);
41+
42+
// Write metadata
43+
common_serialize(builder, stream);
44+
45+
// Track position before body to calculate body length
46+
const size_t body_start = stream.size();
47+
48+
// Write body
3149
generate_body(record_batch, stream, compression, cache);
50+
51+
// Calculate body length
52+
const int64_t body_length = static_cast<int64_t>(stream.size() - body_start);
53+
54+
return {metadata_length, body_length};
3255
}
3356
}

src/stream_file_serializer.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ namespace sparrow_ipc
4747
// Write end-of-stream marker
4848
m_stream.write(end_of_stream);
4949

50-
// Write footer using the first record batch for schema
51-
const size_t footer_size = write_footer(m_first_record_batch.value(), m_stream);
50+
// Write footer using the first record batch for schema and the tracked blocks
51+
const size_t footer_size = write_footer(m_first_record_batch.value(), m_record_batch_blocks, m_stream);
5252

5353
// Write footer size (int32, little-endian)
5454
const int32_t footer_size_i32 = static_cast<int32_t>(footer_size);
@@ -64,6 +64,7 @@ namespace sparrow_ipc
6464

6565
size_t write_footer(
6666
const sparrow::record_batch& record_batch,
67+
const std::vector<record_batch_block>& record_batch_blocks,
6768
any_output_stream& stream
6869
)
6970
{
@@ -83,9 +84,14 @@ namespace sparrow_ipc
8384
std::vector<org::apache::arrow::flatbuf::Block>{}
8485
);
8586

86-
// Create record batches vector
87-
std::vector<org::apache::arrow::flatbuf::Block> record_batch_blocks;
88-
auto record_batches_fb = footer_builder.CreateVectorOfStructs(record_batch_blocks);
87+
// Create record batches vector from tracked blocks
88+
std::vector<org::apache::arrow::flatbuf::Block> fb_blocks;
89+
fb_blocks.reserve(record_batch_blocks.size());
90+
for (const auto& block : record_batch_blocks)
91+
{
92+
fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length);
93+
}
94+
auto record_batches_fb = footer_builder.CreateVectorOfStructs(fb_blocks);
8995

9096
// Create footer
9197
auto footer = org::apache::arrow::flatbuf::CreateFooter(

tests/test_serialize_utils.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "sparrow_ipc/flatbuffer_utils.hpp"
88
#include "sparrow_ipc/magic_values.hpp"
99
#include "sparrow_ipc/memory_output_stream.hpp"
10+
#include "sparrow_ipc/serialize.hpp"
1011
#include "sparrow_ipc/serialize_utils.hpp"
1112
#include "sparrow_ipc/utils.hpp"
1213
#include "sparrow_ipc_tests_helpers.hpp"

0 commit comments

Comments
 (0)