Skip to content

Commit 189fb56

Browse files
author
xiao.dong
committed
feat: add manifest&manifest list writer
1 add v1v2v3 writer definition 2
1 parent 55b0436 commit 189fb56

File tree

5 files changed

+324
-5
lines changed

5 files changed

+324
-5
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ set(ICEBERG_SOURCES
4747
type.cc
4848
manifest_reader.cc
4949
manifest_reader_internal.cc
50+
manifest_writer.cc
51+
manifest_writer_internal.cc
5052
arrow_c_data_guard_internal.cc
5153
util/murmurhash3_internal.cc
5254
util/timepoint.cc

src/iceberg/manifest_writer.cc

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/manifest_writer.h"
21+
22+
#include "iceberg/manifest_entry.h"
23+
#include "iceberg/manifest_list.h"
24+
#include "iceberg/manifest_writer_internal.h"
25+
#include "iceberg/schema.h"
26+
#include "iceberg/util/macros.h"
27+
28+
namespace iceberg {
29+
30+
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
31+
int32_t format_version, int64_t first_row_id, std::string_view manifest_location,
32+
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> partition_schema) {
33+
auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema);
34+
auto fields_span = manifest_entry_schema->fields();
35+
std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
36+
auto schema = std::make_shared<Schema>(fields);
37+
ICEBERG_ASSIGN_OR_RAISE(
38+
auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
39+
{.path = std::string(manifest_location),
40+
.schema = schema,
41+
.io = std::move(file_io)}));
42+
switch (format_version) {
43+
case 1:
44+
return std::make_unique<ManifestWriterV1>(first_row_id, std::move(writer),
45+
std::move(schema));
46+
case 2:
47+
return std::make_unique<ManifestWriterV2>(first_row_id, std::move(writer),
48+
std::move(schema));
49+
case 3:
50+
return std::make_unique<ManifestWriterV3>(first_row_id, std::move(writer),
51+
std::move(schema));
52+
53+
default:
54+
return InvalidArgument("Unsupported manifest format version: {}", format_version);
55+
}
56+
}
57+
58+
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeWriter(
59+
int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id,
60+
int64_t sequence_number, int64_t first_row_id,
61+
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
62+
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
63+
ManifestFile::Type().fields().end());
64+
auto schema = std::make_shared<Schema>(fields);
65+
ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open(
66+
FileFormatType::kAvro,
67+
{.path = std::string(manifest_list_location),
68+
.schema = schema,
69+
.io = std::move(file_io)}));
70+
switch (format_version) {
71+
case 1:
72+
return std::make_unique<ManifestListWriterV1>(snapshot_id, parent_snapshot_id,
73+
sequence_number, first_row_id,
74+
std::move(writer), std::move(schema));
75+
case 2:
76+
return std::make_unique<ManifestListWriterV2>(snapshot_id, parent_snapshot_id,
77+
sequence_number, first_row_id,
78+
std::move(writer), std::move(schema));
79+
case 3:
80+
return std::make_unique<ManifestListWriterV3>(snapshot_id, parent_snapshot_id,
81+
sequence_number, first_row_id,
82+
std::move(writer), std::move(schema));
83+
84+
default:
85+
return InvalidArgument("Unsupported manifest list format version: {}",
86+
format_version);
87+
}
88+
}
89+
90+
} // namespace iceberg

src/iceberg/manifest_writer.h

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,29 +35,51 @@ namespace iceberg {
3535
class ICEBERG_EXPORT ManifestWriter {
3636
public:
3737
virtual ~ManifestWriter() = default;
38-
virtual Status WriteManifestEntries(
39-
const std::vector<ManifestEntry>& entries) const = 0;
38+
39+
/// \brief Write manifest entry to file
40+
/// \param entry Manifest entry to write.
41+
/// \return Status::OK() if all entry was written successfully
42+
virtual Status WriteManifestEntry(const ManifestEntry& entry) const = 0;
43+
44+
/// \brief Close writer and flush to storage.
45+
virtual Status Close() = 0;
4046

4147
/// \brief Creates a writer for a manifest file.
48+
/// \param format_version Format version of the manifest.
49+
/// \param first_row_id First row ID of the snapshot.
4250
/// \param manifest_location Path to the manifest file.
4351
/// \param file_io File IO implementation to use.
4452
/// \return A Result containing the writer or an error.
4553
static Result<std::unique_ptr<ManifestWriter>> MakeWriter(
46-
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
47-
std::shared_ptr<Schema> partition_schema);
54+
int32_t format_version, int64_t first_row_id, std::string_view manifest_location,
55+
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> partition_schema);
4856
};
4957

