Skip to content

Commit cd9ea76

Browse files
dongxiao1198xiao.dong
andauthored
feat: support decompress gzip metadata (#108)
support gzip compressed metadata json --------- Co-authored-by: xiao.dong <[email protected]>
1 parent 5aec271 commit cd9ea76

File tree

8 files changed

+319
-33
lines changed

8 files changed

+319
-33
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,24 @@ set(ICEBERG_SOURCES
4444
type.cc
4545
util/murmurhash3_internal.cc
4646
util/timepoint.cc
47-
util/unreachable.cc)
47+
util/unreachable.cc
48+
util/gzip_internal.cc)
4849

4950
set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS)
5051
set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS)
5152
set(ICEBERG_STATIC_INSTALL_INTERFACE_LIBS)
5253
set(ICEBERG_SHARED_INSTALL_INTERFACE_LIBS)
5354

54-
list(APPEND ICEBERG_STATIC_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow
55-
nlohmann_json::nlohmann_json)
56-
list(APPEND ICEBERG_SHARED_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow
57-
nlohmann_json::nlohmann_json)
55+
list(APPEND
56+
ICEBERG_STATIC_BUILD_INTERFACE_LIBS
57+
nanoarrow::nanoarrow
58+
nlohmann_json::nlohmann_json
59+
ZLIB::ZLIB)
60+
list(APPEND
61+
ICEBERG_SHARED_BUILD_INTERFACE_LIBS
62+
nanoarrow::nanoarrow
63+
nlohmann_json::nlohmann_json
64+
ZLIB::ZLIB)
5865
list(APPEND ICEBERG_STATIC_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow"
5966
"Iceberg::nlohmann_json")
6067
list(APPEND ICEBERG_SHARED_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow"

src/iceberg/result.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ namespace iceberg {
3131
enum class ErrorKind {
3232
kAlreadyExists,
3333
kCommitStateUnknown,
34+
kDecompressError,
3435
kInvalidArgument,
3536
kInvalidExpression,
3637
kInvalidSchema,
@@ -74,6 +75,7 @@ using Status = Result<void>;
7475

7576
DEFINE_ERROR_FUNCTION(AlreadyExists)
7677
DEFINE_ERROR_FUNCTION(CommitStateUnknown)
78+
DEFINE_ERROR_FUNCTION(DecompressError)
7779
DEFINE_ERROR_FUNCTION(InvalidArgument)
7880
DEFINE_ERROR_FUNCTION(InvalidExpression)
7981
DEFINE_ERROR_FUNCTION(InvalidSchema)

src/iceberg/table_metadata.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include "iceberg/table_metadata.h"
2121

2222
#include <format>
23-
#include <ranges>
2423
#include <string>
2524

2625
#include <nlohmann/json.hpp>
@@ -32,6 +31,7 @@
3231
#include "iceberg/schema.h"
3332
#include "iceberg/snapshot.h"
3433
#include "iceberg/sort_order.h"
34+
#include "iceberg/util/gzip_internal.h"
3535
#include "iceberg/util/macros.h"
3636

3737
namespace iceberg {
@@ -156,11 +156,16 @@ Result<MetadataFileCodecType> TableMetadataUtil::CodecFromFileName(
156156
Result<std::unique_ptr<TableMetadata>> TableMetadataUtil::Read(
157157
FileIO& io, const std::string& location, std::optional<size_t> length) {
158158
ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location));
159+
160+
ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
159161
if (codec_type == MetadataFileCodecType::kGzip) {
160-
return NotImplemented("Reading gzip-compressed metadata files is not supported yet");
162+
auto gzip_decompressor = std::make_unique<GZipDecompressor>();
163+
ICEBERG_RETURN_UNEXPECTED(gzip_decompressor->Init());
164+
auto result = gzip_decompressor->Decompress(content);
165+
ICEBERG_RETURN_UNEXPECTED(result);
166+
content = result.value();
161167
}
162168

163-
ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
164169
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content));
165170
return TableMetadataFromJson(json);
166171
}

src/iceberg/util/gzip_internal.cc

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/util/gzip_internal.h"
21+
22+
#include <zlib.h>
23+
24+
#include <cstring>
25+
26+
#include "iceberg/util/macros.h"
27+
28+
namespace iceberg {
29+
30+
class ZlibImpl {
31+
public:
32+
ZlibImpl() { memset(&stream_, 0, sizeof(stream_)); }
33+
34+
~ZlibImpl() {
35+
if (initialized_) {
36+
inflateEnd(&stream_);
37+
}
38+
}
39+
40+
Status Init() {
41+
// Maximum window size
42+
constexpr int kWindowBits = 15;
43+
// Determine if this is libz or gzip from header.
44+
constexpr int kDetectCodec = 32;
45+
int ret = inflateInit2(&stream_, kWindowBits | kDetectCodec);
46+
if (ret != Z_OK) {
47+
return DecompressError("inflateInit2 failed, result:{}", ret);
48+
}
49+
initialized_ = true;
50+
return {};
51+
}
52+
53+
Result<std::string> Decompress(const std::string& compressed_data) {
54+
if (compressed_data.empty()) {
55+
return {};
56+
}
57+
if (!initialized_) {
58+
ICEBERG_RETURN_UNEXPECTED(Init());
59+
}
60+
stream_.avail_in = static_cast<uInt>(compressed_data.size());
61+
stream_.next_in = reinterpret_cast<Bytef*>(const_cast<char*>(compressed_data.data()));
62+
63+
// TODO(xiao.dong) magic buffer, can we get a estimated size from compressed data?
64+
std::vector<char> out_buffer(32 * 1024);
65+
std::string result;
66+
int ret = 0;
67+
do {
68+
stream_.avail_out = static_cast<uInt>(out_buffer.size());
69+
stream_.next_out = reinterpret_cast<Bytef*>(out_buffer.data());
70+
ret = inflate(&stream_, Z_NO_FLUSH);
71+
if (ret != Z_OK && ret != Z_STREAM_END) {
72+
return DecompressError("inflate failed, result:{}", ret);
73+
}
74+
result.append(out_buffer.data(), out_buffer.size() - stream_.avail_out);
75+
} while (ret != Z_STREAM_END);
76+
return result;
77+
}
78+
79+
private:
80+
bool initialized_ = false;
81+
z_stream stream_;
82+
};
83+
84+
GZipDecompressor::GZipDecompressor() : zlib_impl_(std::make_unique<ZlibImpl>()) {}
85+
86+
GZipDecompressor::~GZipDecompressor() = default;
87+
88+
Status GZipDecompressor::Init() { return zlib_impl_->Init(); }
89+
90+
Result<std::string> GZipDecompressor::Decompress(const std::string& compressed_data) {
91+
return zlib_impl_->Decompress(compressed_data);
92+
}
93+
94+
} // namespace iceberg

src/iceberg/util/gzip_internal.h

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
#include <string>
24+
25+
#include "iceberg/result.h"
26+
27+
namespace iceberg {
28+
29+
class ZlibImpl;
30+
31+
class GZipDecompressor {
32+
public:
33+
GZipDecompressor();
34+
35+
~GZipDecompressor();
36+
37+
Status Init();
38+
39+
Result<std::string> Decompress(const std::string& compressed_data);
40+
41+
private:
42+
std::unique_ptr<ZlibImpl> zlib_impl_;
43+
};
44+
45+
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ if(ICEBERG_BUILD_BUNDLE)
7878

7979
add_executable(arrow_test)
8080
target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc
81-
metadata_io_test.cc)
81+
metadata_io_test.cc gzip_decompress_test.cc)
8282
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
8383
GTest::gmock)
8484
add_test(NAME arrow_test COMMAND arrow_test)

