Skip to content

Commit d54d079

Browse files
authored
feat: add file_io and local impl by adapting arrow::filesystem (#30)
This PR add file io interface and arrow local filesystem implementation. FileIO is a pluggable interface for reading, writing, and deleting metadata files, not for data files. --------- Signed-off-by: Junwang Zhao <[email protected]>
1 parent 2912352 commit d54d079

File tree

10 files changed

+343
-5
lines changed

10 files changed

+343
-5
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ function(resolve_arrow_dependency)
7070
ON
7171
CACHE BOOL "" FORCE)
7272
set(ARROW_FILESYSTEM
73-
OFF
73+
ON
7474
CACHE BOOL "" FORCE)
7575
set(ARROW_SIMD_LEVEL
7676
"NONE"

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
6060
DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg)
6161

6262
if(ICEBERG_BUILD_BUNDLE)
63-
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc avro/demo_avro.cc)
63+
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc
64+
avro/demo_avro.cc)
6465

6566
# Libraries to link with exported libiceberg_bundle.{so,a}.
6667
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <arrow/result.h>
23+
#include <arrow/status.h>
24+
25+
#include "iceberg/error.h"
26+
#include "iceberg/expected.h"
27+
28+
namespace iceberg::arrow {
29+
30+
inline ErrorKind ToErrorKind(const ::arrow::Status& status) {
31+
switch (status.code()) {
32+
case ::arrow::StatusCode::IOError:
33+
return ErrorKind::kIOError;
34+
default:
35+
return ErrorKind::kUnknownError;
36+
}
37+
}
38+
39+
#define ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, error_transform) \
40+
auto&& result_name = (rexpr); \
41+
if (!result_name.ok()) { \
42+
return unexpected<Error>{{.kind = error_transform(result_name.status()), \
43+
.message = result_name.status().ToString()}}; \
44+
} \
45+
lhs = std::move(result_name).ValueOrDie();
46+
47+
#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \
48+
ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \
49+
ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, ToErrorKind)
50+
51+
#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \
52+
do { \
53+
auto&& _status = (expr); \
54+
if (!_status.ok()) { \
55+
return unexpected<Error>{ \
56+
{.kind = ToErrorKind(_status), .message = _status.ToString()}}; \
57+
} \
58+
} while (0)
59+
60+
} // namespace iceberg::arrow
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/arrow/arrow_fs_file_io.h"
21+
22+
#include <arrow/filesystem/localfs.h>
23+
24+
#include "iceberg/arrow/arrow_error_transform_internal.h"
25+
26+
namespace iceberg::arrow {
27+
28+
/// \brief Read the content of the file at the given location.
29+
expected<std::string, Error> ArrowFileSystemFileIO::ReadFile(
30+
const std::string& file_location, std::optional<size_t> length) {
31+
::arrow::fs::FileInfo file_info(file_location);
32+
if (length.has_value()) {
33+
file_info.set_size(length.value());
34+
}
35+
std::string content;
36+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_info));
37+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize());
38+
39+
content.resize(file_size);
40+
size_t remain = file_size;
41+
size_t offset = 0;
42+
while (remain > 0) {
43+
size_t read_length = std::min(remain, static_cast<size_t>(1024 * 1024));
44+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
45+
auto read_bytes,
46+
file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
47+
remain -= read_bytes;
48+
offset += read_bytes;
49+
}
50+
51+
return content;
52+
}
53+
54+
/// \brief Write the given content to the file at the given location.
55+
expected<void, Error> ArrowFileSystemFileIO::WriteFile(const std::string& file_location,
56+
std::string_view content) {
57+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(file_location));
58+
ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
59+
ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
60+
ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
61+
return {};
62+
}
63+
64+
/// \brief Delete a file at the given location.
65+
expected<void, Error> ArrowFileSystemFileIO::DeleteFile(
66+
const std::string& file_location) {
67+
ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location));
68+
return {};
69+
}
70+
71+
} // namespace iceberg::arrow
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
24+
#include <arrow/filesystem/filesystem.h>
25+
26+
#include "iceberg/file_io.h"
27+
#include "iceberg/iceberg_bundle_export.h"
28+
29+
namespace iceberg::arrow {
30+
31+
/// \brief A concrete implementation of FileIO for Arrow file system.
32+
class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
33+
public:
34+
explicit ArrowFileSystemFileIO(std::shared_ptr<::arrow::fs::FileSystem> arrow_fs)
35+
: arrow_fs_(std::move(arrow_fs)) {}
36+
37+
~ArrowFileSystemFileIO() override = default;
38+
39+
/// \brief Read the content of the file at the given location.
40+
expected<std::string, Error> ReadFile(const std::string& file_location,
41+
std::optional<size_t> length) override;
42+
43+
/// \brief Write the given content to the file at the given location.
44+
expected<void, Error> WriteFile(const std::string& file_location,
45+
std::string_view content) override;
46+
47+
/// \brief Delete a file at the given location.
48+
expected<void, Error> DeleteFile(const std::string& file_location) override;
49+
50+
private:
51+
std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
52+
};
53+
54+
} // namespace iceberg::arrow

