Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1209,4 +1209,20 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
return table_metadata;
}

Result<nlohmann::json> FromJsonString(const std::string& json_string) {
try {
return nlohmann::json::parse(json_string);
} catch (const std::exception& e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:should we only catch json::parse_error exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the official documentation says it only throws parse_error. I just want to be safe enough to catch all just in case. Or we can add a special branch for parse_error for better error message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also call an overload that doesn't throw an exception, and explicitly check whether the resulting JSON object is discarded() (IIRC)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created #87 to track it.

return JsonParseError("Failed to parse JSON string: {}", e.what());
}
}

Result<std::string> ToJsonString(const nlohmann::json& json) {
try {
return json.dump();
} catch (const std::exception& e) {
return JsonParseError("Failed to serialize to JSON string: {}", e.what());
}
}

} // namespace iceberg
12 changes: 12 additions & 0 deletions src/iceberg/json_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,16 @@ nlohmann::json ToJson(const TableMetadata& table_metadata);
/// \return A `TableMetadata` object or an error if the conversion fails.
Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::json& json);

/// \brief Deserialize a JSON string into a `nlohmann::json` object.
///
/// \param json_string The JSON string to deserialize.
/// \return A `nlohmann::json` object or an error if the deserialization fails.
Result<nlohmann::json> FromJsonString(const std::string& json_string);

/// \brief Serialize a `nlohmann::json` object into a JSON string.
///
/// \param json The `nlohmann::json` object to serialize.
/// \return A JSON string or an error if the serialization fails.
Result<std::string> ToJsonString(const nlohmann::json& json);

} // namespace iceberg
103 changes: 103 additions & 0 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@
#include <ranges>
#include <string>

#include <nlohmann/json.hpp>

#include "iceberg/file_io.h"
#include "iceberg/json_internal.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/sort_order.h"
#include "iceberg/util/macros.h"

namespace iceberg {

std::string ToString(const SnapshotLogEntry& entry) {
Expand Down Expand Up @@ -69,4 +76,100 @@ Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
return *iter;
}

namespace {

template <typename T>
bool SharedPtrVectorEquals(const std::vector<std::shared_ptr<T>>& lhs,
const std::vector<std::shared_ptr<T>>& rhs) {
if (lhs.size() != rhs.size()) {
return false;
}
for (size_t i = 0; i < lhs.size(); ++i) {
if (*lhs[i] != *rhs[i]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be null pointer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be null if there is something wrong. Since operator== is only used in the test case, it should be fine. I can add a NullSafeEquals later if this is required.

return false;
}
}
return true;
}

bool SnapshotRefEquals(
const std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>& lhs,
const std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>& rhs) {
if (lhs.size() != rhs.size()) {
return false;
}
for (const auto& [key, value] : lhs) {
auto iter = rhs.find(key);
if (iter == rhs.end()) {
return false;
}
if (*iter->second != *value) {
return false;
}
}
return true;
}

} // namespace

bool operator==(const TableMetadata& lhs, const TableMetadata& rhs) {
return lhs.format_version == rhs.format_version && lhs.table_uuid == rhs.table_uuid &&
lhs.location == rhs.location &&
lhs.last_sequence_number == rhs.last_sequence_number &&
lhs.last_updated_ms == rhs.last_updated_ms &&
lhs.last_column_id == rhs.last_column_id &&
lhs.current_schema_id == rhs.current_schema_id &&
SharedPtrVectorEquals(lhs.schemas, rhs.schemas) &&
lhs.default_spec_id == rhs.default_spec_id &&
lhs.last_partition_id == rhs.last_partition_id &&
lhs.properties == rhs.properties &&
lhs.current_snapshot_id == rhs.current_snapshot_id &&
SharedPtrVectorEquals(lhs.snapshots, rhs.snapshots) &&
lhs.snapshot_log == rhs.snapshot_log && lhs.metadata_log == rhs.metadata_log &&
SharedPtrVectorEquals(lhs.sort_orders, rhs.sort_orders) &&
lhs.default_sort_order_id == rhs.default_sort_order_id &&
SnapshotRefEquals(lhs.refs, rhs.refs) &&
SharedPtrVectorEquals(lhs.statistics, rhs.statistics) &&
SharedPtrVectorEquals(lhs.partition_statistics, rhs.partition_statistics) &&
lhs.next_row_id == rhs.next_row_id;
}

Result<MetadataFileCodecType> TableMetadataUtil::CodecFromFileName(
std::string_view file_name) {
if (file_name.find(".metadata.json") == std::string::npos) {
return InvalidArgument("{} is not a valid metadata file", file_name);
}

// We have to be backward-compatible with .metadata.json.gz files
if (file_name.ends_with(".metadata.json.gz")) {
return MetadataFileCodecType::kGzip;
}

std::string_view file_name_without_suffix =
file_name.substr(0, file_name.find_last_of(".metadata.json"));
if (file_name_without_suffix.ends_with(".gz")) {
return MetadataFileCodecType::kGzip;
}
return MetadataFileCodecType::kNone;
}

Result<std::unique_ptr<TableMetadata>> TableMetadataUtil::Read(
FileIO& io, const std::string& location, std::optional<size_t> length) {
ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location));
if (codec_type == MetadataFileCodecType::kGzip) {
return NotImplemented("Reading gzip-compressed metadata files is not supported yet");
}

ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content));
return TableMetadataFromJson(json);
}

