@@ -1252,40 +1252,56 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
12521252 Status ReadBatches (const IpcReadOptions& options, RecordBatchVector* out_batches,
12531253 ReadStats* out_stats = nullptr ,
12541254 MetadataVector* out_metadata_list = nullptr ) override {
1255- std::shared_ptr<io::RandomAccessFile> buf_reader;
1256- if (kCoalesce ) {
1257- // Use a non-zero-copy enabled BufferReader so we can test paths properly
1258- buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
1259- } else {
1260- buf_reader = std::make_shared<io::BufferReader>(buffer_);
1261- }
1262- AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
1255+ // The generator doesn't track stats.
1256+ EXPECT_EQ (nullptr , out_stats);
12631257
1264- {
1265- auto fut = RecordBatchFileReader::OpenAsync (buf_reader, footer_offset_, options);
1266- // Do NOT assert OK since some tests check whether this fails properly
1267- EXPECT_FINISHES (fut);
1268- ARROW_ASSIGN_OR_RAISE (auto reader, fut.result ());
1269- EXPECT_EQ (num_batches_written_, reader->num_record_batches ());
1270- // Generator will keep reader alive internally
1271- ARROW_ASSIGN_OR_RAISE (generator, reader->GetRecordBatchGenerator (kCoalesce ));
1272- }
1258+ auto read_batches = [&](bool pre_buffer) -> Result<RecordBatchVector> {
1259+ std::shared_ptr<io::RandomAccessFile> buf_reader;
1260+ if (kCoalesce ) {
1261+ // Use a non-zero-copy enabled BufferReader so we can test paths properly
1262+ buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
1263+ } else {
1264+ buf_reader = std::make_shared<io::BufferReader>(buffer_);
1265+ }
1266+ AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
1267+
1268+ {
1269+ auto fut = RecordBatchFileReader::OpenAsync (buf_reader, footer_offset_, options);
1270+ // Do NOT assert OK since some tests check whether this fails properly
1271+ EXPECT_FINISHES (fut);
1272+ ARROW_ASSIGN_OR_RAISE (auto reader, fut.result ());
1273+ EXPECT_EQ (num_batches_written_, reader->num_record_batches ());
1274+ if (pre_buffer) {
1275+ RETURN_NOT_OK (reader->PreBufferMetadata (/* indices=*/ {}));
1276+ }
1277+ // Generator will keep reader alive internally
1278+ ARROW_ASSIGN_OR_RAISE (generator, reader->GetRecordBatchGenerator (kCoalesce ));
1279+ }
12731280
1274- // Generator is async-reentrant
1275- std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
1281+ // Generator is async-reentrant
1282+ std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
1283+ for (int i = 0 ; i < num_batches_written_; ++i) {
1284+ futures.push_back (generator ());
1285+ }
1286+ auto fut = generator ();
1287+ ARROW_ASSIGN_OR_RAISE (auto final_batch, fut.result ());
1288+ EXPECT_EQ (nullptr , final_batch);
1289+
1290+ RecordBatchVector batches;
1291+ for (auto & future : futures) {
1292+ ARROW_ASSIGN_OR_RAISE (auto batch, future.result ());
1293+ EXPECT_NE (nullptr , batch);
1294+ batches.push_back (batch);
1295+ }
1296+ return batches;
1297+ };
1298+
1299+ ARROW_ASSIGN_OR_RAISE (*out_batches, read_batches (/* pre_buffer=*/ false ));
1300+ // Also read with pre-buffered metadata, and check the results are equal
1301+ ARROW_ASSIGN_OR_RAISE (auto batches_pre_buffered, read_batches (/* pre_buffer=*/ true ));
12761302 for (int i = 0 ; i < num_batches_written_; ++i) {
1277- futures.push_back (generator ());
1278- }
1279- auto fut = generator ();
1280- EXPECT_FINISHES_OK_AND_EQ (nullptr , fut);
1281- for (auto & future : futures) {
1282- EXPECT_FINISHES_OK_AND_ASSIGN (auto batch, future);
1283- out_batches->push_back (batch);
1303+ AssertBatchesEqual (*batches_pre_buffered[i], *(*out_batches)[i], /* check_metadata=*/ true );
12841304 }
1285-
1286- // The generator doesn't track stats.
1287- EXPECT_EQ (nullptr , out_stats);
1288-
12891305 return Status::OK ();
12901306 }
12911307};
0 commit comments