Skip to content

Commit a14c2b7

Browse files
authored
feat: add table metadata reader and writer (#85)
1 parent 60bff25 commit a14c2b7

File tree

8 files changed

+450
-13
lines changed

8 files changed

+450
-13
lines changed

src/iceberg/json_internal.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,4 +1209,20 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
12091209
return table_metadata;
12101210
}
12111211

1212+
Result<nlohmann::json> FromJsonString(const std::string& json_string) {
1213+
try {
1214+
return nlohmann::json::parse(json_string);
1215+
} catch (const std::exception& e) {
1216+
return JsonParseError("Failed to parse JSON string: {}", e.what());
1217+
}
1218+
}
1219+
1220+
Result<std::string> ToJsonString(const nlohmann::json& json) {
1221+
try {
1222+
return json.dump();
1223+
} catch (const std::exception& e) {
1224+
return JsonParseError("Failed to serialize to JSON string: {}", e.what());
1225+
}
1226+
}
1227+
12121228
} // namespace iceberg

src/iceberg/json_internal.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,4 +244,16 @@ nlohmann::json ToJson(const TableMetadata& table_metadata);
244244
/// \return A `TableMetadata` object or an error if the conversion fails.
245245
Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::json& json);
246246

247+
/// \brief Deserialize a JSON string into a `nlohmann::json` object.
248+
///
249+
/// \param json_string The JSON string to deserialize.
250+
/// \return A `nlohmann::json` object or an error if the deserialization fails.
251+
Result<nlohmann::json> FromJsonString(const std::string& json_string);
252+
253+
/// \brief Serialize a `nlohmann::json` object into a JSON string.
254+
///
255+
/// \param json The `nlohmann::json` object to serialize.
256+
/// \return A JSON string or an error if the serialization fails.
257+
Result<std::string> ToJsonString(const nlohmann::json& json);
258+
247259
} // namespace iceberg

src/iceberg/table_metadata.cc

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,17 @@
2323
#include <ranges>
2424
#include <string>
2525

26+
#include <nlohmann/json.hpp>
27+
28+
#include "iceberg/file_io.h"
29+
#include "iceberg/json_internal.h"
2630
#include "iceberg/partition_spec.h"
2731
#include "iceberg/result.h"
2832
#include "iceberg/schema.h"
33+
#include "iceberg/snapshot.h"
2934
#include "iceberg/sort_order.h"
35+
#include "iceberg/util/macros.h"
36+
3037
namespace iceberg {
3138

3239
std::string ToString(const SnapshotLogEntry& entry) {
@@ -69,4 +76,100 @@ Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
6976
return *iter;
7077
}
7178

79+
namespace {
80+
81+
template <typename T>
82+
bool SharedPtrVectorEquals(const std::vector<std::shared_ptr<T>>& lhs,
83+
const std::vector<std::shared_ptr<T>>& rhs) {
84+
if (lhs.size() != rhs.size()) {
85+
return false;
86+
}
87+
for (size_t i = 0; i < lhs.size(); ++i) {
88+
if (*lhs[i] != *rhs[i]) {
89+
return false;
90+
}
91+
}
92+
return true;
93+
}
94+
95+
bool SnapshotRefEquals(
96+
const std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>& lhs,
97+
const std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>& rhs) {
98+
if (lhs.size() != rhs.size()) {
99+
return false;
100+
}
101+
for (const auto& [key, value] : lhs) {
102+
auto iter = rhs.find(key);
103+
if (iter == rhs.end()) {
104+
return false;
105+
}
106+
if (*iter->second != *value) {
107+
return false;
108+
}
109+
}
110+
return true;
111+
}
112+
113+
} // namespace
114+
115+
bool operator==(const TableMetadata& lhs, const TableMetadata& rhs) {
116+
return lhs.format_version == rhs.format_version && lhs.table_uuid == rhs.table_uuid &&
117+
lhs.location == rhs.location &&
118+
lhs.last_sequence_number == rhs.last_sequence_number &&
119+
lhs.last_updated_ms == rhs.last_updated_ms &&
120+
lhs.last_column_id == rhs.last_column_id &&
121+
lhs.current_schema_id == rhs.current_schema_id &&
122+
SharedPtrVectorEquals(lhs.schemas, rhs.schemas) &&
123+
lhs.default_spec_id == rhs.default_spec_id &&
124+
lhs.last_partition_id == rhs.last_partition_id &&
125+
lhs.properties == rhs.properties &&
126+
lhs.current_snapshot_id == rhs.current_snapshot_id &&
127+
SharedPtrVectorEquals(lhs.snapshots, rhs.snapshots) &&
128+
lhs.snapshot_log == rhs.snapshot_log && lhs.metadata_log == rhs.metadata_log &&
129+
SharedPtrVectorEquals(lhs.sort_orders, rhs.sort_orders) &&
130+
lhs.default_sort_order_id == rhs.default_sort_order_id &&
131+
SnapshotRefEquals(lhs.refs, rhs.refs) &&
132+
SharedPtrVectorEquals(lhs.statistics, rhs.statistics) &&
133+
SharedPtrVectorEquals(lhs.partition_statistics, rhs.partition_statistics) &&
134+
lhs.next_row_id == rhs.next_row_id;
135+
}
136+
137+
Result<MetadataFileCodecType> TableMetadataUtil::CodecFromFileName(
138+
std::string_view file_name) {
139+
if (file_name.find(".metadata.json") == std::string::npos) {
140+
return InvalidArgument("{} is not a valid metadata file", file_name);
141+
}
142+
143+
// We have to be backward-compatible with .metadata.json.gz files
144+
if (file_name.ends_with(".metadata.json.gz")) {
145+
return MetadataFileCodecType::kGzip;
146+
}
147+
148+
std::string_view file_name_without_suffix =
149+
file_name.substr(0, file_name.find_last_of(".metadata.json"));
150+
if (file_name_without_suffix.ends_with(".gz")) {
151+
return MetadataFileCodecType::kGzip;
152+
}
153+
return MetadataFileCodecType::kNone;
154+
}
155+
156+
Result<std::unique_ptr<TableMetadata>> TableMetadataUtil::Read(
157+
FileIO& io, const std::string& location, std::optional<size_t> length) {
158+
ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location));
159+
if (codec_type == MetadataFileCodecType::kGzip) {
160+
return NotImplemented("Reading gzip-compressed metadata files is not supported yet");
161+
}
162+
163+
ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
164+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content));
165+
return TableMetadataFromJson(json);
166+
}
167+
168+
Status TableMetadataUtil::Write(FileIO& io, const std::string& location,
169+
const TableMetadata& metadata) {
170+
auto json = ToJson(metadata);
171+
ICEBERG_ASSIGN_OR_RAISE(auto json_string, ToJsonString(json));
172+
return io.WriteFile(location, json_string);
173+
}
174+
72175
} // namespace iceberg

