Skip to content

Commit a5ebaf9

Browse files
committed
completed with test
1 parent b8435b0 commit a5ebaf9

File tree

4 files changed

+218
-30
lines changed

4 files changed

+218
-30
lines changed

src/iceberg/table_scan.cc

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
#include <cstring>
2323
#include <vector>
2424

25-
#include <iceberg/result.h>
26-
2725
#include "iceberg/arrow_c_data.h"
2826
#include "iceberg/file_reader.h"
2927
#include "iceberg/manifest_entry.h"
@@ -48,7 +46,7 @@ struct ReaderStreamPrivateData {
4846

4947
~ReaderStreamPrivateData() {
5048
if (reader) {
51-
reader->Close();
49+
std::ignore = reader->Close();
5250
}
5351
}
5452
};
@@ -303,28 +301,4 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
303301
return tasks;
304302
}
305303

306-
Result<std::vector<ArrowArrayStream>> DataTableScan::ToArrow() const {
307-
Result<std::vector<std::shared_ptr<FileScanTask>>> tasks_result = PlanFiles();
308-
if (!tasks_result.has_value()) {
309-
return InvalidArgument("Failed to plan files: {}", tasks_result.error().message);
310-
}
311-
auto tasks = tasks_result.value();
312-
if (tasks.empty()) {
313-
// TODO(Li Feiyang): return a empty arrow stream
314-
return NotImplemented("No files to scan");
315-
}
316-
317-
std::vector<ArrowArrayStream> arrow_streams;
318-
for (const auto& task : tasks_result.value()) {
319-
Result<ArrowArrayStream> arrow_stream_result =
320-
task->ToArrow(context_.projected_schema, context_.filter, file_io_);
321-
if (!arrow_stream_result.has_value()) {
322-
return InvalidArgument("Failed to get arrow stream: {}",
323-
arrow_stream_result.error().message);
324-
}
325-
arrow_streams.push_back(arrow_stream_result.value());
326-
}
327-
return std::move(arrow_streams);
328-
}
329-
330304
} // namespace iceberg

src/iceberg/table_scan.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,15 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
5555
int32_t files_count() const override;
5656
int64_t estimated_row_count() const override;
5757

58+
/**
59+
* \brief Returns a C-ABI compatible ArrowArrayStream to read the data for this task.
60+
*
61+
* \param projected_schema The projected schema for reading the data.
62+
* \param filter Optional filter expression to apply during reading.
63+
* \param io The FileIO instance for accessing the file data.
64+
* \return A Result containing an ArrowArrayStream, or an error on failure.
65+
*/
5866
Result<ArrowArrayStream> ToArrow(const std::shared_ptr<Schema>& projected_schema,
59-
6067
const std::shared_ptr<Expression>& filter,
6168
const std::shared_ptr<FileIO>& io) const;
6269

@@ -189,8 +196,6 @@ class ICEBERG_EXPORT DataTableScan : public TableScan {
189196
/// \brief Plans the scan tasks by resolving manifests and data files.
190197
/// \return A Result containing scan tasks or an error.
191198
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const override;
192-
193-
Result<std::vector<ArrowArrayStream>> ToArrow() const;
194199
};
195200

196201
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,7 @@ if(ICEBERG_BUILD_BUNDLE)
124124
parquet_data_test.cc
125125
parquet_schema_test.cc
126126
parquet_test.cc)
127+
128+
add_iceberg_test(scan_test USE_BUNDLE SOURCES file_scan_task_test.cc)
129+
127130
endif()

