Skip to content

Commit 27ac1ae

Browse files
committed
test: add parquet reader test
1 parent f2d0abd commit 27ac1ae

File tree

3 files changed

+221
-11
lines changed

3 files changed

+221
-11
lines changed

src/iceberg/parquet/parquet_reader.cc

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
#include "iceberg/parquet/parquet_reader.h"
2121

22-
#include <memory>
22+
#include <numeric>
2323

2424
#include <arrow/c/bridge.h>
2525
#include <arrow/memory_pool.h>
@@ -207,18 +207,21 @@ class ParquetReader::Impl {
207207
break;
208208
}
209209
}
210-
if (row_group_indices.empty()) {
211-
// None of the row groups are selected, return an empty record batch reader
212-
context_->record_batch_reader_ = std::make_unique<EmptyRecordBatchReader>();
213-
return {};
214-
}
210+
} else {
211+
row_group_indices.resize(reader_->parquet_reader()->metadata()->num_row_groups());
212+
std::iota(row_group_indices.begin(), row_group_indices.end(), 0); // NOLINT
215213
}
216214

217215
// Create the record batch reader
218-
auto column_indices = SelectedColumnIndices(projection_);
219-
ICEBERG_ARROW_ASSIGN_OR_RETURN(
220-
context_->record_batch_reader_,
221-
reader_->GetRecordBatchReader(row_group_indices, column_indices));
216+
if (row_group_indices.empty()) {
217+
// None of the row groups are selected, return an empty record batch reader
218+
context_->record_batch_reader_ = std::make_unique<EmptyRecordBatchReader>();
219+
} else {
220+
auto column_indices = SelectedColumnIndices(projection_);
221+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
222+
context_->record_batch_reader_,
223+
reader_->GetRecordBatchReader(row_group_indices, column_indices));
224+
}
222225

223226
return {};
224227
}

test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,5 +121,6 @@ if(ICEBERG_BUILD_BUNDLE)
121121
USE_BUNDLE
122122
SOURCES
123123
parquet_data_test.cc
124-
parquet_schema_test.cc)
124+
parquet_schema_test.cc
125+
parquet_test.cc)
125126
endif()

