Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ cmake-build-release/

# intellij files
.idea

# vscode files
.vscode
3 changes: 2 additions & 1 deletion src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ if(ICEBERG_BUILD_BUNDLE)
parquet/parquet_data_util.cc
parquet/parquet_reader.cc
parquet/parquet_register.cc
parquet/parquet_schema_util.cc)
parquet/parquet_schema_util.cc
parquet/parquet_writer.cc)

# Libraries to link with exported libiceberg_bundle.{so,a}.
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/arrow/arrow_error_transform_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) {
switch (status.code()) {
case ::arrow::StatusCode::IOError:
return ErrorKind::kIOError;
case ::arrow::StatusCode::NotImplemented:
return ErrorKind::kNotImplemented;
default:
return ErrorKind::kUnknownError;
}
Expand Down
28 changes: 28 additions & 0 deletions src/iceberg/constants.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <string_view>

namespace iceberg {

constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";

} // namespace iceberg
7 changes: 5 additions & 2 deletions src/iceberg/parquet/parquet_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ class ParquetReader::Impl {
arrow_reader_properties.set_arrow_extensions_enabled(true);

// Open the Parquet file reader
ICEBERG_ASSIGN_OR_RAISE(auto input_stream, OpenInputStream(options));
ICEBERG_ASSIGN_OR_RAISE(input_stream_, OpenInputStream(options));
auto file_reader =
::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties);
::parquet::ParquetFileReader::Open(input_stream_, reader_properties);
ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make(
pool_, std::move(file_reader), arrow_reader_properties, &reader_));

Expand Down Expand Up @@ -169,6 +169,7 @@ class ParquetReader::Impl {
}

reader_.reset();
ICEBERG_ARROW_RETURN_NOT_OK(input_stream_->Close());
return {};
}

Expand Down Expand Up @@ -236,6 +237,8 @@ class ParquetReader::Impl {
std::shared_ptr<::iceberg::Schema> read_schema_;
// The projection result to apply to the read schema.
SchemaProjection projection_;
// The input stream to read Parquet file.
std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_;
// Parquet file reader to create RecordBatchReader.
std::unique_ptr<::parquet::arrow::FileReader> reader_;
// The context to keep track of the reading progress.
Expand Down
2 changes: 0 additions & 2 deletions src/iceberg/parquet/parquet_register.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

namespace iceberg::parquet {

void RegisterWriter() {}

void RegisterAll() {
RegisterReader();
RegisterWriter();
Expand Down
3 changes: 1 addition & 2 deletions src/iceberg/parquet/parquet_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <parquet/arrow/schema.h>
#include <parquet/schema.h>

#include "iceberg/constants.h"
#include "iceberg/metadata_columns.h"
#include "iceberg/parquet/parquet_schema_util_internal.h"
#include "iceberg/result.h"
Expand All @@ -38,8 +39,6 @@ namespace iceberg::parquet {

namespace {

constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";

std::optional<int32_t> FieldIdFromMetadata(
const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
if (!metadata) {
Expand Down
167 changes: 167 additions & 0 deletions src/iceberg/parquet/parquet_writer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/parquet/parquet_writer.h"

#include <memory>

#include <arrow/c/bridge.h>
#include <arrow/record_batch.h>
#include <arrow/util/key_value_metadata.h>
#include <parquet/arrow/schema.h>
#include <parquet/arrow/writer.h>
#include <parquet/file_writer.h>
#include <parquet/properties.h>

#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"

namespace iceberg::parquet {

namespace {

Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(
const WriterOptions& options) {
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path));
return output;
}

} // namespace

class ParquetWriter::Impl {
public:
Status Open(const WriterOptions& options) {
auto writer_properties =
::parquet::WriterProperties::Builder().memory_pool(pool_)->build();
auto arrow_writer_properties = ::parquet::default_arrow_writer_properties();

ArrowSchema c_schema;
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema));
ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema));

std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor;
ICEBERG_ARROW_RETURN_NOT_OK(
::parquet::arrow::ToParquetSchema(arrow_schema_.get(), *writer_properties,
*arrow_writer_properties, &schema_descriptor));
auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
schema_descriptor->schema_root());

ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
auto file_writer = ::parquet::ParquetFileWriter::Open(
output_stream_, std::move(schema_node), std::move(writer_properties));
ICEBERG_ARROW_RETURN_NOT_OK(
::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), arrow_schema_,
std::move(arrow_writer_properties), &writer_));

return {};
}

Status Write(ArrowArray array) {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch,
::arrow::ImportRecordBatch(&array, arrow_schema_));

ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));

return {};
}

// Close the writer and release resources
Status Close() {
if (writer_ == nullptr) {
return {}; // Already closed
}

ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close());
auto& metadata = writer_->metadata();
split_offsets_.reserve(metadata->num_row_groups());
for (int i = 0; i < metadata->num_row_groups(); ++i) {
split_offsets_.push_back(metadata->RowGroup(i)->file_offset());
}
writer_.reset();

ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell());
ICEBERG_ARROW_RETURN_NOT_OK(output_stream_->Close());
return {};
}

