Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ set(ICEBERG_SOURCES
arrow_c_data_internal.cc
demo.cc
expression/expression.cc
file_reader.cc
json_internal.cc
partition_field.cc
partition_spec.cc
Expand Down
11 changes: 11 additions & 0 deletions src/iceberg/avro/demo_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,15 @@ std::string DemoAvro::print() const {
return actual.str();
}

Result<Reader::Data> DemoAvroReader::Next() { return std::monostate(); }

Reader::DataLayout DemoAvroReader::data_layout() const {
return Reader::DataLayout::kStructLike;
}

ICEBERG_REGISTER_READER_FACTORY(
Avro, [](const ReaderOptions& options) -> Result<std::unique_ptr<Reader>> {
return std::make_unique<DemoAvroReader>();
});

} // namespace iceberg::avro
9 changes: 9 additions & 0 deletions src/iceberg/avro/demo_avro.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>

#include "iceberg/avro.h"
#include "iceberg/file_reader.h"
#include "iceberg/iceberg_bundle_export.h"

namespace iceberg::avro {
Expand All @@ -33,4 +34,12 @@ class ICEBERG_BUNDLE_EXPORT DemoAvro : public Avro {
std::string print() const override;
};

class ICEBERG_BUNDLE_EXPORT DemoAvroReader : public Reader {
public:
DemoAvroReader() = default;
~DemoAvroReader() override = default;
Result<Data> Next() override;
DataLayout data_layout() const override;
};

} // namespace iceberg::avro
56 changes: 56 additions & 0 deletions src/iceberg/file_format.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/file_format.h
/// File format used by Iceberg.

#include <string_view>

#include "iceberg/iceberg_export.h"

namespace iceberg {

/// \brief File format type
enum class ICEBERG_EXPORT FileFormatType {
kParquet,
kAvro,
kOrc,
kPuffin,
kMetadata,
};

/// \brief Convert a FileFormatType to a string
ICEBERG_EXPORT inline std::string_view ToString(FileFormatType format_type) {
switch (format_type) {
case FileFormatType::kParquet:
return "parquet";
case FileFormatType::kAvro:
return "avro";
case FileFormatType::kOrc:
return "orc";
case FileFormatType::kPuffin:
return "puffin";
case FileFormatType::kMetadata:
return "metadata";
}
}

} // namespace iceberg
69 changes: 69 additions & 0 deletions src/iceberg/file_reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/file_reader.h"

#include <unordered_map>

#include "iceberg/expected.h"
#include "iceberg/util/formatter.h"

namespace iceberg {

namespace {

ReaderFactory GetNotImplementedFactory(FileFormatType format_type) {
return [format_type](const ReaderOptions& options) -> Result<std::unique_ptr<Reader>> {
return NotImplemented("Missing reader factory for file format: {}", format_type);
};
}

} // namespace

ReaderFactory& ReaderFactoryRegistry::GetFactory(FileFormatType format_type) {
static std::unordered_map<FileFormatType, ReaderFactory> factories = {
{FileFormatType::kAvro, GetNotImplementedFactory(FileFormatType::kAvro)},
{FileFormatType::kParquet, GetNotImplementedFactory(FileFormatType::kParquet)},
{FileFormatType::kOrc, GetNotImplementedFactory(FileFormatType::kOrc)},
{FileFormatType::kPuffin, GetNotImplementedFactory(FileFormatType::kPuffin)},
{FileFormatType::kMetadata, GetNotImplementedFactory(FileFormatType::kMetadata)},
};
return factories.at(format_type);
}

ReaderFactoryRegistry::ReaderFactoryRegistry(FileFormatType format_type,
ReaderFactory factory) {
GetFactory(format_type) = std::move(factory);
}

Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Create(
FileFormatType format_type, const ReaderOptions& options) {
return GetFactory(format_type)(options);
}

StructLikeReader::StructLikeReader(std::unique_ptr<Reader> reader)
: reader_(std::move(reader)) {}

Result<Reader::Data> StructLikeReader::Next() { return NotImplemented(""); }

BatchReader::BatchReader(std::unique_ptr<Reader> reader) : reader_(std::move(reader)) {}

Result<Reader::Data> BatchReader::Next() { return NotImplemented(""); }

} // namespace iceberg
142 changes: 142 additions & 0 deletions src/iceberg/file_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/file_reader.h
/// Reader interface for file formats like Parquet, Avro and ORC.

#include <functional>
#include <memory>
#include <optional>
#include <variant>

#include "iceberg/arrow_c_data.h"
#include "iceberg/file_format.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Base reader class to read data from different file formats.
class ICEBERG_EXPORT Reader {
public:
virtual ~Reader() = default;

/// \brief Read next data from file.
///
/// \return std::monostate if the reader has no more data, otherwise `ArrowArray` or
/// `StructLike` depending on the data layout by the reader implementation.
using Data =
std::variant<std::monostate, ArrowArray, std::reference_wrapper<const StructLike>>;
virtual Result<Data> Next() = 0;

enum class DataLayout { kArrowArray, kStructLike };

/// \brief Get the data layout returned by `Next()` of the reader.
virtual DataLayout data_layout() const = 0;
};

