Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ set(ICEBERG_SOURCES
expression/expression.cc
expression/literal.cc
file_reader.cc
file_writer.cc
json_internal.cc
manifest_entry.cc
manifest_list.cc
Expand Down Expand Up @@ -107,6 +108,7 @@ if(ICEBERG_BUILD_BUNDLE)
arrow/arrow_fs_file_io.cc
avro/avro_data_util.cc
avro/avro_reader.cc
avro/avro_writer.cc
avro/avro_schema_util.cc
avro/avro_register.cc
avro/avro_stream_internal.cc
Expand Down
10 changes: 3 additions & 7 deletions src/iceberg/avro/avro_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>

#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_schema_util_internal.h"
Expand All @@ -51,13 +52,8 @@ Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const ReaderOptions&
}

auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
auto result = io->fs()->OpenInputFile(file_info);
if (!result.ok()) {
return IOError("Failed to open file {} for {}", options.path,
result.status().message());
}

return std::make_unique<AvroInputStream>(result.MoveValueUnsafe(), buffer_size);
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, io->fs()->OpenInputFile(file_info));
return std::make_unique<AvroInputStream>(file, buffer_size);
}

} // namespace
Expand Down
142 changes: 142 additions & 0 deletions src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/avro/avro_writer.h"

#include <memory>

#include <arrow/array/builder_base.h>
#include <arrow/c/bridge.h>
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <avro/DataFile.hh>
#include <avro/GenericDatum.hh>

#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/avro/avro_stream_internal.h"
#include "iceberg/schema.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"

namespace iceberg::avro {

namespace {

Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const WriterOptions& options,
int64_t buffer_size) {
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path));
return std::make_unique<AvroOutputStream>(output, buffer_size);
}

} // namespace

// A stateful context to keep track of the writing progress.
struct WriteContext {};

class AvroWriter::Impl {
public:
Status Open(const WriterOptions& options) {
write_schema_ = options.schema;

::avro::NodePtr root;
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root));

avro_schema_ = std::make_shared<::avro::ValidSchema>(root);

// Open the output stream and adapt to the avro interface.
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
CreateOutputStream(options, kDefaultBufferSize));

writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
std::move(output_stream), *avro_schema_);
return {};
}

Status Write(ArrowArray /*data*/) {
// TODO(xiao.dong) convert data and write to avro
// total_bytes_+= written_bytes;
return {};
}

Status Close() {
if (writer_ != nullptr) {
writer_->close();
writer_.reset();
}
return {};
}

bool Closed() const { return writer_ == nullptr; }

int64_t length() { return total_bytes_; }

private:
int64_t total_bytes_ = 0;
// The schema to write.
std::shared_ptr<::iceberg::Schema> write_schema_;
// The avro schema to write.
std::shared_ptr<::avro::ValidSchema> avro_schema_;
// The avro writer to write the data into a datum.
std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_;
};

AvroWriter::~AvroWriter() = default;

Status AvroWriter::Write(ArrowArray data) { return impl_->Write(data); }

Status AvroWriter::Open(const WriterOptions& options) {
impl_ = std::make_unique<Impl>();
return impl_->Open(options);
}

Status AvroWriter::Close() {
if (!impl_->Closed()) {
return impl_->Close();
}
return {};
}

std::optional<Metrics> AvroWriter::metrics() {
if (impl_->Closed()) {
// TODO(xiao.dong) implement metrics
return {};
}
return std::nullopt;
}

std::optional<int64_t> AvroWriter::length() {
if (impl_->Closed()) {
return impl_->length();
}
return std::nullopt;
}

std::vector<int64_t> AvroWriter::split_offsets() { return {}; }

void AvroWriter::Register() {
static WriterFactoryRegistry avro_writer_register(
FileFormatType::kAvro,
[]() -> Result<std::unique_ptr<Writer>> { return std::make_unique<AvroWriter>(); });
}

} // namespace iceberg::avro
54 changes: 54 additions & 0 deletions src/iceberg/avro/avro_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include "iceberg/file_writer.h"
#include "iceberg/iceberg_bundle_export.h"