Status TableMetadataUtil::Write(FileIO& io, const std::string& location,
const TableMetadata& metadata) {
auto json = ToJson(metadata);
ICEBERG_ASSIGN_OR_RAISE(auto json_string, ToJsonString(json));
return io.WriteFile(location, json_string);
}

} // namespace iceberg
40 changes: 40 additions & 0 deletions src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -132,6 +133,12 @@ struct ICEBERG_EXPORT TableMetadata {
Result<std::shared_ptr<PartitionSpec>> PartitionSpec() const;
/// \brief Get the current sort order, return NotFoundError if not found
Result<std::shared_ptr<SortOrder>> SortOrder() const;

friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs);

friend bool operator!=(const TableMetadata& lhs, const TableMetadata& rhs) {
return !(lhs == rhs);
}
};

/// \brief Returns a string representation of a SnapshotLogEntry
Expand All @@ -140,4 +147,37 @@ ICEBERG_EXPORT std::string ToString(const SnapshotLogEntry& entry);
/// \brief Returns a string representation of a MetadataLogEntry
ICEBERG_EXPORT std::string ToString(const MetadataLogEntry& entry);

/// \brief The codec type of the table metadata file.
ICEBERG_EXPORT enum class MetadataFileCodecType {
kNone,
kGzip,
};

/// \brief Utility class for table metadata
struct ICEBERG_EXPORT TableMetadataUtil {
/// \brief Get the codec type from the table metadata file name.
///
/// \param file_name The name of the table metadata file.
/// \return The codec type of the table metadata file.
static Result<MetadataFileCodecType> CodecFromFileName(std::string_view file_name);

/// \brief Read the table metadata file.
///
/// \param io The file IO to use to read the table metadata.
/// \param location The location of the table metadata file.
/// \param length The optional length of the table metadata file.
/// \return The table metadata.
static Result<std::unique_ptr<TableMetadata>> Read(
class FileIO& io, const std::string& location,
std::optional<size_t> length = std::nullopt);

/// \brief Write the table metadata to a file.
///
/// \param io The file IO to use to write the table metadata.
/// \param location The location of the table metadata file.
/// \param metadata The table metadata to write.
static Status Write(FileIO& io, const std::string& location,
const TableMetadata& metadata);
};

} // namespace iceberg
3 changes: 2 additions & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ if(ICEBERG_BUILD_BUNDLE)
add_test(NAME avro_test COMMAND avro_test)