/// \brief Wrapper of `Reader` to always return `StructLike`.
///
/// If the data layout of the wrapped reader is `ArrowArray`, the data will be converted
/// to `StructLike`; otherwise, the data will be returned as is without any cost.
class ICEBERG_EXPORT StructLikeReader : public Reader {
public:
explicit StructLikeReader(std::unique_ptr<Reader> reader);

/// \brief Always read data into `StructLike` or monostate if no more data.
Result<Data> Next() final;

DataLayout data_layout() const final { return DataLayout::kStructLike; }

private:
std::unique_ptr<Reader> reader_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m a bit confused about why keeping a Reader object here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this is just an adapter. If the wrapped reader directly returns kStructLike, it does nothing. Otherwise it will try to aggregate them into an ArrowArray.

};

/// \brief Wrapper of `Reader` to always return `ArrowArray`.
///
/// If the data layout of the wrapped reader is `StructLike`, the data will be converted
/// to `ArrowArray`; otherwise, the data will be returned as is without any cost.
class ICEBERG_EXPORT BatchReader : public Reader {
public:
explicit BatchReader(std::unique_ptr<Reader> reader);

/// \brief Always read data into `ArrowArray` or monostate if no more data.
Result<Data> Next() final;

DataLayout data_layout() const final { return DataLayout::kArrowArray; }

private:
std::unique_ptr<Reader> reader_;
};

/// \brief A split of the file to read.
struct ICEBERG_EXPORT Split {
/// \brief The offset of the split.
size_t offset;
/// \brief The length of the split.
size_t length;
};

/// \brief Options for creating a reader.
struct ICEBERG_EXPORT ReaderOptions {
/// \brief The path to the file to read.
std::string path;
/// \brief The total length of the file.
std::optional<size_t> length;
/// \brief The split to read.
std::optional<Split> split;
/// \brief The batch size to read. Only applies to implementations that support
/// batching.
int64_t batch_size;
Comment on lines +106 to +108
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the unit bytes or rows?

(Should this also be optional?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, presumably bytes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Why does Split use size_t but this uses int64_t?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to number of rows in a batch (a.k.a. ArrowArray). It seems that we can use std::optional<size_t> and define a default batch size if non provided.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. int64_t might be appropriate as that's then consistent with C Data Interface. Clarifying the unit might be helpful.

/// \brief FileIO instance to open the file. Reader implementations should down cast it
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
/// `ArrowFileSystemFileIO` as the default implementation.
std::shared_ptr<class FileIO> io;
/// \brief The projection schema to read from the file.
std::shared_ptr<class Schema> projection;
/// \brief The filter to apply to the data. Reader implementations may ignore this if
/// the file format does not support filtering.
std::shared_ptr<class Expression> filter;
};

/// \brief Factory function to create a reader of a specific file format.
using ReaderFactory =
std::function<Result<std::unique_ptr<Reader>>(const ReaderOptions&)>;

/// \brief Registry of reader factories for different file formats.
struct ICEBERG_EXPORT ReaderFactoryRegistry {
/// \brief Register a factory function for a specific file format.
ReaderFactoryRegistry(FileFormatType format_type, ReaderFactory factory);

/// \brief Get the factory function for a specific file format.
static ReaderFactory& GetFactory(FileFormatType format_type);

/// \brief Create a reader for a specific file format.
static Result<std::unique_ptr<Reader>> Create(FileFormatType format_type,
const ReaderOptions& options);
};

/// \brief Macro to register a reader factory for a specific file format.
#define ICEBERG_REGISTER_READER_FACTORY(format_type, reader_factory) \
static ::iceberg::ReaderFactoryRegistry register_reader_factory_##format_type( \
::iceberg::FileFormatType::k##format_type, reader_factory);

} // namespace iceberg
50 changes: 50 additions & 0 deletions src/iceberg/manifest_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/manifest_reader.h
/// Data reader interface for manifest files.

#include <memory>
#include <span>

#include "iceberg/file_reader.h"

namespace iceberg {

/// \brief Read manifest entries from a manifest file.
class ICEBERG_EXPORT ManifestReader {
public:
virtual Result<std::span<std::unique_ptr<class ManifestEntry>>> Entries() const = 0;

private:
std::unique_ptr<StructLikeReader> reader_;
};

/// \brief Read manifest files from a manifest list file.
class ICEBERG_EXPORT ManifestListReader {
public:
virtual Result<std::span<std::unique_ptr<class ManifestFile>>> Files() const = 0;

private:
std::unique_ptr<StructLikeReader> reader_;
};

} // namespace iceberg
2 changes: 1 addition & 1 deletion src/iceberg/util/formatter.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct std::formatter<Derived> : std::formatter<std::string_view> {
/// \brief std::formatter specialization for any type that has a ToString function
template <typename T>
requires requires(const T& t) {
{ ToString(t) } -> std::convertible_to<std::string>;
{ ToString(t) } -> std::convertible_to<std::string_view>;
}
struct std::formatter<T> : std::formatter<std::string_view> {
template <class FormatContext>
Expand Down
Loading
Loading