Skip to content

Commit 0e85c12

Browse files
brunalBruno Cauetpitrou
authored
GH-46407: [C++] Fix IPC serialization of sliced list arrays (#46408)
### Rationale for this change Arrow C++ slices arrays by bumping the top-level `offset` value. However, Arrow Rust slices list arrays by slicing the `value_offsets` buffer. When receiving a Rust Arrow Array in C++ (via the C data interface), its IPC serialization fails to notice that the `value_offsets` buffer needed to be updated, but it still updates the `values` buffer. This leads to a corrupt array on deserialization, with an `value_offsets` buffer that points past the end of the values array. This PR fixes the IPC serialization by also looking at value_offset(0) to determine whether the `value_offsets` buffer needs reconstructing, instead of only looking at offset(). This works because value_offset(int) is the offets buffer, shifted by the top-level offset. We still need to check for offset(), to account for array starting with an empty list (multiple zeroes at the start of the offsets buffer). ### What changes are included in this PR? The fix and nothing else ### Are these changes tested? Yes ### Are there any user-facing changes? No (well, unless they are affected by the bug) **This PR contains a "Critical Fix".** (the changes fix (b) a bug that caused incorrect or invalid data to be produced) : valid operations on valid data produce invalid data. * GitHub Issue: #46407 Lead-authored-by: Bruno Cauet <[email protected]> Co-authored-by: Bruno Cauet <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
1 parent 94e3b3e commit 0e85c12

File tree

5 files changed

+76
-21
lines changed

5 files changed

+76
-21
lines changed

cpp/src/arrow/ipc/feather_test.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,24 @@ TEST_P(TestFeather, SliceBooleanRoundTrip) {
319319
CheckSlices(batch);
320320
}
321321

322+
TEST_P(TestFeather, SliceListRoundTrip) {
323+
if (GetParam().version == kFeatherV1Version) {
324+
GTEST_SKIP() << "Feather V1 does not support list types";
325+
}
326+
std::shared_ptr<RecordBatch> batch;
327+
ASSERT_OK(ipc::test::MakeListRecordBatchSized(600, &batch));
328+
CheckSlices(batch);
329+
}
330+
331+
TEST_P(TestFeather, SliceListViewRoundTrip) {
332+
if (GetParam().version == kFeatherV1Version) {
333+
GTEST_SKIP() << "Feather V1 does not support list view types";
334+
}
335+
std::shared_ptr<RecordBatch> batch;
336+
ASSERT_OK(ipc::test::MakeListViewRecordBatchSized(600, &batch));
337+
CheckSlices(batch);
338+
}
339+
322340
INSTANTIATE_TEST_SUITE_P(
323341
FeatherTests, TestFeather,
324342
::testing::Values(TestParam(kFeatherV1Version), TestParam(kFeatherV2Version),

cpp/src/arrow/ipc/read_write_test.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,29 @@ TEST_F(TestIpcRoundTrip, SpecificMetadataVersion) {
579579
TestMetadataVersion(MetadataVersion::V5);
580580
}
581581

582+
TEST_F(TestIpcRoundTrip, ListWithSlicedValues) {
583+
// This tests serialization of a sliced ListArray that got sliced "the Rust
584+
// way": by slicing the value_offsets buffer, but keeping top-level offset at
585+
// 0.
586+
auto child_data = ArrayFromJSON(int32(), "[1, 2, 3, 4, 5]")->data();
587+
588+
// Offsets buffer [2, 5]
589+
TypedBufferBuilder<int32_t> offsets_builder;
590+
ASSERT_OK(offsets_builder.Reserve(2));
591+
ASSERT_OK(offsets_builder.Append(2));
592+
ASSERT_OK(offsets_builder.Append(5));
593+
ASSERT_OK_AND_ASSIGN(auto offsets_buffer, offsets_builder.Finish());
594+
595+
auto list_data = ArrayData::Make(list(int32()),
596+
/*num_rows=*/1,
597+
/*buffers=*/{nullptr, offsets_buffer});
598+
list_data->child_data = {child_data};
599+
std::shared_ptr<Array> list_array = MakeArray(list_data);
600+
ASSERT_OK(list_array->ValidateFull());
601+
602+
CheckRoundtrip(list_array);
603+
}
604+
582605
TEST(TestReadMessage, CorruptedSmallInput) {
583606
std::string data = "abc";
584607
auto reader = io::BufferReader::FromString(data);

cpp/src/arrow/ipc/test_common.cc

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ Status MakeNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
421421
return Status::OK();
422422
}
423423

424-
Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
424+
Status MakeListRecordBatchSized(const int length, std::shared_ptr<RecordBatch>* out) {
425425
// Make the schema
426426
auto f0 = field("f0", list(int32()));
427427
auto f1 = field("f1", list(list(int32())));
@@ -431,7 +431,6 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
431431
// Example data
432432

433433
MemoryPool* pool = default_memory_pool();
434-
const int length = 200;
435434
std::shared_ptr<Array> leaf_values, list_array, list_list_array, large_list_array;
436435
const bool include_nulls = true;
437436
RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values));
@@ -446,7 +445,11 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
446445
return Status::OK();
447446
}
448447

449-
Status MakeListViewRecordBatch(std::shared_ptr<RecordBatch>* out) {
448+
Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
449+
return MakeListRecordBatchSized(200, out);
450+
}
451+
452+
Status MakeListViewRecordBatchSized(const int length, std::shared_ptr<RecordBatch>* out) {
450453
// Make the schema
451454
auto f0 = field("f0", list_view(int32()));
452455
auto f1 = field("f1", list_view(list_view(int32())));
@@ -456,7 +459,6 @@ Status MakeListViewRecordBatch(std::shared_ptr<RecordBatch>* out) {
456459
// Example data
457460

458461
MemoryPool* pool = default_memory_pool();
459-
const int length = 200;
460462
std::shared_ptr<Array> leaf_values, list_array, list_list_array, large_list_array;
461463
const bool include_nulls = true;
462464
RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values));
@@ -471,6 +473,10 @@ Status MakeListViewRecordBatch(std::shared_ptr<RecordBatch>* out) {
471473
return Status::OK();
472474
}
473475

476+
Status MakeListViewRecordBatch(std::shared_ptr<RecordBatch>* out) {
477+
return MakeListRecordBatchSized(200, out);
478+
}
479+
474480
Status MakeFixedSizeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
475481
// Make the schema
476482
auto f0 = field("f0", fixed_size_list(int32(), 1));

cpp/src/arrow/ipc/test_common.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,15 @@ Status MakeStringTypesRecordBatchWithNulls(std::shared_ptr<RecordBatch>* out);
104104
ARROW_TESTING_EXPORT
105105
Status MakeNullRecordBatch(std::shared_ptr<RecordBatch>* out);
106106

107+
ARROW_TESTING_EXPORT
108+
Status MakeListRecordBatchSized(int length, std::shared_ptr<RecordBatch>* out);
109+
107110
ARROW_TESTING_EXPORT
108111
Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out);
109112