test/parquet_test.cc

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <arrow/array.h>
21+
#include <arrow/c/bridge.h>
22+
#include <arrow/json/from_string.h>
23+
#include <arrow/record_batch.h>
24+
#include <arrow/table.h>
25+
#include <arrow/util/key_value_metadata.h>
26+
#include <parquet/arrow/reader.h>
27+
#include <parquet/arrow/writer.h>
28+
#include <parquet/metadata.h>
29+
30+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
31+
#include "iceberg/parquet/parquet_reader.h"
32+
#include "iceberg/schema.h"
33+
#include "iceberg/type.h"
34+
#include "iceberg/util/checked_cast.h"
35+
#include "matchers.h"
36+
#include "temp_file_test_base.h"
37+
38+
namespace iceberg::parquet {
39+
40+
class ParquetReaderTest : public TempFileTestBase {
41+
protected:
42+
static void SetUpTestSuite() { ParquetReader::Register(); }
43+
44+
void SetUp() override {
45+
TempFileTestBase::SetUp();
46+
file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO();
47+
temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet");
48+
}
49+
50+
void CreateSimpleParquetFile() {
51+
const std::string kParquetFieldIdKey = "PARQUET:field_id";
52+
auto arrow_schema = ::arrow::schema(
53+
{::arrow::field("id", ::arrow::int32(), /*nullable=*/false,
54+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"})),
55+
::arrow::field("name", ::arrow::utf8(), /*nullable=*/true,
56+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"2"}))});
57+
auto table = ::arrow::Table::FromRecordBatches(
58+
arrow_schema, {::arrow::RecordBatch::FromStructArray(
59+
::arrow::json::ArrayFromJSONString(
60+
::arrow::struct_(arrow_schema->fields()),
61+
R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")
62+
.ValueOrDie())
63+
.ValueOrDie()})
64+
.ValueOrDie();
65+
66+
auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
67+
auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie();
68+
69+
ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(),
70+
outfile, /*chunk_size=*/2)
71+
.ok());
72+
}
73+
74+
void VerifyNextBatch(Reader& reader, std::string_view expected_json) {
75+
// Boilerplate to get Arrow schema
76+
auto schema_result = reader.Schema();
77+
ASSERT_THAT(schema_result, IsOk());
78+
auto arrow_c_schema = std::move(schema_result.value());
79+
auto import_schema_result = ::arrow::ImportType(&arrow_c_schema);
80+
auto arrow_schema = import_schema_result.ValueOrDie();
81+
82+
// Boilerplate to get Arrow array
83+
auto data = reader.Next();
84+
ASSERT_THAT(data, IsOk()) << "Reader.Next() failed: " << data.error().message;
85+
ASSERT_TRUE(data.value().has_value()) << "Reader.Next() returned no data";
86+
auto arrow_c_array = data.value().value();
87+
auto data_result = ::arrow::ImportArray(&arrow_c_array, arrow_schema);
88+
auto arrow_array = data_result.ValueOrDie();
89+
90+
// Verify data
91+
auto expected_array =
92+
::arrow::json::ArrayFromJSONString(arrow_schema, expected_json).ValueOrDie();
93+
ASSERT_TRUE(arrow_array->Equals(*expected_array));
94+
}
95+
96+
void VerifyExhausted(Reader& reader) {
97+
auto data = reader.Next();
98+
ASSERT_THAT(data, IsOk());
99+
ASSERT_FALSE(data.value().has_value());
100+
}
101+
102+
std::shared_ptr<FileIO> file_io_;
103+
std::string temp_parquet_file_;
104+
};
105+
106+
TEST_F(ParquetReaderTest, ReadTwoFields) {
107+
CreateSimpleParquetFile();
108+
109+
auto schema = std::make_shared<Schema>(
110+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
111+
SchemaField::MakeOptional(2, "name", string())});
112+
113+
auto reader_result = ReaderFactoryRegistry::Open(
114+
FileFormatType::kParquet,
115+
{.path = temp_parquet_file_, .io = file_io_, .projection = schema});
116+
ASSERT_THAT(reader_result, IsOk())
117+
<< "Failed to create reader: " << reader_result.error().message;
118+
auto reader = std::move(reader_result.value());
119+
120+
ASSERT_NO_FATAL_FAILURE(
121+
VerifyNextBatch(*reader, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])"));
122+
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
123+
}
124+
125+
TEST_F(ParquetReaderTest, ReadReorderedFieldsWithNulls) {
126+
CreateSimpleParquetFile();
127+
128+
auto schema = std::make_shared<Schema>(
129+
std::vector<SchemaField>{SchemaField::MakeOptional(2, "name", string()),
130+
SchemaField::MakeRequired(1, "id", int32()),
131+
SchemaField::MakeOptional(3, "score", float64())});
132+
133+
auto reader_result = ReaderFactoryRegistry::Open(
134+
FileFormatType::kParquet,
135+
{.path = temp_parquet_file_, .io = file_io_, .projection = schema});
136+
ASSERT_THAT(reader_result, IsOk());
137+
auto reader = std::move(reader_result.value());
138+
139+
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(
140+
*reader, R"([["Foo", 1, null], ["Bar", 2, null], ["Baz", 3, null]])"));
141+
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
142+
}
143+
144+
TEST_F(ParquetReaderTest, ReadWithBatchSize) {
145+
CreateSimpleParquetFile();
146+
147+
auto schema = std::make_shared<Schema>(
148+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
149+
150+
auto reader_result =
151+
ReaderFactoryRegistry::Open(FileFormatType::kParquet, {.path = temp_parquet_file_,
152+
.batch_size = 2,
153+
.io = file_io_,
154+
.projection = schema});
155+
ASSERT_THAT(reader_result, IsOk());
156+
auto reader = std::move(reader_result.value());
157+
158+
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, R"([[1], [2]])"));
159+
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, R"([[3]])"));
160+
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
161+
}
162+
163+
TEST_F(ParquetReaderTest, ReadSplit) {
164+
CreateSimpleParquetFile();
165+
166+
// Read split offsets
167+
auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
168+
auto input_stream = io.fs()->OpenInputFile(temp_parquet_file_).ValueOrDie();
169+
auto metadata = ::parquet::ReadMetaData(input_stream);
170+
std::vector<size_t> split_offsets;
171+
for (int i = 0; i < metadata->num_row_groups(); ++i) {
172+
split_offsets.push_back(static_cast<size_t>(metadata->RowGroup(i)->file_offset()));
173+
}
174+
ASSERT_EQ(split_offsets.size(), 2);
175+
176+
auto schema = std::make_shared<Schema>(
177+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
178+
179+
std::vector<Split> splits = {
180+
{.offset = 0, .length = std::numeric_limits<size_t>::max()},
181+
{.offset = split_offsets[0], .length = split_offsets[1] - split_offsets[0]},
182+
{.offset = split_offsets[1], .length = 1},
183+
{.offset = split_offsets[1] + 1, .length = std::numeric_limits<size_t>::max()},
184+
{.offset = 0, .length = split_offsets[0]},
185+
};
186+
std::vector<std::string> expected_json = {
187+
R"([[1], [2], [3]])", R"([[1], [2]])", R"([[3]])", "", "",
188+
};
189+
190+
for (size_t i = 0; i < splits.size(); ++i) {
191+
auto reader_result =
192+
ReaderFactoryRegistry::Open(FileFormatType::kParquet, {.path = temp_parquet_file_,
193+
.split = splits[i],
194+
.batch_size = 100,
195+
.io = file_io_,
196+
.projection = schema});
197+
ASSERT_THAT(reader_result, IsOk());
198+
auto reader = std::move(reader_result.value());
199+
if (!expected_json[i].empty()) {
200+
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_json[i]));
201+
}
202+
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
203+
}
204+
}
205+
206+
} // namespace iceberg::parquet

0 commit comments

Comments
 (0)