test/file_scan_task_test.cc

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <arrow/array.h>
21+
#include <arrow/c/bridge.h>
22+
#include <arrow/json/from_string.h>
23+
#include <arrow/record_batch.h>
24+
#include <arrow/table.h>
25+
#include <arrow/util/key_value_metadata.h>
26+
#include <parquet/arrow/reader.h>
27+
#include <parquet/arrow/writer.h>
28+
#include <parquet/metadata.h>
29+
30+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
31+
#include "iceberg/file_format.h"
32+
#include "iceberg/manifest_entry.h"
33+
#include "iceberg/parquet/parquet_register.h"
34+
#include "iceberg/schema.h"
35+
#include "iceberg/table_scan.h"
36+
#include "iceberg/type.h"
37+
#include "iceberg/util/checked_cast.h"
38+
#include "matchers.h"
39+
#include "temp_file_test_base.h"
40+
41+
namespace iceberg {
42+
43+
class FileScanTaskTest : public TempFileTestBase {
44+
protected:
45+
static void SetUpTestSuite() { parquet::RegisterAll(); }
46+
47+
void SetUp() override {
48+
TempFileTestBase::SetUp();
49+
file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO();
50+
temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet");
51+
CreateSimpleParquetFile();
52+
}
53+
54+
// Helper method to create a Parquet file with sample data.
55+
void CreateSimpleParquetFile(int64_t chunk_size = 1024) {
56+
const std::string kParquetFieldIdKey = "PARQUET:field_id";
57+
auto arrow_schema = ::arrow::schema(
58+
{::arrow::field("id", ::arrow::int32(), /*nullable=*/false,
59+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"})),
60+
::arrow::field("name", ::arrow::utf8(), /*nullable=*/true,
61+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"2"}))});
62+
auto table = ::arrow::Table::FromRecordBatches(
63+
arrow_schema, {::arrow::RecordBatch::FromStructArray(
64+
::arrow::json::ArrayFromJSONString(
65+
::arrow::struct_(arrow_schema->fields()),
66+
R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")
67+
.ValueOrDie())
68+
.ValueOrDie()})
69+
.ValueOrDie();
70+
71+
auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
72+
auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie();
73+
74+
ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(),
75+
outfile, chunk_size)
76+
.ok());
77+
}
78+
79+
// Helper to create a valid but empty Parquet file.
80+
void CreateEmptyParquetFile() {
81+
const std::string kParquetFieldIdKey = "PARQUET:field_id";
82+
auto arrow_schema = ::arrow::schema(
83+
{::arrow::field("id", ::arrow::int32(), /*nullable=*/false,
84+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"}))});
85+
auto empty_table = ::arrow::Table::FromRecordBatches(arrow_schema, {}).ValueOrDie();
86+
87+
auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
88+
auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie();
89+
ASSERT_TRUE(::parquet::arrow::WriteTable(*empty_table, ::arrow::default_memory_pool(),
90+
outfile, 1024)
91+
.ok());
92+
}
93+
94+
// Helper method to verify the content of the next batch from an ArrowArrayStream.
95+
void VerifyStreamNextBatch(struct ArrowArrayStream* stream,
96+
std::string_view expected_json) {
97+
ASSERT_NE(stream->get_schema, nullptr) << "Stream has been released or is invalid.";
98+
99+
ArrowSchema c_schema;
100+
ASSERT_EQ(stream->get_schema(stream, &c_schema), 0);
101+
auto import_schema_result = ::arrow::ImportSchema(&c_schema);
102+
ASSERT_TRUE(import_schema_result.ok()) << import_schema_result.status().message();
103+
auto arrow_schema = import_schema_result.ValueOrDie();
104+
105+
ArrowArray c_array;
106+
ASSERT_EQ(stream->get_next(stream, &c_array), 0)
107+
<< "get_next failed. Error: " << stream->get_last_error(stream);
108+
ASSERT_NE(c_array.release, nullptr) << "Stream is exhausted but expected more data.";
109+
110+
auto import_batch_result = ::arrow::ImportRecordBatch(&c_array, arrow_schema);
111+
ASSERT_TRUE(import_batch_result.ok()) << import_batch_result.status().message();
112+
auto actual_batch = import_batch_result.ValueOrDie();
113+
114+
auto struct_type = ::arrow::struct_(arrow_schema->fields());
115+
auto expected_array =
116+
::arrow::json::ArrayFromJSONString(struct_type, expected_json).ValueOrDie();
117+
auto expected_batch =
118+
::arrow::RecordBatch::FromStructArray(expected_array).ValueOrDie();
119+
120+
ASSERT_TRUE(actual_batch->Equals(*expected_batch))
121+
<< "Actual batch:\n"
122+
<< actual_batch->ToString() << "\nExpected batch:\n"
123+
<< expected_batch->ToString();
124+
}
125+
126+
// Helper method to verify that an ArrowArrayStream is exhausted.
127+
void VerifyStreamExhausted(struct ArrowArrayStream* stream) {
128+
ASSERT_NE(stream->get_next, nullptr) << "Stream has been released or is invalid.";
129+
ArrowArray c_array;
130+
ASSERT_EQ(stream->get_next(stream, &c_array), 0);
131+
ASSERT_EQ(c_array.release, nullptr) << "Stream was not exhausted as expected.";
132+
}
133+
134+
std::shared_ptr<FileIO> file_io_;
135+
std::string temp_parquet_file_;
136+
};
137+
138+
TEST_F(FileScanTaskTest, ReadFullSchema) {
139+
auto data_file = std::make_shared<DataFile>();
140+
data_file->file_path = temp_parquet_file_;
141+
data_file->file_format = FileFormatType::kParquet;
142+
143+
auto projected_schema = std::make_shared<Schema>(
144+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
145+
SchemaField::MakeOptional(2, "name", string())});
146+
147+
FileScanTask task(data_file);
148+
149+
auto stream_result = task.ToArrow(projected_schema, nullptr, file_io_);
150+
ASSERT_THAT(stream_result, IsOk());
151+
auto stream = std::move(stream_result.value());
152+
153+
ASSERT_NO_FATAL_FAILURE(
154+
VerifyStreamNextBatch(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])"));
155+
ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream));
156+
157+
ASSERT_NE(stream.release, nullptr);
158+
stream.release(&stream);
159+
ASSERT_EQ(stream.release, nullptr);
160+
ASSERT_EQ(stream.private_data, nullptr);
161+
}
162+
163+
TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) {
164+
auto data_file = std::make_shared<DataFile>();
165+
data_file->file_path = temp_parquet_file_;
166+
data_file->file_format = FileFormatType::kParquet;
167+
168+
auto projected_schema = std::make_shared<Schema>(
169+
std::vector<SchemaField>{SchemaField::MakeOptional(2, "name", string()),
170+
SchemaField::MakeOptional(3, "score", float64())});
171+
172+
FileScanTask task(data_file);
173+
174+
auto stream_result = task.ToArrow(projected_schema, nullptr, file_io_);
175+
ASSERT_THAT(stream_result, IsOk());
176+
auto stream = std::move(stream_result.value());
177+
178+
ASSERT_NO_FATAL_FAILURE(
179+
VerifyStreamNextBatch(&stream, R"([["Foo", null], ["Bar", null], ["Baz", null]])"));
180+
ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream));
181+
182+
stream.release(&stream);
183+
}
184+
185+
TEST_F(FileScanTaskTest, ReadEmptyFile) {
186+
CreateEmptyParquetFile();
187+
auto data_file = std::make_shared<DataFile>();
188+
data_file->file_path = temp_parquet_file_;
189+
data_file->file_format = FileFormatType::kParquet;
190+
191+
auto projected_schema = std::make_shared<Schema>(
192+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
193+
194+
FileScanTask task(data_file);
195+
196+
auto stream_result = task.ToArrow(projected_schema, nullptr, file_io_);
197+
ASSERT_THAT(stream_result, IsOk());
198+
auto stream = std::move(stream_result.value());
199+
200+
// The stream should be immediately exhausted
201+
ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream));
202+
203+
stream.release(&stream);
204+
}
205+
206+
} // namespace iceberg

0 commit comments

Comments
 (0)