Skip to content

Commit ed1e682

Browse files
author
xiao.dong
committed
add individual decompressor and test
1 parent b7d29bd commit ed1e682

File tree

7 files changed

+227
-55
lines changed

7 files changed

+227
-55
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ set(ICEBERG_SOURCES
4242
type.cc
4343
util/murmurhash3_internal.cc
4444
util/timepoint.cc
45-
util/unreachable.cc)
45+
util/unreachable.cc
46+
util/gzip_internal.cc)
4647

4748
set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS)
4849
set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS)

src/iceberg/result.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ enum class ErrorKind {
3838
kJsonParseError,
3939
kNoSuchNamespace,
4040
kNoSuchTable,
41+
kDecompressError,
4142
kNotFound,
4243
kNotImplemented,
4344
kNotSupported,
@@ -80,6 +81,7 @@ DEFINE_ERROR_FUNCTION(IOError)
8081
DEFINE_ERROR_FUNCTION(JsonParseError)
8182
DEFINE_ERROR_FUNCTION(NoSuchNamespace)
8283
DEFINE_ERROR_FUNCTION(NoSuchTable)
84+
DEFINE_ERROR_FUNCTION(DecompressError)
8385
DEFINE_ERROR_FUNCTION(NotFound)
8486
DEFINE_ERROR_FUNCTION(NotImplemented)
8587
DEFINE_ERROR_FUNCTION(NotSupported)

src/iceberg/table_metadata.cc

Lines changed: 1 addition & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
#include "iceberg/table_metadata.h"
2121

22-
#include <zlib.h>
23-
2422
#include <format>
2523
#include <string>
2624

@@ -33,6 +31,7 @@
3331
#include "iceberg/schema.h"
3432
#include "iceberg/snapshot.h"
3533
#include "iceberg/sort_order.h"
34+
#include "iceberg/util/gzip_internal.h"
3635
#include "iceberg/util/macros.h"
3736

3837
namespace iceberg {
@@ -154,57 +153,6 @@ Result<MetadataFileCodecType> TableMetadataUtil::CodecFromFileName(
154153
return MetadataFileCodecType::kNone;
155154
}
156155

157-
class GZipDecompressor {
158-
public:
159-
GZipDecompressor() { memset(&stream_, 0, sizeof(stream_)); }
160-
161-
~GZipDecompressor() {
162-
if (initialized_) {
163-
inflateEnd(&stream_);
164-
}
165-
}
166-
167-
Status Init() {
168-
int ret = inflateInit2(&stream_, 15 + 32);
169-
if (ret != Z_OK) {
170-
return IOError("inflateInit2 failed, result:{}", ret);
171-
}
172-
initialized_ = true;
173-
return {};
174-
}
175-
176-
Result<std::string> Decompress(const std::string& compressed_data) {
177-
if (compressed_data.empty()) {
178-
return {};
179-
}
180-
if (!initialized_) {
181-
ICEBERG_RETURN_UNEXPECTED(Init());
182-
}
183-
stream_.avail_in = static_cast<uInt>(compressed_data.size());
184-
stream_.next_in = reinterpret_cast<Bytef*>(const_cast<char*>(compressed_data.data()));
185-
186-
// TODO(xiao.dong) magic buffer, can we get a estimated size from compressed data?
187-
std::vector<char> outBuffer(32 * 1024);
188-
std::string result;
189-
int ret = 0;
190-
do {
191-
outBuffer.resize(outBuffer.size());
192-
stream_.avail_out = static_cast<uInt>(outBuffer.size());
193-
stream_.next_out = reinterpret_cast<Bytef*>(outBuffer.data());
194-
ret = inflate(&stream_, Z_NO_FLUSH);
195-
if (ret != Z_OK && ret != Z_STREAM_END) {
196-
return IOError("inflate failed, result:{}", ret);
197-
}
198-
result.append(outBuffer.data(), outBuffer.size() - stream_.avail_out);
199-
} while (ret != Z_STREAM_END);
200-
return result;
201-
}
202-
203-
private:
204-
bool initialized_ = false;
205-
z_stream stream_;
206-
};
207-
208156
Result<std::unique_ptr<TableMetadata>> TableMetadataUtil::Read(
209157
FileIO& io, const std::string& location, std::optional<size_t> length) {
210158
ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location));

src/iceberg/util/gzip_internal.cc

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/util/gzip_internal.h"
21+
22+
#include <zlib.h>
23+
24+
#include "iceberg/util/macros.h"
25+
26+
namespace iceberg {
27+
28+
class ZlibImpl {
29+
public:
30+
ZlibImpl() { memset(&stream_, 0, sizeof(stream_)); }
31+
32+
~ZlibImpl() {
33+
if (initialized_) {
34+
inflateEnd(&stream_);
35+
}
36+
}
37+
38+
Status Init() {
39+
// Maximum window size
40+
static int WINDOW_BITS = 15;
41+
// Determine if this is libz or gzip from header.
42+
static int DETECT_CODEC = 32;
43+
44+
int ret = inflateInit2(&stream_, WINDOW_BITS | DETECT_CODEC);
45+
if (ret != Z_OK) {
46+
return DecompressError("inflateInit2 failed, result:{}", ret);
47+
}
48+
initialized_ = true;
49+
return {};
50+
}
51+
52+
Result<std::string> Decompress(const std::string& compressed_data) {
53+
if (compressed_data.empty()) {
54+
return {};
55+
}
56+
if (!initialized_) {
57+
ICEBERG_RETURN_UNEXPECTED(Init());
58+
}
59+
stream_.avail_in = static_cast<uInt>(compressed_data.size());
60+
stream_.next_in = reinterpret_cast<Bytef*>(const_cast<char*>(compressed_data.data()));
61+
62+
// TODO(xiao.dong) magic buffer, can we get a estimated size from compressed data?
63+
std::vector<char> out_buffer(32 * 1024);
64+
std::string result;
65+
int ret = 0;
66+
do {
67+
stream_.avail_out = static_cast<uInt>(out_buffer.size());
68+
stream_.next_out = reinterpret_cast<Bytef*>(out_buffer.data());
69+
ret = inflate(&stream_, Z_NO_FLUSH);
70+
if (ret != Z_OK && ret != Z_STREAM_END) {
71+
return DecompressError("inflate failed, result:{}", ret);
72+
}
73+
result.append(out_buffer.data(), out_buffer.size() - stream_.avail_out);
74+
} while (ret != Z_STREAM_END);
75+
return result;
76+
}
77+
78+
private:
79+
bool initialized_ = false;
80+
z_stream stream_;
81+
};
82+
83+
GZipDecompressor::GZipDecompressor() : zlib_impl_(std::make_shared<ZlibImpl>()) {}
84+
85+
GZipDecompressor::~GZipDecompressor() = default;
86+
87+
Status GZipDecompressor::Init() { return zlib_impl_->Init(); }
88+
89+
Result<std::string> GZipDecompressor::Decompress(const std::string& compressed_data) {
90+
return zlib_impl_->Decompress(compressed_data);
91+
}
92+
93+
} // namespace iceberg

src/iceberg/util/gzip_internal.h

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
#include <string>
24+
25+
#include "iceberg/result.h"
26+
27+
namespace iceberg {
28+
29+
class ZlibImpl;
30+
31+
class GZipDecompressor {
32+
public:
33+
GZipDecompressor();
34+
35+
~GZipDecompressor();
36+
37+
Status Init();
38+
39+
Result<std::string> Decompress(const std::string& compressed_data);
40+
41+
private:
42+
std::shared_ptr<ZlibImpl> zlib_impl_;
43+
};
44+
45+
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ if(ICEBERG_BUILD_BUNDLE)
7272

7373
add_executable(arrow_test)
7474
target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc
75-
metadata_io_test.cc)
75+
metadata_io_test.cc gzip_decompress_test.cc)
7676
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
7777
GTest::gmock)
7878
add_test(NAME arrow_test COMMAND arrow_test)

