Skip to content

Commit b391f75

Browse files
author
xiao.dong
committed
feat: support decompress gzip metadata
1 parent cb44bdc commit b391f75

File tree

3 files changed

+137
-7
lines changed

3 files changed

+137
-7
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,16 @@ set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS)
4949
set(ICEBERG_STATIC_INSTALL_INTERFACE_LIBS)
5050
set(ICEBERG_SHARED_INSTALL_INTERFACE_LIBS)
5151

52-
list(APPEND ICEBERG_STATIC_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow
53-
nlohmann_json::nlohmann_json)
54-
list(APPEND ICEBERG_SHARED_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow
55-
nlohmann_json::nlohmann_json)
52+
list(APPEND
53+
ICEBERG_STATIC_BUILD_INTERFACE_LIBS
54+
nanoarrow::nanoarrow
55+
nlohmann_json::nlohmann_json
56+
ZLIB::ZLIB)
57+
list(APPEND
58+
ICEBERG_SHARED_BUILD_INTERFACE_LIBS
59+
nanoarrow::nanoarrow
60+
nlohmann_json::nlohmann_json
61+
ZLIB::ZLIB)
5662
list(APPEND ICEBERG_STATIC_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow"
5763
"Iceberg::nlohmann_json")
5864
list(APPEND ICEBERG_SHARED_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow"

src/iceberg/table_metadata.cc

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919

2020
#include "iceberg/table_metadata.h"
2121

22+
#include <zlib.h>
23+
2224
#include <format>
23-
#include <ranges>
2425
#include <string>
2526

2627
#include <nlohmann/json.hpp>
@@ -153,14 +154,70 @@ Result<MetadataFileCodecType> TableMetadataUtil::CodecFromFileName(
153154
return MetadataFileCodecType::kNone;
154155
}
155156

157+
class GZipDecompressor {
158+
public:
159+
GZipDecompressor() : initialized_(false) {}
160+
161+
~GZipDecompressor() {
162+
if (initialized_) {
163+
inflateEnd(&stream_);
164+
}
165+
}
166+
167+
Status Init() {
168+
int ret = inflateInit2(&stream_, 15 + 32);
169+
if (ret != Z_OK) {
170+
return IOError("inflateInit2 failed, result:{}", ret);
171+
}
172+
initialized_ = true;
173+
return {};
174+
}
175+
176+
Result<std::string> Decompress(const std::string& compressed_data) {
177+
if (compressed_data.empty()) {
178+
return {};
179+
}
180+
if (!initialized_) {
181+
ICEBERG_RETURN_UNEXPECTED(Init());
182+
}
183+
stream_.avail_in = static_cast<uInt>(compressed_data.size());
184+
stream_.next_in = reinterpret_cast<Bytef*>(const_cast<char*>(compressed_data.data()));
185+
186+
// TODO(xiao.dong) magic buffer 16k, can we get a estimated size from compressed data?
187+
std::vector<char> outBuffer(32 * 1024);
188+
std::string result;
189+
int ret = 0;
190+
do {
191+
outBuffer.resize(outBuffer.size());
192+
stream_.avail_out = static_cast<uInt>(outBuffer.size());
193+
stream_.next_out = reinterpret_cast<Bytef*>(outBuffer.data());
194+
ret = inflate(&stream_, Z_NO_FLUSH);
195+
if (ret != Z_OK && ret != Z_STREAM_END) {
196+
return IOError("inflate failed, result:{}", ret);
197+
}
198+
result.append(outBuffer.data(), outBuffer.size() - stream_.avail_out);
199+
} while (ret != Z_STREAM_END);
200+
return result;
201+
}
202+
203+
private:
204+
bool initialized_ = false;
205+
z_stream stream_;
206+
};
207+
156208
Result<std::unique_ptr<TableMetadata>> TableMetadataUtil::Read(
157209
FileIO& io, const std::string& location, std::optional<size_t> length) {
158210
ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location));
211+
212+
ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
159213
if (codec_type == MetadataFileCodecType::kGzip) {
160-
return NotImplemented("Reading gzip-compressed metadata files is not supported yet");
214+
auto gzip_decompressor = std::make_unique<GZipDecompressor>();
215+
ICEBERG_RETURN_UNEXPECTED(gzip_decompressor->Init());
216+
auto result = gzip_decompressor->Decompress(content);
217+
ICEBERG_RETURN_UNEXPECTED(result);
218+
content = result.value();
161219
}
162220

