Skip to content

Commit c661697

Browse files
author
xiao.dong
committed
feat: write manifest avro metadata
1 parent 8906ff3 commit c661697

File tree

6 files changed

+61
-41
lines changed

6 files changed

+61
-41
lines changed

src/iceberg/avro/avro_writer.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,16 @@ class AvroWriter::Impl {
6868
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
6969
CreateOutputStream(options, kDefaultBufferSize));
7070
arrow_output_stream_ = output_stream->arrow_output_stream();
71+
std::map<std::string, std::vector<uint8_t>> metadata;
72+
for (const auto& [key, value] : options.properties) {
73+
std::vector<uint8_t> vec;
74+
vec.reserve(value.size());
75+
vec.assign(value.begin(), value.end());
76+
metadata.emplace(key, std::move(vec));
77+
}
7178
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
72-
std::move(output_stream), *avro_schema_);
79+
std::move(output_stream), *avro_schema_, 16 * 1024 /*syncInterval*/,
80+
::avro::NULL_CODEC /*codec*/, metadata);
7381
datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
7482
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
7583
return {};

src/iceberg/manifest_adapter.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ class ICEBERG_EXPORT ManifestAdapter {
4444
Status StartAppending();
4545
Result<ArrowArray*> FinishAppending();
4646
int64_t size() const { return size_; }
47+
const std::unordered_map<std::string, std::string>& metadata() const {
48+
return metadata_;
49+
}
4750

4851
protected:
4952
ArrowArray array_;

src/iceberg/manifest_writer.cc

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,16 @@ Status ManifestWriter::Close() {
5353
return writer_->Close();
5454
}
5555

56-
Result<std::unique_ptr<Writer>> OpenFileWriter(std::string_view location,
57-
std::shared_ptr<Schema> schema,
58-
std::shared_ptr<FileIO> file_io) {
56+
Result<std::unique_ptr<Writer>> OpenFileWriter(
57+
std::string_view location, std::shared_ptr<Schema> schema,
58+
std::shared_ptr<FileIO> file_io,
59+
std::unordered_map<std::string, std::string> properties) {
5960
ICEBERG_ASSIGN_OR_RAISE(
60-
auto writer,
61-
WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = std::string(location),
62-
.schema = std::move(schema),
63-
.io = std::move(file_io)}));
61+
auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
62+
{.path = std::string(location),
63+
.schema = std::move(schema),
64+
.io = std::move(file_io),
65+
.properties = std::move(properties)}));
6466
return writer;
6567
}
6668

@@ -73,9 +75,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
7375
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
7476

7577
auto schema = adapter->schema();
76-
ICEBERG_ASSIGN_OR_RAISE(
77-
auto writer,
78-
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io)));
78+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
79+
OpenFileWriter(manifest_location, std::move(schema),
80+
std::move(file_io), adapter->metadata()));
7981
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
8082
}
8183

@@ -88,9 +90,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
8890
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
8991

9092
auto schema = adapter->schema();
91-
ICEBERG_ASSIGN_OR_RAISE(
92-
auto writer,
93-
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io)));
93+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
94+
OpenFileWriter(manifest_location, std::move(schema),
95+
std::move(file_io), adapter->metadata()));
9496
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
9597
}
9698

@@ -104,9 +106,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
104106
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
105107

106108
auto schema = adapter->schema();
107-
ICEBERG_ASSIGN_OR_RAISE(
108-
auto writer,
109-
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io)));
109+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
110+
OpenFileWriter(manifest_location, std::move(schema),
111+
std::move(file_io), adapter->metadata()));
110112
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
111113
}
112114

@@ -142,9 +144,9 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
142144
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
143145

144146
auto schema = adapter->schema();
145-
ICEBERG_ASSIGN_OR_RAISE(
146-
auto writer,
147-
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io)));
147+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
148+
OpenFileWriter(manifest_list_location, std::move(schema),
149+
std::move(file_io), adapter->metadata()));
148150
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
149151
}
150152

@@ -158,9 +160,9 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(
158160
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
159161

160162
auto schema = adapter->schema();
161-
ICEBERG_ASSIGN_OR_RAISE(
162-
auto writer,
163-
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io)));
163+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
164+
OpenFileWriter(manifest_list_location, std::move(schema),
165+
std::move(file_io), adapter->metadata()));
164166

