Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,24 @@ set(ICEBERG_SOURCES
type.cc
util/murmurhash3_internal.cc
util/timepoint.cc
util/unreachable.cc)
util/unreachable.cc
util/gzip_internal.cc)

set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS)
set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS)
set(ICEBERG_STATIC_INSTALL_INTERFACE_LIBS)
set(ICEBERG_SHARED_INSTALL_INTERFACE_LIBS)

list(APPEND ICEBERG_STATIC_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow
nlohmann_json::nlohmann_json)
list(APPEND ICEBERG_SHARED_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow
nlohmann_json::nlohmann_json)
list(APPEND
ICEBERG_STATIC_BUILD_INTERFACE_LIBS
nanoarrow::nanoarrow
nlohmann_json::nlohmann_json
ZLIB::ZLIB)
list(APPEND
ICEBERG_SHARED_BUILD_INTERFACE_LIBS
nanoarrow::nanoarrow
nlohmann_json::nlohmann_json
ZLIB::ZLIB)
list(APPEND ICEBERG_STATIC_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow"
"Iceberg::nlohmann_json")
list(APPEND ICEBERG_SHARED_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow"
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ enum class ErrorKind {
kJsonParseError,
kNoSuchNamespace,
kNoSuchTable,
kDecompressError,
kNotFound,
kNotImplemented,
kNotSupported,
Expand Down Expand Up @@ -80,6 +81,7 @@ DEFINE_ERROR_FUNCTION(IOError)
DEFINE_ERROR_FUNCTION(JsonParseError)
DEFINE_ERROR_FUNCTION(NoSuchNamespace)
DEFINE_ERROR_FUNCTION(NoSuchTable)
DEFINE_ERROR_FUNCTION(DecompressError)
DEFINE_ERROR_FUNCTION(NotFound)
DEFINE_ERROR_FUNCTION(NotImplemented)
DEFINE_ERROR_FUNCTION(NotSupported)
Expand Down
11 changes: 8 additions & 3 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "iceberg/table_metadata.h"

#include <format>
#include <ranges>
#include <string>

#include <nlohmann/json.hpp>
Expand All @@ -32,6 +31,7 @@
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/sort_order.h"
#include "iceberg/util/gzip_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -156,11 +156,16 @@ Result<MetadataFileCodecType> TableMetadataUtil::CodecFromFileName(
Result<std::unique_ptr<TableMetadata>> TableMetadataUtil::Read(
FileIO& io, const std::string& location, std::optional<size_t> length) {
ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location));

ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
if (codec_type == MetadataFileCodecType::kGzip) {
return NotImplemented("Reading gzip-compressed metadata files is not supported yet");
auto gzip_decompressor = std::make_unique<GZipDecompressor>();
ICEBERG_RETURN_UNEXPECTED(gzip_decompressor->Init());
auto result = gzip_decompressor->Decompress(content);
ICEBERG_RETURN_UNEXPECTED(result);
content = result.value();
}

ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content));
return TableMetadataFromJson(json);
}
Expand Down
94 changes: 94 additions & 0 deletions src/iceberg/util/gzip_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/util/gzip_internal.h"

#include <zlib.h>

#include <cstring>

#include "iceberg/util/macros.h"

namespace iceberg {

class ZlibImpl {
public:
ZlibImpl() { memset(&stream_, 0, sizeof(stream_)); }

~ZlibImpl() {
if (initialized_) {
inflateEnd(&stream_);
}
}

Status Init() {
// Maximum window size
static int kWindowBits = 15;
// Determine if this is libz or gzip from header.
static int kDetectCodec = 32;
int ret = inflateInit2(&stream_, kWindowBits | kDetectCodec);
if (ret != Z_OK) {
return DecompressError("inflateInit2 failed, result:{}", ret);
}
initialized_ = true;
return {};
}

Result<std::string> Decompress(const std::string& compressed_data) {
if (compressed_data.empty()) {
return {};
}
if (!initialized_) {
ICEBERG_RETURN_UNEXPECTED(Init());
}
stream_.avail_in = static_cast<uInt>(compressed_data.size());
stream_.next_in = reinterpret_cast<Bytef*>(const_cast<char*>(compressed_data.data()));

// TODO(xiao.dong) magic buffer, can we get a estimated size from compressed data?
std::vector<char> out_buffer(32 * 1024);
std::string result;
int ret = 0;
do {
stream_.avail_out = static_cast<uInt>(out_buffer.size());
stream_.next_out = reinterpret_cast<Bytef*>(out_buffer.data());
ret = inflate(&stream_, Z_NO_FLUSH);
if (ret != Z_OK && ret != Z_STREAM_END) {
return DecompressError("inflate failed, result:{}", ret);
}
result.append(out_buffer.data(), out_buffer.size() - stream_.avail_out);
} while (ret != Z_STREAM_END);
return result;
}

private:
bool initialized_ = false;
z_stream stream_;
};

GZipDecompressor::GZipDecompressor() : zlib_impl_(std::make_unique<ZlibImpl>()) {}

GZipDecompressor::~GZipDecompressor() = default;

Status GZipDecompressor::Init() { return zlib_impl_->Init(); }

Result<std::string> GZipDecompressor::Decompress(const std::string& compressed_data) {
return zlib_impl_->Decompress(compressed_data);
}

} // namespace iceberg
45 changes: 45 additions & 0 deletions src/iceberg/util/gzip_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <memory>
#include <string>

