Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,16 @@ set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS)
set(ICEBERG_STATIC_INSTALL_INTERFACE_LIBS)
set(ICEBERG_SHARED_INSTALL_INTERFACE_LIBS)

list(APPEND ICEBERG_STATIC_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow
nlohmann_json::nlohmann_json)
list(APPEND ICEBERG_SHARED_BUILD_INTERFACE_LIBS nanoarrow::nanoarrow
nlohmann_json::nlohmann_json)
list(APPEND
ICEBERG_STATIC_BUILD_INTERFACE_LIBS
nanoarrow::nanoarrow
nlohmann_json::nlohmann_json
ZLIB::ZLIB)
list(APPEND
ICEBERG_SHARED_BUILD_INTERFACE_LIBS
nanoarrow::nanoarrow
nlohmann_json::nlohmann_json
ZLIB::ZLIB)
list(APPEND ICEBERG_STATIC_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow"
"Iceberg::nlohmann_json")
list(APPEND ICEBERG_SHARED_INSTALL_INTERFACE_LIBS "Iceberg::nanoarrow"
Expand Down
36 changes: 33 additions & 3 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

#include "iceberg/table_metadata.h"

#include <zlib.h>

#include <array>
#include <format>
#include <ranges>
#include <string>

#include <nlohmann/json.hpp>
Expand Down Expand Up @@ -153,14 +155,42 @@ Result<MetadataFileCodecType> TableMetadataUtil::CodecFromFileName(
return MetadataFileCodecType::kNone;
}

Result<std::string> DecompressGZIPFile(const std::string& filepath) {
gzFile file = gzopen(filepath.c_str(), "rb");
if (!file) {
return IOError("Failed to open gzip file:{} ", filepath);
}

static const int CHUNK_SIZE = 32768; // 32KB chunks
std::array<char, CHUNK_SIZE> buffer;
std::string result;
int bytes_read;

while ((bytes_read = gzread(file, buffer.data(), CHUNK_SIZE)) > 0) {
result.append(buffer.data(), bytes_read);
}

int err;
const char* error_msg = gzerror(file, &err);
if (err != Z_OK) {
gzclose(file);
return IOError("Error during gzip decompression:{} ", std::string(error_msg));
}

gzclose(file);
return result;
}

Result<std::unique_ptr<TableMetadata>> TableMetadataUtil::Read(
FileIO& io, const std::string& location, std::optional<size_t> length) {
ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location));
std::string content;
if (codec_type == MetadataFileCodecType::kGzip) {
return NotImplemented("Reading gzip-compressed metadata files is not supported yet");
ICEBERG_ASSIGN_OR_RAISE(content, DecompressGZIPFile(location));
} else {
ICEBERG_ASSIGN_OR_RAISE(content, io.ReadFile(location, length));
}

ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content));
return TableMetadataFromJson(json);
}
Expand Down
67 changes: 67 additions & 0 deletions test/metadata_io_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
*/

#include <arrow/filesystem/localfs.h>
#include <arrow/io/compressed.h>
#include <arrow/io/file.h>
#include <arrow/util/compression.h>
#include <gtest/gtest.h>
#include <nlohmann/json.hpp>

#include "iceberg/arrow/arrow_fs_file_io.h"
#include "iceberg/file_io.h"
#include "iceberg/json_internal.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
Expand Down Expand Up @@ -78,4 +83,66 @@ TEST_F(MetadataIOTest, ReadWriteMetadata) {
EXPECT_EQ(*metadata_read, metadata);
}

TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) {
std::vector<SchemaField> schema_fields;
schema_fields.emplace_back(/*field_id=*/1, "x", std::make_shared<LongType>(),
/*optional=*/false);
auto schema = std::make_shared<Schema>(std::move(schema_fields), /*schema_id=*/1);

TableMetadata metadata{.format_version = 1,
.table_uuid = "1234567890",
.location = "s3://bucket/path",
.last_sequence_number = 0,
.schemas = {schema},
.current_schema_id = 1,
.default_spec_id = 0,
.last_partition_id = 0,
.properties = {{"key", "value"}},
.current_snapshot_id = 3051729675574597004,
.snapshots = {std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 3051729675574597004,
.sequence_number = 0,
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
.manifest_list = "s3://a/b/1.avro",
.summary = {{"operation", "append"}},
})},
.default_sort_order_id = 0,
.next_row_id = 0};

auto json = ToJson(metadata);
auto ret = ToJsonString(json);
ASSERT_TRUE(ret.has_value());
auto json_string = ret.value();

#define ARROW_ASSIGN_OR_THROW(var, expr) \
{ \
auto result = expr; \
ASSERT_TRUE(result.ok()); \
var = std::move(result.ValueUnsafe()); \
}

auto file_path = CreateNewTempFilePathWithSuffix(".metadata.json.gz");
std::shared_ptr<::arrow::io::FileOutputStream> out_stream;
ARROW_ASSIGN_OR_THROW(out_stream, ::arrow::io::FileOutputStream::Open(file_path));

std::shared_ptr<::arrow::util::Codec> gzip_codec;
ARROW_ASSIGN_OR_THROW(gzip_codec,
::arrow::util::Codec::Create(::arrow::Compression::GZIP));

std::shared_ptr<::arrow::io::OutputStream> compressed_stream;
ARROW_ASSIGN_OR_THROW(compressed_stream, ::arrow::io::CompressedOutputStream::Make(
gzip_codec.get(), out_stream));
#undef ARROW_ASSIGN_OR_THROW

ASSERT_TRUE(compressed_stream->Write(json_string).ok());
ASSERT_TRUE(compressed_stream->Flush().ok());
ASSERT_TRUE(compressed_stream->Close().ok());

auto result = TableMetadataUtil::Read(*io_, file_path);
EXPECT_THAT(result, IsOk());

auto metadata_read = std::move(result.value());
EXPECT_EQ(*metadata_read, metadata);
}

} // namespace iceberg
Loading