Skip to content

Commit cf01c08

Browse files
author
xiao.dong
committed
fix comment
1 parent 73e115e commit cf01c08

File tree

7 files changed

+156
-45
lines changed

7 files changed

+156
-45
lines changed

src/iceberg/manifest_writer.cc

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@
2727

2828
namespace iceberg {
2929

30-
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
31-
int32_t format_version, int64_t snapshot_id, int64_t first_row_id,
30+
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::Make(
31+
int32_t format_version, int64_t snapshot_id, std::optional<int64_t> first_row_id,
3232
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
3333
std::shared_ptr<Schema> partition_schema) {
34-
auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema);
34+
auto manifest_entry_schema =
35+
ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
3536
auto fields_span = manifest_entry_schema->fields();
3637
std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
3738
auto schema = std::make_shared<Schema>(fields);
@@ -48,17 +49,21 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
4849
return std::make_unique<ManifestWriterV2>(snapshot_id, std::move(writer),
4950
std::move(schema));
5051
case 3:
51-
return std::make_unique<ManifestWriterV3>(snapshot_id, first_row_id,
52+
// first_row_id is required for V3 manifest entry
53+
if (!first_row_id.has_value()) {
54+
return InvalidManifest("first_row_id is required for V3 manifest entry");
55+
}
56+
return std::make_unique<ManifestWriterV3>(snapshot_id, first_row_id.value(),
5257
std::move(writer), std::move(schema));
5358

5459
default:
55-
return InvalidArgument("Unsupported manifest format version: {}", format_version);
60+
return NotSupported("Unsupported manifest format version: {}", format_version);
5661
}
5762
}
5863