5058
/// \brief Write manifest files to a manifest list file.
5159
class ICEBERG_EXPORT ManifestListWriter {
5260
public:
5361
virtual ~ManifestListWriter() = default;
54-
virtual Status WriteManifestFiles(const std::vector<ManifestFile>& files) const = 0;
62+
63+
/// \brief Write manifest file list to manifest list file.
64+
/// \param file Manifest file to write.
65+
/// \return Status::OK() if all file was written successfully
66+
virtual Status WriteManifestFile(const ManifestFile& file) const = 0;
67+
68+
/// \brief Close writer and flush to storage.
69+
virtual Status Close() = 0;
5570

5671
/// \brief Creates a writer for the manifest list.
72+
/// \param format_version Format version of the manifest list.
73+
/// \param snapshot_id ID of the snapshot.
74+
/// \param parent_snapshot_id ID of the parent snapshot.
75+
/// \param sequence_number Sequence number of the snapshot.
76+
/// \param first_row_id First row ID of the snapshot.
5777
/// \param manifest_list_location Path to the manifest list file.
5878
/// \param file_io File IO implementation to use.
5979
/// \return A Result containing the writer or an error.
6080
static Result<std::unique_ptr<ManifestListWriter>> MakeWriter(
81+
int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id,
82+
int64_t sequence_number, int64_t first_row_id,
6183
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);
6284
};
6385

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "manifest_writer_internal.h"
21+
22+
#include "iceberg/manifest_entry.h"
23+
#include "iceberg/manifest_list.h"
24+
#include "iceberg/schema.h"
25+
26+
namespace iceberg {
27+
28+
Status ManifestWriterV1::WriteManifestEntry(const ManifestEntry& entry) const {
29+
// TODO(xiao.dong) convert entries to arrow data
30+
return {};
31+
}
32+
33+
Status ManifestWriterV1::Close() { return {}; }
34+
35+
Status ManifestWriterV2::WriteManifestEntry(const ManifestEntry& entry) const {
36+
// TODO(xiao.dong) convert entries to arrow data
37+
return {};
38+
}
39+
40+
Status ManifestWriterV2::Close() { return {}; }
41+
42+
Status ManifestWriterV3::WriteManifestEntry(const ManifestEntry& entry) const {
43+
// TODO(xiao.dong) convert entries to arrow data
44+
return {};
45+
}
46+
47+
Status ManifestWriterV3::Close() { return {}; }
48+
49+
Status ManifestListWriterV1::WriteManifestFile(const ManifestFile& file) const {
50+
// TODO(xiao.dong) convert manifest files to arrow data
51+
return {};
52+
}
53+
54+
Status ManifestListWriterV1::Close() { return {}; }
55+
56+
Status ManifestListWriterV2::WriteManifestFile(const ManifestFile& file) const {
57+
// TODO(xiao.dong) convert manifest files to arrow data
58+
return {};
59+
}
60+
61+
Status ManifestListWriterV2::Close() { return {}; }
62+
63+
Status ManifestListWriterV3::WriteManifestFile(const ManifestFile& file) const {
64+
// TODO(xiao.dong) convert manifest files to arrow data
65+
return {};
66+
}
67+
68+
Status ManifestListWriterV3::Close() { return {}; }
69+
} // namespace iceberg
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/internal/manifest_writer_internal.h
23+
/// Writer implementation for manifest list files and manifest files.
24+
25+
#include "iceberg/manifest_writer.h"
26+
27+
namespace iceberg {
28+
29+
/// \brief Write manifest entries to a manifest file.
30+
class ManifestWriterImpl : public ManifestWriter {
31+
public:
32+
explicit ManifestWriterImpl(int64_t first_row_id, std::unique_ptr<Writer> writer,
33+
std::shared_ptr<Schema> schema)
34+
: schema_(std::move(schema)), writer_(std::move(writer)) {}
35+
36+
private:
37+
std::shared_ptr<Schema> schema_;
38+
std::unique_ptr<Writer> writer_;
39+
};
40+
41+
/// \brief Write v1 manifest entries to a manifest file.
42+
class ManifestWriterV1 : public ManifestWriterImpl {
43+
public:
44+
explicit ManifestWriterV1(int64_t first_row_id, std::unique_ptr<Writer> writer,
45+
std::shared_ptr<Schema> schema)
46+
: ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {}
47+
48+
Status WriteManifestEntry(const ManifestEntry& entry) const override;
49+
50+
Status Close() override;
51+
};
52+
53+
/// \brief Write v2 manifest entries to a manifest file.
54+
class ManifestWriterV2 : public ManifestWriterImpl {
55+
public:
56+
explicit ManifestWriterV2(int64_t first_row_id, std::unique_ptr<Writer> writer,
57+
std::shared_ptr<Schema> schema)
58+
: ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {}
59+
60+
Status WriteManifestEntry(const ManifestEntry& entry) const override;
61+
62+
Status Close() override;
63+
};
64+
65+
/// \brief Write v3 manifest entries to a manifest file.
66+
class ManifestWriterV3 : public ManifestWriterImpl {
67+
public:
68+
explicit ManifestWriterV3(int64_t first_row_id, std::unique_ptr<Writer> writer,
69+
std::shared_ptr<Schema> schema)
70+
: ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {}
71+
72+
Status WriteManifestEntry(const ManifestEntry& entry) const override;
73+
74+
Status Close() override;
75+
};
76+
77+
/// \brief Write manifest files to a manifest list file.
78+
class ManifestListWriterImpl : public ManifestListWriter {
79+
public:
80+
explicit ManifestListWriterImpl(int64_t snapshot_id, int64_t parent_snapshot_id,
81+
int64_t sequence_number, int64_t first_row_id,
82+
std::unique_ptr<Writer> writer,
83+
std::shared_ptr<Schema> schema)
84+
: schema_(std::move(schema)), writer_(std::move(writer)) {}
85+
86+
private:
87+
std::shared_ptr<Schema> schema_;
88+
std::unique_ptr<Writer> writer_;
89+
};
90+
91+
/// \brief Write v1 manifest files to a manifest list file.
92+
class ManifestListWriterV1 : public ManifestListWriterImpl {
93+
public:
94+
explicit ManifestListWriterV1(int64_t snapshot_id, int64_t parent_snapshot_id,
95+
int64_t sequence_number, int64_t first_row_id,
96+
std::unique_ptr<Writer> writer,
97+
std::shared_ptr<Schema> schema)
98+
: ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number,
99+
first_row_id, std::move(writer), std::move(schema)) {}
100+
101+
Status WriteManifestFile(const ManifestFile& file) const override;
102+
103+
Status Close() override;
104+
};
105+
106+
/// \brief Write v2 manifest files to a manifest list file.
107+
class ManifestListWriterV2 : public ManifestListWriterImpl {
108+
public:
109+
explicit ManifestListWriterV2(int64_t snapshot_id, int64_t parent_snapshot_id,
110+
int64_t sequence_number, int64_t first_row_id,
111+
std::unique_ptr<Writer> writer,
112+
std::shared_ptr<Schema> schema)
113+
: ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number,
114+
first_row_id, std::move(writer), std::move(schema)) {}
115+
116+
Status WriteManifestFile(const ManifestFile& file) const override;
117+
118+
Status Close() override;
119+
};
120+
121+
/// \brief Write v3 manifest files to a manifest list file.
122+
class ManifestListWriterV3 : public ManifestListWriterImpl {
123+
public:
124+
explicit ManifestListWriterV3(int64_t snapshot_id, int64_t parent_snapshot_id,
125+
int64_t sequence_number, int64_t first_row_id,
126+
std::unique_ptr<Writer> writer,
127+
std::shared_ptr<Schema> schema)
128+
: ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number,
129+
first_row_id, std::move(writer), std::move(schema)) {}
130+
131+
Status WriteManifestFile(const ManifestFile& file) const override;
132+
133+
Status Close() override;
134+
};
135+
136+
} // namespace iceberg

0 commit comments

Comments
 (0)