test/gzip_decompress_test.cc

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <arrow/filesystem/localfs.h>
21+
#include <arrow/io/compressed.h>
22+
#include <arrow/io/file.h>
23+
#include <arrow/util/compression.h>
24+
#include <gtest/gtest.h>
25+
26+
#include "iceberg/arrow/arrow_fs_file_io.h"
27+
#include "iceberg/file_io.h"
28+
#include "iceberg/util/gzip_internal.h"
29+
#include "matchers.h"
30+
#include "temp_file_test_base.h"
31+
32+
namespace iceberg {
33+
34+
class GZipTest : public TempFileTestBase {
35+
protected:
36+
void SetUp() override {
37+
TempFileTestBase::SetUp();
38+
io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
39+
std::make_shared<::arrow::fs::LocalFileSystem>());
40+
temp_filepath_ = CreateNewTempFilePathWithSuffix("test.gz");
41+
}
42+
43+
std::shared_ptr<iceberg::FileIO> io_;
44+
std::string temp_filepath_;
45+
};
46+
47+
TEST_F(GZipTest, GZipDecompressedString) {
48+
auto test_string = GenerateRandomString(1024);
49+
50+
#define ARROW_ASSIGN_OR_THROW(var, expr) \
51+
{ \
52+
auto result = expr; \
53+
ASSERT_TRUE(result.ok()); \
54+
var = std::move(result.ValueUnsafe()); \
55+
}
56+
std::shared_ptr<::arrow::io::FileOutputStream> out_stream;
57+
ARROW_ASSIGN_OR_THROW(out_stream, ::arrow::io::FileOutputStream::Open(temp_filepath_));
58+
59+
std::shared_ptr<::arrow::util::Codec> gzip_codec;
60+
ARROW_ASSIGN_OR_THROW(gzip_codec,
61+
::arrow::util::Codec::Create(::arrow::Compression::GZIP));
62+
63+
std::shared_ptr<::arrow::io::OutputStream> compressed_stream;
64+
ARROW_ASSIGN_OR_THROW(compressed_stream, ::arrow::io::CompressedOutputStream::Make(
65+
gzip_codec.get(), out_stream));
66+
#undef ARROW_ASSIGN_OR_THROW
67+
68+
ASSERT_TRUE(compressed_stream->Write(test_string).ok());
69+
ASSERT_TRUE(compressed_stream->Flush().ok());
70+
ASSERT_TRUE(compressed_stream->Close().ok());
71+
72+
auto result = io_->ReadFile(temp_filepath_, test_string.size());
73+
EXPECT_THAT(result, IsOk());
74+
75+
auto gzip_decompressor = std::make_unique<GZipDecompressor>();
76+
EXPECT_THAT(gzip_decompressor->Init(), IsOk());
77+
EXPECT_THAT(result, IsOk());
78+
auto read_data = gzip_decompressor->Decompress(result.value());
79+
EXPECT_THAT(read_data, IsOk());
80+
ASSERT_EQ(read_data.value(), test_string);
81+
}
82+
83+
} // namespace iceberg

0 commit comments

Comments
 (0)