Skip to content

Commit ad790f3

Browse files
committed
feat: add file_io and local fs impl
add reader and writer interfaces Signed-off-by: Junwang Zhao <[email protected]>
1 parent faf9cc8 commit ad790f3

File tree

12 files changed

+907
-0
lines changed

12 files changed

+907
-0
lines changed

src/iceberg/file_io.h

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
#include <string>
24+
25+
#include "iceberg/iceberg_export.h"
26+
#include "iceberg/reader.h"
27+
#include "iceberg/writer.h"
28+
29+
namespace iceberg {
30+
31+
/// \brief An interface used to read input files using Reader and AsyncReader
32+
class ICEBERG_EXPORT InputFile {
33+
public:
34+
explicit InputFile(std::string location) : location_(std::move(location)) {}
35+
36+
virtual ~InputFile() = default;
37+
38+
/// \brief Checks whether the file exists.
39+
virtual bool exists() const = 0;
40+
/// \brief Returns the total length of the file, in bytes.
41+
virtual int64_t getLength() const = 0;
42+
43+
/// \brief Get a Reader instance to read bytes from the file.
44+
virtual std::unique_ptr<Reader> newReader() = 0;
45+
46+
/// \brief Get the file location
47+
const std::string& location() const { return location_; }
48+
49+
protected:
50+
std::string location_;
51+
};
52+
53+
/// \brief An interface used to write output files using Writer and AsyncWriter
54+
class ICEBERG_EXPORT OutputFile {
55+
public:
56+
explicit OutputFile(std::string location) : location_(std::move(location)) {}
57+
58+
virtual ~OutputFile() = default;
59+
60+
/// \brief Create the file.
61+
///
62+
/// If the file exists, or an error will be thrown.
63+
virtual void create() = 0;
64+
65+
/// \brief Get a Writer instance to write bytes to the file.
66+
virtual std::unique_ptr<Writer> newWriter() = 0;
67+
68+
/// \brief Get the file location
69+
const std::string& location() const { return location_; }
70+
71+
/// \brief Return an InputFile for the location of this OutputFile.
72+
virtual std::shared_ptr<InputFile> toInputFile() const = 0;
73+
74+
protected:
75+
std::string location_;
76+
};
77+
78+
/// \brief Pluggable module for reading, writing, and deleting files.
79+
///
80+
/// Both table metadata files and data files can be written and read by this module.
81+
class ICEBERG_EXPORT FileIO {
82+
public:
83+
explicit FileIO(std::string name) : name_(std::move(name)) {}
84+
85+
virtual ~FileIO() = default;
86+
87+
/// \brief Get an InputFile
88+
///
89+
/// Get a InputFile instance to read bytes from the file at the given location.
90+
virtual std::shared_ptr<InputFile> newInputFile(const std::string& location) = 0;
91+
92+
/// \brief Get an OutputFile
93+
///
94+
/// Get a OutputFile instance to write bytes to the file at the given location.
95+
virtual std::shared_ptr<OutputFile> newOutputFile(const std::string& location) = 0;
96+
97+
/// \brief Delete file
98+
///
99+
/// Delete the file at the given location.
100+
virtual void DeleteFile(const std::string& location) = 0;
101+
void DeleteFile(const InputFile& ifile) { return DeleteFile(ifile.location()); }
102+
void DeleteFile(const OutputFile& ofile) { return DeleteFile(ofile.location()); }
103+
104+
const std::string& name() const { return name_; }
105+
106+
protected:
107+
std::string name_;
108+
};
109+
110+
} // namespace iceberg

