Skip to content

Commit b80c0d2

Browse files
author
xiao.dong
committed
refactor MakeReader
1 parent 705c4ea commit b80c0d2

File tree

5 files changed

+51
-34
lines changed

5 files changed

+51
-34
lines changed

src/iceberg/manifest_reader.cc

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,40 @@
1919

2020
#include "iceberg/manifest_reader.h"
2121

22+
#include "iceberg/manifest_entry.h"
23+
#include "iceberg/manifest_list.h"
2224
#include "iceberg/manifest_reader_internal.h"
25+
#include "iceberg/schema.h"
26+
#include "iceberg/util/macros.h"
2327

2428
namespace iceberg {
2529

26-
std::shared_ptr<ManifestReader> ManifestReader::NewReader(
27-
std::unique_ptr<Reader> reader) {
28-
return std::make_shared<ManifestReaderImpl>(std::move(reader));
30+
Result<std::shared_ptr<ManifestReader>> ManifestReader::MakeReader(
31+
const std::string& manifest_location, std::shared_ptr<FileIO> file_io,
32+
std::shared_ptr<Schema> partition_schema) {
33+
auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema);
34+
auto fields_span = manifest_entry_schema->fields();
35+
std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
36+
auto schema = std::make_shared<Schema>(fields);
37+
ICEBERG_ASSIGN_OR_RAISE(
38+
auto reader,
39+
ReaderFactoryRegistry::Open(
40+
FileFormatType::kAvro,
41+
{.path = manifest_location, .io = std::move(file_io), .projection = schema}));
42+
return std::make_shared<ManifestReaderImpl>(std::move(reader), std::move(schema));
2943
}
3044

31-
std::shared_ptr<ManifestListReader> ManifestListReader::NewReader(
32-
std::unique_ptr<Reader> reader) {
33-
return std::make_shared<ManifestListReaderImpl>(std::move(reader));
45+
Result<std::shared_ptr<ManifestListReader>> ManifestListReader::MakeReader(
46+
const std::string& manifest_list_location, std::shared_ptr<FileIO> file_io) {
47+
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
48+
ManifestFile::Type().fields().end());
49+
auto schema = std::make_shared<Schema>(fields);
50+
ICEBERG_ASSIGN_OR_RAISE(
51+
auto reader,
52+
ReaderFactoryRegistry::Open(FileFormatType::kAvro, {.path = manifest_list_location,
53+
.io = std::move(file_io),
54+
.projection = schema}));
55+
return std::make_shared<ManifestListReaderImpl>(std::move(reader), std::move(schema));
3456
}
3557

3658
} // namespace iceberg

src/iceberg/manifest_reader.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,18 @@ class ICEBERG_EXPORT ManifestReader {
3636
public:
3737
virtual Result<std::vector<std::unique_ptr<ManifestEntry>>> Entries() const = 0;
3838

39-
static std::shared_ptr<ManifestReader> NewReader(std::unique_ptr<Reader> reader);
39+
static Result<std::shared_ptr<ManifestReader>> MakeReader(
40+
const std::string& manifest_location, std::shared_ptr<FileIO> file_io,
41+
std::shared_ptr<Schema> partition_schema);
4042
};
4143

4244
/// \brief Read manifest files from a manifest list file.
4345
class ICEBERG_EXPORT ManifestListReader {
4446
public:
4547
virtual Result<std::vector<std::unique_ptr<ManifestFile>>> Files() const = 0;
4648

47-
static std::shared_ptr<ManifestListReader> NewReader(std::unique_ptr<Reader> reader);
49+
static Result<std::shared_ptr<ManifestListReader>> MakeReader(
50+
const std::string& manifest_list_location, std::shared_ptr<FileIO> file_io);
4851
};
4952

5053
} // namespace iceberg

src/iceberg/manifest_reader_internal.cc

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@
2929
#include "iceberg/schema.h"
3030
#include "iceberg/schema_internal.h"
3131
#include "iceberg/type.h"
32+
#include "iceberg/util/macros.h"
3233

