Skip to content

Commit 1f8ed57

Browse files
committed
FileIO only deal with metadata files
1 parent 2e6d8d9 commit 1f8ed57

21 files changed

+301
-855
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ function(resolve_arrow_dependency)
7070
ON
7171
CACHE BOOL "" FORCE)
7272
set(ARROW_FILESYSTEM
73-
OFF
73+
ON
7474
CACHE BOOL "" FORCE)
7575
set(ARROW_SIMD_LEVEL
7676
"NONE"

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
5757
DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg)
5858

5959
if(ICEBERG_BUILD_BUNDLE)
60-
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc avro/demo_avro.cc)
60+
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/io/local_file_io.cc
61+
avro/demo_avro.cc)
6162

6263
# Libraries to link with exported libiceberg_bundle.{so,a}.
6364
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)

src/iceberg/arrow/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,6 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
add_subdirectory(io)
19+
1820
iceberg_install_all_headers(iceberg/arrow)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <arrow/result.h>
23+
#include <arrow/status.h>
24+
25+
#include "iceberg/error.h"
26+
#include "iceberg/expected.h"
27+
28+
namespace iceberg::arrow::internal {
29+
30+
inline ErrorKind ToErrorKind(const ::arrow::Status& status) {
31+
switch (status.code()) {
32+
case ::arrow::StatusCode::IOError:
33+
return ErrorKind::kIOError;
34+
default:
35+
return ErrorKind::kUnknownError;
36+
}
37+
}
38+
39+
#define ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, error_transform) \
40+
auto&& result_name = (rexpr); \
41+
if (!result_name.ok()) { \
42+
return unexpected( \
43+
Error(error_transform(result_name.status()), result_name.status().ToString())); \
44+
} \
45+
lhs = std::move(result_name).ValueOrDie();
46+
47+
#define ICEBERG_INTERNAL_ASSIGN_OR_RETURN(lhs, rexpr) \
48+
ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL( \
49+
ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \
50+
internal::ToErrorKind)
51+
52+
#define ICEBERG_INTERNAL_RETURN_NOT_OK(expr) \
53+
do { \
54+
auto&& _status = (expr); \
55+
if (!_status.ok()) { \
56+
return unexpected(Error(internal::ToErrorKind(_status), _status.ToString())); \
57+
} \
58+
} while (0)
59+
60+
} // namespace iceberg::arrow::internal

