diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index e361f1e2f..783d99c4b 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -70,7 +70,7 @@ function(resolve_arrow_dependency) ON CACHE BOOL "" FORCE) set(ARROW_FILESYSTEM - OFF + ON CACHE BOOL "" FORCE) set(ARROW_SIMD_LEVEL "NONE" diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index ac33be2c1..fec895240 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -60,7 +60,8 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg) if(ICEBERG_BUILD_BUNDLE) - set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc avro/demo_avro.cc) + set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc + avro/demo_avro.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h b/src/iceberg/arrow/arrow_error_transform_internal.h new file mode 100644 index 000000000..588bab549 --- /dev/null +++ b/src/iceberg/arrow/arrow_error_transform_internal.h @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/error.h" +#include "iceberg/expected.h" + +namespace iceberg::arrow { + +inline ErrorKind ToErrorKind(const ::arrow::Status& status) { + switch (status.code()) { + case ::arrow::StatusCode::IOError: + return ErrorKind::kIOError; + default: + return ErrorKind::kUnknownError; + } +} + +#define ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, error_transform) \ + auto&& result_name = (rexpr); \ + if (!result_name.ok()) { \ + return unexpected{{.kind = error_transform(result_name.status()), \ + .message = result_name.status().ToString()}}; \ + } \ + lhs = std::move(result_name).ValueOrDie(); + +#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \ + ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \ + ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, ToErrorKind) + +#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \ + do { \ + auto&& _status = (expr); \ + if (!_status.ok()) { \ + return unexpected{ \ + {.kind = ToErrorKind(_status), .message = _status.ToString()}}; \ + } \ + } while (0) + +} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/arrow_fs_file_io.cc b/src/iceberg/arrow/arrow_fs_file_io.cc new file mode 100644 index 000000000..270ecb78e --- /dev/null +++ b/src/iceberg/arrow/arrow_fs_file_io.cc @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/arrow/arrow_fs_file_io.h" + +#include + +#include "iceberg/arrow/arrow_error_transform_internal.h" + +namespace iceberg::arrow { + +/// \brief Read the content of the file at the given location. +expected ArrowFileSystemFileIO::ReadFile( + const std::string& file_location, std::optional length) { + ::arrow::fs::FileInfo file_info(file_location); + if (length.has_value()) { + file_info.set_size(length.value()); + } + std::string content; + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_info)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize()); + + content.resize(file_size); + size_t remain = file_size; + size_t offset = 0; + while (remain > 0) { + size_t read_length = std::min(remain, static_cast(1024 * 1024)); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto read_bytes, + file->Read(read_length, reinterpret_cast(&content[offset]))); + remain -= read_bytes; + offset += read_bytes; + } + + return content; +} + +/// \brief Write the given content to the file at the given location. +expected ArrowFileSystemFileIO::WriteFile(const std::string& file_location, + std::string_view content) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(file_location)); + ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size())); + ICEBERG_ARROW_RETURN_NOT_OK(file->Flush()); + ICEBERG_ARROW_RETURN_NOT_OK(file->Close()); + return {}; +} + +/// \brief Delete a file at the given location. +expected ArrowFileSystemFileIO::DeleteFile( + const std::string& file_location) { + ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location)); + return {}; +} + +} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/arrow_fs_file_io.h b/src/iceberg/arrow/arrow_fs_file_io.h new file mode 100644 index 000000000..e79e75fea --- /dev/null +++ b/src/iceberg/arrow/arrow_fs_file_io.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include + +#include "iceberg/file_io.h" +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::arrow { + +/// \brief A concrete implementation of FileIO for Arrow file system. +class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { + public: + explicit ArrowFileSystemFileIO(std::shared_ptr<::arrow::fs::FileSystem> arrow_fs) + : arrow_fs_(std::move(arrow_fs)) {} + + ~ArrowFileSystemFileIO() override = default; + + /// \brief Read the content of the file at the given location. + expected ReadFile(const std::string& file_location, + std::optional length) override; + + /// \brief Write the given content to the file at the given location. + expected WriteFile(const std::string& file_location, + std::string_view content) override; + + /// \brief Delete a file at the given location. + expected DeleteFile(const std::string& file_location) override; + + private: + std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_; +}; + +} // namespace iceberg::arrow diff --git a/src/iceberg/demo.cc b/src/iceberg/demo.cc index a9c0a031c..aa1835b22 100644 --- a/src/iceberg/demo.cc +++ b/src/iceberg/demo.cc @@ -21,6 +21,7 @@ #include "iceberg/avro.h" // include to export symbols #include "iceberg/catalog.h" +#include "iceberg/file_io.h" #include "iceberg/location_provider.h" #include "iceberg/table.h" #include "iceberg/transaction.h" diff --git a/src/iceberg/error.h b/src/iceberg/error.h index 60f07f331..a4b74a97c 100644 --- a/src/iceberg/error.h +++ b/src/iceberg/error.h @@ -34,6 +34,9 @@ enum class ErrorKind { kCommitStateUnknown, kInvalidSchema, kInvalidArgument, + kIOError, + kNotImplemented, + kUnknownError, kNotSupported, }; diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h new file mode 100644 index 000000000..03922f654 --- /dev/null +++ b/src/iceberg/file_io.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/error.h" +#include "iceberg/expected.h" +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief Pluggable module for reading, writing, and deleting files. +/// +/// This module only handle metadata files, not data files. The metadata files +/// are typically small and are used to store schema, partition information, +/// and other metadata about the table. +/// +/// Note that these functions are not atomic. For example, if a write fails, +/// the file may be partially written. Implementations should be careful to +/// avoid corrupting metadata files. +class ICEBERG_EXPORT FileIO { + public: + FileIO() = default; + virtual ~FileIO() = default; + + /// \brief Read the content of the file at the given location. + /// + /// \param file_location The location of the file to read. + /// \param length The number of bytes to read. Some object storage need to specify + /// the length to read, e.g. S3 `GetObject` has a Range parameter. + /// \return The content of the file if the read succeeded, an error code if the read + /// failed. + virtual expected ReadFile(const std::string& file_location, + std::optional length) { + // We provide a default implementation to avoid Windows linker error LNK2019. + return unexpected{ + {.kind = ErrorKind::kNotImplemented, .message = "ReadFile not implemented"}}; + } + + /// \brief Write the given content to the file at the given location. + /// + /// \param file_location The location of the file to write. + /// \param content The content to write to the file. + /// \param overwrite If true, overwrite the file if it exists. If false, fail if the + /// file exists. + /// \return void if the write succeeded, an error code if the write failed. + virtual expected WriteFile(const std::string& file_location, + std::string_view content) { + return unexpected{ + {.kind = ErrorKind::kNotImplemented, .message = "WriteFile not implemented"}}; + } + + /// \brief Delete a file at the given location. + /// + /// \param file_location The location of the file to delete. + /// \return void if the delete succeeded, an error code if the delete failed. + virtual expected DeleteFile(const std::string& file_location) { + return unexpected{ + {.kind = ErrorKind::kNotImplemented, .message = "DeleteFile not implemented"}}; + } +}; + +} // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9849d4378..96e319445 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -47,8 +47,8 @@ if(ICEBERG_BUILD_BUNDLE) add_test(NAME avro_test COMMAND avro_test) add_executable(arrow_test) - target_sources(arrow_test PRIVATE arrow_test.cc) - target_link_libraries(arrow_test PRIVATE iceberg_bundle_static Arrow::arrow_static - GTest::gtest_main GTest::gmock) + target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc) + target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main + GTest::gmock) add_test(NAME arrow_test COMMAND arrow_test) endif() diff --git a/test/arrow_fs_file_io_test.cc b/test/arrow_fs_file_io_test.cc new file mode 100644 index 000000000..193c13df3 --- /dev/null +++ b/test/arrow_fs_file_io_test.cc @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/arrow/arrow_fs_file_io.h" + +#include + +#include +#include + +#include "matchers.h" + +namespace iceberg { + +class LocalFileIOTest : public testing::Test { + protected: + void SetUp() override { + local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>(); + file_io_ = std::make_shared(local_fs_); + } + + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; + std::shared_ptr file_io_; + std::filesystem::path tmpfile = std::filesystem::temp_directory_path() / "123.txt"; +}; + +TEST_F(LocalFileIOTest, ReadWriteFile) { + auto read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt); + EXPECT_THAT(read_res, IsError(ErrorKind::kIOError)); + EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file")); + + auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world"); + EXPECT_THAT(write_res, IsOk()); + + read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt); + EXPECT_THAT(read_res, IsOk()); + EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world"))); +} + +TEST_F(LocalFileIOTest, DeleteFile) { + auto del_res = file_io_->DeleteFile(tmpfile.string()); + EXPECT_THAT(del_res, IsOk()); + + del_res = file_io_->DeleteFile(tmpfile.string()); + EXPECT_THAT(del_res, IsError(ErrorKind::kIOError)); + EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file")); +} + +} // namespace iceberg