3334
namespace iceberg {
3435

3536
#define ARROW_RETURN_IF_NOT_OK(status, error) \
36-
if (status != NANOARROW_OK) { \
37-
return InvalidArrowData("NanoArrow error: {}", error.message); \
37+
if (status != NANOARROW_OK) [[unlikely]] { \
38+
return InvalidArrowData("Nanoarrow error: {}", error.message); \
3839
}
3940

4041
Result<std::vector<std::unique_ptr<ManifestFile>>> ParseManifestListEntry(
@@ -206,16 +207,8 @@ Result<std::vector<std::unique_ptr<ManifestEntry>>> ManifestReaderImpl::Entries(
206207

207208
Result<std::vector<std::unique_ptr<ManifestFile>>> ManifestListReaderImpl::Files() const {
208209
std::vector<std::unique_ptr<ManifestFile>> manifest_files;
209-
auto arrow_schema = reader_->Schema();
210-
if (!arrow_schema.has_value()) {
211-
return InvalidArgument("Get schema failed in reader:{}",
212-
arrow_schema.error().message);
213-
}
214-
internal::ArrowSchemaGuard schema_guard(&arrow_schema.value());
215-
auto schema = FromArrowSchema(arrow_schema.value(), std::nullopt);
216-
if (!schema.has_value()) {
217-
return InvalidArgument("Parse iceberg schema failed:{}", schema.error().message);
218-
}
210+
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
211+
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
219212
while (true) {
220213
auto result = reader_->Next();
221214
if (!result.has_value()) {
@@ -224,8 +217,8 @@ Result<std::vector<std::unique_ptr<ManifestFile>>> ManifestListReaderImpl::Files
224217
}
225218
if (result.value().has_value()) {
226219
internal::ArrowArrayGuard array_guard(&result.value().value());
227-
auto parse_result = ParseManifestListEntry(
228-
&arrow_schema.value(), &result.value().value(), *schema.value());
220+
auto parse_result =
221+
ParseManifestListEntry(&arrow_schema, &result.value().value(), *schema_);
229222
if (!parse_result.has_value()) {
230223
return InvalidArgument("Failed to parse manifest list entry:{}",
231224
parse_result.error().message);

src/iceberg/manifest_reader_internal.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,28 @@ namespace iceberg {
2929
/// \brief Read manifest entries from a manifest file.
3030
class ManifestReaderImpl : public ManifestReader {
3131
public:
32-
explicit ManifestReaderImpl(std::unique_ptr<Reader> reader)
33-
: reader_(std::move(reader)) {}
32+
explicit ManifestReaderImpl(std::unique_ptr<Reader> reader,
33+
std::shared_ptr<Schema> schema)
34+
: schema_(std::move(schema)), reader_(std::move(reader)) {}
3435

3536
Result<std::vector<std::unique_ptr<ManifestEntry>>> Entries() const override;
3637

3738
private:
39+
std::shared_ptr<Schema> schema_;
3840
std::unique_ptr<Reader> reader_;
3941
};
4042

4143
/// \brief Read manifest files from a manifest list file.
4244
class ManifestListReaderImpl : public ManifestListReader {
4345
public:
44-
explicit ManifestListReaderImpl(std::unique_ptr<Reader> reader)
45-
: reader_(std::move(reader)) {}
46+
explicit ManifestListReaderImpl(std::unique_ptr<Reader> reader,
47+
std::shared_ptr<Schema> schema)
48+
: schema_(std::move(schema)), reader_(std::move(reader)) {}
4649

4750
Result<std::vector<std::unique_ptr<ManifestFile>>> Files() const override;
4851

4952
private:
53+
std::shared_ptr<Schema> schema_;
5054
std::unique_ptr<Reader> reader_;
5155
};
5256

test/manifest_list_reader_test.cc

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,11 @@ class ManifestListReaderTest : public TempFileTestBase {
4747
};
4848

4949
TEST_F(ManifestListReaderTest, BasicTest) {
50-
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
51-
ManifestFile::Type().fields().end());
52-
auto schema = std::make_shared<Schema>(fields);
5350
std::string path = GetResourcePath(
5451
"snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro");
55-
auto reader_result = ReaderFactoryRegistry::Open(
56-
FileFormatType::kAvro, {.path = path, .io = file_io_, .projection = schema});
57-
ASSERT_THAT(reader_result, IsOk());
58-
auto reader = std::move(reader_result.value());
59-
auto manifest_reader = ManifestListReader::NewReader(std::move(reader));
52+
auto manifest_reader_result = ManifestListReader::MakeReader(path, file_io_);
53+
ASSERT_EQ(manifest_reader_result.has_value(), true);
54+
auto manifest_reader = manifest_reader_result.value();
6055
auto read_result = manifest_reader->Files();
6156
ASSERT_EQ(read_result.has_value(), true);
6257
ASSERT_EQ(read_result.value().size(), 4);

0 commit comments

Comments
 (0)