test/io/CMakeLists.txt renamed to src/iceberg/arrow/io/CMakeLists.txt

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,4 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
if(ICEBERG_IO)
19-
add_executable(fs_file_io_test)
20-
target_sources(fs_file_io_test PRIVATE fs_file_io_test.cc)
21-
target_link_libraries(fs_file_io_test PRIVATE iceberg_io_static GTest::gtest_main)
22-
target_include_directories(fs_file_io_test PRIVATE "${ICEBERG_INCLUDES}")
23-
add_test(NAME fs_file_io_test COMMAND fs_file_io_test)
24-
endif()
18+
iceberg_install_all_headers(iceberg/arrow/io)
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/arrow/io/local_file_io.h"
21+
22+
#include <filesystem>
23+
24+
#include <arrow/filesystem/localfs.h>
25+
26+
#include "iceberg/arrow/arrow_error_transform_internal.h"
27+
28+
namespace iceberg::arrow::io {
29+
30+
/// \brief Read the content of the file at the given location.
31+
expected<std::string, Error> LocalFileIO::ReadFile(const std::string& file_location,
32+
std::optional<size_t> length) {
33+
// We don't support reading a file with a specific length.
34+
if (length.has_value()) {
35+
return unexpected(Error(ErrorKind::kInvalidArgument, "Length is not supported"));
36+
}
37+
std::string content;
38+
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, local_fs_->OpenInputFile(file_location));
39+
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file_size, file->GetSize());
40+
41+
content.resize(file_size);
42+
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(
43+
auto read_length,
44+
file->ReadAt(0, file_size, reinterpret_cast<uint8_t*>(&content[0])));
45+
46+
return content;
47+
}
48+
49+
/// \brief Write the given content to the file at the given location.
50+
expected<void, Error> LocalFileIO::WriteFile(const std::string& file_location,
51+
std::string_view content, bool overwrite) {
52+
if (!overwrite && FileExists(file_location)) {
53+
return unexpected(Error(ErrorKind::kAlreadyExists, ""));
54+
}
55+
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file,
56+
local_fs_->OpenOutputStream(file_location));
57+
ICEBERG_INTERNAL_RETURN_NOT_OK(file->Write(content.data(), content.size()));
58+
return {};
59+
}
60+
61+
/// \brief Delete a file at the given location.
62+
expected<void, Error> LocalFileIO::DeleteFile(const std::string& file_location) {
63+
if (!FileExists(file_location)) {
64+
return unexpected(Error(ErrorKind::kNoSuchFile,
65+
std::format("File {} does not exist", file_location)));
66+
}
67+
ICEBERG_INTERNAL_RETURN_NOT_OK(local_fs_->DeleteFile(file_location));
68+
return {};
69+
}
70+
71+
bool LocalFileIO::FileExists(const std::string& location) {
72+
// ::arrow::fs::LocalFileSystem does not have a exists method.
73+
return std::filesystem::exists(location);
74+
}
75+
76+
} // namespace iceberg::arrow::io
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
24+
#include <arrow/filesystem/localfs.h>
25+
26+
#include "iceberg/file_io.h"
27+
#include "iceberg/iceberg_bundle_export.h"
28+
29+
namespace iceberg::arrow::io {
30+
31+
/// \brief A concrete implementation of FileIO for file system.
32+
class ICEBERG_BUNDLE_EXPORT LocalFileIO : public FileIO {
33+
public:
34+
explicit LocalFileIO(std::shared_ptr<::arrow::fs::LocalFileSystem>& local_fs)
35+
: local_fs_(local_fs) {}
36+
37+
~LocalFileIO() override = default;
38+
39+
/// \brief Read the content of the file at the given location.
40+
expected<std::string, Error> ReadFile(const std::string& file_location,
41+
std::optional<size_t> length) override;
42+
43+
/// \brief Write the given content to the file at the given location.
44+
expected<void, Error> WriteFile(const std::string& file_location,
45+
std::string_view content, bool overwrite) override;
46+
47+
/// \brief Delete a file at the given location.
48+
expected<void, Error> DeleteFile(const std::string& file_location) override;
49+
50+
private:
51+
/// \brief Check if a file exists
52+
bool FileExists(const std::string& location);
53+
54+
std::shared_ptr<::arrow::fs::LocalFileSystem>& local_fs_;
55+
};
56+
57+
} // namespace iceberg::arrow::io

src/iceberg/error.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,22 @@ namespace iceberg {
3030
enum class ErrorKind {
3131
kNoSuchNamespace,
3232
kAlreadyExists,
33+
kNoSuchFile,
3334
kNoSuchTable,
3435
kCommitStateUnknown,
3536
kInvalidSchema,
3637
kInvalidArgument,
38+
kIOError,
39+
kUnknownError,
3740
};
3841

3942
/// \brief Error with a kind and a message.
4043
struct ICEBERG_EXPORT [[nodiscard]] Error {
4144
ErrorKind kind;
4245
std::string message;
46+
47+
explicit Error(ErrorKind kind, std::string message)
48+
: kind(kind), message(std::move(message)) {};
4349
};
4450

4551
} // namespace iceberg

src/iceberg/file_io.h

Lines changed: 31 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -19,92 +19,51 @@
1919

2020
#pragma once
2121

22-
#include <memory>
22+
#include <optional>
2323
#include <string>
24+
#include <string_view>
2425

26+
#include "iceberg/error.h"
27+
#include "iceberg/expected.h"
2528
#include "iceberg/iceberg_export.h"
26-
#include "iceberg/reader.h"
27-
#include "iceberg/writer.h"
2829

