Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,16 @@ class AvroWriter::Impl {
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
CreateOutputStream(options, kDefaultBufferSize));
arrow_output_stream_ = output_stream->arrow_output_stream();
std::map<std::string, std::vector<uint8_t>> metadata;
for (const auto& [key, value] : options.properties) {
std::vector<uint8_t> vec;
vec.reserve(value.size());
vec.assign(value.begin(), value.end());
metadata.emplace(key, std::move(vec));
}
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
std::move(output_stream), *avro_schema_);
std::move(output_stream), *avro_schema_, 16 * 1024 /*syncInterval*/,
::avro::NULL_CODEC /*codec*/, metadata);
datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
return {};
Expand Down
12 changes: 12 additions & 0 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ nlohmann::json ToJson(const Schema& schema) {
return json;
}

Result<std::string> ToJsonString(const Schema& schema) {
return ToJsonString(ToJson(schema));
}

nlohmann::json ToJson(const SnapshotRef& ref) {
nlohmann::json json;
json[kSnapshotId] = ref.snapshot_id;
Expand Down Expand Up @@ -490,6 +494,10 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec) {
return json;
}

Result<std::string> ToJsonString(const PartitionSpec& partition_spec) {
return ToJsonString(ToJson(partition_spec));
}

Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
const nlohmann::json& json, bool allow_field_id_missing) {
ICEBERG_ASSIGN_OR_RAISE(auto source_id, GetJsonValue<int32_t>(json, kSourceId));
Expand Down Expand Up @@ -785,6 +793,10 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
return json;
}

Result<std::string> ToJsonString(const TableMetadata& table_metadata) {
return ToJsonString(ToJson(table_metadata));
}

namespace {

/// \brief Parse the schemas from the JSON object.
Expand Down
24 changes: 24 additions & 0 deletions src/iceberg/json_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ ICEBERG_EXPORT Result<std::unique_ptr<SortOrder>> SortOrderFromJson(
/// \return The JSON representation of the schema.
ICEBERG_EXPORT nlohmann::json ToJson(const Schema& schema);

/// \brief Convert an Iceberg Schema to JSON.
///
/// \param[in] schema The Iceberg schema to convert.
/// \return The JSON string of the schema.
ICEBERG_EXPORT Result<std::string> ToJsonString(const Schema& schema);

/// \brief Convert JSON to an Iceberg Schema.
///
/// \param[in] json The JSON representation of the schema.
Expand Down Expand Up @@ -148,6 +154,18 @@ ICEBERG_EXPORT Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
/// array.
ICEBERG_EXPORT nlohmann::json ToJson(const PartitionSpec& partition_spec);

/// \brief Serializes a `PartitionSpec` object to JSON.
///
/// This function converts a `PartitionSpec` object into a JSON representation.
/// The resulting JSON includes the spec ID and a list of `PartitionField` objects.
/// Each `PartitionField` is serialized as described in the `ToJson(PartitionField)`
/// function.
///
/// \param partition_spec The `PartitionSpec` object to be serialized.
/// \return A JSON string of the `PartitionSpec` with its order ID and fields
/// array.
ICEBERG_EXPORT Result<std::string> ToJsonString(const PartitionSpec& partition_spec);

/// \brief Deserializes a JSON object into a `PartitionSpec` object.
///
/// This function parses the provided JSON and creates a `PartitionSpec` object.
Expand Down Expand Up @@ -246,6 +264,12 @@ ICEBERG_EXPORT Result<MetadataLogEntry> MetadataLogEntryFromJson(
/// \return A JSON object representing the `TableMetadata`.
ICEBERG_EXPORT nlohmann::json ToJson(const TableMetadata& table_metadata);

/// \brief Serializes a `TableMetadata` object to JSON.
///
/// \param table_metadata The `TableMetadata` object to be serialized.
/// \return A JSON string of the `TableMetadata`.
ICEBERG_EXPORT Result<std::string> ToJsonString(const TableMetadata& table_metadata);

/// \brief Deserializes a JSON object into a `TableMetadata` object.
///
/// \param json The JSON object representing a `TableMetadata`.
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/manifest_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class ICEBERG_EXPORT ManifestAdapter {
Status StartAppending();
Result<ArrowArray*> FinishAppending();
int64_t size() const { return size_; }
const std::unordered_map<std::string, std::string>& metadata() const {
return metadata_;
}

protected:
ArrowArray array_;
Expand Down
52 changes: 27 additions & 25 deletions src/iceberg/manifest_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ Status ManifestWriter::Close() {
return writer_->Close();
}

Result<std::unique_ptr<Writer>> OpenFileWriter(std::string_view location,
std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> file_io) {
Result<std::unique_ptr<Writer>> OpenFileWriter(
std::string_view location, std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> file_io,
std::unordered_map<std::string, std::string> properties) {
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = std::string(location),
.schema = std::move(schema),
.io = std::move(file_io)}));
auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
{.path = std::string(location),
.schema = std::move(schema),
.io = std::move(file_io),
.properties = std::move(properties)}));
return writer;
}

