From b391f757392c7948d94a2ffd9c571d039a122d77 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 21 May 2025 16:28:00 +0800 Subject: [PATCH 1/8] feat: support decompress gzip metadata --- src/iceberg/CMakeLists.txt | 14 +++++--- src/iceberg/table_metadata.cc | 63 ++++++++++++++++++++++++++++++-- test/metadata_io_test.cc | 67 +++++++++++++++++++++++++++++++++++ 3 files changed, 137 insertions(+), 7 deletions(-) diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 44b5003e7..90a9bf350 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -49,10 +49,16 @@ set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS) set(ICEBERG_STATIC_INSTALL_INTERFACE_LIBS) set(ICEBERG_SHARED_INSTALL_INTERFACE_LIBS) -list(APPEND ICEBERG_STATIC_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow - nlohmann_json::nlohmann_json) -list(APPEND ICEBERG_SHARED_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow - nlohmann_json::nlohmann_json) +list(APPEND + ICEBERG_STATIC_BUILD_INTERFACE_LIBS + nanoarrow::nanoarrow + nlohmann_json::nlohmann_json + ZLIB::ZLIB) +list(APPEND + ICEBERG_SHARED_BUILD_INTERFACE_LIBS + nanoarrow::nanoarrow + nlohmann_json::nlohmann_json + ZLIB::ZLIB) list(APPEND ICEBERG_STATIC_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow" "Iceberg::nlohmann_json") list(APPEND ICEBERG_SHARED_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow" diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index c1ada81d9..0566dcb1c 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -19,8 +19,9 @@ #include "iceberg/table_metadata.h" +#include + #include -#include #include #include @@ -153,14 +154,70 @@ Result TableMetadataUtil::CodecFromFileName( return MetadataFileCodecType::kNone; } +class GZipDecompressor { + public: + GZipDecompressor() : initialized_(false) {} + + ~GZipDecompressor() { + if (initialized_) { + inflateEnd(&stream_); + } + } + + Status Init() { + int ret = inflateInit2(&stream_, 15 + 32); + if (ret != Z_OK) { + return IOError("inflateInit2 failed, result:{}", ret); + } + initialized_ = true; + return {}; + } + + Result Decompress(const std::string& compressed_data) { + if (compressed_data.empty()) { + return {}; + } + if (!initialized_) { + ICEBERG_RETURN_UNEXPECTED(Init()); + } + stream_.avail_in = static_cast(compressed_data.size()); + stream_.next_in = reinterpret_cast(const_cast(compressed_data.data())); + + // TODO(xiao.dong) magic buffer 16k, can we get a estimated size from compressed data? + std::vector outBuffer(32 * 1024); + std::string result; + int ret = 0; + do { + outBuffer.resize(outBuffer.size()); + stream_.avail_out = static_cast(outBuffer.size()); + stream_.next_out = reinterpret_cast(outBuffer.data()); + ret = inflate(&stream_, Z_NO_FLUSH); + if (ret != Z_OK && ret != Z_STREAM_END) { + return IOError("inflate failed, result:{}", ret); + } + result.append(outBuffer.data(), outBuffer.size() - stream_.avail_out); + } while (ret != Z_STREAM_END); + return result; + } + + private: + bool initialized_ = false; + z_stream stream_; +}; + Result> TableMetadataUtil::Read( FileIO& io, const std::string& location, std::optional length) { ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location)); + + ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length)); if (codec_type == MetadataFileCodecType::kGzip) { - return NotImplemented("Reading gzip-compressed metadata files is not supported yet"); + auto gzip_decompressor = std::make_unique(); + ICEBERG_RETURN_UNEXPECTED(gzip_decompressor->Init()); + auto result = gzip_decompressor->Decompress(content); + ICEBERG_RETURN_UNEXPECTED(result); + content = result.value(); } - ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length)); ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content)); return TableMetadataFromJson(json); } diff --git a/test/metadata_io_test.cc b/test/metadata_io_test.cc index cc98278b0..8f4f31d0d 100644 --- a/test/metadata_io_test.cc +++ b/test/metadata_io_test.cc @@ -18,10 +18,15 @@ */ #include +#include +#include +#include #include +#include #include "iceberg/arrow/arrow_fs_file_io.h" #include "iceberg/file_io.h" +#include "iceberg/json_internal.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" @@ -78,4 +83,66 @@ TEST_F(MetadataIOTest, ReadWriteMetadata) { EXPECT_EQ(*metadata_read, metadata); } +TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) { + std::vector schema_fields; + schema_fields.emplace_back(/*field_id=*/1, "x", std::make_shared(), + /*optional=*/false); + auto schema = std::make_shared(std::move(schema_fields), /*schema_id=*/1); + + TableMetadata metadata{.format_version = 1, + .table_uuid = "1234567890", + .location = "s3://bucket/path", + .last_sequence_number = 0, + .schemas = {schema}, + .current_schema_id = 1, + .default_spec_id = 0, + .last_partition_id = 0, + .properties = {{"key", "value"}}, + .current_snapshot_id = 3051729675574597004, + .snapshots = {std::make_shared(Snapshot{ + .snapshot_id = 3051729675574597004, + .sequence_number = 0, + .timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(), + .manifest_list = "s3://a/b/1.avro", + .summary = {{"operation", "append"}}, + })}, + .default_sort_order_id = 0, + .next_row_id = 0}; + + auto json = ToJson(metadata); + auto ret = ToJsonString(json); + ASSERT_TRUE(ret.has_value()); + auto json_string = ret.value(); + +#define ARROW_ASSIGN_OR_THROW(var, expr) \ + { \ + auto result = expr; \ + ASSERT_TRUE(result.ok()); \ + var = std::move(result.ValueUnsafe()); \ + } + + auto file_path = CreateNewTempFilePathWithSuffix(".metadata.json.gz"); + std::shared_ptr<::arrow::io::FileOutputStream> out_stream; + ARROW_ASSIGN_OR_THROW(out_stream, ::arrow::io::FileOutputStream::Open(file_path)); + + std::shared_ptr<::arrow::util::Codec> gzip_codec; + ARROW_ASSIGN_OR_THROW(gzip_codec, + ::arrow::util::Codec::Create(::arrow::Compression::GZIP)); + + std::shared_ptr<::arrow::io::OutputStream> compressed_stream; + ARROW_ASSIGN_OR_THROW(compressed_stream, ::arrow::io::CompressedOutputStream::Make( + gzip_codec.get(), out_stream)); +#undef ARROW_ASSIGN_OR_THROW + + ASSERT_TRUE(compressed_stream->Write(json_string).ok()); + ASSERT_TRUE(compressed_stream->Flush().ok()); + ASSERT_TRUE(compressed_stream->Close().ok()); + + auto result = TableMetadataUtil::Read(*io_, file_path); + EXPECT_THAT(result, IsOk()); + + auto metadata_read = std::move(result.value()); + EXPECT_EQ(*metadata_read, metadata); +} + } // namespace iceberg From 2dfa6174a1997bcd737d6d060aa70dfecca3d7c8 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 23 May 2025 14:24:09 +0800 Subject: [PATCH 2/8] use gread instead of streaming api --- src/iceberg/table_metadata.cc | 74 +++++++++++------------------------ 1 file changed, 23 insertions(+), 51 deletions(-) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 0566dcb1c..5e5b266b3 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -154,68 +154,40 @@ Result TableMetadataUtil::CodecFromFileName( return MetadataFileCodecType::kNone; } -class GZipDecompressor { - public: - GZipDecompressor() : initialized_(false) {} - - ~GZipDecompressor() { - if (initialized_) { - inflateEnd(&stream_); - } +Result DecompressGZIPFile(const std::string& filepath) { + gzFile file = gzopen(filepath.c_str(), "rb"); + if (!file) { + return IOError("Failed to open gzip file:{} ", filepath); } - Status Init() { - int ret = inflateInit2(&stream_, 15 + 32); - if (ret != Z_OK) { - return IOError("inflateInit2 failed, result:{}", ret); - } - initialized_ = true; - return {}; + const int CHUNK_SIZE = 32768; // 32KB chunks + char buffer[CHUNK_SIZE]; + std::string result; + int bytes_read; + + while ((bytes_read = gzread(file, buffer, CHUNK_SIZE)) > 0) { + result.append(buffer, bytes_read); } - Result Decompress(const std::string& compressed_data) { - if (compressed_data.empty()) { - return {}; - } - if (!initialized_) { - ICEBERG_RETURN_UNEXPECTED(Init()); - } - stream_.avail_in = static_cast(compressed_data.size()); - stream_.next_in = reinterpret_cast(const_cast(compressed_data.data())); - - // TODO(xiao.dong) magic buffer 16k, can we get a estimated size from compressed data? - std::vector outBuffer(32 * 1024); - std::string result; - int ret = 0; - do { - outBuffer.resize(outBuffer.size()); - stream_.avail_out = static_cast(outBuffer.size()); - stream_.next_out = reinterpret_cast(outBuffer.data()); - ret = inflate(&stream_, Z_NO_FLUSH); - if (ret != Z_OK && ret != Z_STREAM_END) { - return IOError("inflate failed, result:{}", ret); - } - result.append(outBuffer.data(), outBuffer.size() - stream_.avail_out); - } while (ret != Z_STREAM_END); - return result; + int err; + const char* error_msg = gzerror(file, &err); + if (err != Z_OK) { + gzclose(file); + return IOError("Error during gzip decompression:{} ", std::string(error_msg)); } - private: - bool initialized_ = false; - z_stream stream_; -}; + gzclose(file); + return result; +} Result> TableMetadataUtil::Read( FileIO& io, const std::string& location, std::optional length) { ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location)); - - ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length)); + std::string content; if (codec_type == MetadataFileCodecType::kGzip) { - auto gzip_decompressor = std::make_unique(); - ICEBERG_RETURN_UNEXPECTED(gzip_decompressor->Init()); - auto result = gzip_decompressor->Decompress(content); - ICEBERG_RETURN_UNEXPECTED(result); - content = result.value(); + ICEBERG_ASSIGN_OR_RAISE(content, DecompressGZIPFile(location)); + } else { + ICEBERG_ASSIGN_OR_RAISE(content, io.ReadFile(location, length)); } ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content)); From d815b7cbd0fad4228ec327c3af5f61b71185be58 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 23 May 2025 14:50:10 +0800 Subject: [PATCH 3/8] fix format --- src/iceberg/table_metadata.cc | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 5e5b266b3..387607ce1 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -21,6 +21,7 @@ #include +#include #include #include @@ -160,13 +161,13 @@ Result DecompressGZIPFile(const std::string& filepath) { return IOError("Failed to open gzip file:{} ", filepath); } - const int CHUNK_SIZE = 32768; // 32KB chunks - char buffer[CHUNK_SIZE]; + static const int CHUNK_SIZE = 32768; // 32KB chunks + std::array buffer; std::string result; int bytes_read; - while ((bytes_read = gzread(file, buffer, CHUNK_SIZE)) > 0) { - result.append(buffer, bytes_read); + while ((bytes_read = gzread(file, buffer.data(), CHUNK_SIZE)) > 0) { + result.append(buffer.data(), bytes_read); } int err; From b7d29bd52529127088b9a8933e705a57fc1366fa Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 23 May 2025 16:23:50 +0800 Subject: [PATCH 4/8] revert --- src/iceberg/table_metadata.cc | 75 ++++++++++++++++++++++++----------- 1 file changed, 51 insertions(+), 24 deletions(-) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 387607ce1..db900aac9 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -21,7 +21,6 @@ #include -#include #include #include @@ -155,40 +154,68 @@ Result TableMetadataUtil::CodecFromFileName( return MetadataFileCodecType::kNone; } -Result DecompressGZIPFile(const std::string& filepath) { - gzFile file = gzopen(filepath.c_str(), "rb"); - if (!file) { - return IOError("Failed to open gzip file:{} ", filepath); - } +class GZipDecompressor { + public: + GZipDecompressor() { memset(&stream_, 0, sizeof(stream_)); } - static const int CHUNK_SIZE = 32768; // 32KB chunks - std::array buffer; - std::string result; - int bytes_read; + ~GZipDecompressor() { + if (initialized_) { + inflateEnd(&stream_); + } + } - while ((bytes_read = gzread(file, buffer.data(), CHUNK_SIZE)) > 0) { - result.append(buffer.data(), bytes_read); + Status Init() { + int ret = inflateInit2(&stream_, 15 + 32); + if (ret != Z_OK) { + return IOError("inflateInit2 failed, result:{}", ret); + } + initialized_ = true; + return {}; } - int err; - const char* error_msg = gzerror(file, &err); - if (err != Z_OK) { - gzclose(file); - return IOError("Error during gzip decompression:{} ", std::string(error_msg)); + Result Decompress(const std::string& compressed_data) { + if (compressed_data.empty()) { + return {}; + } + if (!initialized_) { + ICEBERG_RETURN_UNEXPECTED(Init()); + } + stream_.avail_in = static_cast(compressed_data.size()); + stream_.next_in = reinterpret_cast(const_cast(compressed_data.data())); + + // TODO(xiao.dong) magic buffer, can we get a estimated size from compressed data? + std::vector outBuffer(32 * 1024); + std::string result; + int ret = 0; + do { + outBuffer.resize(outBuffer.size()); + stream_.avail_out = static_cast(outBuffer.size()); + stream_.next_out = reinterpret_cast(outBuffer.data()); + ret = inflate(&stream_, Z_NO_FLUSH); + if (ret != Z_OK && ret != Z_STREAM_END) { + return IOError("inflate failed, result:{}", ret); + } + result.append(outBuffer.data(), outBuffer.size() - stream_.avail_out); + } while (ret != Z_STREAM_END); + return result; } - gzclose(file); - return result; -} + private: + bool initialized_ = false; + z_stream stream_; +}; Result> TableMetadataUtil::Read( FileIO& io, const std::string& location, std::optional length) { ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location)); - std::string content; + + ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length)); if (codec_type == MetadataFileCodecType::kGzip) { - ICEBERG_ASSIGN_OR_RAISE(content, DecompressGZIPFile(location)); - } else { - ICEBERG_ASSIGN_OR_RAISE(content, io.ReadFile(location, length)); + auto gzip_decompressor = std::make_unique(); + ICEBERG_RETURN_UNEXPECTED(gzip_decompressor->Init()); + auto result = gzip_decompressor->Decompress(content); + ICEBERG_RETURN_UNEXPECTED(result); + content = result.value(); } ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content)); From ed1e682fa346b847ca79916eafc3cb983e05b99d Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Mon, 26 May 2025 11:06:21 +0800 Subject: [PATCH 5/8] add individual decompressor and test --- src/iceberg/CMakeLists.txt | 3 +- src/iceberg/result.h | 2 + src/iceberg/table_metadata.cc | 54 +----------------- src/iceberg/util/gzip_internal.cc | 93 +++++++++++++++++++++++++++++++ src/iceberg/util/gzip_internal.h | 45 +++++++++++++++ test/CMakeLists.txt | 2 +- test/gzip_decompress_test.cc | 83 +++++++++++++++++++++++++++ 7 files changed, 227 insertions(+), 55 deletions(-) create mode 100644 src/iceberg/util/gzip_internal.cc create mode 100644 src/iceberg/util/gzip_internal.h create mode 100644 test/gzip_decompress_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 90a9bf350..920db2590 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -42,7 +42,8 @@ set(ICEBERG_SOURCES type.cc util/murmurhash3_internal.cc util/timepoint.cc - util/unreachable.cc) + util/unreachable.cc + util/gzip_internal.cc) set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS) set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 38d9e381f..f1af0ae38 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -38,6 +38,7 @@ enum class ErrorKind { kJsonParseError, kNoSuchNamespace, kNoSuchTable, + kDecompressError, kNotFound, kNotImplemented, kNotSupported, @@ -80,6 +81,7 @@ DEFINE_ERROR_FUNCTION(IOError) DEFINE_ERROR_FUNCTION(JsonParseError) DEFINE_ERROR_FUNCTION(NoSuchNamespace) DEFINE_ERROR_FUNCTION(NoSuchTable) +DEFINE_ERROR_FUNCTION(DecompressError) DEFINE_ERROR_FUNCTION(NotFound) DEFINE_ERROR_FUNCTION(NotImplemented) DEFINE_ERROR_FUNCTION(NotSupported) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index db900aac9..4e112fd21 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -19,8 +19,6 @@ #include "iceberg/table_metadata.h" -#include - #include #include @@ -33,6 +31,7 @@ #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/sort_order.h" +#include "iceberg/util/gzip_internal.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -154,57 +153,6 @@ Result TableMetadataUtil::CodecFromFileName( return MetadataFileCodecType::kNone; } -class GZipDecompressor { - public: - GZipDecompressor() { memset(&stream_, 0, sizeof(stream_)); } - - ~GZipDecompressor() { - if (initialized_) { - inflateEnd(&stream_); - } - } - - Status Init() { - int ret = inflateInit2(&stream_, 15 + 32); - if (ret != Z_OK) { - return IOError("inflateInit2 failed, result:{}", ret); - } - initialized_ = true; - return {}; - } - - Result Decompress(const std::string& compressed_data) { - if (compressed_data.empty()) { - return {}; - } - if (!initialized_) { - ICEBERG_RETURN_UNEXPECTED(Init()); - } - stream_.avail_in = static_cast(compressed_data.size()); - stream_.next_in = reinterpret_cast(const_cast(compressed_data.data())); - - // TODO(xiao.dong) magic buffer, can we get a estimated size from compressed data? - std::vector outBuffer(32 * 1024); - std::string result; - int ret = 0; - do { - outBuffer.resize(outBuffer.size()); - stream_.avail_out = static_cast(outBuffer.size()); - stream_.next_out = reinterpret_cast(outBuffer.data()); - ret = inflate(&stream_, Z_NO_FLUSH); - if (ret != Z_OK && ret != Z_STREAM_END) { - return IOError("inflate failed, result:{}", ret); - } - result.append(outBuffer.data(), outBuffer.size() - stream_.avail_out); - } while (ret != Z_STREAM_END); - return result; - } - - private: - bool initialized_ = false; - z_stream stream_; -}; - Result> TableMetadataUtil::Read( FileIO& io, const std::string& location, std::optional length) { ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location)); diff --git a/src/iceberg/util/gzip_internal.cc b/src/iceberg/util/gzip_internal.cc new file mode 100644 index 000000000..40530d77a --- /dev/null +++ b/src/iceberg/util/gzip_internal.cc @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/gzip_internal.h" + +#include + +#include "iceberg/util/macros.h" + +namespace iceberg { + +class ZlibImpl { + public: + ZlibImpl() { memset(&stream_, 0, sizeof(stream_)); } + + ~ZlibImpl() { + if (initialized_) { + inflateEnd(&stream_); + } + } + + Status Init() { + // Maximum window size + static int WINDOW_BITS = 15; + // Determine if this is libz or gzip from header. + static int DETECT_CODEC = 32; + + int ret = inflateInit2(&stream_, WINDOW_BITS | DETECT_CODEC); + if (ret != Z_OK) { + return DecompressError("inflateInit2 failed, result:{}", ret); + } + initialized_ = true; + return {}; + } + + Result Decompress(const std::string& compressed_data) { + if (compressed_data.empty()) { + return {}; + } + if (!initialized_) { + ICEBERG_RETURN_UNEXPECTED(Init()); + } + stream_.avail_in = static_cast(compressed_data.size()); + stream_.next_in = reinterpret_cast(const_cast(compressed_data.data())); + + // TODO(xiao.dong) magic buffer, can we get a estimated size from compressed data? + std::vector out_buffer(32 * 1024); + std::string result; + int ret = 0; + do { + stream_.avail_out = static_cast(out_buffer.size()); + stream_.next_out = reinterpret_cast(out_buffer.data()); + ret = inflate(&stream_, Z_NO_FLUSH); + if (ret != Z_OK && ret != Z_STREAM_END) { + return DecompressError("inflate failed, result:{}", ret); + } + result.append(out_buffer.data(), out_buffer.size() - stream_.avail_out); + } while (ret != Z_STREAM_END); + return result; + } + + private: + bool initialized_ = false; + z_stream stream_; +}; + +GZipDecompressor::GZipDecompressor() : zlib_impl_(std::make_shared()) {} + +GZipDecompressor::~GZipDecompressor() = default; + +Status GZipDecompressor::Init() { return zlib_impl_->Init(); } + +Result GZipDecompressor::Decompress(const std::string& compressed_data) { + return zlib_impl_->Decompress(compressed_data); +} + +} // namespace iceberg diff --git a/src/iceberg/util/gzip_internal.h b/src/iceberg/util/gzip_internal.h new file mode 100644 index 000000000..a97d92e83 --- /dev/null +++ b/src/iceberg/util/gzip_internal.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/result.h" + +namespace iceberg { + +class ZlibImpl; + +class GZipDecompressor { + public: + GZipDecompressor(); + + ~GZipDecompressor(); + + Status Init(); + + Result Decompress(const std::string& compressed_data); + + private: + std::shared_ptr zlib_impl_; +}; + +} // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 4a88229f5..65422ef4c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -72,7 +72,7 @@ if(ICEBERG_BUILD_BUNDLE) add_executable(arrow_test) target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc - metadata_io_test.cc) + metadata_io_test.cc gzip_decompress_test.cc) target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) add_test(NAME arrow_test COMMAND arrow_test) diff --git a/test/gzip_decompress_test.cc b/test/gzip_decompress_test.cc new file mode 100644 index 000000000..5edb36619 --- /dev/null +++ b/test/gzip_decompress_test.cc @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io.h" +#include "iceberg/file_io.h" +#include "iceberg/util/gzip_internal.h" +#include "matchers.h" +#include "temp_file_test_base.h" + +namespace iceberg { + +class GZipTest : public TempFileTestBase { + protected: + void SetUp() override { + TempFileTestBase::SetUp(); + io_ = std::make_shared( + std::make_shared<::arrow::fs::LocalFileSystem>()); + temp_filepath_ = CreateNewTempFilePathWithSuffix("test.gz"); + } + + std::shared_ptr io_; + std::string temp_filepath_; +}; + +TEST_F(GZipTest, GZipDecompressedString) { + auto test_string = GenerateRandomString(1024); + +#define ARROW_ASSIGN_OR_THROW(var, expr) \ + { \ + auto result = expr; \ + ASSERT_TRUE(result.ok()); \ + var = std::move(result.ValueUnsafe()); \ + } + std::shared_ptr<::arrow::io::FileOutputStream> out_stream; + ARROW_ASSIGN_OR_THROW(out_stream, ::arrow::io::FileOutputStream::Open(temp_filepath_)); + + std::shared_ptr<::arrow::util::Codec> gzip_codec; + ARROW_ASSIGN_OR_THROW(gzip_codec, + ::arrow::util::Codec::Create(::arrow::Compression::GZIP)); + + std::shared_ptr<::arrow::io::OutputStream> compressed_stream; + ARROW_ASSIGN_OR_THROW(compressed_stream, ::arrow::io::CompressedOutputStream::Make( + gzip_codec.get(), out_stream)); +#undef ARROW_ASSIGN_OR_THROW + + ASSERT_TRUE(compressed_stream->Write(test_string).ok()); + ASSERT_TRUE(compressed_stream->Flush().ok()); + ASSERT_TRUE(compressed_stream->Close().ok()); + + auto result = io_->ReadFile(temp_filepath_, test_string.size()); + EXPECT_THAT(result, IsOk()); + + auto gzip_decompressor = std::make_unique(); + EXPECT_THAT(gzip_decompressor->Init(), IsOk()); + EXPECT_THAT(result, IsOk()); + auto read_data = gzip_decompressor->Decompress(result.value()); + EXPECT_THAT(read_data, IsOk()); + ASSERT_EQ(read_data.value(), test_string); +} + +} // namespace iceberg From 643b1153ecf37f1a6946ddd31e0d45a3f154e51c Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Mon, 26 May 2025 11:12:01 +0800 Subject: [PATCH 6/8] fix header include --- src/iceberg/util/gzip_internal.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/iceberg/util/gzip_internal.cc b/src/iceberg/util/gzip_internal.cc index 40530d77a..0e03a4f0e 100644 --- a/src/iceberg/util/gzip_internal.cc +++ b/src/iceberg/util/gzip_internal.cc @@ -21,6 +21,8 @@ #include +#include + #include "iceberg/util/macros.h" namespace iceberg { From 0c1094bcf12b2a7f2a80cf70e5d2e62154a81995 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Mon, 26 May 2025 18:13:04 +0800 Subject: [PATCH 7/8] fix comments --- src/iceberg/util/gzip_internal.cc | 9 ++-- src/iceberg/util/gzip_internal.h | 2 +- test/metadata_io_test.cc | 79 ++++++++++++------------------- 3 files changed, 36 insertions(+), 54 deletions(-) diff --git a/src/iceberg/util/gzip_internal.cc b/src/iceberg/util/gzip_internal.cc index 0e03a4f0e..f9dceab83 100644 --- a/src/iceberg/util/gzip_internal.cc +++ b/src/iceberg/util/gzip_internal.cc @@ -39,11 +39,10 @@ class ZlibImpl { Status Init() { // Maximum window size - static int WINDOW_BITS = 15; + static int kWindowBits = 15; // Determine if this is libz or gzip from header. - static int DETECT_CODEC = 32; - - int ret = inflateInit2(&stream_, WINDOW_BITS | DETECT_CODEC); + static int kDetectCodec = 32; + int ret = inflateInit2(&stream_, kWindowBits | kDetectCodec); if (ret != Z_OK) { return DecompressError("inflateInit2 failed, result:{}", ret); } @@ -82,7 +81,7 @@ class ZlibImpl { z_stream stream_; }; -GZipDecompressor::GZipDecompressor() : zlib_impl_(std::make_shared()) {} +GZipDecompressor::GZipDecompressor() : zlib_impl_(std::make_unique()) {} GZipDecompressor::~GZipDecompressor() = default; diff --git a/src/iceberg/util/gzip_internal.h b/src/iceberg/util/gzip_internal.h index a97d92e83..61cdc00b3 100644 --- a/src/iceberg/util/gzip_internal.h +++ b/src/iceberg/util/gzip_internal.h @@ -39,7 +39,7 @@ class GZipDecompressor { Result Decompress(const std::string& compressed_data); private: - std::shared_ptr zlib_impl_; + std::unique_ptr zlib_impl_; }; } // namespace iceberg diff --git a/test/metadata_io_test.cc b/test/metadata_io_test.cc index 8f4f31d0d..7d987e25b 100644 --- a/test/metadata_io_test.cc +++ b/test/metadata_io_test.cc @@ -44,35 +44,41 @@ class MetadataIOTest : public TempFileTestBase { temp_filepath_ = CreateNewTempFilePathWithSuffix(".metadata.json"); } + TableMetadata PrepareMetadata() { + std::vector schema_fields; + schema_fields.emplace_back(/*field_id=*/1, "x", std::make_shared(), + /*optional=*/false); + auto schema = std::make_shared(std::move(schema_fields), /*schema_id=*/1); + + TableMetadata metadata{ + .format_version = 1, + .table_uuid = "1234567890", + .location = "s3://bucket/path", + .last_sequence_number = 0, + .schemas = {schema}, + .current_schema_id = 1, + .default_spec_id = 0, + .last_partition_id = 0, + .properties = {{"key", "value"}}, + .current_snapshot_id = 3051729675574597004, + .snapshots = {std::make_shared(Snapshot{ + .snapshot_id = 3051729675574597004, + .sequence_number = 0, + .timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(), + .manifest_list = "s3://a/b/1.avro", + .summary = {{"operation", "append"}}, + })}, + .default_sort_order_id = 0, + .next_row_id = 0}; + return metadata; + } + std::shared_ptr io_; std::string temp_filepath_; }; TEST_F(MetadataIOTest, ReadWriteMetadata) { - std::vector schema_fields; - schema_fields.emplace_back(/*field_id=*/1, "x", std::make_shared(), - /*optional=*/false); - auto schema = std::make_shared(std::move(schema_fields), /*schema_id=*/1); - - TableMetadata metadata{.format_version = 1, - .table_uuid = "1234567890", - .location = "s3://bucket/path", - .last_sequence_number = 0, - .schemas = {schema}, - .current_schema_id = 1, - .default_spec_id = 0, - .last_partition_id = 0, - .properties = {{"key", "value"}}, - .current_snapshot_id = 3051729675574597004, - .snapshots = {std::make_shared(Snapshot{ - .snapshot_id = 3051729675574597004, - .sequence_number = 0, - .timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(), - .manifest_list = "s3://a/b/1.avro", - .summary = {{"operation", "append"}}, - })}, - .default_sort_order_id = 0, - .next_row_id = 0}; + TableMetadata metadata = PrepareMetadata(); EXPECT_THAT(TableMetadataUtil::Write(*io_, temp_filepath_, metadata), IsOk()); @@ -84,30 +90,7 @@ TEST_F(MetadataIOTest, ReadWriteMetadata) { } TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) { - std::vector schema_fields; - schema_fields.emplace_back(/*field_id=*/1, "x", std::make_shared(), - /*optional=*/false); - auto schema = std::make_shared(std::move(schema_fields), /*schema_id=*/1); - - TableMetadata metadata{.format_version = 1, - .table_uuid = "1234567890", - .location = "s3://bucket/path", - .last_sequence_number = 0, - .schemas = {schema}, - .current_schema_id = 1, - .default_spec_id = 0, - .last_partition_id = 0, - .properties = {{"key", "value"}}, - .current_snapshot_id = 3051729675574597004, - .snapshots = {std::make_shared(Snapshot{ - .snapshot_id = 3051729675574597004, - .sequence_number = 0, - .timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(), - .manifest_list = "s3://a/b/1.avro", - .summary = {{"operation", "append"}}, - })}, - .default_sort_order_id = 0, - .next_row_id = 0}; + TableMetadata metadata = PrepareMetadata(); auto json = ToJson(metadata); auto ret = ToJsonString(json); From 220ef62ad5c032d9d1c34e668dc933d20a9a4a12 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 27 May 2025 10:31:47 +0800 Subject: [PATCH 8/8] fix format --- src/iceberg/result.h | 4 ++-- src/iceberg/util/gzip_internal.cc | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/iceberg/result.h b/src/iceberg/result.h index f1af0ae38..8ca0d79c5 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -31,6 +31,7 @@ namespace iceberg { enum class ErrorKind { kAlreadyExists, kCommitStateUnknown, + kDecompressError, kInvalidArgument, kInvalidExpression, kInvalidSchema, @@ -38,7 +39,6 @@ enum class ErrorKind { kJsonParseError, kNoSuchNamespace, kNoSuchTable, - kDecompressError, kNotFound, kNotImplemented, kNotSupported, @@ -74,6 +74,7 @@ using Status = Result; DEFINE_ERROR_FUNCTION(AlreadyExists) DEFINE_ERROR_FUNCTION(CommitStateUnknown) +DEFINE_ERROR_FUNCTION(DecompressError) DEFINE_ERROR_FUNCTION(InvalidArgument) DEFINE_ERROR_FUNCTION(InvalidExpression) DEFINE_ERROR_FUNCTION(InvalidSchema) @@ -81,7 +82,6 @@ DEFINE_ERROR_FUNCTION(IOError) DEFINE_ERROR_FUNCTION(JsonParseError) DEFINE_ERROR_FUNCTION(NoSuchNamespace) DEFINE_ERROR_FUNCTION(NoSuchTable) -DEFINE_ERROR_FUNCTION(DecompressError) DEFINE_ERROR_FUNCTION(NotFound) DEFINE_ERROR_FUNCTION(NotImplemented) DEFINE_ERROR_FUNCTION(NotSupported) diff --git a/src/iceberg/util/gzip_internal.cc b/src/iceberg/util/gzip_internal.cc index f9dceab83..ab38bb00f 100644 --- a/src/iceberg/util/gzip_internal.cc +++ b/src/iceberg/util/gzip_internal.cc @@ -39,9 +39,9 @@ class ZlibImpl { Status Init() { // Maximum window size - static int kWindowBits = 15; + constexpr int kWindowBits = 15; // Determine if this is libz or gzip from header. - static int kDetectCodec = 32; + constexpr int kDetectCodec = 32; int ret = inflateInit2(&stream_, kWindowBits | kDetectCodec); if (ret != Z_OK) { return DecompressError("inflateInit2 failed, result:{}", ret);