src/iceberg/table_metadata.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include <memory>
2626
#include <string>
27+
#include <string_view>
2728
#include <unordered_map>
2829
#include <vector>
2930

@@ -132,6 +133,12 @@ struct ICEBERG_EXPORT TableMetadata {
132133
Result<std::shared_ptr<PartitionSpec>> PartitionSpec() const;
133134
/// \brief Get the current sort order, return NotFoundError if not found
134135
Result<std::shared_ptr<SortOrder>> SortOrder() const;
136+
137+
friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs);
138+
139+
friend bool operator!=(const TableMetadata& lhs, const TableMetadata& rhs) {
140+
return !(lhs == rhs);
141+
}
135142
};
136143

137144
/// \brief Returns a string representation of a SnapshotLogEntry
@@ -140,4 +147,37 @@ ICEBERG_EXPORT std::string ToString(const SnapshotLogEntry& entry);
140147
/// \brief Returns a string representation of a MetadataLogEntry
141148
ICEBERG_EXPORT std::string ToString(const MetadataLogEntry& entry);
142149

150+
/// \brief The codec type of the table metadata file.
151+
ICEBERG_EXPORT enum class MetadataFileCodecType {
152+
kNone,
153+
kGzip,
154+
};
155+
156+
/// \brief Utility class for table metadata
157+
struct ICEBERG_EXPORT TableMetadataUtil {
158+
/// \brief Get the codec type from the table metadata file name.
159+
///
160+
/// \param file_name The name of the table metadata file.
161+
/// \return The codec type of the table metadata file.
162+
static Result<MetadataFileCodecType> CodecFromFileName(std::string_view file_name);
163+
164+
/// \brief Read the table metadata file.
165+
///
166+
/// \param io The file IO to use to read the table metadata.
167+
/// \param location The location of the table metadata file.
168+
/// \param length The optional length of the table metadata file.
169+
/// \return The table metadata.
170+
static Result<std::unique_ptr<TableMetadata>> Read(
171+
class FileIO& io, const std::string& location,
172+
std::optional<size_t> length = std::nullopt);
173+
174+
/// \brief Write the table metadata to a file.
175+
///
176+
/// \param io The file IO to use to write the table metadata.
177+
/// \param location The location of the table metadata file.
178+
/// \param metadata The table metadata to write.
179+
static Status Write(FileIO& io, const std::string& location,
180+
const TableMetadata& metadata);
181+
};
182+
143183
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ if(ICEBERG_BUILD_BUNDLE)
6969
add_test(NAME avro_test COMMAND avro_test)
7070

7171
add_executable(arrow_test)
72-
target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc)
72+
target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc
73+
metadata_io_test.cc)
7374
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
7475
GTest::gmock)
7576
add_test(NAME arrow_test COMMAND arrow_test)