165167
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
166168
}
@@ -175,9 +177,9 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
175177
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
176178

177179
auto schema = adapter->schema();
178-
ICEBERG_ASSIGN_OR_RAISE(
179-
auto writer,
180-
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io)));
180+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
181+
OpenFileWriter(manifest_list_location, std::move(schema),
182+
std::move(file_io), adapter->metadata()));
181183
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
182184
}
183185

src/iceberg/v1_metadata.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
#include "iceberg/v1_metadata.h"
2121

22+
#include <nlohmann/json.hpp>
23+
24+
#include "iceberg/json_internal.h"
2225
#include "iceberg/manifest_entry.h"
2326
#include "iceberg/manifest_list.h"
2427
#include "iceberg/schema.h"
@@ -47,15 +50,14 @@ Status ManifestEntryAdapterV1::Init() {
4750
DataFile::kSplitOffsets.field_id(),
4851
DataFile::kSortOrderId.field_id(),
4952
};
50-
// TODO(xiao.dong) schema to json
51-
metadata_["schema"] = "{}";
52-
// TODO(xiao.dong) partition spec to json
53-
metadata_["partition-spec"] = "{}";
53+
ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
54+
metadata_["schema"] = ToJson(*manifest_schema_).dump();
5455
if (partition_spec_ != nullptr) {
56+
metadata_["partition-spec"] = ToJson(*partition_spec_).dump();
5557
metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
5658
}
5759
metadata_["format-version"] = "1";
58-
return InitSchema(kManifestEntryFieldIds);
60+
return {};
5961
}
6062

6163
Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) {

src/iceberg/v2_metadata.cc

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919

2020
#include "iceberg/v2_metadata.h"
2121

22+
#include <nlohmann/json.hpp>
23+
24+
#include "iceberg/json_internal.h"
2225
#include "iceberg/manifest_entry.h"
2326
#include "iceberg/manifest_list.h"
2427
#include "iceberg/schema.h"
28+
#include "iceberg/util/macros.h"
2529

2630
namespace iceberg {
2731

@@ -50,16 +54,15 @@ Status ManifestEntryAdapterV2::Init() {
5054
DataFile::kSortOrderId.field_id(),
5155
DataFile::kReferencedDataFile.field_id(),
5256
};
53-
// TODO(xiao.dong) schema to json
54-
metadata_["schema"] = "{}";
55-
// TODO(xiao.dong) partition spec to json
56-
metadata_["partition-spec"] = "{}";
57+
ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
58+
metadata_["schema"] = ToJson(*manifest_schema_).dump();
5759
if (partition_spec_ != nullptr) {
60+
metadata_["partition-spec"] = ToJson(*partition_spec_).dump();
5861
metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
5962
}
6063
metadata_["format-version"] = "2";
6164
metadata_["content"] = "data";
62-
return InitSchema(kManifestEntryFieldIds);
65+
return {};
6366
}
6467

6568
Status ManifestEntryAdapterV2::Append(const ManifestEntry& entry) {

src/iceberg/v3_metadata.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
#include "iceberg/v3_metadata.h"
2121

22+
#include <nlohmann/json.hpp>
23+
24+
#include "iceberg/json_internal.h"
2225
#include "iceberg/manifest_entry.h"
2326
#include "iceberg/manifest_list.h"
2427
#include "iceberg/schema.h"
@@ -54,16 +57,15 @@ Status ManifestEntryAdapterV3::Init() {
5457
DataFile::kContentOffset.field_id(),
5558
DataFile::kContentSize.field_id(),
5659
};
57-
// TODO(xiao.dong) schema to json
58-
metadata_["schema"] = "{}";
59-
// TODO(xiao.dong) partition spec to json
60-
metadata_["partition-spec"] = "{}";
60+
ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
61+
metadata_["schema"] = ToJson(*manifest_schema_).dump();
6162
if (partition_spec_ != nullptr) {
63+
metadata_["partition-spec"] = ToJson(*partition_spec_).dump();
6264
metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
6365
}
6466
metadata_["format-version"] = "3";
6567
metadata_["content"] = "data";
66-
return InitSchema(kManifestEntryFieldIds);
68+
return {};
6769
}
6870

6971
Status ManifestEntryAdapterV3::Append(const ManifestEntry& entry) {

0 commit comments

Comments
 (0)