Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake_modules/IcebergThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ function(resolve_arrow_dependency)
ON
CACHE BOOL "" FORCE)
set(ARROW_FILESYSTEM
OFF
ON
CACHE BOOL "" FORCE)
set(ARROW_SIMD_LEVEL
"NONE"
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg)

if(ICEBERG_BUILD_BUNDLE)
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc avro/demo_avro.cc)
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc
avro/demo_avro.cc)

# Libraries to link with exported libiceberg_bundle.{so,a}.
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
Expand Down
61 changes: 61 additions & 0 deletions src/iceberg/arrow/arrow_error_transform_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <arrow/result.h>
#include <arrow/status.h>

#include "iceberg/error.h"
#include "iceberg/expected.h"

namespace iceberg::arrow::internal {

inline ErrorKind ToErrorKind(const ::arrow::Status& status) {
switch (status.code()) {
case ::arrow::StatusCode::IOError:
return ErrorKind::kIOError;
default:
return ErrorKind::kUnknownError;
}
}

#define ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, error_transform) \
auto&& result_name = (rexpr); \
if (!result_name.ok()) { \
return unexpected<Error>{{.kind = error_transform(result_name.status()), \
.message = result_name.status().ToString()}}; \
} \
lhs = std::move(result_name).ValueOrDie();

#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \
ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \
ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \
internal::ToErrorKind)

#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \
do { \
auto&& _status = (expr); \
if (!_status.ok()) { \
return unexpected<Error>{ \
{.kind = internal::ToErrorKind(_status), .message = _status.ToString()}}; \
} \
} while (0)

} // namespace iceberg::arrow::internal
71 changes: 71 additions & 0 deletions src/iceberg/arrow/arrow_fs_file_io.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/arrow/arrow_fs_file_io.h"

#include <arrow/filesystem/localfs.h>

#include "iceberg/arrow/arrow_error_transform_internal.h"

namespace iceberg::arrow {

/// \brief Read the content of the file at the given location.
expected<std::string, Error> ArrowFileSystemFileIO::ReadFile(
const std::string& file_location, std::optional<size_t> length) {
::arrow::fs::FileInfo file_info(file_location);
if (length.has_value()) {
file_info.set_size(length.value());
}
std::string content;
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_info));
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize());

content.resize(file_size);
size_t remain = file_size;
size_t offset = 0;
while (remain > 0) {
size_t read_length = std::min(remain, static_cast<size_t>(1024 * 1024));
ICEBERG_ARROW_ASSIGN_OR_RETURN(
auto read_bytes,
file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
remain -= read_bytes;
offset += read_bytes;
}

return content;
}

/// \brief Write the given content to the file at the given location.
expected<void, Error> ArrowFileSystemFileIO::WriteFile(const std::string& file_location,
std::string_view content) {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(file_location));
ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
return {};
}

/// \brief Delete a file at the given location.
expected<void, Error> ArrowFileSystemFileIO::DeleteFile(
const std::string& file_location) {
ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location));
return {};
}

} // namespace iceberg::arrow
54 changes: 54 additions & 0 deletions src/iceberg/arrow/arrow_fs_file_io.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <memory>

#include <arrow/filesystem/filesystem.h>

#include "iceberg/file_io.h"
#include "iceberg/iceberg_bundle_export.h"

namespace iceberg::arrow {

/// \brief A concrete implementation of FileIO for Arrow file system.
class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
public:
explicit ArrowFileSystemFileIO(std::shared_ptr<::arrow::fs::FileSystem> arrow_fs)
: arrow_fs_(std::move(arrow_fs)) {}

~ArrowFileSystemFileIO() override = default;

/// \brief Read the content of the file at the given location.
expected<std::string, Error> ReadFile(const std::string& file_location,
std::optional<size_t> length) override;

/// \brief Write the given content to the file at the given location.
expected<void, Error> WriteFile(const std::string& file_location,
std::string_view content) override;

/// \brief Delete a file at the given location.
expected<void, Error> DeleteFile(const std::string& file_location) override;

private:
std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
};

} // namespace iceberg::arrow
1 change: 1 addition & 0 deletions src/iceberg/demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "iceberg/avro.h" // include to export symbols
#include "iceberg/catalog.h"
#include "iceberg/file_io.h"
#include "iceberg/location_provider.h"
#include "iceberg/table.h"
#include "iceberg/transaction.h"
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ enum class ErrorKind {
kCommitStateUnknown,
kInvalidSchema,
kInvalidArgument,
kIOError,
kNotImplemented,
kUnknownError,
};

