Skip to content

Commit 219ba51

Browse files
committed
test: add parquet reader test
1 parent f2d0abd commit 219ba51

File tree

3 files changed

+177
-11
lines changed

3 files changed

+177
-11
lines changed

src/iceberg/parquet/parquet_reader.cc

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
#include "iceberg/parquet/parquet_reader.h"
2121

22-
#include <memory>
22+
#include <numeric>
2323

2424
#include <arrow/c/bridge.h>
2525
#include <arrow/memory_pool.h>
@@ -207,18 +207,21 @@ class ParquetReader::Impl {
207207
break;
208208
}
209209
}
210-
if (row_group_indices.empty()) {
211-
// None of the row groups are selected, return an empty record batch reader
212-
context_->record_batch_reader_ = std::make_unique<EmptyRecordBatchReader>();
213-
return {};
214-
}
210+
} else {
211+
row_group_indices.resize(reader_->parquet_reader()->metadata()->num_row_groups());
212+
std::iota(row_group_indices.begin(), row_group_indices.end(), 0); // NOLINT
215213
}
216214

217215
// Create the record batch reader
218-
auto column_indices = SelectedColumnIndices(projection_);
219-
ICEBERG_ARROW_ASSIGN_OR_RETURN(
220-
context_->record_batch_reader_,
221-
reader_->GetRecordBatchReader(row_group_indices, column_indices));
216+
if (row_group_indices.empty()) {
217+
// None of the row groups are selected, return an empty record batch reader
218+
context_->record_batch_reader_ = std::make_unique<EmptyRecordBatchReader>();
219+
} else {
220+
auto column_indices = SelectedColumnIndices(projection_);
221+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
222+
context_->record_batch_reader_,
223+
reader_->GetRecordBatchReader(row_group_indices, column_indices));
224+
}
222225

223226
return {};
224227
}

test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,5 +121,6 @@ if(ICEBERG_BUILD_BUNDLE)
121121
USE_BUNDLE
122122
SOURCES
123123
parquet_data_test.cc
124-
parquet_schema_test.cc)
124+
parquet_schema_test.cc
125+
parquet_test.cc)
125126
endif()

