diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0f105a12e..4c9190be4 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -21,6 +21,7 @@ set(ICEBERG_SOURCES arrow_c_data_internal.cc demo.cc expression/expression.cc + file_reader.cc json_internal.cc partition_field.cc partition_spec.cc diff --git a/src/iceberg/avro/demo_avro.cc b/src/iceberg/avro/demo_avro.cc index 2ff772e99..1b7f7a2e4 100644 --- a/src/iceberg/avro/demo_avro.cc +++ b/src/iceberg/avro/demo_avro.cc @@ -49,4 +49,15 @@ std::string DemoAvro::print() const { return actual.str(); } +Result DemoAvroReader::Next() { return std::monostate(); } + +Reader::DataLayout DemoAvroReader::data_layout() const { + return Reader::DataLayout::kStructLike; +} + +ICEBERG_REGISTER_READER_FACTORY( + Avro, [](const ReaderOptions& options) -> Result> { + return std::make_unique(); + }); + } // namespace iceberg::avro diff --git a/src/iceberg/avro/demo_avro.h b/src/iceberg/avro/demo_avro.h index d143dccc9..ca7dbdf2f 100644 --- a/src/iceberg/avro/demo_avro.h +++ b/src/iceberg/avro/demo_avro.h @@ -22,6 +22,7 @@ #include #include "iceberg/avro.h" +#include "iceberg/file_reader.h" #include "iceberg/iceberg_bundle_export.h" namespace iceberg::avro { @@ -33,4 +34,12 @@ class ICEBERG_BUNDLE_EXPORT DemoAvro : public Avro { std::string print() const override; }; +class ICEBERG_BUNDLE_EXPORT DemoAvroReader : public Reader { + public: + DemoAvroReader() = default; + ~DemoAvroReader() override = default; + Result Next() override; + DataLayout data_layout() const override; +}; + } // namespace iceberg::avro diff --git a/src/iceberg/file_format.h b/src/iceberg/file_format.h new file mode 100644 index 000000000..883782c7c --- /dev/null +++ b/src/iceberg/file_format.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/file_format.h +/// File format used by Iceberg. + +#include + +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief File format type +enum class ICEBERG_EXPORT FileFormatType { + kParquet, + kAvro, + kOrc, + kPuffin, +}; + +/// \brief Convert a FileFormatType to a string +ICEBERG_EXPORT inline std::string_view ToString(FileFormatType format_type) { + switch (format_type) { + case FileFormatType::kParquet: + return "parquet"; + case FileFormatType::kAvro: + return "avro"; + case FileFormatType::kOrc: + return "orc"; + case FileFormatType::kPuffin: + return "puffin"; + } +} + +} // namespace iceberg diff --git a/src/iceberg/file_reader.cc b/src/iceberg/file_reader.cc new file mode 100644 index 000000000..bf8035a91 --- /dev/null +++ b/src/iceberg/file_reader.cc @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/file_reader.h" + +#include + +#include "iceberg/expected.h" +#include "iceberg/util/formatter.h" + +namespace iceberg { + +namespace { + +ReaderFactory GetNotImplementedFactory(FileFormatType format_type) { + return [format_type](const ReaderOptions& options) -> Result> { + return NotImplemented("Missing reader factory for file format: {}", format_type); + }; +} + +} // namespace + +ReaderFactory& ReaderFactoryRegistry::GetFactory(FileFormatType format_type) { + static std::unordered_map factories = { + {FileFormatType::kAvro, GetNotImplementedFactory(FileFormatType::kAvro)}, + {FileFormatType::kParquet, GetNotImplementedFactory(FileFormatType::kParquet)}, + {FileFormatType::kOrc, GetNotImplementedFactory(FileFormatType::kOrc)}, + {FileFormatType::kPuffin, GetNotImplementedFactory(FileFormatType::kPuffin)}, + }; + return factories.at(format_type); +} + +ReaderFactoryRegistry::ReaderFactoryRegistry(FileFormatType format_type, + ReaderFactory factory) { + GetFactory(format_type) = std::move(factory); +} + +Result> ReaderFactoryRegistry::Create( + FileFormatType format_type, const ReaderOptions& options) { + return GetFactory(format_type)(options); +} + +StructLikeReader::StructLikeReader(std::unique_ptr reader) + : reader_(std::move(reader)) {} + +Result StructLikeReader::Next() { return NotImplemented(""); } + +BatchReader::BatchReader(std::unique_ptr reader) : reader_(std::move(reader)) {} + +Result BatchReader::Next() { return NotImplemented(""); } + +} // namespace iceberg diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h new file mode 100644 index 000000000..c3faab5b8 --- /dev/null +++ b/src/iceberg/file_reader.h @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/file_reader.h +/// Reader interface for file formats like Parquet, Avro and ORC. + +#include +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/file_format.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Base reader class to read data from different file formats. +class ICEBERG_EXPORT Reader { + public: + virtual ~Reader() = default; + + /// \brief Read next data from file. + /// + /// \return std::monostate if the reader has no more data, otherwise `ArrowArray` or + /// `StructLike` depending on the data layout by the reader implementation. + using Data = + std::variant>; + virtual Result Next() = 0; + + enum class DataLayout { kArrowArray, kStructLike }; + + /// \brief Get the data layout returned by `Next()` of the reader. + virtual DataLayout data_layout() const = 0; +}; + +/// \brief Wrapper of `Reader` to always return `StructLike`. +/// +/// If the data layout of the wrapped reader is `ArrowArray`, the data will be converted +/// to `StructLike`; otherwise, the data will be returned as is without any cost. +class ICEBERG_EXPORT StructLikeReader : public Reader { + public: + explicit StructLikeReader(std::unique_ptr reader); + + /// \brief Always read data into `StructLike` or monostate if no more data. + Result Next() final; + + DataLayout data_layout() const final { return DataLayout::kStructLike; } + + private: + std::unique_ptr reader_; +}; + +/// \brief Wrapper of `Reader` to always return `ArrowArray`. +/// +/// If the data layout of the wrapped reader is `StructLike`, the data will be converted +/// to `ArrowArray`; otherwise, the data will be returned as is without any cost. +class ICEBERG_EXPORT BatchReader : public Reader { + public: + explicit BatchReader(std::unique_ptr reader); + + /// \brief Always read data into `ArrowArray` or monostate if no more data. + Result Next() final; + + DataLayout data_layout() const final { return DataLayout::kArrowArray; } + + private: + std::unique_ptr reader_; +}; + +/// \brief A split of the file to read. +struct ICEBERG_EXPORT Split { + /// \brief The offset of the split. + size_t offset; + /// \brief The length of the split. + size_t length; +}; + +/// \brief Options for creating a reader. +struct ICEBERG_EXPORT ReaderOptions { + /// \brief The path to the file to read. + std::string path; + /// \brief The total length of the file. + std::optional length; + /// \brief The split to read. + std::optional split; + /// \brief The batch size to read. Only applies to implementations that support + /// batching. + int64_t batch_size; + /// \brief FileIO instance to open the file. Reader implementations should down cast it + /// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses + /// `ArrowFileSystemFileIO` as the default implementation. + std::shared_ptr io; + /// \brief The projection schema to read from the file. + std::shared_ptr projection; + /// \brief The filter to apply to the data. Reader implementations may ignore this if + /// the file format does not support filtering. + std::shared_ptr filter; +}; + +/// \brief Factory function to create a reader of a specific file format. +using ReaderFactory = + std::function>(const ReaderOptions&)>; + +/// \brief Registry of reader factories for different file formats. +struct ICEBERG_EXPORT ReaderFactoryRegistry { + /// \brief Register a factory function for a specific file format. + ReaderFactoryRegistry(FileFormatType format_type, ReaderFactory factory); + + /// \brief Get the factory function for a specific file format. + static ReaderFactory& GetFactory(FileFormatType format_type); + + /// \brief Create a reader for a specific file format. + static Result> Create(FileFormatType format_type, + const ReaderOptions& options); +}; + +/// \brief Macro to register a reader factory for a specific file format. +#define ICEBERG_REGISTER_READER_FACTORY(format_type, reader_factory) \ + static ::iceberg::ReaderFactoryRegistry register_reader_factory_##format_type( \ + ::iceberg::FileFormatType::k##format_type, reader_factory); + +} // namespace iceberg diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h new file mode 100644 index 000000000..a7350362d --- /dev/null +++ b/src/iceberg/manifest_reader.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest_reader.h +/// Data reader interface for manifest files. + +#include +#include + +#include "iceberg/file_reader.h" + +namespace iceberg { + +/// \brief Read manifest entries from a manifest file. +class ICEBERG_EXPORT ManifestReader { + public: + virtual Result>> Entries() const = 0; + + private: + std::unique_ptr reader_; +}; + +/// \brief Read manifest files from a manifest list file. +class ICEBERG_EXPORT ManifestListReader { + public: + virtual Result>> Files() const = 0; + + private: + std::unique_ptr reader_; +}; + +} // namespace iceberg diff --git a/src/iceberg/util/formatter.h b/src/iceberg/util/formatter.h index be2e246cf..930172e41 100644 --- a/src/iceberg/util/formatter.h +++ b/src/iceberg/util/formatter.h @@ -44,7 +44,7 @@ struct std::formatter : std::formatter { /// \brief std::formatter specialization for any type that has a ToString function template requires requires(const T& t) { - { ToString(t) } -> std::convertible_to; + { ToString(t) } -> std::convertible_to; } struct std::formatter : std::formatter { template diff --git a/test/avro_test.cc b/test/avro_test.cc index 5bdcfca24..5ffcbc01a 100644 --- a/test/avro_test.cc +++ b/test/avro_test.cc @@ -19,6 +19,11 @@ #include #include +#include + +#include "matchers.h" + +namespace iceberg::avro { TEST(AVROTest, TestDemoAvro) { std::string expected = @@ -38,3 +43,17 @@ TEST(AVROTest, TestDemoAvro) { auto avro = iceberg::avro::DemoAvro(); EXPECT_EQ(avro.print(), expected); } + +TEST(AVROTest, TestDemoAvroReader) { + auto result = ReaderFactoryRegistry::Create(FileFormatType::kAvro, {}); + ASSERT_THAT(result, IsOk()); + + auto reader = std::move(result.value()); + ASSERT_EQ(reader->data_layout(), Reader::DataLayout::kStructLike); + + auto data = reader->Next(); + ASSERT_THAT(data, IsOk()); + ASSERT_TRUE(std::holds_alternative(data.value())); +} + +} // namespace iceberg::avro