/// \brief Error with a kind and a message.
Expand Down
83 changes: 83 additions & 0 deletions src/iceberg/file_io.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <optional>
#include <string>
#include <string_view>

#include "iceberg/error.h"
#include "iceberg/expected.h"
#include "iceberg/iceberg_export.h"

namespace iceberg {

/// \brief Pluggable module for reading, writing, and deleting files.
///
/// This module only handle metadata files, not data files. The metadata files
/// are typically small and are used to store schema, partition information,
/// and other metadata about the table.
///
/// Note that these functions are not atomic. For example, if a write fails,
/// the file may be partially written. Implementations should be careful to
/// avoid corrupting metadata files.
class ICEBERG_EXPORT FileIO {
public:
FileIO() = default;
virtual ~FileIO() = default;

/// \brief Read the content of the file at the given location.
///
/// \param file_location The location of the file to read.
/// \param length The number of bytes to read. Some object storage need to specify
/// the length to read, e.g. S3 `GetObject` has a Range parameter.
/// \return The content of the file if the read succeeded, an error code if the read
/// failed.
virtual expected<std::string, Error> ReadFile(const std::string& file_location,
std::optional<size_t> length) {
// We provide a default implementation to avoid Windows linker error LNK2019.
return unexpected<Error>{
{.kind = ErrorKind::kNotImplemented, .message = "ReadFile not implemented"}};
}

/// \brief Write the given content to the file at the given location.
///
/// \param file_location The location of the file to write.
/// \param content The content to write to the file.
/// \param overwrite If true, overwrite the file if it exists. If false, fail if the
/// file exists.
/// \return void if the write succeeded, an error code if the write failed.
virtual expected<void, Error> WriteFile(const std::string& file_location,
std::string_view content) {
return unexpected<Error>{
{.kind = ErrorKind::kNotImplemented, .message = "WriteFile not implemented"}};
}

/// \brief Delete a file at the given location.
///
/// \param file_location The location of the file to delete.
/// \return void if the delete succeeded, an error code if the delete failed.
virtual expected<void, Error> DeleteFile(const std::string& file_location) {
return unexpected<Error>{
{.kind = ErrorKind::kNotImplemented, .message = "DeleteFile not implemented"}};
}
};

} // namespace iceberg
6 changes: 3 additions & 3 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ if(ICEBERG_BUILD_BUNDLE)
add_test(NAME avro_test COMMAND avro_test)

add_executable(arrow_test)
target_sources(arrow_test PRIVATE arrow_test.cc)
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static Arrow::arrow_static
GTest::gtest_main GTest::gmock)
target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc)
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
GTest::gmock)
add_test(NAME arrow_test COMMAND arrow_test)
endif()
65 changes: 65 additions & 0 deletions test/arrow_fs_file_io_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/arrow/arrow_fs_file_io.h"

#include <filesystem>

#include <arrow/filesystem/localfs.h>
#include <gtest/gtest.h>

#include "matchers.h"

namespace iceberg {

class LocalFileIOTest : public testing::Test {
protected:
void SetUp() override {
local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
}

std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::shared_ptr<iceberg::FileIO> file_io_;
std::filesystem::path tmpfile = std::filesystem::temp_directory_path() / "123.txt";
};

TEST_F(LocalFileIOTest, ReadWriteFile) {
auto read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
EXPECT_THAT(read_res, IsError(ErrorKind::kIOError));
EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file"));

auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world");
EXPECT_THAT(write_res, IsOk());

read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
EXPECT_THAT(read_res, IsOk());
EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));
}

TEST_F(LocalFileIOTest, DeleteFile) {
auto del_res = file_io_->DeleteFile(tmpfile.string());
EXPECT_THAT(del_res, IsOk());

del_res = file_io_->DeleteFile(tmpfile.string());
EXPECT_THAT(del_res, IsError(ErrorKind::kIOError));
EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file"));
}

} // namespace iceberg
Loading