test/gzip_decompress_test.cc

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <arrow/filesystem/localfs.h>
21+
#include <arrow/io/compressed.h>
22+
#include <arrow/io/file.h>
23+
#include <arrow/util/compression.h>
24+
#include <gtest/gtest.h>
25+
26+
#include "iceberg/arrow/arrow_fs_file_io.h"
27+
#include "iceberg/file_io.h"
28+
#include "iceberg/util/gzip_internal.h"
29+
#include "matchers.h"
30+
#include "temp_file_test_base.h"
31+
32+
namespace iceberg {
33+
34+
class GZipTest : public TempFileTestBase {
35+
protected:
36+
void SetUp() override {
37+
TempFileTestBase::SetUp();
38+
io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
39+
std::make_shared<::arrow::fs::LocalFileSystem>());
40+
temp_filepath_ = CreateNewTempFilePathWithSuffix("test.gz");
41+
}
42+
43+
std::shared_ptr<iceberg::FileIO> io_;
44+
std::string temp_filepath_;
45+
};
46+
47+
TEST_F(GZipTest, GZipDecompressedString) {
48+
auto test_string = GenerateRandomString(1024);
49+
50+
#define ARROW_ASSIGN_OR_THROW(var, expr) \
51+
{ \
52+
auto result = expr; \
53+
ASSERT_TRUE(result.ok()); \
54+
var = std::move(result.ValueUnsafe()); \
55+
}
56+
std::shared_ptr<::arrow::io::FileOutputStream> out_stream;
57+
ARROW_ASSIGN_OR_THROW(out_stream, ::arrow::io::FileOutputStream::Open(temp_filepath_));
58+
59+
std::shared_ptr<::arrow::util::Codec> gzip_codec;
60+
ARROW_ASSIGN_OR_THROW(gzip_codec,
61+
::arrow::util::Codec::Create(::arrow::Compression::GZIP));
62+
63+
std::shared_ptr<::arrow::io::OutputStream> compressed_stream;
64+
ARROW_ASSIGN_OR_THROW(compressed_stream, ::arrow::io::CompressedOutputStream::Make(
65+
gzip_codec.get(), out_stream));
66+
#undef ARROW_ASSIGN_OR_THROW
67+
68+
ASSERT_TRUE(compressed_stream->Write(test_string).ok());
69+
ASSERT_TRUE(compressed_stream->Flush().ok());
70+
ASSERT_TRUE(compressed_stream->Close().ok());
71+
72+
auto result = io_->ReadFile(temp_filepath_, test_string.size());
73+
EXPECT_THAT(result, IsOk());
74+
75+
auto gzip_decompressor = std::make_unique<GZipDecompressor>();
76+
EXPECT_THAT(gzip_decompressor->Init(), IsOk());
77+
EXPECT_THAT(result, IsOk());
78+
auto read_data = gzip_decompressor->Decompress(result.value());
79+
EXPECT_THAT(read_data, IsOk());
80+
ASSERT_EQ(read_data.value(), test_string);
81+
}
82+
83+
} // namespace iceberg

0 commit comments

Comments
 (0)