Skip to content

Commit 9ee2f5e

Browse files
committed
apacheGH-48844: [C++][IPC] Turn disabled bodyLength assertions into error checks
1 parent d54a205 commit 9ee2f5e

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

cpp/src/arrow/ipc/message.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,8 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
375375
decoder.next_required_size());
376376
}
377377

378+
// TODO(GH-48846): we should take a body_length just like ReadMessageAsync
379+
// and read metadata + body in one go.
378380
ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length));
379381
if (metadata->size() < metadata_length) {
380382
return Status::Invalid("Expected to read ", metadata_length,

cpp/src/arrow/ipc/reader.cc

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,31 +1180,36 @@ Status CheckAligned(const FileBlock& block) {
11801180
return Status::OK();
11811181
}
11821182

1183+
template <typename MessagePtr>
1184+
Result<MessagePtr> CheckBodyLength(MessagePtr message, const FileBlock& block) {
1185+
if (message->body_length() != block.body_length) {
1186+
return Status::Invalid(
1187+
"Mismatching body length for IPC message "
1188+
"(Block.bodyLength: ",
1189+
block.body_length, " vs. Message.bodyLength: ", message->body_length(), ")");
1190+
}
1191+
// NOTE: we cannot check metadata length as easily as we would have to account
1192+
// for the additional IPC signalisation (such as optional continuation bytes).
1193+
return message;
1194+
}
1195+
11831196
Result<std::unique_ptr<Message>> ReadMessageFromBlock(
11841197
const FileBlock& block, io::RandomAccessFile* file,
11851198
const FieldsLoaderFunction& fields_loader) {
11861199
RETURN_NOT_OK(CheckAligned(block));
1187-
// TODO(wesm): this breaks integration tests, see ARROW-3256
1188-
// DCHECK_EQ((*out)->body_length(), block.body_length);
1189-
11901200
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
11911201
file, fields_loader));
1192-
return message;
1202+
return CheckBodyLength(std::move(message), block);
11931203
}
11941204

11951205
Future<std::shared_ptr<Message>> ReadMessageFromBlockAsync(
11961206
const FileBlock& block, io::RandomAccessFile* file, const io::IOContext& io_context) {
1197-
if (!bit_util::IsMultipleOf8(block.offset) ||
1198-
!bit_util::IsMultipleOf8(block.metadata_length) ||
1199-
!bit_util::IsMultipleOf8(block.body_length)) {
1200-
return Status::Invalid("Unaligned block in IPC file");
1201-
}
1202-
1203-
// TODO(wesm): this breaks integration tests, see ARROW-3256
1204-
// DCHECK_EQ((*out)->body_length(), block.body_length);
1205-
1207+
RETURN_NOT_OK(CheckAligned(block));
12061208
return ReadMessageAsync(block.offset, block.metadata_length, block.body_length, file,
1207-
io_context);
1209+
io_context)
1210+
.Then([block](std::shared_ptr<Message> message) {
1211+
return CheckBodyLength(std::move(message), block);
1212+
});
12081213
}
12091214

12101215
class RecordBatchFileReaderImpl;

0 commit comments

Comments
 (0)