diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 44b5003e7..920db2590 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -42,17 +42,24 @@ set(ICEBERG_SOURCES type.cc util/murmurhash3_internal.cc util/timepoint.cc - util/unreachable.cc) + util/unreachable.cc + util/gzip_internal.cc) set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS) set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS) set(ICEBERG_STATIC_INSTALL_INTERFACE_LIBS) set(ICEBERG_SHARED_INSTALL_INTERFACE_LIBS) -list(APPEND ICEBERG_STATIC_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow - nlohmann_json::nlohmann_json) -list(APPEND ICEBERG_SHARED_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow - nlohmann_json::nlohmann_json) +list(APPEND + ICEBERG_STATIC_BUILD_INTERFACE_LIBS + nanoarrow::nanoarrow + nlohmann_json::nlohmann_json + ZLIB::ZLIB) +list(APPEND + ICEBERG_SHARED_BUILD_INTERFACE_LIBS + nanoarrow::nanoarrow + nlohmann_json::nlohmann_json + ZLIB::ZLIB) list(APPEND ICEBERG_STATIC_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow" "Iceberg::nlohmann_json") list(APPEND ICEBERG_SHARED_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow" diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 38d9e381f..8ca0d79c5 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -31,6 +31,7 @@ namespace iceberg { enum class ErrorKind { kAlreadyExists, kCommitStateUnknown, + kDecompressError, kInvalidArgument, kInvalidExpression, kInvalidSchema, @@ -73,6 +74,7 @@ using Status = Result; DEFINE_ERROR_FUNCTION(AlreadyExists) DEFINE_ERROR_FUNCTION(CommitStateUnknown) +DEFINE_ERROR_FUNCTION(DecompressError) DEFINE_ERROR_FUNCTION(InvalidArgument) DEFINE_ERROR_FUNCTION(InvalidExpression) DEFINE_ERROR_FUNCTION(InvalidSchema) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index c1ada81d9..4e112fd21 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -20,7 +20,6 @@ #include "iceberg/table_metadata.h" #include -#include #include #include @@ -32,6 +31,7 @@ #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/sort_order.h" +#include "iceberg/util/gzip_internal.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -156,11 +156,16 @@ Result TableMetadataUtil::CodecFromFileName( Result> TableMetadataUtil::Read( FileIO& io, const std::string& location, std::optional length) { ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location)); + + ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length)); if (codec_type == MetadataFileCodecType::kGzip) { - return NotImplemented("Reading gzip-compressed metadata files is not supported yet"); + auto gzip_decompressor = std::make_unique(); + ICEBERG_RETURN_UNEXPECTED(gzip_decompressor->Init()); + auto result = gzip_decompressor->Decompress(content); + ICEBERG_RETURN_UNEXPECTED(result); + content = result.value(); } - ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length)); ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content)); return TableMetadataFromJson(json); } diff --git a/src/iceberg/util/gzip_internal.cc b/src/iceberg/util/gzip_internal.cc new file mode 100644 index 000000000..ab38bb00f --- /dev/null +++ b/src/iceberg/util/gzip_internal.cc @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/gzip_internal.h" + +#include + +#include + +#include "iceberg/util/macros.h" + +namespace iceberg { + +class ZlibImpl { + public: + ZlibImpl() { memset(&stream_, 0, sizeof(stream_)); } + + ~ZlibImpl() { + if (initialized_) { + inflateEnd(&stream_); + } + } + + Status Init() { + // Maximum window size + constexpr int kWindowBits = 15; + // Determine if this is libz or gzip from header. + constexpr int kDetectCodec = 32; + int ret = inflateInit2(&stream_, kWindowBits | kDetectCodec); + if (ret != Z_OK) { + return DecompressError("inflateInit2 failed, result:{}", ret); + } + initialized_ = true; + return {}; + } + + Result Decompress(const std::string& compressed_data) { + if (compressed_data.empty()) { + return {}; + } + if (!initialized_) { + ICEBERG_RETURN_UNEXPECTED(Init()); + } + stream_.avail_in = static_cast(compressed_data.size()); + stream_.next_in = reinterpret_cast(const_cast(compressed_data.data())); + + // TODO(xiao.dong) magic buffer, can we get a estimated size from compressed data? + std::vector out_buffer(32 * 1024); + std::string result; + int ret = 0; + do { + stream_.avail_out = static_cast(out_buffer.size()); + stream_.next_out = reinterpret_cast(out_buffer.data()); + ret = inflate(&stream_, Z_NO_FLUSH); + if (ret != Z_OK && ret != Z_STREAM_END) { + return DecompressError("inflate failed, result:{}", ret); + } + result.append(out_buffer.data(), out_buffer.size() - stream_.avail_out); + } while (ret != Z_STREAM_END); + return result; + } + + private: + bool initialized_ = false; + z_stream stream_; +}; + +GZipDecompressor::GZipDecompressor() : zlib_impl_(std::make_unique()) {} + +GZipDecompressor::~GZipDecompressor() = default; + +Status GZipDecompressor::Init() { return zlib_impl_->Init(); } + +Result GZipDecompressor::Decompress(const std::string& compressed_data) { + return zlib_impl_->Decompress(compressed_data); +} + +} // namespace iceberg diff --git a/src/iceberg/util/gzip_internal.h b/src/iceberg/util/gzip_internal.h new file mode 100644 index 000000000..61cdc00b3 --- /dev/null +++ b/src/iceberg/util/gzip_internal.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/result.h" + +namespace iceberg { + +class ZlibImpl; + +class GZipDecompressor { + public: + GZipDecompressor(); + + ~GZipDecompressor(); + + Status Init(); + + Result Decompress(const std::string& compressed_data); + + private: + std::unique_ptr zlib_impl_; +}; + +} // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 4a88229f5..65422ef4c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -72,7 +72,7 @@ if(ICEBERG_BUILD_BUNDLE) add_executable(arrow_test) target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc - metadata_io_test.cc) + metadata_io_test.cc gzip_decompress_test.cc) target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) add_test(NAME arrow_test COMMAND arrow_test) diff --git a/test/gzip_decompress_test.cc b/test/gzip_decompress_test.cc new file mode 100644 index 000000000..5edb36619 --- /dev/null +++ b/test/gzip_decompress_test.cc @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io.h" +#include "iceberg/file_io.h" +#include "iceberg/util/gzip_internal.h" +#include "matchers.h" +#include "temp_file_test_base.h" + +namespace iceberg { + +class GZipTest : public TempFileTestBase { + protected: + void SetUp() override { + TempFileTestBase::SetUp(); + io_ = std::make_shared( + std::make_shared<::arrow::fs::LocalFileSystem>()); + temp_filepath_ = CreateNewTempFilePathWithSuffix("test.gz"); + } + + std::shared_ptr io_; + std::string temp_filepath_; +}; + +TEST_F(GZipTest, GZipDecompressedString) { + auto test_string = GenerateRandomString(1024); + +#define ARROW_ASSIGN_OR_THROW(var, expr) \ + { \ + auto result = expr; \ + ASSERT_TRUE(result.ok()); \ + var = std::move(result.ValueUnsafe()); \ + } + std::shared_ptr<::arrow::io::FileOutputStream> out_stream; + ARROW_ASSIGN_OR_THROW(out_stream, ::arrow::io::FileOutputStream::Open(temp_filepath_)); + + std::shared_ptr<::arrow::util::Codec> gzip_codec; + ARROW_ASSIGN_OR_THROW(gzip_codec, + ::arrow::util::Codec::Create(::arrow::Compression::GZIP)); + + std::shared_ptr<::arrow::io::OutputStream> compressed_stream; + ARROW_ASSIGN_OR_THROW(compressed_stream, ::arrow::io::CompressedOutputStream::Make( + gzip_codec.get(), out_stream)); +#undef ARROW_ASSIGN_OR_THROW + + ASSERT_TRUE(compressed_stream->Write(test_string).ok()); + ASSERT_TRUE(compressed_stream->Flush().ok()); + ASSERT_TRUE(compressed_stream->Close().ok()); + + auto result = io_->ReadFile(temp_filepath_, test_string.size()); + EXPECT_THAT(result, IsOk()); + + auto gzip_decompressor = std::make_unique(); + EXPECT_THAT(gzip_decompressor->Init(), IsOk()); + EXPECT_THAT(result, IsOk()); + auto read_data = gzip_decompressor->Decompress(result.value()); + EXPECT_THAT(read_data, IsOk()); + ASSERT_EQ(read_data.value(), test_string); +} + +} // namespace iceberg diff --git a/test/metadata_io_test.cc b/test/metadata_io_test.cc index cc98278b0..7d987e25b 100644 --- a/test/metadata_io_test.cc +++ b/test/metadata_io_test.cc @@ -18,10 +18,15 @@ */ #include +#include +#include +#include #include +#include #include "iceberg/arrow/arrow_fs_file_io.h" #include "iceberg/file_io.h" +#include "iceberg/json_internal.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" @@ -39,35 +44,41 @@ class MetadataIOTest : public TempFileTestBase { temp_filepath_ = CreateNewTempFilePathWithSuffix(".metadata.json"); } + TableMetadata PrepareMetadata() { + std::vector schema_fields; + schema_fields.emplace_back(/*field_id=*/1, "x", std::make_shared(), + /*optional=*/false); + auto schema = std::make_shared(std::move(schema_fields), /*schema_id=*/1); + + TableMetadata metadata{ + .format_version = 1, + .table_uuid = "1234567890", + .location = "s3://bucket/path", + .last_sequence_number = 0, + .schemas = {schema}, + .current_schema_id = 1, + .default_spec_id = 0, + .last_partition_id = 0, + .properties = {{"key", "value"}}, + .current_snapshot_id = 3051729675574597004, + .snapshots = {std::make_shared(Snapshot{ + .snapshot_id = 3051729675574597004, + .sequence_number = 0, + .timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(), + .manifest_list = "s3://a/b/1.avro", + .summary = {{"operation", "append"}}, + })}, + .default_sort_order_id = 0, + .next_row_id = 0}; + return metadata; + } + std::shared_ptr io_; std::string temp_filepath_; }; TEST_F(MetadataIOTest, ReadWriteMetadata) { - std::vector schema_fields; - schema_fields.emplace_back(/*field_id=*/1, "x", std::make_shared(), - /*optional=*/false); - auto schema = std::make_shared(std::move(schema_fields), /*schema_id=*/1); - - TableMetadata metadata{.format_version = 1, - .table_uuid = "1234567890", - .location = "s3://bucket/path", - .last_sequence_number = 0, - .schemas = {schema}, - .current_schema_id = 1, - .default_spec_id = 0, - .last_partition_id = 0, - .properties = {{"key", "value"}}, - .current_snapshot_id = 3051729675574597004, - .snapshots = {std::make_shared(Snapshot{ - .snapshot_id = 3051729675574597004, - .sequence_number = 0, - .timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(), - .manifest_list = "s3://a/b/1.avro", - .summary = {{"operation", "append"}}, - })}, - .default_sort_order_id = 0, - .next_row_id = 0}; + TableMetadata metadata = PrepareMetadata(); EXPECT_THAT(TableMetadataUtil::Write(*io_, temp_filepath_, metadata), IsOk()); @@ -78,4 +89,43 @@ TEST_F(MetadataIOTest, ReadWriteMetadata) { EXPECT_EQ(*metadata_read, metadata); } +TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) { + TableMetadata metadata = PrepareMetadata(); + + auto json = ToJson(metadata); + auto ret = ToJsonString(json); + ASSERT_TRUE(ret.has_value()); + auto json_string = ret.value(); + +#define ARROW_ASSIGN_OR_THROW(var, expr) \ + { \ + auto result = expr; \ + ASSERT_TRUE(result.ok()); \ + var = std::move(result.ValueUnsafe()); \ + } + + auto file_path = CreateNewTempFilePathWithSuffix(".metadata.json.gz"); + std::shared_ptr<::arrow::io::FileOutputStream> out_stream; + ARROW_ASSIGN_OR_THROW(out_stream, ::arrow::io::FileOutputStream::Open(file_path)); + + std::shared_ptr<::arrow::util::Codec> gzip_codec; + ARROW_ASSIGN_OR_THROW(gzip_codec, + ::arrow::util::Codec::Create(::arrow::Compression::GZIP)); + + std::shared_ptr<::arrow::io::OutputStream> compressed_stream; + ARROW_ASSIGN_OR_THROW(compressed_stream, ::arrow::io::CompressedOutputStream::Make( + gzip_codec.get(), out_stream)); +#undef ARROW_ASSIGN_OR_THROW + + ASSERT_TRUE(compressed_stream->Write(json_string).ok()); + ASSERT_TRUE(compressed_stream->Flush().ok()); + ASSERT_TRUE(compressed_stream->Close().ok()); + + auto result = TableMetadataUtil::Read(*io_, file_path); + EXPECT_THAT(result, IsOk()); + + auto metadata_read = std::move(result.value()); + EXPECT_EQ(*metadata_read, metadata); +} + } // namespace iceberg