@@ -94,23 +94,14 @@ class FileScanTaskTest : public TempFileTestBase {
9494 // Helper method to verify the content of the next batch from an ArrowArrayStream.
9595 void VerifyStreamNextBatch (struct ArrowArrayStream * stream,
9696 std::string_view expected_json) {
97- ASSERT_NE (stream-> get_schema , nullptr ) << " Stream has been released or is invalid. " ;
97+ auto record_batch_reader = :: arrow::ImportRecordBatchReader (stream). ValueOrDie () ;
9898
99- ArrowSchema c_schema;
100- ASSERT_EQ (stream->get_schema (stream, &c_schema), 0 );
101- auto import_schema_result = ::arrow::ImportSchema (&c_schema);
102- ASSERT_TRUE (import_schema_result.ok ()) << import_schema_result.status ().message ();
103- auto arrow_schema = import_schema_result.ValueOrDie ();
104-
105- ArrowArray c_array;
106- ASSERT_EQ (stream->get_next (stream, &c_array), 0 )
107- << " get_next failed. Error: " << stream->get_last_error (stream);
108- ASSERT_NE (c_array.release , nullptr ) << " Stream is exhausted but expected more data." ;
109-
110- auto import_batch_result = ::arrow::ImportRecordBatch (&c_array, arrow_schema);
111- ASSERT_TRUE (import_batch_result.ok ()) << import_batch_result.status ().message ();
112- auto actual_batch = import_batch_result.ValueOrDie ();
99+ auto result = record_batch_reader->Next ();
100+ ASSERT_TRUE (result.ok ()) << result.status ().message ();
101+ auto actual_batch = result.ValueOrDie ();
102+ ASSERT_NE (actual_batch, nullptr ) << " Stream is exhausted but expected more data." ;
113103
104+ auto arrow_schema = actual_batch->schema ();
114105 auto struct_type = ::arrow::struct_ (arrow_schema->fields ());
115106 auto expected_array =
116107 ::arrow::json::ArrayFromJSONString (struct_type, expected_json).ValueOrDie();
@@ -125,10 +116,10 @@ class FileScanTaskTest : public TempFileTestBase {
125116
126117 // Helper method to verify that an ArrowArrayStream is exhausted.
127118 void VerifyStreamExhausted (struct ArrowArrayStream * stream) {
128- ASSERT_NE (stream-> get_next , nullptr ) << " Stream has been released or is invalid. " ;
129- ArrowArray c_array ;
130- ASSERT_EQ (stream-> get_next (stream, &c_array), 0 );
131- ASSERT_EQ (c_array. release , nullptr ) << " Stream was not exhausted as expected." ;
119+ auto record_batch_reader = :: arrow::ImportRecordBatchReader (stream). ValueOrDie () ;
120+ auto result = record_batch_reader-> Next () ;
121+ ASSERT_TRUE (result. ok ()) << result. status (). message ( );
122+ ASSERT_EQ (result. ValueOrDie () , nullptr ) << " Reader was not exhausted as expected." ;
132123 }
133124
134125 std::shared_ptr<FileIO> file_io_;
@@ -146,18 +137,12 @@ TEST_F(FileScanTaskTest, ReadFullSchema) {
146137
147138 FileScanTask task (data_file);
148139
149- auto stream_result = task.ToArrow (projected_schema, nullptr , file_io_ );
140+ auto stream_result = task.ToArrow (file_io_, projected_schema, nullptr );
150141 ASSERT_THAT (stream_result, IsOk ());
151142 auto stream = std::move (stream_result.value ());
152143
153144 ASSERT_NO_FATAL_FAILURE (
154145 VerifyStreamNextBatch (&stream, R"( [[1, "Foo"], [2, "Bar"], [3, "Baz"]])" ));
155- ASSERT_NO_FATAL_FAILURE (VerifyStreamExhausted (&stream));
156-
157- ASSERT_NE (stream.release , nullptr );
158- stream.release (&stream);
159- ASSERT_EQ (stream.release , nullptr );
160- ASSERT_EQ (stream.private_data , nullptr );
161146}
162147
163148TEST_F (FileScanTaskTest, ReadProjectedAndReorderedSchema) {
@@ -171,15 +156,12 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) {
171156
172157 FileScanTask task (data_file);
173158
174- auto stream_result = task.ToArrow (projected_schema, nullptr , file_io_ );
159+ auto stream_result = task.ToArrow (file_io_, projected_schema, nullptr );
175160 ASSERT_THAT (stream_result, IsOk ());
176161 auto stream = std::move (stream_result.value ());
177162
178163 ASSERT_NO_FATAL_FAILURE (
179164 VerifyStreamNextBatch (&stream, R"( [["Foo", null], ["Bar", null], ["Baz", null]])" ));
180- ASSERT_NO_FATAL_FAILURE (VerifyStreamExhausted (&stream));
181-
182- stream.release (&stream);
183165}
184166
185167TEST_F (FileScanTaskTest, ReadEmptyFile) {
@@ -193,14 +175,12 @@ TEST_F(FileScanTaskTest, ReadEmptyFile) {
193175
194176 FileScanTask task (data_file);
195177
196- auto stream_result = task.ToArrow (projected_schema, nullptr , file_io_ );
178+ auto stream_result = task.ToArrow (file_io_, projected_schema, nullptr );
197179 ASSERT_THAT (stream_result, IsOk ());
198180 auto stream = std::move (stream_result.value ());
199181
200182 // The stream should be immediately exhausted
201183 ASSERT_NO_FATAL_FAILURE (VerifyStreamExhausted (&stream));
202-
203- stream.release (&stream);
204184}
205185
206186} // namespace iceberg
0 commit comments