test/parquet_test.cc

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <arrow/array.h>
21+
#include <arrow/c/bridge.h>
22+
#include <arrow/json/from_string.h>
23+
#include <arrow/record_batch.h>
24+
#include <arrow/table.h>
25+
#include <arrow/util/key_value_metadata.h>
26+
#include <parquet/arrow/writer.h>
27+
28+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
29+
#include "iceberg/parquet/parquet_reader.h"
30+
#include "iceberg/schema.h"
31+
#include "iceberg/type.h"
32+
#include "iceberg/util/checked_cast.h"
33+
#include "matchers.h"
34+
#include "temp_file_test_base.h"
35+
36+
namespace iceberg::parquet {
37+
38+
class ParquetReaderTest : public TempFileTestBase {
39+
protected:
40+
static void SetUpTestSuite() { ParquetReader::Register(); }
41+
42+
void SetUp() override {
43+
TempFileTestBase::SetUp();
44+
file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO();
45+
temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet");
46+
}
47+
48+
void CreateSimpleParquetFile() {
49+
const std::string kParquetFieldIdKey = "PARQUET:field_id";
50+
auto arrow_schema = ::arrow::schema(
51+
{::arrow::field("id", ::arrow::int32(), /*nullable=*/false,
52+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"})),
53+
::arrow::field("name", ::arrow::utf8(), /*nullable=*/true,
54+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"2"}))});
55+
auto table = ::arrow::Table::FromRecordBatches(
56+
arrow_schema, {::arrow::RecordBatch::FromStructArray(
57+
::arrow::json::ArrayFromJSONString(
58+
::arrow::struct_(arrow_schema->fields()),
59+
R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")
60+
.ValueOrDie())
61+
.ValueOrDie()})
62+
.ValueOrDie();
63+
64+
auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
65+
auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie();
66+
67+
ASSERT_TRUE(
68+
::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(), outfile)
69+
.ok());
70+
ASSERT_TRUE(outfile->Close().ok());
71+
}
72+
73+
void VerifyNextBatch(Reader& reader, std::string_view expected_json) {
74+
// Boilerplate to get Arrow schema
75+
auto schema_result = reader.Schema();
76+
ASSERT_THAT(schema_result, IsOk());
77+
auto arrow_c_schema = std::move(schema_result.value());
78+
auto import_schema_result = ::arrow::ImportType(&arrow_c_schema);
79+
auto arrow_schema = import_schema_result.ValueOrDie();
80+
81+
// Boilerplate to get Arrow array
82+
auto data = reader.Next();
83+
ASSERT_THAT(data, IsOk()) << "Reader.Next() failed: " << data.error().message;
84+
ASSERT_TRUE(data.value().has_value()) << "Reader.Next() returned no data";
85+
auto arrow_c_array = data.value().value();
86+
auto data_result = ::arrow::ImportArray(&arrow_c_array, arrow_schema);
87+
auto arrow_array = data_result.ValueOrDie();
88+
89+
// Verify data
90+
auto expected_array =
91+
::arrow::json::ArrayFromJSONString(arrow_schema, expected_json).ValueOrDie();
92+
ASSERT_TRUE(arrow_array->Equals(*expected_array));
93+
}
94+
95+
void VerifyExhausted(Reader& reader) {
96+
auto data = reader.Next();
97+
ASSERT_THAT(data, IsOk());
98+
ASSERT_FALSE(data.value().has_value());
99+
}
100+
101+
std::shared_ptr<FileIO> file_io_;
102+
std::string temp_parquet_file_;
103+
};
104+
105+
TEST_F(ParquetReaderTest, ReadTwoFields) {
106+
CreateSimpleParquetFile();
107+
108+
auto schema = std::make_shared<Schema>(
109+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
110+
SchemaField::MakeOptional(2, "name", string())});
111+
112+
auto reader_result = ReaderFactoryRegistry::Open(
113+
FileFormatType::kParquet,
114+
{.path = temp_parquet_file_, .io = file_io_, .projection = schema});
115+
ASSERT_THAT(reader_result, IsOk())
116+
<< "Failed to create reader: " << reader_result.error().message;
117+
auto reader = std::move(reader_result.value());
118+
119+
ASSERT_NO_FATAL_FAILURE(
120+
VerifyNextBatch(*reader, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])"));
121+
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
122+
}
123+
124+
TEST_F(ParquetReaderTest, ReadReorderedFieldsWithNulls) {
125+
CreateSimpleParquetFile();
126+
127+
auto schema = std::make_shared<Schema>(
128+
std::vector<SchemaField>{SchemaField::MakeOptional(2, "name", string()),
129+
SchemaField::MakeRequired(1, "id", int32()),
130+
SchemaField::MakeOptional(3, "score", float64())});
131+
132+
auto reader_result = ReaderFactoryRegistry::Open(
133+
FileFormatType::kParquet,
134+
{.path = temp_parquet_file_, .io = file_io_, .projection = schema});
135+
ASSERT_THAT(reader_result, IsOk());
136+
auto reader = std::move(reader_result.value());
137+
138+
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(
139+
*reader, R"([["Foo", 1, null], ["Bar", 2, null], ["Baz", 3, null]])"));
140+
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
141+
}
142+
143+
TEST_F(ParquetReaderTest, ReadWithBatchSize) {
144+
CreateSimpleParquetFile();
145+
146+
auto schema = std::make_shared<Schema>(
147+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
148+
149+
auto reader_result =
150+
ReaderFactoryRegistry::Open(FileFormatType::kParquet, {.path = temp_parquet_file_,
151+
.batch_size = 2,
152+
.io = file_io_,
153+
.projection = schema});
154+
ASSERT_THAT(reader_result, IsOk());
155+
auto reader = std::move(reader_result.value());
156+
157+
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, R"([[1], [2]])"));
158+
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, R"([[3]])"));
159+
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
160+
}
161+
162+
} // namespace iceberg::parquet

0 commit comments

Comments
 (0)