Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake_modules/IcebergThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ function(resolve_arrow_dependency)
ON
CACHE BOOL "" FORCE)
set(ARROW_FILESYSTEM
OFF
ON
CACHE BOOL "" FORCE)
set(ARROW_SIMD_LEVEL
"NONE"
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg)

if(ICEBERG_BUILD_BUNDLE)
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc avro/demo_avro.cc)
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/io/arrow_fs_file_io.cc
avro/demo_avro.cc)

# Libraries to link with exported libiceberg_bundle.{so,a}.
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@
# specific language governing permissions and limitations
# under the License.

add_subdirectory(io)

iceberg_install_all_headers(iceberg/arrow)
61 changes: 61 additions & 0 deletions src/iceberg/arrow/arrow_error_transform_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <arrow/result.h>
#include <arrow/status.h>

#include "iceberg/error.h"
#include "iceberg/expected.h"

namespace iceberg::arrow::internal {

inline ErrorKind ToErrorKind(const ::arrow::Status& status) {
switch (status.code()) {
case ::arrow::StatusCode::IOError:
return ErrorKind::kIOError;
default:
return ErrorKind::kUnknownError;
}
}

#define ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, error_transform) \
auto&& result_name = (rexpr); \
if (!result_name.ok()) { \
return unexpected<Error>{{.kind = error_transform(result_name.status()), \
.message = result_name.status().ToString()}}; \
} \
lhs = std::move(result_name).ValueOrDie();

#define ICEBERG_INTERNAL_ASSIGN_OR_RETURN(lhs, rexpr) \
ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL( \
ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \
internal::ToErrorKind)

#define ICEBERG_INTERNAL_RETURN_NOT_OK(expr) \
do { \
auto&& _status = (expr); \
if (!_status.ok()) { \
return unexpected<Error>{ \
{.kind = internal::ToErrorKind(_status), .message = _status.ToString()}}; \
} \
} while (0)

} // namespace iceberg::arrow::internal
18 changes: 18 additions & 0 deletions src/iceberg/arrow/io/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

iceberg_install_all_headers(iceberg/arrow/io)
72 changes: 72 additions & 0 deletions src/iceberg/arrow/io/arrow_fs_file_io.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/arrow/io/arrow_fs_file_io.h"

#include <filesystem>

#include <arrow/filesystem/localfs.h>

#include "iceberg/arrow/arrow_error_transform_internal.h"

namespace iceberg::arrow::io {

/// \brief Read the content of the file at the given location.
expected<std::string, Error> ArrowFileSystemFileIO::ReadFile(
const std::string& file_location, std::optional<size_t> length) {
// We don't support reading a file with a specific length.
if (length.has_value()) {
return unexpected(Error(ErrorKind::kInvalidArgument, "Length is not supported"));
}
std::string content;
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_location));
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file_size, file->GetSize());

content.resize(file_size);
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(
auto read_length,
file->ReadAt(0, file_size, reinterpret_cast<uint8_t*>(&content[0])));

return content;
}

/// \brief Write the given content to the file at the given location.
expected<void, Error> ArrowFileSystemFileIO::WriteFile(const std::string& file_location,
std::string_view content,
bool overwrite) {
auto exists = arrow_fs_->OpenInputFile(file_location).ok();
if (!overwrite && exists) {
return unexpected(Error(ErrorKind::kAlreadyExists, "File already exists"));
}
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file,
arrow_fs_->OpenOutputStream(file_location));
ICEBERG_INTERNAL_RETURN_NOT_OK(file->Write(content.data(), content.size()));
ICEBERG_INTERNAL_RETURN_NOT_OK(file->Flush());
ICEBERG_INTERNAL_RETURN_NOT_OK(file->Close());
return {};
}

/// \brief Delete a file at the given location.
expected<void, Error> ArrowFileSystemFileIO::DeleteFile(
const std::string& file_location) {
ICEBERG_INTERNAL_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location));
return {};
}

} // namespace iceberg::arrow::io
54 changes: 54 additions & 0 deletions src/iceberg/arrow/io/arrow_fs_file_io.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <memory>

#include <arrow/filesystem/filesystem.h>

#include "iceberg/file_io.h"
#include "iceberg/iceberg_bundle_export.h"