#include "iceberg/result.h"

namespace iceberg {

class ZlibImpl;

class GZipDecompressor {
public:
GZipDecompressor();

~GZipDecompressor();

Status Init();

Result<std::string> Decompress(const std::string& compressed_data);

private:
std::unique_ptr<ZlibImpl> zlib_impl_;
};

} // namespace iceberg
2 changes: 1 addition & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ if(ICEBERG_BUILD_BUNDLE)

add_executable(arrow_test)
target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc
metadata_io_test.cc)
metadata_io_test.cc gzip_decompress_test.cc)
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
GTest::gmock)
add_test(NAME arrow_test COMMAND arrow_test)
Expand Down
83 changes: 83 additions & 0 deletions test/gzip_decompress_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <arrow/filesystem/localfs.h>
#include <arrow/io/compressed.h>
#include <arrow/io/file.h>
#include <arrow/util/compression.h>
#include <gtest/gtest.h>

#include "iceberg/arrow/arrow_fs_file_io.h"
#include "iceberg/file_io.h"
#include "iceberg/util/gzip_internal.h"
#include "matchers.h"
#include "temp_file_test_base.h"

namespace iceberg {

class GZipTest : public TempFileTestBase {
protected:
void SetUp() override {
TempFileTestBase::SetUp();
io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
std::make_shared<::arrow::fs::LocalFileSystem>());
temp_filepath_ = CreateNewTempFilePathWithSuffix("test.gz");
}

std::shared_ptr<iceberg::FileIO> io_;
std::string temp_filepath_;
};

TEST_F(GZipTest, GZipDecompressedString) {
auto test_string = GenerateRandomString(1024);

#define ARROW_ASSIGN_OR_THROW(var, expr) \
{ \
auto result = expr; \
ASSERT_TRUE(result.ok()); \
var = std::move(result.ValueUnsafe()); \
}
std::shared_ptr<::arrow::io::FileOutputStream> out_stream;
ARROW_ASSIGN_OR_THROW(out_stream, ::arrow::io::FileOutputStream::Open(temp_filepath_));

std::shared_ptr<::arrow::util::Codec> gzip_codec;
ARROW_ASSIGN_OR_THROW(gzip_codec,
::arrow::util::Codec::Create(::arrow::Compression::GZIP));

std::shared_ptr<::arrow::io::OutputStream> compressed_stream;
ARROW_ASSIGN_OR_THROW(compressed_stream, ::arrow::io::CompressedOutputStream::Make(
gzip_codec.get(), out_stream));
#undef ARROW_ASSIGN_OR_THROW

ASSERT_TRUE(compressed_stream->Write(test_string).ok());
ASSERT_TRUE(compressed_stream->Flush().ok());
ASSERT_TRUE(compressed_stream->Close().ok());

auto result = io_->ReadFile(temp_filepath_, test_string.size());
EXPECT_THAT(result, IsOk());

auto gzip_decompressor = std::make_unique<GZipDecompressor>();
EXPECT_THAT(gzip_decompressor->Init(), IsOk());
EXPECT_THAT(result, IsOk());
auto read_data = gzip_decompressor->Decompress(result.value());
EXPECT_THAT(read_data, IsOk());
ASSERT_EQ(read_data.value(), test_string);
}

} // namespace iceberg
Loading
Loading