src/iceberg/demo.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "iceberg/avro.h" // include to export symbols
2323
#include "iceberg/catalog.h"
24+
#include "iceberg/file_io.h"
2425
#include "iceberg/location_provider.h"
2526
#include "iceberg/table.h"
2627
#include "iceberg/transaction.h"

src/iceberg/error.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ enum class ErrorKind {
3434
kCommitStateUnknown,
3535
kInvalidSchema,
3636
kInvalidArgument,
37+
kIOError,
38+
kNotImplemented,
39+
kUnknownError,
3740
kNotSupported,
3841
};
3942

src/iceberg/file_io.h

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <optional>
23+
#include <string>
24+
#include <string_view>
25+
26+
#include "iceberg/error.h"
27+
#include "iceberg/expected.h"
28+
#include "iceberg/iceberg_export.h"
29+
30+
namespace iceberg {
31+
32+
/// \brief Pluggable module for reading, writing, and deleting files.
33+
///
34+
/// This module only handle metadata files, not data files. The metadata files
35+
/// are typically small and are used to store schema, partition information,
36+
/// and other metadata about the table.
37+
///
38+
/// Note that these functions are not atomic. For example, if a write fails,
39+
/// the file may be partially written. Implementations should be careful to
40+
/// avoid corrupting metadata files.
41+
class ICEBERG_EXPORT FileIO {
42+
public:
43+
FileIO() = default;
44+
virtual ~FileIO() = default;
45+
46+
/// \brief Read the content of the file at the given location.
47+
///
48+
/// \param file_location The location of the file to read.
49+
/// \param length The number of bytes to read. Some object storage need to specify
50+
/// the length to read, e.g. S3 `GetObject` has a Range parameter.
51+
/// \return The content of the file if the read succeeded, an error code if the read
52+
/// failed.
53+
virtual expected<std::string, Error> ReadFile(const std::string& file_location,
54+
std::optional<size_t> length) {
55+
// We provide a default implementation to avoid Windows linker error LNK2019.
56+
return unexpected<Error>{
57+
{.kind = ErrorKind::kNotImplemented, .message = "ReadFile not implemented"}};
58+
}
59+
60+
/// \brief Write the given content to the file at the given location.
61+
///
62+
/// \param file_location The location of the file to write.
63+
/// \param content The content to write to the file.
64+
/// \param overwrite If true, overwrite the file if it exists. If false, fail if the
65+
/// file exists.
66+
/// \return void if the write succeeded, an error code if the write failed.
67+
virtual expected<void, Error> WriteFile(const std::string& file_location,
68+
std::string_view content) {
69+
return unexpected<Error>{
70+
{.kind = ErrorKind::kNotImplemented, .message = "WriteFile not implemented"}};
71+
}
72+
73+
/// \brief Delete a file at the given location.
74+
///
75+
/// \param file_location The location of the file to delete.
76+
/// \return void if the delete succeeded, an error code if the delete failed.
77+
virtual expected<void, Error> DeleteFile(const std::string& file_location) {
78+
return unexpected<Error>{
79+
{.kind = ErrorKind::kNotImplemented, .message = "DeleteFile not implemented"}};
80+
}
81+
};
82+
83+
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ if(ICEBERG_BUILD_BUNDLE)
4747
add_test(NAME avro_test COMMAND avro_test)
4848

4949
add_executable(arrow_test)
50-
target_sources(arrow_test PRIVATE arrow_test.cc)
51-
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static Arrow::arrow_static
52-
GTest::gtest_main GTest::gmock)
50+
target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc)
51+
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
52+
GTest::gmock)
5353
add_test(NAME arrow_test COMMAND arrow_test)
5454
endif()

test/arrow_fs_file_io_test.cc

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/arrow/arrow_fs_file_io.h"
21+
22+
#include <filesystem>
23+
24+
#include <arrow/filesystem/localfs.h>
25+
#include <gtest/gtest.h>
26+
27+
#include "matchers.h"
28+
29+
namespace iceberg {
30+
31+
class LocalFileIOTest : public testing::Test {
32+
protected:
33+
void SetUp() override {
34+
local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
35+
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
36+
}
37+
38+
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
39+
std::shared_ptr<iceberg::FileIO> file_io_;
40+
std::filesystem::path tmpfile = std::filesystem::temp_directory_path() / "123.txt";
41+
};
42+
43+
TEST_F(LocalFileIOTest, ReadWriteFile) {
44+
auto read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
45+
EXPECT_THAT(read_res, IsError(ErrorKind::kIOError));
46+
EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file"));
47+
48+
auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world");
49+
EXPECT_THAT(write_res, IsOk());
50+
51+
read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
52+
EXPECT_THAT(read_res, IsOk());
53+
EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));
54+
}
55+
56+
TEST_F(LocalFileIOTest, DeleteFile) {
57+
auto del_res = file_io_->DeleteFile(tmpfile.string());
58+
EXPECT_THAT(del_res, IsOk());
59+
60+
del_res = file_io_->DeleteFile(tmpfile.string());
61+
EXPECT_THAT(del_res, IsError(ErrorKind::kIOError));
62+
EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file"));
63+
}
64+
65+
} // namespace iceberg

0 commit comments

Comments
 (0)