src/iceberg/io/CMakeLists.txt

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
if(NOT ICEBERG_IO)
19+
return()
20+
endif()
21+
22+
set(ICEBERG_IO_SOURCES fs_file_io.cc fs_file_reader.cc fs_file_writer.cc)
23+
set(ICEBERG_IO_INCLUDES "${ICEBERG_INCLUDES}")
24+
25+
# Libraries to link with exported libiceberg_io.{so,a}.
26+
set(ICEBERG_IO_STATIC_BUILD_INTERFACE_LIBS)
27+
set(ICEBERG_IO_SHARED_BUILD_INTERFACE_LIBS)
28+
set(ICEBERG_IO_STATIC_INSTALL_INTERFACE_LIBS)
29+
set(ICEBERG_IO_SHARED_INSTALL_INTERFACE_LIBS)
30+
31+
list(APPEND ICEBERG_IO_STATIC_BUILD_INTERFACE_LIBS
32+
"$<IF:$<TARGET_EXISTS:iceberg_static>,iceberg_static,iceberg_shared>")
33+
list(APPEND ICEBERG_IO_SHARED_BUILD_INTERFACE_LIBS
34+
"$<IF:$<TARGET_EXISTS:iceberg_shared>,iceberg_shared,iceberg_static>")
35+
list(APPEND
36+
ICEBERG_IO_STATIC_INSTALL_INTERFACE_LIBS
37+
"$<IF:$<TARGET_EXISTS:Iceberg::iceberg_static>,Iceberg::iceberg_static,Iceberg::iceberg_shared>"
38+
)
39+
list(APPEND
40+
ICEBERG_IO_SHARED_INSTALL_INTERFACE_LIBS
41+
"$<IF:$<TARGET_EXISTS:Iceberg::iceberg_shared>,Iceberg::iceberg_shared,Iceberg::iceberg_static>"
42+
)
43+
44+
add_iceberg_lib(iceberg_io
45+
SOURCES
46+
${ICEBERG_IO_SOURCES}
47+
PRIVATE_INCLUDES
48+
${ICEBERG_IO_INCLUDES}
49+
SHARED_LINK_LIBS
50+
${ICEBERG_IO_SHARED_BUILD_INTERFACE_LIBS}
51+
STATIC_LINK_LIBS
52+
${ICEBERG_IO_STATIC_BUILD_INTERFACE_LIBS}
53+
STATIC_INSTALL_INTERFACE_LIBS
54+
${ICEBERG_IO_STATIC_INSTALL_INTERFACE_LIBS}
55+
SHARED_INSTALL_INTERFACE_LIBS
56+
${ICEBERG_IO_SHARED_INSTALL_INTERFACE_LIBS})
57+
58+
iceberg_install_all_headers(iceberg/io)
59+
60+
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_io_export.h
61+
DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg/io)

src/iceberg/io/fs_file_io.cc

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/io/fs_file_io.h"
21+
22+
#include <fcntl.h>
23+
24+
#include <cassert>
25+
#include <filesystem>
26+
#include <format>
27+
28+
#include <sys/stat.h>
29+
30+
#include "iceberg/exception.h"
31+
#include "iceberg/io/fs_file_reader.h"
32+
#include "iceberg/io/fs_file_writer.h"
33+
34+
namespace iceberg::io {
35+
36+
bool FsInputFile::exists() const { return std::filesystem::exists(location()); }
37+
38+
int64_t FsInputFile::getLength() const {
39+
struct stat stat_buffer;
40+
if (stat(location().c_str(), &stat_buffer) != 0) {
41+
throw IcebergError(std::format(
42+
"Failed to get file length. File does not exist or is inaccessible: {}",
43+
location_));
44+
}
45+
return stat_buffer.st_size;
46+
}
47+
48+
std::unique_ptr<Reader> FsInputFile::newReader() {
49+
return std::make_unique<FsFileReader>(location_);
50+
}
51+
52+
void FsOutputFile::create() {
53+
// Check if the file already exists
54+
std::ifstream existing_file(location_);
55+
bool file_exists = existing_file.good();
56+
existing_file.close();
57+
58+
if (file_exists) {
59+
throw IcebergError(std::format("File already exists: {}", location_));
60+
}
61+
62+
// Create or overwrite the file by opening it in truncating mode
63+
std::ofstream new_file(location_, std::ios::binary | std::ios::out | std::ios::trunc);
64+
if (!new_file.is_open()) {
65+
throw IcebergError(std::format("Failed to create or overwrite file: {}", location_));
66+
}
67+
new_file.close();
68+
}
69+
70+
std::unique_ptr<Writer> FsOutputFile::newWriter() {
71+
return std::make_unique<FsFileWriter>(location_);
72+
}
73+
74+
std::shared_ptr<InputFile> FsFileIO::newInputFile(const std::string& location) {
75+
// Check if the file exists
76+
if (!fileExists(location)) {
77+
throw IcebergError(std::format("InputFile does not exist: {}", location));
78+
}
79+
80+
// Create and return an FsInputFile instance
81+
return std::make_shared<FsInputFile>(location);
82+
}
83+
84+
std::shared_ptr<OutputFile> FsFileIO::newOutputFile(const std::string& location) {
85+
return std::make_shared<FsOutputFile>(location);
86+
}
87+
88+
void FsFileIO::DeleteFile(const std::string& location) {
89+
// Check if the file exists
90+
if (!fileExists(location)) {
91+
throw IcebergError(std::format("InputFile does not exist: {}", location));
92+
}
93+
std::error_code ec;
94+
if (std::filesystem::remove(location, ec) == false) {
95+
throw IcebergError(
96+
std::format("Failed to delete file: {}, error code: {}", location, ec.message()));
97+
}
98+
}
99+
100+
bool FsFileIO::fileExists(const std::string& location) {
101+
std::ifstream file(location);
102+
return file.good();
103+
}
104+
105+
} // namespace iceberg::io