add_executable(arrow_test)
target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc)
target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc
metadata_io_test.cc)
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
GTest::gmock)
add_test(NAME arrow_test COMMAND arrow_test)
Expand Down
27 changes: 15 additions & 12 deletions test/arrow_fs_file_io_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,48 @@

#include "iceberg/arrow/arrow_fs_file_io.h"

#include <filesystem>

#include <arrow/filesystem/localfs.h>
#include <gtest/gtest.h>

#include "matchers.h"
#include "temp_file_test_base.h"

namespace iceberg {

class LocalFileIOTest : public testing::Test {
class LocalFileIOTest : public TempFileTestBase {
protected:
void SetUp() override {
local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
TempFileTestBase::SetUp();
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
std::make_shared<::arrow::fs::LocalFileSystem>());
temp_filepath_ = CreateNewTempFilePath();
}

std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::shared_ptr<iceberg::FileIO> file_io_;
std::filesystem::path tmpfile = std::filesystem::temp_directory_path() / "123.txt";
std::string temp_filepath_;
};

TEST_F(LocalFileIOTest, ReadWriteFile) {
auto read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
auto read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
EXPECT_THAT(read_res, IsError(ErrorKind::kIOError));
EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file"));

auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world");
auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
EXPECT_THAT(write_res, IsOk());

read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
EXPECT_THAT(read_res, IsOk());
EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));
}

TEST_F(LocalFileIOTest, DeleteFile) {
auto del_res = file_io_->DeleteFile(tmpfile.string());
auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
EXPECT_THAT(write_res, IsOk());

auto del_res = file_io_->DeleteFile(temp_filepath_);
EXPECT_THAT(del_res, IsOk());

del_res = file_io_->DeleteFile(tmpfile.string());
del_res = file_io_->DeleteFile(temp_filepath_);
EXPECT_THAT(del_res, IsError(ErrorKind::kIOError));
EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file"));
}
Expand Down
81 changes: 81 additions & 0 deletions test/metadata_io_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <arrow/filesystem/localfs.h>
#include <gtest/gtest.h>

#include "iceberg/arrow/arrow_fs_file_io.h"
#include "iceberg/file_io.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "matchers.h"
#include "temp_file_test_base.h"

namespace iceberg {

class MetadataIOTest : public TempFileTestBase {
protected:
void SetUp() override {
TempFileTestBase::SetUp();
io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
std::make_shared<::arrow::fs::LocalFileSystem>());
temp_filepath_ = CreateNewTempFilePathWithSuffix(".metadata.json");
}

std::shared_ptr<iceberg::FileIO> io_;
std::string temp_filepath_;
};

TEST_F(MetadataIOTest, ReadWriteMetadata) {
std::vector<SchemaField> schema_fields;
schema_fields.emplace_back(/*field_id=*/1, "x", std::make_shared<LongType>(),
/*optional=*/false);
auto schema = std::make_shared<Schema>(std::move(schema_fields), /*schema_id=*/1);

TableMetadata metadata{.format_version = 1,
.table_uuid = "1234567890",
.location = "s3://bucket/path",
.last_sequence_number = 0,
.schemas = {schema},
.current_schema_id = 1,
.default_spec_id = 0,
.last_partition_id = 0,
.properties = {{"key", "value"}},
.current_snapshot_id = 3051729675574597004,
.snapshots = {std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 3051729675574597004,
.sequence_number = 0,
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
.manifest_list = "s3://a/b/1.avro",
.summary = {{"operation", "append"}},
})},
.default_sort_order_id = 0,
.next_row_id = 0};

EXPECT_THAT(TableMetadataUtil::Write(*io_, temp_filepath_, metadata), IsOk());

auto result = TableMetadataUtil::Read(*io_, temp_filepath_);
EXPECT_THAT(result, IsOk());

auto metadata_read = std::move(result.value());
EXPECT_EQ(*metadata_read, metadata);
}

} // namespace iceberg
Loading
Loading