Skip to content

Commit 15d2e46

Browse files
committed
wip
1 parent 32403c9 commit 15d2e46

File tree

2 files changed

+122
-5
lines changed

2 files changed

+122
-5
lines changed

src/serialize.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,19 @@ namespace sparrow_ipc
3232
flatbuffers::FlatBufferBuilder builder = get_record_batch_message_builder(record_batch, compression, cache);
3333
const flatbuffers::uoffset_t flatbuffer_size = builder.GetSize();
3434

35-
// Calculate metadata length: flatbuffer size + padding to 8-byte alignment
36-
// Note: The metadata_length in the Arrow file footer includes the size prefix (4 bytes)
37-
// but not the continuation bytes
35+
// Calculate metadata length for the Block in the footer
36+
// According to Arrow spec, metadata_length must be a multiple of 8.
37+
// The encapsulated message format is:
38+
// - continuation (4 bytes)
39+
// - size prefix (4 bytes)
40+
// - flatbuffer metadata (flatbuffer_size bytes)
41+
// - padding to 8-byte boundary
42+
//
43+
// Arrow's WriteMessage returns metadata_length = align_to_8(8 + flatbuffer_size)
44+
// which INCLUDES the continuation bytes.
45+
const size_t prefix_size = continuation.size() + sizeof(uint32_t); // 8 bytes
3846
const int32_t metadata_length = static_cast<int32_t>(
39-
sizeof(uint32_t) + utils::align_to_8(static_cast<size_t>(flatbuffer_size))
47+
utils::align_to_8(prefix_size + flatbuffer_size)
4048
);
4149

4250
// Write metadata
@@ -48,7 +56,7 @@ namespace sparrow_ipc
4856
// Write body
4957
generate_body(record_batch, stream, compression, cache);
5058

51-
// Calculate body length
59+
// Calculate body length (should already be 8-aligned since generate_body pads each buffer)
5260
const int64_t body_length = static_cast<int64_t>(stream.size() - body_start);
5361

5462
return {metadata_length, body_length};

tests/test_stream_file_serializer.cpp

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,4 +545,113 @@ TEST_SUITE("Stream file serializer tests")
545545
CHECK_EQ(footer->schema()->fields()->Get(1)->type_type(), org::apache::arrow::flatbuf::Type::FloatingPoint);
546546
CHECK_EQ(footer->schema()->fields()->Get(2)->type_type(), org::apache::arrow::flatbuf::Type::Utf8);
547547
}
548+
549+
TEST_CASE("Footer block alignment - all fields must be multiples of 8")
550+
{
551+
// Arrow IPC format requires that offset, metaDataLength, and bodyLength
552+
// in FileBlock are all multiples of 8. This test verifies the alignment.
553+
554+
std::vector<std::string> names = {"a", "b", "c", "d"};
555+
556+
// Create arrays with the same number of rows (7 elements each)
557+
std::vector<int32_t> int_data = {1, 2, 3, 4, 5, 6, 7};
558+
sparrow::primitive_array<int32_t> int_array(std::move(int_data));
559+
560+
std::vector<float> float_data = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f};
561+
sparrow::primitive_array<float> float_array(std::move(float_data));
562+
563+
std::vector<bool> bool_data = {true, false, true, false, true, false, true};
564+
sparrow::primitive_array<bool> bool_array(std::move(bool_data));
565+
566+
std::vector<double> double_data = {1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7};
567+
sparrow::primitive_array<double> double_array(std::move(double_data));
568+
569+
std::vector<sparrow::array> arrays;
570+
arrays.emplace_back(std::move(int_array));
571+
arrays.emplace_back(std::move(float_array));
572+
arrays.emplace_back(std::move(bool_array));
573+
arrays.emplace_back(std::move(double_array));
574+
sparrow::record_batch batch(names, std::move(arrays));
575+
576+
std::vector<uint8_t> file_data;
577+
sparrow_ipc::memory_output_stream mem_stream(file_data);
578+
579+
{
580+
sparrow_ipc::stream_file_serializer serializer(mem_stream);
581+
serializer << batch << sparrow_ipc::end_file;
582+
}
583+
584+
const auto* footer = get_footer_from_file_data(file_data);
585+
REQUIRE(footer != nullptr);
586+
REQUIRE(footer->recordBatches() != nullptr);
587+
REQUIRE_EQ(footer->recordBatches()->size(), 1);
588+
589+
const auto& block = *footer->recordBatches()->Get(0);
590+
591+
// All three fields must be multiples of 8 per Arrow spec
592+
// (see Apache Arrow reader.cc CheckAligned function)
593+
CHECK_EQ(block.offset() % 8, 0);
594+
CHECK_EQ(block.metaDataLength() % 8, 0);
595+
CHECK_EQ(block.bodyLength() % 8, 0);
596+
597+
// metaDataLength should include continuation (4) + size (4) + flatbuffer + padding
598+
// so it should be at least 8
599+
CHECK_GE(block.metaDataLength(), 8);
600+
}
601+
602+
TEST_CASE("Footer block alignment with multiple batches")
603+
{
604+
std::vector<std::string> names = {"x", "y"};
605+
606+
std::vector<uint8_t> file_data;
607+
sparrow_ipc::memory_output_stream mem_stream(file_data);
608+
609+
{
610+
sparrow_ipc::stream_file_serializer serializer(mem_stream);
611+
612+
// Create multiple batches with different sizes to test alignment edge cases
613+
for (int batch_idx = 0; batch_idx < 5; ++batch_idx)
614+
{
615+
const size_t num_rows = 3 + batch_idx * 2; // 3, 5, 7, 9, 11 rows
616+
617+
std::vector<int32_t> int_data(num_rows);
618+
std::vector<float> float_data(num_rows);
619+
for (size_t i = 0; i < num_rows; ++i)
620+
{
621+
int_data[i] = static_cast<int32_t>(batch_idx * 100 + i);
622+
float_data[i] = static_cast<float>(i) * 0.1f;
623+
}
624+
625+
sparrow::primitive_array<int32_t> int_array(std::move(int_data));
626+
sparrow::primitive_array<float> float_array(std::move(float_data));
627+
628+
std::vector<sparrow::array> arrays;
629+
arrays.emplace_back(std::move(int_array));
630+
arrays.emplace_back(std::move(float_array));
631+
632+
auto names_copy = names;
633+
sparrow::record_batch batch(std::move(names_copy), std::move(arrays));
634+
635+
serializer << batch;
636+
}
637+
638+
serializer << sparrow_ipc::end_file;
639+
}
640+
641+
const auto* footer = get_footer_from_file_data(file_data);
642+
REQUIRE(footer != nullptr);
643+
REQUIRE(footer->recordBatches() != nullptr);
644+
REQUIRE_EQ(footer->recordBatches()->size(), 5);
645+
646+
// Check alignment for all blocks
647+
for (size_t i = 0; i < footer->recordBatches()->size(); ++i)
648+
{
649+
const auto& block = *footer->recordBatches()->Get(static_cast<uint32_t>(i));
650+
651+
// All three fields must be multiples of 8
652+
CHECK_MESSAGE(block.offset() % 8 == 0, "Block ", i, " offset not aligned");
653+
CHECK_MESSAGE(block.metaDataLength() % 8 == 0, "Block ", i, " metaDataLength not aligned");
654+
CHECK_MESSAGE(block.bodyLength() % 8 == 0, "Block ", i, " bodyLength not aligned");
655+
}
656+
}
548657
}

0 commit comments

Comments
 (0)