src/iceberg/io/fs_file_io.h

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include "iceberg/file_io.h"
23+
#include "iceberg/io/iceberg_io_export.h"
24+
25+
namespace iceberg::io {
26+
27+
class ICEBERG_IO_EXPORT FsInputFile : public InputFile {
28+
public:
29+
explicit FsInputFile(std::string location) : InputFile(std::move(location)) {}
30+
~FsInputFile() override = default;
31+
32+
/// \brief Checks whether the file exists.
33+
bool exists() const override;
34+
35+
/// \brief Returns the total length of the file, in bytes.
36+
int64_t getLength() const override;
37+
38+
/// \brief Get a Reader instance to read bytes from the file.
39+
std::unique_ptr<Reader> newReader() override;
40+
};
41+
42+
class ICEBERG_IO_EXPORT FsOutputFile : public OutputFile {
43+
public:
44+
explicit FsOutputFile(std::string location) : OutputFile(std::move(location)) {}
45+
~FsOutputFile() override = default;
46+
47+
/// \brief Create the file, optionally overwriting if it exists.
48+
///
49+
/// If the file already exists, an exception is thrown.
50+
void create() override;
51+
52+
/// \brief Get a Writer instance to write bytes to the file.
53+
///
54+
/// Returns a unique pointer to a `FsFileWriter`.
55+
std::unique_ptr<Writer> newWriter() override;
56+
57+
/// \brief Return an InputFile for the location of this OutputFile.
58+
///
59+
/// Creates an FsInputFile for reading the file pointed by this OutputFile's location.
60+
std::shared_ptr<InputFile> toInputFile() const override {
61+
return std::make_shared<FsInputFile>(location_);
62+
}
63+
};
64+
65+
/// \brief A concrete implementation of FileIO for file system.
66+
class ICEBERG_IO_EXPORT FsFileIO : public FileIO {
67+
public:
68+
explicit FsFileIO(const std::string& name) : FileIO(name) {}
69+
70+
~FsFileIO() override = default;
71+
72+
/// \brief Get an InputFile
73+
///
74+
/// Returns a shared pointer to an FsInputFile instance, representing the file at
75+
/// `location`.
76+
std::shared_ptr<InputFile> newInputFile(const std::string& location) override;
77+
78+
/// \brief Get an OutputFile
79+
///
80+
/// Returns a shared pointer to an FsOutputFile instance, representing the file at
81+
/// `location`.
82+
std::shared_ptr<OutputFile> newOutputFile(const std::string& location) override;
83+
84+
/// \brief Delete file
85+
///
86+
/// Deletes the file at the given location using file system operations.
87+
void DeleteFile(const std::string& location) override;
88+
89+
private:
90+
/// \brief Check if a file exists
91+
bool fileExists(const std::string& location);
92+
};
93+
94+
} // namespace iceberg::io

0 commit comments

Comments
 (0)