test/arrow_fs_file_io_test.cc

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,45 +19,48 @@
1919

2020
#include "iceberg/arrow/arrow_fs_file_io.h"
2121

22-
#include <filesystem>
23-
2422
#include <arrow/filesystem/localfs.h>
2523
#include <gtest/gtest.h>
2624

2725
#include "matchers.h"
26+
#include "temp_file_test_base.h"
2827

2928
namespace iceberg {
3029

31-
class LocalFileIOTest : public testing::Test {
30+
class LocalFileIOTest : public TempFileTestBase {
3231
protected:
3332
void SetUp() override {
34-
local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
35-
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
33+
TempFileTestBase::SetUp();
34+
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
35+
std::make_shared<::arrow::fs::LocalFileSystem>());
36+
temp_filepath_ = CreateNewTempFilePath();
3637
}
3738

38-
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
3939
std::shared_ptr<iceberg::FileIO> file_io_;
40-
std::filesystem::path tmpfile = std::filesystem::temp_directory_path() / "123.txt";
40+
std::string temp_filepath_;
4141
};
4242

4343
TEST_F(LocalFileIOTest, ReadWriteFile) {
44-
auto read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
44+
auto read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
4545
EXPECT_THAT(read_res, IsError(ErrorKind::kIOError));
4646
EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file"));
4747

48-
auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world");
48+
auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
4949
EXPECT_THAT(write_res, IsOk());
5050

51-
read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
51+
read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
5252
EXPECT_THAT(read_res, IsOk());
5353
EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));
5454
}
5555

5656
TEST_F(LocalFileIOTest, DeleteFile) {
57-
auto del_res = file_io_->DeleteFile(tmpfile.string());
57+
auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
58+
EXPECT_THAT(write_res, IsOk());
59+
60+
auto del_res = file_io_->DeleteFile(temp_filepath_);
5861
EXPECT_THAT(del_res, IsOk());
5962

60-
del_res = file_io_->DeleteFile(tmpfile.string());
63+
del_res = file_io_->DeleteFile(temp_filepath_);
6164
EXPECT_THAT(del_res, IsError(ErrorKind::kIOError));
6265
EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file"));
6366
}

test/metadata_io_test.cc

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <arrow/filesystem/localfs.h>
21+
#include <gtest/gtest.h>
22+
23+
#include "iceberg/arrow/arrow_fs_file_io.h"
24+
#include "iceberg/file_io.h"
25+
#include "iceberg/schema.h"
26+
#include "iceberg/snapshot.h"
27+
#include "iceberg/table_metadata.h"
28+
#include "matchers.h"
29+
#include "temp_file_test_base.h"
30+
31+
namespace iceberg {
32+
33+
class MetadataIOTest : public TempFileTestBase {
34+
protected:
35+
void SetUp() override {
36+
TempFileTestBase::SetUp();
37+
io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
38+
std::make_shared<::arrow::fs::LocalFileSystem>());
39+
temp_filepath_ = CreateNewTempFilePathWithSuffix(".metadata.json");
40+
}
41+
42+
std::shared_ptr<iceberg::FileIO> io_;
43+
std::string temp_filepath_;
44+
};
45+
46+
TEST_F(MetadataIOTest, ReadWriteMetadata) {
47+
std::vector<SchemaField> schema_fields;
48+
schema_fields.emplace_back(/*field_id=*/1, "x", std::make_shared<LongType>(),
49+
/*optional=*/false);
50+
auto schema = std::make_shared<Schema>(std::move(schema_fields), /*schema_id=*/1);
51+
52+
TableMetadata metadata{.format_version = 1,
53+
.table_uuid = "1234567890",
54+
.location = "s3://bucket/path",
55+
.last_sequence_number = 0,
56+
.schemas = {schema},
57+
.current_schema_id = 1,
58+
.default_spec_id = 0,
59+
.last_partition_id = 0,
60+
.properties = {{"key", "value"}},
61+
.current_snapshot_id = 3051729675574597004,
62+
.snapshots = {std::make_shared<Snapshot>(Snapshot{
63+
.snapshot_id = 3051729675574597004,
64+
.sequence_number = 0,
65+
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
66+
.manifest_list = "s3://a/b/1.avro",
67+
.summary = {{"operation", "append"}},
68+
})},
69+
.default_sort_order_id = 0,
70+
.next_row_id = 0};
71+
72+
EXPECT_THAT(TableMetadataUtil::Write(*io_, temp_filepath_, metadata), IsOk());
73+
74+
auto result = TableMetadataUtil::Read(*io_, temp_filepath_);
75+
EXPECT_THAT(result, IsOk());
76+
77+
auto metadata_read = std::move(result.value());
78+
EXPECT_EQ(*metadata_read, metadata);
79+
}
80+
81+
} // namespace iceberg

0 commit comments

Comments
 (0)