bool Closed() const { return writer_ == nullptr; }

int64_t length() const { return total_bytes_; }

std::vector<int64_t> split_offsets() const { return split_offsets_; }

private:
// TODO(gangwu): make memory pool configurable
::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
// Schema to write from the Parquet file.
std::shared_ptr<::arrow::Schema> arrow_schema_;
// The output stream to write Parquet file.
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
// Parquet file writer to write ArrowArray.
std::unique_ptr<::parquet::arrow::FileWriter> writer_;
// Total length of the written Parquet file.
int64_t total_bytes_;
// Row group start offsets in the Parquet file.
std::vector<int64_t> split_offsets_;
};

ParquetWriter::~ParquetWriter() = default;

Status ParquetWriter::Open(const WriterOptions& options) {
impl_ = std::make_unique<Impl>();
return impl_->Open(options);
}

Status ParquetWriter::Write(ArrowArray array) { return impl_->Write(array); }

Status ParquetWriter::Close() { return impl_->Close(); }

std::optional<Metrics> ParquetWriter::metrics() {
if (!impl_->Closed()) {
return std::nullopt;
}
return {};
}

std::optional<int64_t> ParquetWriter::length() {
if (!impl_->Closed()) {
return std::nullopt;
}
return impl_->length();
}

std::vector<int64_t> ParquetWriter::split_offsets() {
if (!impl_->Closed()) {
return {};
}
return impl_->split_offsets();
}

void RegisterWriter() {
static WriterFactoryRegistry parquet_writer_register(
FileFormatType::kParquet, []() -> Result<std::unique_ptr<Writer>> {
return std::make_unique<ParquetWriter>();
});
}

} // namespace iceberg::parquet
51 changes: 51 additions & 0 deletions src/iceberg/parquet/parquet_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include "iceberg/file_writer.h"
#include "iceberg/iceberg_bundle_export.h"

namespace iceberg::parquet {

/// \brief A writer that writes ArrowArray to Parquet files.
class ICEBERG_BUNDLE_EXPORT ParquetWriter : public Writer {
public:
ParquetWriter() = default;

~ParquetWriter() override;

Status Open(const WriterOptions& options) final;

Status Close() final;

Status Write(ArrowArray array) final;

std::optional<Metrics> metrics() final;

std::optional<int64_t> length() final;

std::vector<int64_t> split_offsets() final;

private:
class Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace iceberg::parquet
7 changes: 4 additions & 3 deletions src/iceberg/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <optional>
#include <string>

#include "iceberg/constants.h"
#include "iceberg/schema.h"
#include "iceberg/type.h"
#include "iceberg/util/macros.h"
Expand All @@ -45,7 +46,7 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n
NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderInit(&metadata_buffer, nullptr));
if (field_id.has_value()) {
NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderAppend(
&metadata_buffer, ArrowCharView(std::string(kFieldIdKey).c_str()),
&metadata_buffer, ArrowCharView(std::string(kParquetFieldIdKey).c_str()),
ArrowCharView(std::to_string(field_id.value()).c_str())));
}

Expand Down Expand Up @@ -185,8 +186,8 @@ int32_t GetFieldId(const ArrowSchema& schema) {
return kUnknownFieldId;
}

ArrowStringView field_id_key{.data = kFieldIdKey.data(),
.size_bytes = kFieldIdKey.size()};
ArrowStringView field_id_key{.data = kParquetFieldIdKey.data(),
.size_bytes = kParquetFieldIdKey.size()};
ArrowStringView field_id_value;
if (ArrowMetadataGetValue(schema.metadata, field_id_key, &field_id_value) !=
NANOARROW_OK) {
Expand Down
5 changes: 0 additions & 5 deletions src/iceberg/schema_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@

namespace iceberg {

// Apache Arrow C++ uses "PARQUET:field_id" to store field IDs for Parquet.
// Here we follow a similar convention for Iceberg but we might also add
// "PARQUET:field_id" in the future once we implement a Parquet writer.
constexpr std::string_view kFieldIdKey = "ICEBERG:field_id";

/// \brief Convert an Iceberg schema to an Arrow schema.
///
/// \param[in] schema The Iceberg schema to convert.
Expand Down
12 changes: 12 additions & 0 deletions src/iceberg/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,16 @@ std::shared_ptr<FixedType> fixed(int32_t length) {
return std::make_shared<FixedType>(length);
}

std::shared_ptr<MapType> map(SchemaField key, SchemaField value) {
return std::make_shared<MapType>(key, value);
}

std::shared_ptr<ListType> list(SchemaField element) {
return std::make_shared<ListType>(std::move(element));
}

std::shared_ptr<StructType> struct_(std::vector<SchemaField> fields) {
return std::make_shared<StructType>(std::move(fields));
}

} // namespace iceberg
Loading
Loading