Skip to content

Commit 11e1da0

Browse files
committed
fix more review comments
Signed-off-by: Junwang Zhao <[email protected]>
1 parent b590aac commit 11e1da0

File tree

9 files changed

+52
-82
lines changed

9 files changed

+52
-82
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
5757
DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg)
5858

5959
if(ICEBERG_BUILD_BUNDLE)
60-
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/io/arrow_fs_file_io.cc
60+
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc
6161
avro/demo_avro.cc)
6262

6363
# Libraries to link with exported libiceberg_bundle.{so,a}.

src/iceberg/arrow/CMakeLists.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,4 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
add_subdirectory(io)
19-
2018
iceberg_install_all_headers(iceberg/arrow)

src/iceberg/arrow/arrow_error_transform_internal.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,20 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) {
3636
}
3737
}
3838

39-
#define ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, error_transform) \
40-
auto&& result_name = (rexpr); \
41-
if (!result_name.ok()) { \
42-
return unexpected<Error>{{.kind = error_transform(result_name.status()), \
43-
.message = result_name.status().ToString()}}; \
44-
} \
39+
#define ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, error_transform) \
40+
auto&& result_name = (rexpr); \
41+
if (!result_name.ok()) { \
42+
return unexpected<Error>{{.kind = error_transform(result_name.status()), \
43+
.message = result_name.status().ToString()}}; \
44+
} \
4545
lhs = std::move(result_name).ValueOrDie();
4646

47-
#define ICEBERG_INTERNAL_ASSIGN_OR_RETURN(lhs, rexpr) \
48-
ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL( \
47+
#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \
48+
ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \
4949
ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \
5050
internal::ToErrorKind)
5151

52-
#define ICEBERG_INTERNAL_RETURN_NOT_OK(expr) \
52+
#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \
5353
do { \
5454
auto&& _status = (expr); \
5555
if (!_status.ok()) { \

src/iceberg/arrow/io/arrow_fs_file_io.cc renamed to src/iceberg/arrow/arrow_fs_file_io.cc

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,56 +17,55 @@
1717
* under the License.
1818
*/
1919

20-
#include "iceberg/arrow/io/arrow_fs_file_io.h"
21-
22-
#include <filesystem>
20+
#include "iceberg/arrow/arrow_fs_file_io.h"
2321

2422
#include <arrow/filesystem/localfs.h>
2523

2624
#include "iceberg/arrow/arrow_error_transform_internal.h"
2725

28-
namespace iceberg::arrow::io {
26+
namespace iceberg::arrow {
2927

3028
/// \brief Read the content of the file at the given location.
3129
expected<std::string, Error> ArrowFileSystemFileIO::ReadFile(
3230
const std::string& file_location, std::optional<size_t> length) {
33-
// We don't support reading a file with a specific length.
31+
::arrow::fs::FileInfo file_info(file_location);
3432
if (length.has_value()) {
35-
return unexpected(Error(ErrorKind::kInvalidArgument, "Length is not supported"));
33+
file_info.set_size(length.value());
3634
}
3735
std::string content;
38-
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_location));
39-
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file_size, file->GetSize());
36+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_info));
37+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize());
4038

4139
content.resize(file_size);
42-
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(
43-
auto read_length,
44-
file->ReadAt(0, file_size, reinterpret_cast<uint8_t*>(&content[0])));
40+
size_t remain = file_size;
41+
size_t offset = 0;
42+
while (remain > 0) {
43+
size_t read_length = std::min(remain, static_cast<size_t>(1024 * 1024));
44+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
45+
auto read_bytes,
46+
file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
47+
remain -= read_bytes;
48+
offset += read_bytes;
49+
}
4550

4651
return content;
4752
}
4853

4954
/// \brief Write the given content to the file at the given location.
5055
expected<void, Error> ArrowFileSystemFileIO::WriteFile(const std::string& file_location,
51-
std::string_view content,
52-
bool overwrite) {
53-
auto exists = arrow_fs_->OpenInputFile(file_location).ok();
54-
if (!overwrite && exists) {
55-
return unexpected(Error(ErrorKind::kAlreadyExists, "File already exists"));
56-
}
57-
ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file,
58-
arrow_fs_->OpenOutputStream(file_location));
59-
ICEBERG_INTERNAL_RETURN_NOT_OK(file->Write(content.data(), content.size()));
60-
ICEBERG_INTERNAL_RETURN_NOT_OK(file->Flush());
61-
ICEBERG_INTERNAL_RETURN_NOT_OK(file->Close());
56+
std::string_view content) {
57+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(file_location));
58+
ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
59+
ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
60+
ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
6261
return {};
6362
}
6463

6564
/// \brief Delete a file at the given location.
6665
expected<void, Error> ArrowFileSystemFileIO::DeleteFile(
6766
const std::string& file_location) {
68-
ICEBERG_INTERNAL_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location));
67+
ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location));
6968
return {};
7069
}
7170

72-
} // namespace iceberg::arrow::io
71+
} // namespace iceberg::arrow

src/iceberg/arrow/io/arrow_fs_file_io.h renamed to src/iceberg/arrow/arrow_fs_file_io.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
#include "iceberg/file_io.h"
2727
#include "iceberg/iceberg_bundle_export.h"
2828