113+
ARROW_TESTING_EXPORT
114+
Status MakeListViewRecordBatchSized(int length, std::shared_ptr<RecordBatch>* out);
115+
110116
ARROW_TESTING_EXPORT
111117
Status MakeListViewRecordBatch(std::shared_ptr<RecordBatch>* out);
112118

cpp/src/arrow/ipc/writer.cc

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -324,34 +324,36 @@ class RecordBatchSerializer {
324324
// Share slicing logic between ListArray, BinaryArray and LargeBinaryArray
325325
using offset_type = typename ArrayType::offset_type;
326326

327-
auto offsets = array.value_offsets();
327+
if (array.length() == 0) {
328+
*value_offsets = array.value_offsets();
329+
return Status::OK();
330+
}
328331

329332
int64_t required_bytes = sizeof(offset_type) * (array.length() + 1);
330-
if (array.offset() != 0) {
331-
// If we have a non-zero offset, then the value offsets do not start at
332-
// zero. We must a) create a new offsets array with shifted offsets and
333-
// b) slice the values array accordingly
334-
333+
if (array.value_offset(0) > 0) {
334+
// If the offset of the first value is non-zero, then we must create a new
335+
// offsets buffer with shifted offsets.
335336
ARROW_ASSIGN_OR_RAISE(auto shifted_offsets,
336337
AllocateBuffer(required_bytes, options_.memory_pool));
337338

338339
auto dest_offsets = shifted_offsets->mutable_span_as<offset_type>();
339-
const offset_type start_offset = array.value_offset(0);
340+
const offset_type* source_offsets = array.raw_value_offsets();
341+
const offset_type start_offset = source_offsets[0];
340342

341-
for (int i = 0; i < array.length(); ++i) {
342-
dest_offsets[i] = array.value_offset(i) - start_offset;
343+
for (int i = 0; i <= array.length(); ++i) {
344+
dest_offsets[i] = source_offsets[i] - start_offset;
343345
}
344-
// Final offset
345-
dest_offsets[array.length()] = array.value_offset(array.length()) - start_offset;
346-
offsets = std::move(shifted_offsets);
346+
*value_offsets = std::move(shifted_offsets);
347347
} else {
348-
// ARROW-6046: Slice offsets to used extent, in case we have a truncated
349-
// slice
350-
if (offsets != nullptr && offsets->size() > required_bytes) {
351-
offsets = SliceBuffer(offsets, 0, required_bytes);
348+
// ARROW-6046: if we have a truncated slice with unused leading or
349+
// trailing data, then we slice it.
350+
if (array.offset() > 0 || array.value_offsets()->size() > required_bytes) {
351+
*value_offsets = SliceBuffer(
352+
array.value_offsets(), array.offset() * sizeof(offset_type), required_bytes);
353+
} else {
354+
*value_offsets = array.value_offsets();
352355
}
353356
}
354-
*value_offsets = std::move(offsets);
355357
return Status::OK();
356358
}
357359

0 commit comments

Comments
 (0)