Skip to content

Commit 391fb38

Browse files
committed
wip
1 parent 1a1684f commit 391fb38

File tree

2 files changed

+122
-5
lines changed

2 files changed

+122
-5
lines changed

src/serialize.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,19 @@ namespace sparrow_ipc
3030
flatbuffers::FlatBufferBuilder builder = get_record_batch_message_builder(record_batch, compression);
3131
const flatbuffers::uoffset_t flatbuffer_size = builder.GetSize();
3232

33-
// Calculate metadata length: flatbuffer size + padding to 8-byte alignment
34-
// Note: The metadata_length in the Arrow file footer includes the size prefix (4 bytes)
35-
// but not the continuation bytes
33+
// Calculate metadata length for the Block in the footer
34+
// According to Arrow spec, metadata_length must be a multiple of 8.
35+
// The encapsulated message format is:
36+
// - continuation (4 bytes)
37+
// - size prefix (4 bytes)
38+
// - flatbuffer metadata (flatbuffer_size bytes)
39+
// - padding to 8-byte boundary
40+
//
41+
// Arrow's WriteMessage returns metadata_length = align_to_8(8 + flatbuffer_size)
42+
// which INCLUDES the continuation bytes.
43+
const size_t prefix_size = continuation.size() + sizeof(uint32_t); // 8 bytes
3644
const int32_t metadata_length = static_cast<int32_t>(
37-
sizeof(uint32_t) + utils::align_to_8(static_cast<size_t>(flatbuffer_size))
45+
utils::align_to_8(prefix_size + flatbuffer_size)
3846
);
3947

4048
// Write metadata
@@ -46,7 +54,7 @@ namespace sparrow_ipc
4654
// Write body
4755
generate_body(record_batch, stream, compression);
4856

49-
// Calculate body length
57+
// Calculate body length (should already be 8-aligned since generate_body pads each buffer)
5058
const int64_t body_length = static_cast<int64_t>(stream.size() - body_start);
5159

5260
return {metadata_length, body_length};

tests/test_stream_file_serializer.cpp

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,4 +537,113 @@ TEST_SUITE("Stream file serializer tests")
537537
CHECK_EQ(footer->schema()->fields()->Get(1)->type_type(), org::apache::arrow::flatbuf::Type::FloatingPoint);
538538
CHECK_EQ(footer->schema()->fields()->Get(2)->type_type(), org::apache::arrow::flatbuf::Type::Utf8);
539539
}
540+
541+
TEST_CASE("Footer block alignment - all fields must be multiples of 8")
542+
{
543+
// Arrow IPC format requires that offset, metaDataLength, and bodyLength
544+
// in FileBlock are all multiples of 8. This test verifies the alignment.
545+
546+
std::vector<std::string> names = {"a", "b", "c", "d"};
547+
548+
// Create arrays with the same number of rows (7 elements each)
549+
std::vector<int32_t> int_data = {1, 2, 3, 4, 5, 6, 7};
550+
sparrow::primitive_array<int32_t> int_array(std::move(int_data));
551+
552+
std::vector<float> float_data = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f};
553+
sparrow::primitive_array<float> float_array(std::move(float_data));
554+
555+
std::vector<bool> bool_data = {true, false, true, false, true, false, true};
556+
sparrow::primitive_array<bool> bool_array(std::move(bool_data));
557+
558+
std::vector<double> double_data = {1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7};
559+
sparrow::primitive_array<double> double_array(std::move(double_data));
560+
561+
std::vector<sparrow::array> arrays;
562+
arrays.emplace_back(std::move(int_array));
563+
arrays.emplace_back(std::move(float_array));
564+
arrays.emplace_back(std::move(bool_array));
565+
arrays.emplace_back(std::move(double_array));
566+
sparrow::record_batch batch(names, std::move(arrays));
567+
568+
std::vector<uint8_t> file_data;
569+
sparrow_ipc::memory_output_stream mem_stream(file_data);
570+
571+
{
572+
sparrow_ipc::stream_file_serializer serializer(mem_stream);
573+
serializer << batch << sparrow_ipc::end_file;
574+
}
575+
576+
const auto* footer = get_footer_from_file_data(file_data);
577+
REQUIRE(footer != nullptr);
578+
REQUIRE(footer->recordBatches() != nullptr);
579+
REQUIRE_EQ(footer->recordBatches()->size(), 1);
580+
581+
const auto& block = *footer->recordBatches()->Get(0);
582+
583+
// All three fields must be multiples of 8 per Arrow spec
584+
// (see Apache Arrow reader.cc CheckAligned function)
585+
CHECK_EQ(block.offset() % 8, 0);
586+
CHECK_EQ(block.metaDataLength() % 8, 0);
587+
CHECK_EQ(block.bodyLength() % 8, 0);
588+
589+
// metaDataLength should include continuation (4) + size (4) + flatbuffer + padding
590+
// so it should be at least 8
591+
CHECK_GE(block.metaDataLength(), 8);
592+
}
593+
594+
TEST_CASE("Footer block alignment with multiple batches")
595+
{
596+
std::vector<std::string> names = {"x", "y"};
597+
598+
std::vector<uint8_t> file_data;
599+
sparrow_ipc::memory_output_stream mem_stream(file_data);
600+
601+
{
602+
sparrow_ipc::stream_file_serializer serializer(mem_stream);
603+
604+
// Create multiple batches with different sizes to test alignment edge cases
605+
for (int batch_idx = 0; batch_idx < 5; ++batch_idx)
606+
{
607+
const size_t num_rows = 3 + batch_idx * 2; // 3, 5, 7, 9, 11 rows
608+
609+
std::vector<int32_t> int_data(num_rows);
610+
std::vector<float> float_data(num_rows);
611+
for (size_t i = 0; i < num_rows; ++i)
612+
{
613+
int_data[i] = static_cast<int32_t>(batch_idx * 100 + i);
614+
float_data[i] = static_cast<float>(i) * 0.1f;
615+
}
616+
617+
sparrow::primitive_array<int32_t> int_array(std::move(int_data));
618+
sparrow::primitive_array<float> float_array(std::move(float_data));
619+
620+
std::vector<sparrow::array> arrays;
621+
arrays.emplace_back(std::move(int_array));
622+
arrays.emplace_back(std::move(float_array));
623+
624+
auto names_copy = names;
625+
sparrow::record_batch batch(std::move(names_copy), std::move(arrays));
626+
627+
serializer << batch;
628+
}
629+
630+
serializer << sparrow_ipc::end_file;
631+
}
632+
633+
const auto* footer = get_footer_from_file_data(file_data);
634+
REQUIRE(footer != nullptr);
635+
REQUIRE(footer->recordBatches() != nullptr);
636+
REQUIRE_EQ(footer->recordBatches()->size(), 5);
637+
638+
// Check alignment for all blocks
639+
for (size_t i = 0; i < footer->recordBatches()->size(); ++i)
640+
{
641+
const auto& block = *footer->recordBatches()->Get(static_cast<uint32_t>(i));
642+
643+
// All three fields must be multiples of 8
644+
CHECK_MESSAGE(block.offset() % 8 == 0, "Block ", i, " offset not aligned");
645+
CHECK_MESSAGE(block.metaDataLength() % 8 == 0, "Block ", i, " metaDataLength not aligned");
646+
CHECK_MESSAGE(block.bodyLength() % 8 == 0, "Block ", i, " bodyLength not aligned");
647+
}
648+
}
540649
}

0 commit comments

Comments
 (0)