29-
namespace iceberg::arrow::io {
29+
namespace iceberg::arrow {
3030

3131
/// \brief A concrete implementation of FileIO for Arrow file system.
3232
class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
@@ -42,7 +42,7 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
4242

4343
/// \brief Write the given content to the file at the given location.
4444
expected<void, Error> WriteFile(const std::string& file_location,
45-
std::string_view content, bool overwrite) override;
45+
std::string_view content) override;
4646

4747
/// \brief Delete a file at the given location.
4848
expected<void, Error> DeleteFile(const std::string& file_location) override;
@@ -51,4 +51,4 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
5151
std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
5252
};
5353

54-
} // namespace iceberg::arrow::io
54+
} // namespace iceberg::arrow

src/iceberg/arrow/io/CMakeLists.txt

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/iceberg/file_io.h

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ namespace iceberg {
3434
/// This module only handle metadata files, not data files. The metadata files
3535
/// are typically small and are used to store schema, partition information,
3636
/// and other metadata about the table.
37+
///
38+
/// Note that these functions are not atomic. For example, if a write fails,
39+
/// the file may be partially written. Implementations should be careful to
40+
/// avoid corrupting metadata files.
3741
class ICEBERG_EXPORT FileIO {
3842
public:
3943
FileIO() = default;
@@ -48,10 +52,7 @@ class ICEBERG_EXPORT FileIO {
4852
/// failed.
4953
virtual expected<std::string, Error> ReadFile(const std::string& file_location,
5054
std::optional<size_t> length) {
51-
// The following line is to avoid Windows linker error LNK2019.
52-
// If this function is defined as pure virtual function, the `expected<std::string,
53-
// Error>` and `unexpected<Error>` will not be instantiated and exported in
54-
// libiceberg.
55+
// We provide a default implementation to avoid Windows linker error LNK2019.
5556
return unexpected<Error>{
5657
{.kind = ErrorKind::kNotImplemented, .message = "ReadFile not implemented"}};
5758
}
@@ -64,10 +65,7 @@ class ICEBERG_EXPORT FileIO {
6465
/// file exists.
6566
/// \return void if the write succeeded, an error code if the write failed.
6667
virtual expected<void, Error> WriteFile(const std::string& file_location,
67-
std::string_view content, bool overwrite) {
68-
// The following line is to avoid Windows linker error LNK2019.
69-
// If this function is defined as pure virtual function, the `expected<void, Error>`
70-
// will not be instantiated and exported in libiceberg.
68+
std::string_view content) {
7169
return unexpected<Error>{
7270
{.kind = ErrorKind::kNotImplemented, .message = "WriteFile not implemented"}};
7371
}
@@ -76,7 +74,10 @@ class ICEBERG_EXPORT FileIO {
7674
///
7775
/// \param file_location The location of the file to delete.
7876
/// \return void if the delete succeeded, an error code if the delete failed.
79-
virtual expected<void, Error> DeleteFile(const std::string& file_location) = 0;
77+
virtual expected<void, Error> DeleteFile(const std::string& file_location) {
78+
return unexpected<Error>{
79+
{.kind = ErrorKind::kNotImplemented, .message = "DeleteFile not implemented"}};
80+
}
8081
};
8182

8283
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,8 @@ if(ICEBERG_BUILD_BUNDLE)
4141
add_test(NAME avro_test COMMAND avro_test)
4242

4343
add_executable(arrow_test)
44-
target_sources(arrow_test PRIVATE arrow_test.cc)
44+
target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc)
4545
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
4646
GTest::gmock)
4747
add_test(NAME arrow_test COMMAND arrow_test)
48-
49-
add_executable(arrow_fs_file_io_test)
50-
target_sources(arrow_fs_file_io_test PRIVATE arrow_fs_file_io_test.cc)
51-
target_link_libraries(arrow_fs_file_io_test PRIVATE iceberg_bundle_static
52-
GTest::gtest_main GTest::gmock)
53-
add_test(NAME arrow_fs_file_io_test COMMAND arrow_fs_file_io_test)
5448
endif()

test/arrow_fs_file_io_test.cc

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
#include "iceberg/arrow/io/arrow_fs_file_io.h"
20+
#include "iceberg/arrow/arrow_fs_file_io.h"
2121

2222
#include <filesystem>
2323

@@ -32,7 +32,7 @@ class LocalFileIOTest : public testing::Test {
3232
protected:
3333
void SetUp() override {
3434
local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
35-
file_io_ = std::make_shared<iceberg::arrow::io::ArrowFileSystemFileIO>(local_fs_);
35+
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
3636
}
3737

3838
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
@@ -41,20 +41,16 @@ class LocalFileIOTest : public testing::Test {
4141
};
4242

4343
TEST_F(LocalFileIOTest, ReadWriteFile) {
44-
auto read_res = file_io_->ReadFile(tmpfile.string(), 1024);
45-
EXPECT_THAT(read_res, IsError(ErrorKind::kInvalidArgument));
46-
EXPECT_THAT(read_res, HasErrorMessage("Length is not supported"));
47-
48-
read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
44+
auto read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
4945
EXPECT_THAT(read_res, IsError(ErrorKind::kIOError));
5046
EXPECT_THAT(read_res, HasErrorMessage("No such file or directory"));
5147

52-
auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world", false);
48+
auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world");
5349
EXPECT_THAT(write_res, IsOk());
5450

5551
read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
5652
EXPECT_THAT(read_res, IsOk());
57-
EXPECT_EQ(read_res.value(), "hello world");
53+
EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));
5854
}
5955

6056
TEST_F(LocalFileIOTest, DeleteFile) {

0 commit comments

Comments
 (0)