163-
ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
164221
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content));
165222
return TableMetadataFromJson(json);
166223
}

test/metadata_io_test.cc

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818
*/
1919

2020
#include <arrow/filesystem/localfs.h>
21+
#include <arrow/io/compressed.h>
22+
#include <arrow/io/file.h>
23+
#include <arrow/util/compression.h>
2124
#include <gtest/gtest.h>
25+
#include <nlohmann/json.hpp>
2226

2327
#include "iceberg/arrow/arrow_fs_file_io.h"
2428
#include "iceberg/file_io.h"
29+
#include "iceberg/json_internal.h"
2530
#include "iceberg/schema.h"
2631
#include "iceberg/snapshot.h"
2732
#include "iceberg/table_metadata.h"
@@ -78,4 +83,66 @@ TEST_F(MetadataIOTest, ReadWriteMetadata) {
7883
EXPECT_EQ(*metadata_read, metadata);
7984
}
8085

86+
TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) {
87+
std::vector<SchemaField> schema_fields;
88+
schema_fields.emplace_back(/*field_id=*/1, "x", std::make_shared<LongType>(),
89+
/*optional=*/false);
90+
auto schema = std::make_shared<Schema>(std::move(schema_fields), /*schema_id=*/1);
91+
92+
TableMetadata metadata{.format_version = 1,
93+
.table_uuid = "1234567890",
94+
.location = "s3://bucket/path",
95+
.last_sequence_number = 0,
96+
.schemas = {schema},
97+
.current_schema_id = 1,
98+
.default_spec_id = 0,
99+
.last_partition_id = 0,
100+
.properties = {{"key", "value"}},
101+
.current_snapshot_id = 3051729675574597004,
102+
.snapshots = {std::make_shared<Snapshot>(Snapshot{
103+
.snapshot_id = 3051729675574597004,
104+
.sequence_number = 0,
105+
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
106+
.manifest_list = "s3://a/b/1.avro",
107+
.summary = {{"operation", "append"}},
108+
})},
109+
.default_sort_order_id = 0,
110+
.next_row_id = 0};
111+
112+
auto json = ToJson(metadata);
113+
auto ret = ToJsonString(json);
114+
ASSERT_TRUE(ret.has_value());
115+
auto json_string = ret.value();
116+
117+
#define ARROW_ASSIGN_OR_THROW(var, expr) \
118+
{ \
119+
auto result = expr; \
120+
ASSERT_TRUE(result.ok()); \
121+
var = std::move(result.ValueUnsafe()); \
122+
}
123+
124+
auto file_path = CreateNewTempFilePathWithSuffix(".metadata.json.gz");
125+
std::shared_ptr<::arrow::io::FileOutputStream> out_stream;
126+
ARROW_ASSIGN_OR_THROW(out_stream, ::arrow::io::FileOutputStream::Open(file_path));
127+
128+
std::shared_ptr<::arrow::util::Codec> gzip_codec;
129+
ARROW_ASSIGN_OR_THROW(gzip_codec,
130+
::arrow::util::Codec::Create(::arrow::Compression::GZIP));
131+
132+
std::shared_ptr<::arrow::io::OutputStream> compressed_stream;
133+
ARROW_ASSIGN_OR_THROW(compressed_stream, ::arrow::io::CompressedOutputStream::Make(
134+
gzip_codec.get(), out_stream));
135+
#undef ARROW_ASSIGN_OR_THROW
136+
137+
ASSERT_TRUE(compressed_stream->Write(json_string).ok());
138+
ASSERT_TRUE(compressed_stream->Flush().ok());
139+
ASSERT_TRUE(compressed_stream->Close().ok());
140+
141+
auto result = TableMetadataUtil::Read(*io_, file_path);
142+
EXPECT_THAT(result, IsOk());
143+
144+
auto metadata_read = std::move(result.value());
145+
EXPECT_EQ(*metadata_read, metadata);
146+
}
147+
81148
} // namespace iceberg

0 commit comments

Comments
 (0)