Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,16 @@ set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS)
set(ICEBERG_STATIC_INSTALL_INTERFACE_LIBS)
set(ICEBERG_SHARED_INSTALL_INTERFACE_LIBS)

list(APPEND ICEBERG_STATIC_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow
nlohmann_json::nlohmann_json)
list(APPEND ICEBERG_SHARED_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow
nlohmann_json::nlohmann_json)
list(APPEND
ICEBERG_STATIC_BUILD_INTERFACE_LIBS
nanoarrow::nanoarrow
nlohmann_json::nlohmann_json
ZLIB::ZLIB)
list(APPEND
ICEBERG_SHARED_BUILD_INTERFACE_LIBS
nanoarrow::nanoarrow
nlohmann_json::nlohmann_json
ZLIB::ZLIB)
list(APPEND ICEBERG_STATIC_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow"
"Iceberg::nlohmann_json")
list(APPEND ICEBERG_SHARED_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow"
Expand Down
63 changes: 60 additions & 3 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

#include "iceberg/table_metadata.h"

#include <zlib.h>

#include <format>
#include <ranges>
#include <string>

#include <nlohmann/json.hpp>
Expand Down Expand Up @@ -153,14 +154,70 @@ Result<MetadataFileCodecType> TableMetadataUtil::CodecFromFileName(
return MetadataFileCodecType::kNone;
}

class GZipDecompressor {
public:
GZipDecompressor() { memset(&stream_, 0, sizeof(stream_)); }

~GZipDecompressor() {
if (initialized_) {
inflateEnd(&stream_);
}
}

Status Init() {
int ret = inflateInit2(&stream_, 15 + 32);
if (ret != Z_OK) {
return IOError("inflateInit2 failed, result:{}", ret);
}
initialized_ = true;
return {};
}

Result<std::string> Decompress(const std::string& compressed_data) {
if (compressed_data.empty()) {
return {};
}
if (!initialized_) {
ICEBERG_RETURN_UNEXPECTED(Init());
}
stream_.avail_in = static_cast<uInt>(compressed_data.size());
stream_.next_in = reinterpret_cast<Bytef*>(const_cast<char*>(compressed_data.data()));

// TODO(xiao.dong) magic buffer, can we get a estimated size from compressed data?
std::vector<char> outBuffer(32 * 1024);
std::string result;
int ret = 0;
do {
outBuffer.resize(outBuffer.size());
stream_.avail_out = static_cast<uInt>(outBuffer.size());
stream_.next_out = reinterpret_cast<Bytef*>(outBuffer.data());
ret = inflate(&stream_, Z_NO_FLUSH);
if (ret != Z_OK && ret != Z_STREAM_END) {
return IOError("inflate failed, result:{}", ret);
}
result.append(outBuffer.data(), outBuffer.size() - stream_.avail_out);
} while (ret != Z_STREAM_END);
return result;
}

private:
bool initialized_ = false;
z_stream stream_;
};

Result<std::unique_ptr<TableMetadata>> TableMetadataUtil::Read(
FileIO& io, const std::string& location, std::optional<size_t> length) {
ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location));

ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
if (codec_type == MetadataFileCodecType::kGzip) {
return NotImplemented("Reading gzip-compressed metadata files is not supported yet");
auto gzip_decompressor = std::make_unique<GZipDecompressor>();
ICEBERG_RETURN_UNEXPECTED(gzip_decompressor->Init());
auto result = gzip_decompressor->Decompress(content);
ICEBERG_RETURN_UNEXPECTED(result);
content = result.value();
}

ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content));
return TableMetadataFromJson(json);
}
Expand Down
67 changes: 67 additions & 0 deletions test/metadata_io_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
*/

#include <arrow/filesystem/localfs.h>
#include <arrow/io/compressed.h>
#include <arrow/io/file.h>
#include <arrow/util/compression.h>
#include <gtest/gtest.h>
#include <nlohmann/json.hpp>

#include "iceberg/arrow/arrow_fs_file_io.h"
#include "iceberg/file_io.h"
#include "iceberg/json_internal.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
Expand Down Expand Up @@ -78,4 +83,66 @@ TEST_F(MetadataIOTest, ReadWriteMetadata) {
EXPECT_EQ(*metadata_read, metadata);
}

TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) {
std::vector<SchemaField> schema_fields;
schema_fields.emplace_back(/*field_id=*/1, "x", std::make_shared<LongType>(),
/*optional=*/false);
auto schema = std::make_shared<Schema>(std::move(schema_fields), /*schema_id=*/1);

TableMetadata metadata{.format_version = 1,
.table_uuid = "1234567890",
.location = "s3://bucket/path",
.last_sequence_number = 0,
.schemas = {schema},
.current_schema_id = 1,
.default_spec_id = 0,
.last_partition_id = 0,
.properties = {{"key", "value"}},
.current_snapshot_id = 3051729675574597004,
.snapshots = {std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 3051729675574597004,
.sequence_number = 0,
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
.manifest_list = "s3://a/b/1.avro",
.summary = {{"operation", "append"}},
})},
.default_sort_order_id = 0,
.next_row_id = 0};

auto json = ToJson(metadata);
auto ret = ToJsonString(json);
ASSERT_TRUE(ret.has_value());
auto json_string = ret.value();

#define ARROW_ASSIGN_OR_THROW(var, expr) \
{ \
auto result = expr; \
ASSERT_TRUE(result.ok()); \
var = std::move(result.ValueUnsafe()); \
}

auto file_path = CreateNewTempFilePathWithSuffix(".metadata.json.gz");
std::shared_ptr<::arrow::io::FileOutputStream> out_stream;
ARROW_ASSIGN_OR_THROW(out_stream, ::arrow::io::FileOutputStream::Open(file_path));

std::shared_ptr<::arrow::util::Codec> gzip_codec;
ARROW_ASSIGN_OR_THROW(gzip_codec,
::arrow::util::Codec::Create(::arrow::Compression::GZIP));

std::shared_ptr<::arrow::io::OutputStream> compressed_stream;
ARROW_ASSIGN_OR_THROW(compressed_stream, ::arrow::io::CompressedOutputStream::Make(
gzip_codec.get(), out_stream));
#undef ARROW_ASSIGN_OR_THROW

ASSERT_TRUE(compressed_stream->Write(json_string).ok());
ASSERT_TRUE(compressed_stream->Flush().ok());
ASSERT_TRUE(compressed_stream->Close().ok());

auto result = TableMetadataUtil::Read(*io_, file_path);
EXPECT_THAT(result, IsOk());

auto metadata_read = std::move(result.value());
EXPECT_EQ(*metadata_read, metadata);
}

} // namespace iceberg
Loading