namespace iceberg::arrow::io {

/// \brief A concrete implementation of FileIO for Arrow file system.
class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
public:
explicit ArrowFileSystemFileIO(std::shared_ptr<::arrow::fs::FileSystem> arrow_fs)
: arrow_fs_(std::move(arrow_fs)) {}

~ArrowFileSystemFileIO() override = default;

/// \brief Read the content of the file at the given location.
expected<std::string, Error> ReadFile(const std::string& file_location,
std::optional<size_t> length) override;

/// \brief Write the given content to the file at the given location.
expected<void, Error> WriteFile(const std::string& file_location,
std::string_view content, bool overwrite) override;

/// \brief Delete a file at the given location.
expected<void, Error> DeleteFile(const std::string& file_location) override;

private:
std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
};

} // namespace iceberg::arrow::io
1 change: 1 addition & 0 deletions src/iceberg/demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "iceberg/avro.h" // include to export symbols
#include "iceberg/catalog.h"
#include "iceberg/file_io.h"
#include "iceberg/location_provider.h"
#include "iceberg/table.h"
#include "iceberg/transaction.h"
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ enum class ErrorKind {
kCommitStateUnknown,
kInvalidSchema,
kInvalidArgument,
kIOError,
kNotImplemented,
kUnknownError,
};

/// \brief Error with a kind and a message.
Expand Down
81 changes: 81 additions & 0 deletions src/iceberg/file_io.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <optional>
#include <string>
#include <string_view>

#include "iceberg/error.h"
#include "iceberg/expected.h"
#include "iceberg/iceberg_export.h"

namespace iceberg {

/// \brief Pluggable module for reading, writing, and deleting files.
///
/// This module only handle metadata files, not data files. The metadata files
/// are typically small and are used to store schema, partition information,
/// and other metadata about the table.
class ICEBERG_EXPORT FileIO {
public:
FileIO() = default;
virtual ~FileIO() = default;

/// \brief Read the content of the file at the given location.
///
/// \param file_location The location of the file to read.
/// \param length The number of bytes to read. Some object storage need to specify
/// the length to read, e.g. S3 `GetObject` has a Range parameter.
/// \return The content of the file if the read succeeded, an error code if the read
/// failed.
virtual expected<std::string, Error> ReadFile(const std::string& file_location,
std::optional<size_t> length) {
// The following line is to avoid Windows linker error LNK2019.
// If this function is defined as pure virtual function, the `unexpected<Error>` will
// not be instantiated and exported in libiceberg.
return unexpected<Error>{
{.kind = ErrorKind::kNotImplemented, .message = "ReadFile not implemented"}};
}

/// \brief Write the given content to the file at the given location.
///
/// \param file_location The location of the file to write.
/// \param content The content to write to the file.
/// \param overwrite If true, overwrite the file if it exists. If false, fail if the
/// file exists.
/// \return void if the write succeeded, an error code if the write failed.
virtual expected<void, Error> WriteFile(const std::string& file_location,
std::string_view content, bool overwrite) {
// The following line is to avoid Windows linker error LNK2019.
// If this function is defined as pure virtual function, the `unexpected<Error>` will
// not be instantiated and exported in libiceberg.
return unexpected<Error>{
{.kind = ErrorKind::kNotImplemented, .message = "WriteFile not implemented"}};
}

/// \brief Delete a file at the given location.
///
/// \param file_location The location of the file to delete.
/// \return void if the delete succeeded, an error code if the delete failed.
virtual expected<void, Error> DeleteFile(const std::string& file_location) = 0;
};

} // namespace iceberg
10 changes: 8 additions & 2 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ if(ICEBERG_BUILD_BUNDLE)

add_executable(arrow_test)
target_sources(arrow_test PRIVATE arrow_test.cc)
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static Arrow::arrow_static
GTest::gtest_main GTest::gmock)
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
GTest::gmock)
add_test(NAME arrow_test COMMAND arrow_test)

add_executable(arrow_fs_file_io_test)
target_sources(arrow_fs_file_io_test PRIVATE arrow_fs_file_io_test.cc)
target_link_libraries(arrow_fs_file_io_test PRIVATE iceberg_bundle_static
GTest::gtest_main)
add_test(NAME arrow_fs_file_io_test COMMAND arrow_fs_file_io_test)
endif()
Loading
Loading