59-
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeWriter(
64+
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::Make(
6065
int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id,
61-
int64_t sequence_number, int64_t first_row_id,
66+
std::optional<int64_t> sequence_number, std::optional<int64_t> first_row_id,
6267
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
6368
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
6469
ManifestFile::Type().fields().end());
@@ -71,20 +76,25 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeWriter(
7176
switch (format_version) {
7277
case 1:
7378
return std::make_unique<ManifestListWriterV1>(snapshot_id, parent_snapshot_id,
74-
7579
std::move(writer), std::move(schema));
7680
case 2:
7781
return std::make_unique<ManifestListWriterV2>(snapshot_id, parent_snapshot_id,
78-
sequence_number, std::move(writer),
79-
std::move(schema));
80-
case 3:
81-
return std::make_unique<ManifestListWriterV3>(snapshot_id, parent_snapshot_id,
82-
sequence_number, first_row_id,
82+
sequence_number.value(),
8383
std::move(writer), std::move(schema));
84+
case 3:
85+
// sequence_number&first_row_id is required for V3 manifest list
86+
if (!sequence_number.has_value()) {
87+
return InvalidManifestList("sequence_number is required for V3 manifest list");
88+
}
89+
if (!first_row_id.has_value()) {
90+
return InvalidManifestList("first_row_id is required for V3 manifest list");
91+
}
92+
return std::make_unique<ManifestListWriterV3>(
93+
snapshot_id, parent_snapshot_id, sequence_number.value(), first_row_id.value(),
94+
std::move(writer), std::move(schema));
8495

8596
default:
86-
return InvalidArgument("Unsupported manifest list format version: {}",
87-
format_version);
97+
return NotSupported("Unsupported manifest list format version: {}", format_version);
8898
}
8999
}
90100

src/iceberg/manifest_writer.h

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,15 @@ class ICEBERG_EXPORT ManifestWriter {
3636
public:
3737
virtual ~ManifestWriter() = default;
3838

39-
/// \brief Write manifest entry to file
39+
/// \brief Write manifest entry to file.
4040
/// \param entry Manifest entry to write.
41-
/// \return Status::OK() if all entry was written successfully
42-
virtual Status WriteManifestEntry(const ManifestEntry& entry) const = 0;
41+
/// \return Status::OK() if entry was written successfully
42+
virtual Status Add(const ManifestEntry& entry) = 0;
43+
44+
/// \brief Write manifest entries to file.
45+
/// \param entries Manifest entries to write.
46+
/// \return Status::OK() if all entries were written successfully
47+
virtual Status AddAll(const std::vector<ManifestEntry>& entries) = 0;
4348

4449
/// \brief Close writer and flush to storage.
4550
virtual Status Close() = 0;
@@ -51,8 +56,8 @@ class ICEBERG_EXPORT ManifestWriter {
5156
/// \param manifest_location Path to the manifest file.
5257
/// \param file_io File IO implementation to use.
5358
/// \return A Result containing the writer or an error.
54-
static Result<std::unique_ptr<ManifestWriter>> MakeWriter(
55-
int32_t format_version, int64_t snapshot_id, int64_t first_row_id,
59+
static Result<std::unique_ptr<ManifestWriter>> Make(
60+
int32_t format_version, int64_t snapshot_id, std::optional<int64_t> first_row_id,
5661
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
5762
std::shared_ptr<Schema> partition_schema);
5863
};
@@ -62,10 +67,15 @@ class ICEBERG_EXPORT ManifestListWriter {
6267
public:
6368
virtual ~ManifestListWriter() = default;
6469

65-
/// \brief Write manifest file list to manifest list file.
70+
/// \brief Write manifest file to manifest list file.
6671
/// \param file Manifest file to write.
67-
/// \return Status::OK() if all file was written successfully
68-
virtual Status WriteManifestFile(const ManifestFile& file) const = 0;
72+
/// \return Status::OK() if file was written successfully
73+
virtual Status Add(const ManifestFile& file) = 0;
74+
75+
/// \brief Write manifest file list to manifest list file.
76+
/// \param files Manifest file list to write.
77+
/// \return Status::OK() if all files were written successfully
78+
virtual Status AddAll(const std::vector<ManifestFile>& files) = 0;
6979

7080
/// \brief Close writer and flush to storage.
7181
virtual Status Close() = 0;
@@ -79,9 +89,9 @@ class ICEBERG_EXPORT ManifestListWriter {
7989
/// \param manifest_list_location Path to the manifest list file.
8090
/// \param file_io File IO implementation to use.
8191
/// \return A Result containing the writer or an error.
82-
static Result<std::unique_ptr<ManifestListWriter>> MakeWriter(
92+
static Result<std::unique_ptr<ManifestListWriter>> Make(
8393
int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id,
84-
int64_t sequence_number, int64_t first_row_id,
94+
std::optional<int64_t> sequence_number, std::optional<int64_t> first_row_id,
8595
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);
8696
};
8797

src/iceberg/manifest_writer_internal.cc

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,45 +25,105 @@
2525

2626
namespace iceberg {
2727

28-
Status ManifestWriterV1::WriteManifestEntry(const ManifestEntry& entry) const {
28+
Status ManifestWriterV1::Add(const ManifestEntry& entry) {
29+
// TODO(xiao.dong) convert entries to arrow data
30+
return {};
31+
}
32+
33+
Status ManifestWriterV1::AddAll(const std::vector<ManifestEntry>& files) {
2934
// TODO(xiao.dong) convert entries to arrow data
3035
return {};
3136
}
3237

3338
Status ManifestWriterV1::Close() { return {}; }
3439

35-
Status ManifestWriterV2::WriteManifestEntry(const ManifestEntry& entry) const {
40+
ManifestEntry ManifestWriterV1::prepare(const ManifestEntry& entry) {
41+
return wrapper_.Wrap(entry);
42+
}
43+
44+
Status ManifestWriterV2::Add(const ManifestEntry& entry) {
45+
// TODO(xiao.dong) convert entries to arrow data
46+
return {};
47+
}
48+
49+
Status ManifestWriterV2::AddAll(const std::vector<ManifestEntry>& files) {
3650
// TODO(xiao.dong) convert entries to arrow data
3751
return {};
3852
}
3953

4054
Status ManifestWriterV2::Close() { return {}; }
4155

42-
Status ManifestWriterV3::WriteManifestEntry(const ManifestEntry& entry) const {
56+
ManifestEntry ManifestWriterV2::prepare(const ManifestEntry& entry) {
57+
return wrapper_.Wrap(entry);
58+
}
59+
60+
Status ManifestWriterV3::Add(const ManifestEntry& entry) {
61+
// TODO(xiao.dong) convert entries to arrow data
62+
return {};
63+
}
64+
65+
Status ManifestWriterV3::AddAll(const std::vector<ManifestEntry>& files) {
4366
// TODO(xiao.dong) convert entries to arrow data
4467
return {};
4568
}
4669

4770
Status ManifestWriterV3::Close() { return {}; }
4871

49-
Status ManifestListWriterV1::WriteManifestFile(const ManifestFile& file) const {
72+
ManifestEntry ManifestWriterV3::prepare(const ManifestEntry& entry) {
73+
return wrapper_.Wrap(entry);
74+
}
75+
76+
Status ManifestListWriterV1::Add(const ManifestFile& file) {
77+
// TODO(xiao.dong) convert manifest files to arrow data
78+
return {};
79+
}
80+
81+
Status ManifestListWriterV1::AddAll(const std::vector<ManifestFile>& files) {
5082
// TODO(xiao.dong) convert manifest files to arrow data
5183
return {};
5284
}
5385

5486
Status ManifestListWriterV1::Close() { return {}; }
5587

56-
Status ManifestListWriterV2::WriteManifestFile(const ManifestFile& file) const {
88+
ManifestFile ManifestListWriterV1::prepare(const ManifestFile& file) {
89+
return wrapper_.Wrap(file);
90+
}
91+
92+
Status ManifestListWriterV2::Add(const ManifestFile& file) {
93+
// TODO(xiao.dong) convert manifest files to arrow data
94+
return {};
95+
}
96+
97+
Status ManifestListWriterV2::AddAll(const std::vector<ManifestFile>& files) {
5798
// TODO(xiao.dong) convert manifest files to arrow data
5899
return {};
59100
}
60101

61102
Status ManifestListWriterV2::Close() { return {}; }
62103

63-
Status ManifestListWriterV3::WriteManifestFile(const ManifestFile& file) const {
104+
ManifestFile ManifestListWriterV2::prepare(const ManifestFile& file) {
105+
return wrapper_.Wrap(file);
106+
}
107+
108+
Status ManifestListWriterV3::Add(const ManifestFile& file) {
109+
// TODO(xiao.dong) convert manifest files to arrow data
110+
return {};
111+
}
112+
113+
Status ManifestListWriterV3::AddAll(const std::vector<ManifestFile>& files) {
64114
// TODO(xiao.dong) convert manifest files to arrow data
65115
return {};
66116
}
67117

68118
Status ManifestListWriterV3::Close() { return {}; }
119+
120+
ManifestFile ManifestListWriterV3::prepare(const ManifestFile& file) {
121+
if (file.content != ManifestFile::Content::kData || file.first_row_id.has_value()) {
122+
return wrapper_.Wrap(file, std::nullopt);
123+
}
124+
auto result = wrapper_.Wrap(file, next_row_id_);
125+
next_row_id_ +=
126+
file.existing_rows_count.value_or(0) + file.added_rows_count.value_or(0);
127+
return result;
128+
}
69129
} // namespace iceberg

src/iceberg/manifest_writer_internal.h

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ class ManifestWriterImpl : public ManifestWriter {
3636
std::shared_ptr<Schema> schema)
3737
: schema_(std::move(schema)), writer_(std::move(writer)) {}
3838

39+
protected:
40+
virtual ManifestEntry prepare(const ManifestEntry& entry) = 0;
41+
3942
private:
4043
std::shared_ptr<Schema> schema_;
4144
std::unique_ptr<Writer> writer_;
@@ -48,10 +51,13 @@ class ManifestWriterV1 : public ManifestWriterImpl {
4851
std::shared_ptr<Schema> schema)
4952
: ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)) {}
5053

51-
Status WriteManifestEntry(const ManifestEntry& entry) const override;
52-
54+
Status Add(const ManifestEntry& entry) override;
55+
Status AddAll(const std::vector<ManifestEntry>& entries) override;
5356
Status Close() override;
5457

58+
protected:
59+
ManifestEntry prepare(const ManifestEntry& entry) override;
60+
5561
private:
5662
V1MetaData::ManifestEntryWrapper wrapper_;
5763
};
@@ -64,10 +70,14 @@ class ManifestWriterV2 : public ManifestWriterImpl {
6470
: ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)),
6571
wrapper_(snapshot_id) {}
6672

67-
Status WriteManifestEntry(const ManifestEntry& entry) const override;
73+
Status Add(const ManifestEntry& entry) override;
74+
Status AddAll(const std::vector<ManifestEntry>& entries) override;
6875

6976
Status Close() override;
7077

78+
protected:
79+
ManifestEntry prepare(const ManifestEntry& entry) override;
80+
7181
private:
7282
V2MetaData::ManifestEntryWrapper wrapper_;
7383
};
@@ -81,10 +91,14 @@ class ManifestWriterV3 : public ManifestWriterImpl {
8191
: ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)),
8292
wrapper_(snapshot_id) {}
8393

84-
Status WriteManifestEntry(const ManifestEntry& entry) const override;
94+
Status Add(const ManifestEntry& entry) override;
95+
Status AddAll(const std::vector<ManifestEntry>& entries) override;
8596

8697
Status Close() override;
8798

99+
protected:
100+
ManifestEntry prepare(const ManifestEntry& entry) override;
101+
88102
private:
89103
V3MetaData::ManifestEntryWrapper wrapper_;
90104
};
@@ -97,6 +111,9 @@ class ManifestListWriterImpl : public ManifestListWriter {
97111
std::shared_ptr<Schema> schema)
98112
: schema_(std::move(schema)), writer_(std::move(writer)) {}
99113

114+
protected:
115+
virtual ManifestFile prepare(const ManifestFile& file) = 0;
116+
100117
private:
101118
std::shared_ptr<Schema> schema_;
102119
std::unique_ptr<Writer> writer_;
@@ -112,10 +129,13 @@ class ManifestListWriterV1 : public ManifestListWriterImpl {
112129
: ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer),
113130
std::move(schema)) {}
114131

115-
Status WriteManifestFile(const ManifestFile& file) const override;
116-
132+
Status Add(const ManifestFile& file) override;
133+
Status AddAll(const std::vector<ManifestFile>& files) override;
117134
Status Close() override;
118135

136+
protected:
137+
ManifestFile prepare(const ManifestFile& file) override;
138+
119139
private:
120140
V1MetaData::ManifestFileWrapper wrapper_;
121141
};
@@ -130,10 +150,14 @@ class ManifestListWriterV2 : public ManifestListWriterImpl {
130150
std::move(schema)),
131151
wrapper_(snapshot_id, sequence_number) {}
132152

133-
Status WriteManifestFile(const ManifestFile& file) const override;
153+
Status Add(const ManifestFile& file) override;
154+
Status AddAll(const std::vector<ManifestFile>& files) override;
134155

135156
Status Close() override;
136157

158+
protected:
159+
ManifestFile prepare(const ManifestFile& file) override;
160+
137161
private:
138162
V2MetaData::ManifestFileWrapper wrapper_;
139163
};
@@ -149,11 +173,16 @@ class ManifestListWriterV3 : public ManifestListWriterImpl {
149173
std::move(schema)),
150174
wrapper_(snapshot_id, sequence_number) {}
151175

152-
Status WriteManifestFile(const ManifestFile& file) const override;
176+
Status Add(const ManifestFile& file) override;
177+
Status AddAll(const std::vector<ManifestFile>& files) override;
153178

154179
Status Close() override;
155180

181+
protected:
182+
ManifestFile prepare(const ManifestFile& file) override;
183+
156184
private:
185+
int64_t next_row_id_ = 0;
157186
V3MetaData::ManifestFileWrapper wrapper_;
158187
};
159188

src/iceberg/v1_metadata.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ class V1MetaData {
3535
struct ManifestFileWrapper : public ManifestFile {
3636
ManifestFileWrapper() = default;
3737

38-
ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; }
38+
ManifestFile Wrap(ManifestFile file) { return *this; }
3939
};
4040

4141
/// \brief v1 manifest entry wrapper.
4242
struct ManifestEntryWrapper : public ManifestEntry {
4343
ManifestEntryWrapper() = default;
4444

45-
ManifestEntry wrap(ManifestEntry entry) { return *this; }
45+
ManifestEntry Wrap(ManifestEntry entry) { return *this; }
4646
};
4747

4848
static ManifestFileWrapper manifestFileWrapper() { return {}; }

0 commit comments

Comments
 (0)