namespace iceberg::avro {

/// \brief A writer ArrowArray to Avro files.
class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {
public:
AvroWriter() = default;

~AvroWriter() override;

Status Open(const WriterOptions& options) final;

Status Close() final;

Status Write(ArrowArray data) final;

std::optional<Metrics> metrics() final;

std::optional<int64_t> length() final;

std::vector<int64_t> split_offsets() final;

/// \brief Register this Avro writer implementation.
static void Register();

private:
class Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace iceberg::avro
62 changes: 62 additions & 0 deletions src/iceberg/file_writer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/file_writer.h"

#include <unordered_map>

#include "iceberg/result.h"
#include "iceberg/util/formatter.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

WriterFactory GetNotImplementedFactory(FileFormatType format_type) {
return [format_type]() -> Result<std::unique_ptr<Writer>> {
return NotImplemented("Missing writer factory for file format: {}", format_type);
};
}

} // namespace

WriterFactory& WriterFactoryRegistry::GetFactory(FileFormatType format_type) {
static std::unordered_map<FileFormatType, WriterFactory> factories = {
{FileFormatType::kAvro, GetNotImplementedFactory(FileFormatType::kAvro)},
{FileFormatType::kParquet, GetNotImplementedFactory(FileFormatType::kParquet)},
{FileFormatType::kOrc, GetNotImplementedFactory(FileFormatType::kOrc)},
{FileFormatType::kPuffin, GetNotImplementedFactory(FileFormatType::kPuffin)},
};
return factories.at(format_type);
}

WriterFactoryRegistry::WriterFactoryRegistry(FileFormatType format_type,
WriterFactory factory) {
GetFactory(format_type) = std::move(factory);
}

Result<std::unique_ptr<Writer>> WriterFactoryRegistry::Open(
FileFormatType format_type, const WriterOptions& options) {
ICEBERG_ASSIGN_OR_RAISE(auto writer, GetFactory(format_type)());
ICEBERG_RETURN_UNEXPECTED(writer->Open(options));
return writer;
}

} // namespace iceberg
17 changes: 16 additions & 1 deletion src/iceberg/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "iceberg/arrow_c_data.h"
#include "iceberg/file_format.h"
#include "iceberg/metrics.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

Expand All @@ -38,7 +39,7 @@ struct ICEBERG_EXPORT WriterOptions {
/// \brief The path to the file to write.
std::string path;
/// \brief The schema of the data to write.
ArrowSchema schema;
std::shared_ptr<Schema> schema;
/// \brief FileIO instance to open the file. Writer implementations should down cast it
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
/// `ArrowFileSystemFileIO` as the default implementation.
Expand All @@ -65,6 +66,20 @@ class ICEBERG_EXPORT Writer {
///
/// \return Status of write results.
virtual Status Write(ArrowArray data) = 0;

/// \brief Get the file statistics.
/// Only valid after the file is closed.
virtual std::optional<Metrics> metrics() = 0;

/// \brief Get the file length.
/// Only valid after the file is closed.
virtual std::optional<int64_t> length() = 0;

/// \brief Returns a list of recommended split locations, if applicable, empty
/// otherwise. When available, this information is used for planning scan tasks whose
/// boundaries are determined by these offsets. The returned list must be sorted in
/// ascending order. Only valid after the file is closed.
virtual std::vector<int64_t> split_offsets() = 0;
};

/// \brief Factory function to create a writer of a specific file format.
Expand Down
43 changes: 43 additions & 0 deletions src/iceberg/metrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/metrics.h
/// Iceberg file format metrics

#include <unordered_map>

#include "iceberg/expression/literal.h"
#include "iceberg/iceberg_export.h"

namespace iceberg {

/// \brief Iceberg file format metrics
struct ICEBERG_EXPORT Metrics {
int64_t row_count = 0;
std::unordered_map<int64_t, int64_t> column_sizes;
std::unordered_map<int64_t, int64_t> value_counts;
std::unordered_map<int64_t, int64_t> null_value_counts;
std::unordered_map<int64_t, int64_t> nan_value_counts;
std::unordered_map<int64_t, Literal> lower_bounds;
std::unordered_map<int64_t, Literal> upper_bounds;
};

} // namespace iceberg
Loading