Skip to content

Commit 8aa7fd7

Browse files
authored
feat: add file reader interface (#88)
1 parent 1375104 commit 8aa7fd7

File tree

9 files changed

+354
-1
lines changed

9 files changed

+354
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ set(ICEBERG_SOURCES
2121
arrow_c_data_internal.cc
2222
demo.cc
2323
expression/expression.cc
24+
file_reader.cc
2425
json_internal.cc
2526
partition_field.cc
2627
partition_spec.cc

src/iceberg/avro/demo_avro.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,15 @@ std::string DemoAvro::print() const {
4949
return actual.str();
5050
}
5151

52+
Result<Reader::Data> DemoAvroReader::Next() { return std::monostate(); }
53+
54+
Reader::DataLayout DemoAvroReader::data_layout() const {
55+
return Reader::DataLayout::kStructLike;
56+
}
57+
58+
ICEBERG_REGISTER_READER_FACTORY(
59+
Avro, [](const ReaderOptions& options) -> Result<std::unique_ptr<Reader>> {
60+
return std::make_unique<DemoAvroReader>();
61+
});
62+
5263
} // namespace iceberg::avro

src/iceberg/avro/demo_avro.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <string>
2323

2424
#include "iceberg/avro.h"
25+
#include "iceberg/file_reader.h"
2526
#include "iceberg/iceberg_bundle_export.h"
2627

2728
namespace iceberg::avro {
@@ -33,4 +34,12 @@ class ICEBERG_BUNDLE_EXPORT DemoAvro : public Avro {
3334
std::string print() const override;
3435
};
3536

37+
class ICEBERG_BUNDLE_EXPORT DemoAvroReader : public Reader {
38+
public:
39+
DemoAvroReader() = default;
40+
~DemoAvroReader() override = default;
41+
Result<Data> Next() override;
42+
DataLayout data_layout() const override;
43+
};
44+
3645
} // namespace iceberg::avro

src/iceberg/file_format.h

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/file_format.h
23+
/// File format used by Iceberg.
24+
25+
#include <string_view>
26+
27+
#include "iceberg/iceberg_export.h"
28+
29+
namespace iceberg {
30+
31+
/// \brief File format type
32+
enum class ICEBERG_EXPORT FileFormatType {
33+
kParquet,
34+
kAvro,
35+
kOrc,
36+
kPuffin,
37+
};
38+
39+
/// \brief Convert a FileFormatType to a string
40+
ICEBERG_EXPORT inline std::string_view ToString(FileFormatType format_type) {
41+
switch (format_type) {
42+
case FileFormatType::kParquet:
43+
return "parquet";
44+
case FileFormatType::kAvro:
45+
return "avro";
46+
case FileFormatType::kOrc:
47+
return "orc";
48+
case FileFormatType::kPuffin:
49+
return "puffin";
50+
}
51+
}
52+
53+
} // namespace iceberg

src/iceberg/file_reader.cc

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/file_reader.h"
21+
22+
#include <unordered_map>
23+
24+
#include "iceberg/expected.h"
25+
#include "iceberg/util/formatter.h"
26+
27+
namespace iceberg {
28+
29+
namespace {
30+
31+
ReaderFactory GetNotImplementedFactory(FileFormatType format_type) {
32+
return [format_type](const ReaderOptions& options) -> Result<std::unique_ptr<Reader>> {
33+
return NotImplemented("Missing reader factory for file format: {}", format_type);
34+
};
35+
}
36+
37+
} // namespace
38+
39+
ReaderFactory& ReaderFactoryRegistry::GetFactory(FileFormatType format_type) {
40+
static std::unordered_map<FileFormatType, ReaderFactory> factories = {
41+
{FileFormatType::kAvro, GetNotImplementedFactory(FileFormatType::kAvro)},
42+
{FileFormatType::kParquet, GetNotImplementedFactory(FileFormatType::kParquet)},
43+
{FileFormatType::kOrc, GetNotImplementedFactory(FileFormatType::kOrc)},
44+
{FileFormatType::kPuffin, GetNotImplementedFactory(FileFormatType::kPuffin)},
45+
};
46+
return factories.at(format_type);
47+
}
48+
49+
ReaderFactoryRegistry::ReaderFactoryRegistry(FileFormatType format_type,
50+
ReaderFactory factory) {
51+
GetFactory(format_type) = std::move(factory);
52+
}
53+
54+
Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Create(
55+
FileFormatType format_type, const ReaderOptions& options) {
56+
return GetFactory(format_type)(options);
57+
}
58+
59+
StructLikeReader::StructLikeReader(std::unique_ptr<Reader> reader)
60+
: reader_(std::move(reader)) {}
61+
62+
Result<Reader::Data> StructLikeReader::Next() { return NotImplemented(""); }
63+
64+
BatchReader::BatchReader(std::unique_ptr<Reader> reader) : reader_(std::move(reader)) {}
65+
66+
Result<Reader::Data> BatchReader::Next() { return NotImplemented(""); }
67+
68+
} // namespace iceberg

src/iceberg/file_reader.h

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/file_reader.h
23+
/// Reader interface for file formats like Parquet, Avro and ORC.
24+
25+
#include <functional>
26+
#include <memory>
27+
#include <optional>
28+
#include <variant>
29+
30+
#include "iceberg/arrow_c_data.h"
31+
#include "iceberg/file_format.h"
32+
#include "iceberg/result.h"
33+
#include "iceberg/type_fwd.h"
34+
35+
namespace iceberg {
36+
37+
/// \brief Base reader class to read data from different file formats.
38+
class ICEBERG_EXPORT Reader {
39+
public:
40+
virtual ~Reader() = default;
41+
42+
/// \brief Read next data from file.
43+
///
44+
/// \return std::monostate if the reader has no more data, otherwise `ArrowArray` or
45+
/// `StructLike` depending on the data layout by the reader implementation.
46+
using Data =
47+
std::variant<std::monostate, ArrowArray, std::reference_wrapper<const StructLike>>;
48+
virtual Result<Data> Next() = 0;
49+
50+
enum class DataLayout { kArrowArray, kStructLike };
51+
52+
/// \brief Get the data layout returned by `Next()` of the reader.
53+
virtual DataLayout data_layout() const = 0;
54+
};
55+
56+
/// \brief Wrapper of `Reader` to always return `StructLike`.
57+
///
58+
/// If the data layout of the wrapped reader is `ArrowArray`, the data will be converted
59+
/// to `StructLike`; otherwise, the data will be returned as is without any cost.
60+
class ICEBERG_EXPORT StructLikeReader : public Reader {
61+
public:
62+
explicit StructLikeReader(std::unique_ptr<Reader> reader);
63+
64+
/// \brief Always read data into `StructLike` or monostate if no more data.
65+
Result<Data> Next() final;
66+
67+
DataLayout data_layout() const final { return DataLayout::kStructLike; }
68+
69+
private:
70+
std::unique_ptr<Reader> reader_;
71+
};
72+
73+
/// \brief Wrapper of `Reader` to always return `ArrowArray`.
74+
///
75+
/// If the data layout of the wrapped reader is `StructLike`, the data will be converted
76+
/// to `ArrowArray`; otherwise, the data will be returned as is without any cost.
77+
class ICEBERG_EXPORT BatchReader : public Reader {
78+
public:
79+
explicit BatchReader(std::unique_ptr<Reader> reader);
80+
81+
/// \brief Always read data into `ArrowArray` or monostate if no more data.
82+
Result<Data> Next() final;
83+
84+
DataLayout data_layout() const final { return DataLayout::kArrowArray; }
85+
86+
private:
87+
std::unique_ptr<Reader> reader_;
88+
};
89+
90+
/// \brief A split of the file to read.
91+
struct ICEBERG_EXPORT Split {
92+
/// \brief The offset of the split.
93+
size_t offset;
94+
/// \brief The length of the split.
95+
size_t length;
96+
};
97+
98+
/// \brief Options for creating a reader.
99+
struct ICEBERG_EXPORT ReaderOptions {
100+
/// \brief The path to the file to read.
101+
std::string path;
102+
/// \brief The total length of the file.
103+
std::optional<size_t> length;
104+
/// \brief The split to read.
105+
std::optional<Split> split;
106+
/// \brief The batch size to read. Only applies to implementations that support
107+
/// batching.
108+
int64_t batch_size;
109+
/// \brief FileIO instance to open the file. Reader implementations should down cast it
110+
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
111+
/// `ArrowFileSystemFileIO` as the default implementation.
112+
std::shared_ptr<class FileIO> io;
113+
/// \brief The projection schema to read from the file.
114+
std::shared_ptr<class Schema> projection;
115+
/// \brief The filter to apply to the data. Reader implementations may ignore this if
116+
/// the file format does not support filtering.
117+
std::shared_ptr<class Expression> filter;
118+
};
119+
120+
/// \brief Factory function to create a reader of a specific file format.
121+
using ReaderFactory =
122+
std::function<Result<std::unique_ptr<Reader>>(const ReaderOptions&)>;
123+
124+
/// \brief Registry of reader factories for different file formats.
125+
struct ICEBERG_EXPORT ReaderFactoryRegistry {
126+
/// \brief Register a factory function for a specific file format.
127+
ReaderFactoryRegistry(FileFormatType format_type, ReaderFactory factory);
128+
129+
/// \brief Get the factory function for a specific file format.
130+
static ReaderFactory& GetFactory(FileFormatType format_type);
131+
132+
/// \brief Create a reader for a specific file format.
133+
static Result<std::unique_ptr<Reader>> Create(FileFormatType format_type,
134+
const ReaderOptions& options);
135+
};
136+
137+
/// \brief Macro to register a reader factory for a specific file format.
138+
#define ICEBERG_REGISTER_READER_FACTORY(format_type, reader_factory) \
139+
static ::iceberg::ReaderFactoryRegistry register_reader_factory_##format_type( \
140+
::iceberg::FileFormatType::k##format_type, reader_factory);
141+
142+
} // namespace iceberg

src/iceberg/manifest_reader.h

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/manifest_reader.h
23+
/// Data reader interface for manifest files.
24+
25+
#include <memory>
26+
#include <span>
27+
28+
#include "iceberg/file_reader.h"
29+
30+
namespace iceberg {
31+
32+
/// \brief Read manifest entries from a manifest file.
33+
class ICEBERG_EXPORT ManifestReader {
34+
public:
35+
virtual Result<std::span<std::unique_ptr<class ManifestEntry>>> Entries() const = 0;
36+
37+
private:
38+
std::unique_ptr<StructLikeReader> reader_;
39+
};
40+
41+
/// \brief Read manifest files from a manifest list file.
42+
class ICEBERG_EXPORT ManifestListReader {
43+
public:
44+
virtual Result<std::span<std::unique_ptr<class ManifestFile>>> Files() const = 0;
45+
46+
private:
47+
std::unique_ptr<StructLikeReader> reader_;
48+
};
49+
50+
} // namespace iceberg

src/iceberg/util/formatter.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ struct std::formatter<Derived> : std::formatter<std::string_view> {
4444
/// \brief std::formatter specialization for any type that has a ToString function
4545
template <typename T>
4646
requires requires(const T& t) {
47-
{ ToString(t) } -> std::convertible_to<std::string>;
47+
{ ToString(t) } -> std::convertible_to<std::string_view>;
4848
}
4949
struct std::formatter<T> : std::formatter<std::string_view> {
5050
template <class FormatContext>

0 commit comments

Comments
 (0)