Skip to content

Commit 2a29297

Browse files
committed
GH-48844: [C++][IPC] Turn disabled bodyLength assertions into error checks
1 parent d54a205 commit 2a29297

File tree

2 files changed

+19
-14
lines changed

2 files changed

+19
-14
lines changed

cpp/src/arrow/ipc/message.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,8 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
375375
decoder.next_required_size());
376376
}
377377

378+
// TODO(GH-48846): we should take a body_length just like ReadMessageAsync
379+
// and read metadata + body in one go.
378380
ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length));
379381
if (metadata->size() < metadata_length) {
380382
return Status::Invalid("Expected to read ", metadata_length,

cpp/src/arrow/ipc/reader.cc

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,31 +1180,34 @@ Status CheckAligned(const FileBlock& block) {
11801180
return Status::OK();
11811181
}
11821182

1183+
template <typename MessagePtr>
1184+
Result<MessagePtr> CheckBodyLength(MessagePtr message, int64_t body_length) {
1185+
if (message->body_length() != body_length) {
1186+
return Status::Invalid(
1187+
"Mismatching body length for IPC message "
1188+
"(Block.bodyLength: ",
1189+
body_length, " vs. Message.bodyLength: ", message->body_length(), ")");
1190+
}
1191+
return message;
1192+
}
1193+
11831194
Result<std::unique_ptr<Message>> ReadMessageFromBlock(
11841195
const FileBlock& block, io::RandomAccessFile* file,
11851196
const FieldsLoaderFunction& fields_loader) {
11861197
RETURN_NOT_OK(CheckAligned(block));
1187-
// TODO(wesm): this breaks integration tests, see ARROW-3256
1188-
// DCHECK_EQ((*out)->body_length(), block.body_length);
1189-
11901198
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
11911199
file, fields_loader));
1192-
return message;
1200+
return CheckBodyLength(std::move(message), block.body_length);
11931201
}
11941202

11951203
Future<std::shared_ptr<Message>> ReadMessageFromBlockAsync(
11961204
const FileBlock& block, io::RandomAccessFile* file, const io::IOContext& io_context) {
1197-
if (!bit_util::IsMultipleOf8(block.offset) ||
1198-
!bit_util::IsMultipleOf8(block.metadata_length) ||
1199-
!bit_util::IsMultipleOf8(block.body_length)) {
1200-
return Status::Invalid("Unaligned block in IPC file");
1201-
}
1202-
1203-
// TODO(wesm): this breaks integration tests, see ARROW-3256
1204-
// DCHECK_EQ((*out)->body_length(), block.body_length);
1205-
1205+
RETURN_NOT_OK(CheckAligned(block));
12061206
return ReadMessageAsync(block.offset, block.metadata_length, block.body_length, file,
1207-
io_context);
1207+
io_context)
1208+
.Then([block](std::shared_ptr<Message> message) {
1209+
return CheckBodyLength(message, block.body_length);
1210+
});
12081211
}
12091212

12101213
class RecordBatchFileReaderImpl;

0 commit comments

Comments
 (0)