Skip to content

Commit 7404f14

Browse files
dongxiao1198xiao.dong
andauthored
feat: add manifest&manifest list writer (#176)
1 add v1v2v3 writer definition 2 add v1v2v3 metadata wrapper --------- Co-authored-by: xiao.dong <[email protected]>
1 parent b696713 commit 7404f14

File tree

7 files changed

+534
-6
lines changed

7 files changed

+534
-6
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ set(ICEBERG_SOURCES
4747
type.cc
4848
manifest_reader.cc
4949
manifest_reader_internal.cc
50+
manifest_writer.cc
5051
arrow_c_data_guard_internal.cc
5152
util/murmurhash3_internal.cc
5253
util/timepoint.cc

src/iceberg/manifest_adapter.h

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/metadata_adapter.h
23+
/// Base class of adapter for v1v2v3v4 metadata.
24+
25+
#include "iceberg/arrow_c_data.h"
26+
#include "iceberg/result.h"
27+
#include "iceberg/type_fwd.h"
28+
29+
namespace iceberg {
30+
31+
// \brief Base class to append manifest metadata to Arrow array.
32+
class ICEBERG_EXPORT ManifestAdapter {
33+
public:
34+
ManifestAdapter() = default;
35+
virtual ~ManifestAdapter() = default;
36+
37+
virtual Status StartAppending() = 0;
38+
virtual Result<ArrowArray> FinishAppending() = 0;
39+
int64_t size() const { return size_; }
40+
41+
protected:
42+
ArrowArray array_;
43+
int64_t size_ = 0;
44+
};
45+
46+
// \brief Implemented by different versions with different schemas to
47+
// append a list of `ManifestEntry`s to an `ArrowArray`.
48+
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
49+
public:
50+
ManifestEntryAdapter() = default;
51+
~ManifestEntryAdapter() override = default;
52+
53+
virtual Status Append(const ManifestEntry& entry) = 0;
54+
};
55+
56+
// \brief Implemented by different versions with different schemas to
57+
// append a list of `ManifestFile`s to an `ArrowArray`.
58+
class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter {
59+
public:
60+
ManifestFileAdapter() = default;
61+
~ManifestFileAdapter() override = default;
62+
63+
virtual Status Append(const ManifestFile& file) = 0;
64+
};
65+
66+
} // namespace iceberg

src/iceberg/manifest_writer.cc

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/manifest_writer.h"
21+
22+
#include "iceberg/manifest_entry.h"
23+
#include "iceberg/manifest_list.h"
24+
#include "iceberg/schema.h"
25+
#include "iceberg/util/macros.h"
26+
#include "iceberg/v1_metadata.h"
27+
#include "iceberg/v2_metadata.h"
28+
#include "iceberg/v3_metadata.h"
29+
30+
namespace iceberg {
31+
32+
Status ManifestWriter::Add(const ManifestEntry& entry) {
33+
if (adapter_->size() >= kBatchSize) {
34+
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
35+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
36+
ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending());
37+
}
38+
return adapter_->Append(entry);
39+
}
40+
41+
Status ManifestWriter::AddAll(const std::vector<ManifestEntry>& entries) {
42+
for (const auto& entry : entries) {
43+
ICEBERG_RETURN_UNEXPECTED(Add(entry));
44+
}
45+
return {};
46+
}
47+
48+
Status ManifestWriter::Close() {
49+
if (adapter_->size() > 0) {
50+
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
51+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
52+
}
53+
return {};
54+
}
55+
56+
Result<std::unique_ptr<Writer>> OpenFileWriter(std::string_view location,
57+
std::shared_ptr<Schema> schema,
58+
std::shared_ptr<FileIO> file_io) {
59+
ICEBERG_ASSIGN_OR_RAISE(
60+
auto writer,
61+
WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = std::string(location),
62+
.schema = std::move(schema),
63+
.io = std::move(file_io)}));
64+
return writer;
65+
}
66+
67+
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
68+
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
69+
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> partition_schema) {
70+
// TODO(xiao.dong) parse v1 schema
71+
auto manifest_entry_schema =
72+
ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
73+
auto fields_span = manifest_entry_schema->fields();
74+
std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
75+
auto schema = std::make_shared<Schema>(fields);
76+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
77+
OpenFileWriter(manifest_location, schema, std::move(file_io)));
78+
auto adapter = std::make_unique<ManifestEntryAdapterV1>(snapshot_id, std::move(schema));
79+
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
80+
}
81+
82+
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
83+
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
84+
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> partition_schema) {
85+
// TODO(xiao.dong) parse v2 schema
86+
auto manifest_entry_schema =
87+
ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
88+
auto fields_span = manifest_entry_schema->fields();
89+
std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
90+
auto schema = std::make_shared<Schema>(fields);
91+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
92+
OpenFileWriter(manifest_location, schema, std::move(file_io)));
93+
auto adapter = std::make_unique<ManifestEntryAdapterV2>(snapshot_id, std::move(schema));
94+
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
95+
}
96+
97+
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
98+
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
99+
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
100+
std::shared_ptr<Schema> partition_schema) {
101+
// TODO(xiao.dong) parse v3 schema
102+
auto manifest_entry_schema =
103+
ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
104+
auto fields_span = manifest_entry_schema->fields();
105+
std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
106+
auto schema = std::make_shared<Schema>(fields);
107+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
108+
OpenFileWriter(manifest_location, schema, std::move(file_io)));
109+
auto adapter = std::make_unique<ManifestEntryAdapterV3>(snapshot_id, first_row_id,
110+
std::move(schema));
111+
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
112+
}
113+
114+
Status ManifestListWriter::Add(const ManifestFile& file) {
115+
if (adapter_->size() >= kBatchSize) {
116+
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
117+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
118+
ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending());
119+
}
120+
return adapter_->Append(file);
121+
}
122+
123+
Status ManifestListWriter::AddAll(const std::vector<ManifestFile>& files) {
124+
for (const auto& file : files) {
125+
ICEBERG_RETURN_UNEXPECTED(Add(file));
126+
}
127+
return {};
128+
}
129+
130+
Status ManifestListWriter::Close() {
131+
if (adapter_->size() > 0) {
132+
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
133+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
134+
}
135+
return {};
136+
}
137+
138+
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
139+
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
140+
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
141+
// TODO(xiao.dong) parse v1 schema
142+
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
143+
ManifestFile::Type().fields().end());
144+
auto schema = std::make_shared<Schema>(fields);
145+
ICEBERG_ASSIGN_OR_RAISE(
146+
auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io)));
147+
auto adapter = std::make_unique<ManifestFileAdapterV1>(snapshot_id, parent_snapshot_id,
148+
std::move(schema));
149+
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
150+
}
151+
152+
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(
153+
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
154+
int64_t sequence_number, std::string_view manifest_list_location,
155+
std::shared_ptr<FileIO> file_io) {
156+
// TODO(xiao.dong) parse v2 schema
157+
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
158+
ManifestFile::Type().fields().end());
159+
auto schema = std::make_shared<Schema>(fields);
160+
ICEBERG_ASSIGN_OR_RAISE(
161+
auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io)));
162+
auto adapter = std::make_unique<ManifestFileAdapterV2>(
163+
snapshot_id, parent_snapshot_id, sequence_number, std::move(schema));
164+
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
165+
}
166+
167+
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
168+
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
169+
int64_t sequence_number, std::optional<int64_t> first_row_id,
170+
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
171+
// TODO(xiao.dong) parse v3 schema
172+
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
173+
ManifestFile::Type().fields().end());
174+
auto schema = std::make_shared<Schema>(fields);
175+
ICEBERG_ASSIGN_OR_RAISE(
176+
auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io)));
177+
auto adapter = std::make_unique<ManifestFileAdapterV3>(
178+
snapshot_id, parent_snapshot_id, sequence_number, first_row_id, std::move(schema));
179+
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
180+
}
181+
182+
} // namespace iceberg

0 commit comments

Comments
 (0)