Expand All @@ -73,9 +75,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io)));
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io), adapter->metadata()));
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
}

Expand All @@ -88,9 +90,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io)));
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io), adapter->metadata()));
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
}

Expand All @@ -104,9 +106,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io)));
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io), adapter->metadata()));
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
}

Expand Down Expand Up @@ -142,9 +144,9 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io)));
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io), adapter->metadata()));
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
}

Expand All @@ -158,9 +160,9 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io)));
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io), adapter->metadata()));

return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
}
Expand All @@ -175,9 +177,9 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io)));
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io), adapter->metadata()));
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
}

Expand Down
3 changes: 1 addition & 2 deletions src/iceberg/test/metadata_io_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ TEST_F(MetadataIOTest, ReadWriteMetadata) {
TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) {
TableMetadata metadata = PrepareMetadata();

auto json = ToJson(metadata);
auto ret = ToJsonString(json);
auto ret = ToJsonString(metadata);
ASSERT_TRUE(ret.has_value());
auto json_string = ret.value();

Expand Down
10 changes: 5 additions & 5 deletions src/iceberg/v1_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "iceberg/v1_metadata.h"

#include "iceberg/json_internal.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/schema.h"
Expand Down Expand Up @@ -47,15 +48,14 @@ Status ManifestEntryAdapterV1::Init() {
DataFile::kSplitOffsets.field_id(),
DataFile::kSortOrderId.field_id(),
};
// TODO(xiao.dong) schema to json
metadata_["schema"] = "{}";
// TODO(xiao.dong) partition spec to json
metadata_["partition-spec"] = "{}";
ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
if (partition_spec_ != nullptr) {
ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_));
metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
}
metadata_["format-version"] = "1";
return InitSchema(kManifestEntryFieldIds);
return {};
}

Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) {
Expand Down
11 changes: 6 additions & 5 deletions src/iceberg/v2_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

#include "iceberg/v2_metadata.h"

#include "iceberg/json_internal.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/schema.h"
#include "iceberg/util/macros.h"

namespace iceberg {

Expand Down Expand Up @@ -50,16 +52,15 @@ Status ManifestEntryAdapterV2::Init() {
DataFile::kSortOrderId.field_id(),
DataFile::kReferencedDataFile.field_id(),
};
// TODO(xiao.dong) schema to json
metadata_["schema"] = "{}";
// TODO(xiao.dong) partition spec to json
metadata_["partition-spec"] = "{}";
ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
if (partition_spec_ != nullptr) {
ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_));
metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
}
metadata_["format-version"] = "2";
metadata_["content"] = "data";
return InitSchema(kManifestEntryFieldIds);
return {};
}

Status ManifestEntryAdapterV2::Append(const ManifestEntry& entry) {
Expand Down
10 changes: 5 additions & 5 deletions src/iceberg/v3_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "iceberg/v3_metadata.h"

#include "iceberg/json_internal.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/schema.h"
Expand Down Expand Up @@ -54,16 +55,15 @@ Status ManifestEntryAdapterV3::Init() {
DataFile::kContentOffset.field_id(),
DataFile::kContentSize.field_id(),
};
// TODO(xiao.dong) schema to json
metadata_["schema"] = "{}";
// TODO(xiao.dong) partition spec to json
metadata_["partition-spec"] = "{}";
ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
if (partition_spec_ != nullptr) {
ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_));
metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
}
metadata_["format-version"] = "3";
metadata_["content"] = "data";
return InitSchema(kManifestEntryFieldIds);
return {};
}

Status ManifestEntryAdapterV3::Append(const ManifestEntry& entry) {
Expand Down
Loading