diff --git a/.github/workflows/cpp-linter.yml b/.github/workflows/cpp-linter.yml index 1100c6525..8a3d4cca5 100644 --- a/.github/workflows/cpp-linter.yml +++ b/.github/workflows/cpp-linter.yml @@ -39,7 +39,7 @@ jobs: mkdir build && cd build cmake .. -DCMAKE_EXPORT_COMPILE_COMMANDS=ON cmake --build . - - uses: cpp-linter/cpp-linter-action@d7155ea6699028b6b09b0457e26b3c5d73f0ed46 + - uses: cpp-linter/cpp-linter-action@f91c446a32ae3eb9f98fef8c9ed4c7cb613a4f8a id: linter continue-on-error: true env: diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index b509c2579..dac3ddd23 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -48,6 +48,7 @@ set(ICEBERG_SOURCES type.cc manifest_reader.cc manifest_reader_internal.cc + manifest_writer.cc arrow_c_data_guard_internal.cc util/murmurhash3_internal.cc util/timepoint.cc diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h new file mode 100644 index 000000000..2ffe51be3 --- /dev/null +++ b/src/iceberg/manifest_adapter.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metadata_adapter.h +/// Base class of adapter for v1v2v3v4 metadata. + +#include "iceberg/arrow_c_data.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +// \brief Base class to append manifest metadata to Arrow array. +class ICEBERG_EXPORT ManifestAdapter { + public: + ManifestAdapter() = default; + virtual ~ManifestAdapter() = default; + + virtual Status StartAppending() = 0; + virtual Result FinishAppending() = 0; + int64_t size() const { return size_; } + + protected: + ArrowArray array_; + int64_t size_ = 0; +}; + +// \brief Implemented by different versions with different schemas to +// append a list of `ManifestEntry`s to an `ArrowArray`. +class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter { + public: + ManifestEntryAdapter() = default; + ~ManifestEntryAdapter() override = default; + + virtual Status Append(const ManifestEntry& entry) = 0; +}; + +// \brief Implemented by different versions with different schemas to +// append a list of `ManifestFile`s to an `ArrowArray`. +class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter { + public: + ManifestFileAdapter() = default; + ~ManifestFileAdapter() override = default; + + virtual Status Append(const ManifestFile& file) = 0; +}; + +} // namespace iceberg diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc new file mode 100644 index 000000000..27fd3f767 --- /dev/null +++ b/src/iceberg/manifest_writer.cc @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest_writer.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" +#include "iceberg/util/macros.h" +#include "iceberg/v1_metadata.h" +#include "iceberg/v2_metadata.h" +#include "iceberg/v3_metadata.h" + +namespace iceberg { + +Status ManifestWriter::Add(const ManifestEntry& entry) { + if (adapter_->size() >= kBatchSize) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); + ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending()); + } + return adapter_->Append(entry); +} + +Status ManifestWriter::AddAll(const std::vector& entries) { + for (const auto& entry : entries) { + ICEBERG_RETURN_UNEXPECTED(Add(entry)); + } + return {}; +} + +Status ManifestWriter::Close() { + if (adapter_->size() > 0) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); + } + return {}; +} + +Result> OpenFileWriter(std::string_view location, + std::shared_ptr schema, + std::shared_ptr file_io) { + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = std::string(location), + .schema = std::move(schema), + .io = std::move(file_io)})); + return writer; +} + +Result> ManifestWriter::MakeV1Writer( + std::optional snapshot_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema) { + // TODO(xiao.dong) parse v1 schema + auto manifest_entry_schema = + ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); + auto fields_span = manifest_entry_schema->fields(); + std::vector fields(fields_span.begin(), fields_span.end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_location, schema, std::move(file_io))); + auto adapter = std::make_unique(snapshot_id, std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +Result> ManifestWriter::MakeV2Writer( + std::optional snapshot_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema) { + // TODO(xiao.dong) parse v2 schema + auto manifest_entry_schema = + ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); + auto fields_span = manifest_entry_schema->fields(); + std::vector fields(fields_span.begin(), fields_span.end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_location, schema, std::move(file_io))); + auto adapter = std::make_unique(snapshot_id, std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +Result> ManifestWriter::MakeV3Writer( + std::optional snapshot_id, std::optional first_row_id, + std::string_view manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema) { + // TODO(xiao.dong) parse v3 schema + auto manifest_entry_schema = + ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); + auto fields_span = manifest_entry_schema->fields(); + std::vector fields(fields_span.begin(), fields_span.end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_location, schema, std::move(file_io))); + auto adapter = std::make_unique(snapshot_id, first_row_id, + std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +Status ManifestListWriter::Add(const ManifestFile& file) { + if (adapter_->size() >= kBatchSize) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); + ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending()); + } + return adapter_->Append(file); +} + +Status ManifestListWriter::AddAll(const std::vector& files) { + for (const auto& file : files) { + ICEBERG_RETURN_UNEXPECTED(Add(file)); + } + return {}; +} + +Status ManifestListWriter::Close() { + if (adapter_->size() > 0) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); + } + return {}; +} + +Result> ManifestListWriter::MakeV1Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + std::string_view manifest_list_location, std::shared_ptr file_io) { + // TODO(xiao.dong) parse v1 schema + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); + auto adapter = std::make_unique(snapshot_id, parent_snapshot_id, + std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +Result> ManifestListWriter::MakeV2Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::string_view manifest_list_location, + std::shared_ptr file_io) { + // TODO(xiao.dong) parse v2 schema + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); + auto adapter = std::make_unique( + snapshot_id, parent_snapshot_id, sequence_number, std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +Result> ManifestListWriter::MakeV3Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::optional first_row_id, + std::string_view manifest_list_location, std::shared_ptr file_io) { + // TODO(xiao.dong) parse v3 schema + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); + auto adapter = std::make_unique( + snapshot_id, parent_snapshot_id, sequence_number, first_row_id, std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +} // namespace iceberg diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 3c5091b3d..c9ec30702 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -20,13 +20,14 @@ #pragma once /// \file iceberg/manifest_writer.h -/// Data writer interface for manifest files. +/// Data writer interface for manifest files and manifest list files. #include #include #include "iceberg/file_writer.h" #include "iceberg/iceberg_export.h" +#include "iceberg/manifest_adapter.h" #include "iceberg/type_fwd.h" namespace iceberg { @@ -34,31 +35,121 @@ namespace iceberg { /// \brief Write manifest entries to a manifest file. class ICEBERG_EXPORT ManifestWriter { public: + ManifestWriter(std::unique_ptr writer, + std::unique_ptr adapter) + : writer_(std::move(writer)), adapter_(std::move(adapter)) {} + virtual ~ManifestWriter() = default; - virtual Status WriteManifestEntries( - const std::vector& entries) const = 0; + + /// \brief Write manifest entry to file. + /// \param entry Manifest entry to write. + /// \return Status::OK() if entry was written successfully + Status Add(const ManifestEntry& entry); + + /// \brief Write manifest entries to file. + /// \param entries Manifest entries to write. + /// \return Status::OK() if all entries were written successfully + Status AddAll(const std::vector& entries); + + /// \brief Close writer and flush to storage. + Status Close(); /// \brief Creates a writer for a manifest file. + /// \param snapshot_id ID of the snapshot. /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. - static Result> MakeWriter( + static Result> MakeV1Writer( + std::optional snapshot_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema); + + /// \brief Creates a writer for a manifest file. + /// \param snapshot_id ID of the snapshot. + /// \param manifest_location Path to the manifest file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the writer or an error. + static Result> MakeV2Writer( + std::optional snapshot_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema); + + /// \brief Creates a writer for a manifest file. + /// \param snapshot_id ID of the snapshot. + /// \param first_row_id First row ID of the snapshot. + /// \param manifest_location Path to the manifest file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the writer or an error. + static Result> MakeV3Writer( + std::optional snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema); + + private: + static constexpr int64_t kBatchSize = 1024; + std::unique_ptr writer_; + std::unique_ptr adapter_; }; /// \brief Write manifest files to a manifest list file. class ICEBERG_EXPORT ManifestListWriter { public: + ManifestListWriter(std::unique_ptr writer, + std::unique_ptr adapter) + : writer_(std::move(writer)), adapter_(std::move(adapter)) {} + virtual ~ManifestListWriter() = default; - virtual Status WriteManifestFiles(const std::vector& files) const = 0; + + /// \brief Write manifest file to manifest list file. + /// \param file Manifest file to write. + /// \return Status::OK() if file was written successfully + Status Add(const ManifestFile& file); + + /// \brief Write manifest file list to manifest list file. + /// \param files Manifest file list to write. + /// \return Status::OK() if all files were written successfully + Status AddAll(const std::vector& files); + + /// \brief Close writer and flush to storage. + Status Close(); + + /// \brief Creates a writer for the v1 manifest list. + /// \param snapshot_id ID of the snapshot. + /// \param parent_snapshot_id ID of the parent snapshot. + /// \param manifest_list_location Path to the manifest list file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the writer or an error. + static Result> MakeV1Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + std::string_view manifest_list_location, std::shared_ptr file_io); /// \brief Creates a writer for the manifest list. + /// \param snapshot_id ID of the snapshot. + /// \param parent_snapshot_id ID of the parent snapshot. + /// \param sequence_number Sequence number of the snapshot. /// \param manifest_list_location Path to the manifest list file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. - static Result> MakeWriter( + static Result> MakeV2Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::string_view manifest_list_location, + std::shared_ptr file_io); + + /// \brief Creates a writer for the manifest list. + /// \param snapshot_id ID of the snapshot. + /// \param parent_snapshot_id ID of the parent snapshot. + /// \param sequence_number Sequence number of the snapshot. + /// \param first_row_id First row ID of the snapshot. + /// \param manifest_list_location Path to the manifest list file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the writer or an error. + static Result> MakeV3Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::optional first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io); + + private: + static constexpr int64_t kBatchSize = 1024; + std::unique_ptr writer_; + std::unique_ptr adapter_; }; } // namespace iceberg diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h new file mode 100644 index 000000000..7e91da7b3 --- /dev/null +++ b/src/iceberg/v1_metadata.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/v1_metadata.h + +#include + +#include "iceberg/manifest_adapter.h" + +namespace iceberg { + +/// \brief Adapter to convert V1 ManifestEntry to `ArrowArray`. +class ManifestEntryAdapterV1 : public ManifestEntryAdapter { + public: + ManifestEntryAdapterV1(std::optional snapshot_id, + std::shared_ptr schema) { + // TODO(xiao.dong): init v1 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestEntry& entry) override { return {}; } + Result FinishAppending() override { return {}; } + + private: + std::shared_ptr manifest_schema_; + ArrowSchema schema_; // converted from manifest_schema_ +}; + +/// \brief Adapter to convert V1 ManifestFile to `ArrowArray`. +class ManifestFileAdapterV1 : public ManifestFileAdapter { + public: + ManifestFileAdapterV1(int64_t snapshot_id, std::optional parent_snapshot_id, + std::shared_ptr schema) { + // TODO(xiao.dong): init v1 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestFile& file) override { return {}; } + Result FinishAppending() override { return {}; } + + private: + std::shared_ptr manifest_list_schema_; + ArrowSchema schema_; // converted from manifest_list_schema_ +}; + +} // namespace iceberg diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h new file mode 100644 index 000000000..d6ff6aa3a --- /dev/null +++ b/src/iceberg/v2_metadata.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/v2_metadata.h + +#include + +#include "iceberg/manifest_adapter.h" + +namespace iceberg { + +/// \brief Adapter to convert V2 ManifestEntry to `ArrowArray`. +class ManifestEntryAdapterV2 : public ManifestEntryAdapter { + public: + ManifestEntryAdapterV2(std::optional snapshot_id, + std::shared_ptr schema) { + // TODO(xiao.dong): init v2 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestEntry& entry) override { return {}; } + Result FinishAppending() override { return {}; } + + private: + std::shared_ptr manifest_schema_; + ArrowSchema schema_; // converted from manifest_schema_ +}; + +/// \brief Adapter to convert V2 ManifestFile to `ArrowArray`. +class ManifestFileAdapterV2 : public ManifestFileAdapter { + public: + ManifestFileAdapterV2(int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::shared_ptr schema) { + // TODO(xiao.dong): init v2 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestFile& file) override { return {}; } + Result FinishAppending() override { return {}; } + + private: + std::shared_ptr manifest_list_schema_; + ArrowSchema schema_; // converted from manifest_list_schema_ +}; + +} // namespace iceberg diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h new file mode 100644 index 000000000..e7bcc3552 --- /dev/null +++ b/src/iceberg/v3_metadata.h @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/v3_metadata.h + +#include + +#include "iceberg/manifest_adapter.h" + +namespace iceberg { + +/// \brief Adapter to convert V3ManifestEntry to `ArrowArray`. +class ManifestEntryAdapterV3 : public ManifestEntryAdapter { + public: + ManifestEntryAdapterV3(std::optional snapshot_id, + std::optional first_row_id, + std::shared_ptr schema) { + // TODO(xiao.dong): init v3 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestEntry& entry) override { return {}; } + Result FinishAppending() override { return {}; } + + private: + std::shared_ptr manifest_schema_; + ArrowSchema schema_; // converted from manifest_schema_ +}; + +/// \brief Adapter to convert V3 ManifestFile to `ArrowArray`. +class ManifestFileAdapterV3 : public ManifestFileAdapter { + public: + ManifestFileAdapterV3(int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::optional first_row_id, + std::shared_ptr schema) { + // TODO(xiao.dong): init v3 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestFile& file) override { return {}; } + Result FinishAppending() override { return {}; } + + private: + std::shared_ptr manifest_list_schema_; + ArrowSchema schema_; // converted from manifest_list_schema_ +}; + +} // namespace iceberg