Skip to content

Commit 7ebc88c

Browse files
koupitrou
andauthored
GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder (#36344)
### Rationale for this change Because they (pull-based and push-based) must have the same behavior. ### What changes are included in this PR? This PR extracts reusable codes to StreamDecoderInternal from StreamDecoderImpl. External API isn't changed for RecordBatchStreamReader and StreamDecoder. This PR adds some external API to implement this: * arrow::Status::ToStringWithoutContextLines(): This is only for testing. We can get stable result of ASSERT_RAISES_WITH_MESSAGE() with/without -DARROW_EXTRA_ERROR_CONTEXT=ON by this. We can extract this and related changes to separated PR if we want. * arrow::ipc::Listener::OnRecordBatchWithMetadataDecoded(): Because RecordBatchStreamReader wants not only RecordBatch but also custom metadata. OnRecordBatchWithMetadataDecoded() receives RecordBatchWithMetadata. OnRecordBatchDecoded() still exists and it's used by default for backward compatibility. * arrow::ipc::CollectListener::metadatas(), arrow::ipc::CollectListener::num_record_batches(), arrow::ipc::CollectListener::PopRecordBatch(), arrow::ipc::CollectListener::PopRecordBatchWithMetadat(): If we add these APIs, we can use CollectListner in RecordBatchStreamReader. We can create an internal listener only for RecordBatchStreamReader if don't want to extend CollectListener. ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes. **This PR includes breaking changes to public APIs.** `arrow::ipc::CollectListener::record_batches()` returns `const std::vector<std::shared_ptr<RecordBatch>>&` instead of `std::vector<std::shared_ptr<RecordBatch>>`. * Closes: #26153 Lead-authored-by: Sutou Kouhei <[email protected]> Co-authored-by: Sutou Kouhei <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Sutou Kouhei <[email protected]>
1 parent a2ccd46 commit 7ebc88c

File tree

6 files changed

+262
-247
lines changed

6 files changed

+262
-247
lines changed

cpp/src/arrow/ipc/read_write_test.cc

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2082,29 +2082,28 @@ TEST(TestRecordBatchStreamReader, NotEnoughDictionaries) {
20822082
// error
20832083
ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());
20842084

2085-
auto AssertFailsWith = [](std::shared_ptr<Buffer> stream, const std::string& ex_error) {
2085+
auto Read = [](std::shared_ptr<Buffer> stream) -> Status {
20862086
io::BufferReader reader(stream);
2087-
ASSERT_OK_AND_ASSIGN(auto ipc_reader, RecordBatchStreamReader::Open(&reader));
2087+
ARROW_ASSIGN_OR_RAISE(auto ipc_reader, RecordBatchStreamReader::Open(&reader));
20882088
std::shared_ptr<RecordBatch> batch;
2089-
Status s = ipc_reader->ReadNext(&batch);
2090-
ASSERT_TRUE(s.IsInvalid());
2091-
ASSERT_EQ(ex_error, s.message().substr(0, ex_error.size()));
2089+
return ipc_reader->ReadNext(&batch);
20922090
};
20932091

20942092
// Stream terminates before reading all dictionaries
20952093
std::shared_ptr<Buffer> truncated_stream;
20962094
SpliceMessages(buffer, {0, 1}, &truncated_stream);
2097-
std::string ex_message =
2098-
("IPC stream ended without reading the expected number (3)"
2099-
" of dictionaries");
2100-
AssertFailsWith(truncated_stream, ex_message);
2095+
ASSERT_RAISES_WITH_MESSAGE(Invalid,
2096+
"Invalid: IPC stream ended without "
2097+
"reading the expected number (3) of dictionaries",
2098+
Read(truncated_stream));
21012099

21022100
// One of the dictionaries is missing, then we see a record batch
21032101
SpliceMessages(buffer, {0, 1, 2, 4}, &truncated_stream);
2104-
ex_message =
2105-
("IPC stream did not have the expected number (3) of dictionaries "
2106-
"at the start of the stream");
2107-
AssertFailsWith(truncated_stream, ex_message);
2102+
ASSERT_RAISES_WITH_MESSAGE(Invalid,
2103+
"Invalid: IPC stream did not have "
2104+
"the expected number (3) of dictionaries "
2105+
"at the start of the stream",
2106+
Read(truncated_stream));
21082107
}
21092108

21102109
TEST(TestRecordBatchStreamReader, MalformedInput) {

0 commit comments

Comments
 (0)