2930
namespace iceberg {
3031

31-
/// \brief An interface used to read input files using Reader and AsyncReader
32-
class ICEBERG_EXPORT InputFile {
33-
public:
34-
explicit InputFile(std::string location) : location_(std::move(location)) {}
35-
36-
virtual ~InputFile() = default;
37-
38-
/// \brief Checks whether the file exists.
39-
virtual bool exists() const = 0;
40-
/// \brief Returns the total length of the file, in bytes.
41-
virtual int64_t getLength() const = 0;
42-
43-
/// \brief Get a Reader instance to read bytes from the file.
44-
virtual std::unique_ptr<Reader> newReader() = 0;
45-
46-
/// \brief Get the file location
47-
const std::string& location() const { return location_; }
48-
49-
protected:
50-
std::string location_;
51-
};
52-
53-
/// \brief An interface used to write output files using Writer and AsyncWriter
54-
class ICEBERG_EXPORT OutputFile {
55-
public:
56-
explicit OutputFile(std::string location) : location_(std::move(location)) {}
57-
58-
virtual ~OutputFile() = default;
59-
60-
/// \brief Create the file.
61-
///
62-
/// If the file exists, or an error will be thrown.
63-
virtual void create() = 0;
64-
65-
/// \brief Get a Writer instance to write bytes to the file.
66-
virtual std::unique_ptr<Writer> newWriter() = 0;
67-
68-
/// \brief Get the file location
69-
const std::string& location() const { return location_; }
70-
71-
/// \brief Return an InputFile for the location of this OutputFile.
72-
virtual std::shared_ptr<InputFile> toInputFile() const = 0;
73-
74-
protected:
75-
std::string location_;
76-
};
77-
78-
/// \brief Pluggable module for reading, writing, and deleting files.
32+
/// \brief Pluggable module for reading, writing, and deleting metadata files.
7933
///
80-
/// Both table metadata files and data files can be written and read by this module.
34+
/// This module only handle metadata files, not data files. The metadata files
35+
/// are typically small and are used to store schema, partition information,
36+
/// and other metadata about the table.
8137
class ICEBERG_EXPORT FileIO {
8238
public:
83-
explicit FileIO(std::string name) : name_(std::move(name)) {}
84-
39+
FileIO() = default;
8540
virtual ~FileIO() = default;
8641

87-
/// \brief Get an InputFile
42+
/// \brief Read the content of the file at the given location.
8843
///
89-
/// Get a InputFile instance to read bytes from the file at the given location.
90-
virtual std::shared_ptr<InputFile> newInputFile(const std::string& location) = 0;
91-
92-
/// \brief Get an OutputFile
44+
/// \param file_location The location of the file to read.
45+
/// \param length The number of bytes to read. Some object storage need to specify
46+
/// the length to read, e.g. S3 `GetObject` has a Range parameter.
47+
/// \return The content of the file if the read succeeded, an error code if the read
48+
/// failed.
49+
virtual expected<std::string, Error> ReadFile(const std::string& file_location,
50+
std::optional<size_t> length) = 0;
51+
52+
/// \brief Write the given content to the file at the given location.
9353
///
94-
/// Get a OutputFile instance to write bytes to the file at the given location.
95-
virtual std::shared_ptr<OutputFile> newOutputFile(const std::string& location) = 0;
96-
97-
/// \brief Delete file
54+
/// \param file_location The location of the file to write.
55+
/// \param content The content to write to the file.
56+
/// \param overwrite If true, overwrite the file if it exists. If false, fail if the
57+
/// file exists.
58+
/// \return void if the write succeeded, an error code if the write failed.
59+
virtual expected<void, Error> WriteFile(const std::string& file_location,
60+
std::string_view content, bool overwrite) = 0;
61+
62+
/// \brief Delete a file at the given location.
9863
///
99-
/// Delete the file at the given location.
100-
virtual void DeleteFile(const std::string& location) = 0;
101-
void DeleteFile(const InputFile& ifile) { return DeleteFile(ifile.location()); }
102-
void DeleteFile(const OutputFile& ofile) { return DeleteFile(ofile.location()); }
103-
104-
const std::string& name() const { return name_; }
105-
106-
protected:
107-
std::string name_;
64+
/// \param file_location The location of the file to delete.
65+
/// \return void if the delete succeeded, an error code if the delete failed.
66+
virtual expected<void, Error> DeleteFile(const std::string& file_location) = 0;
10867
};
10968

11069
} // namespace iceberg

0 commit comments

Comments
 (0)