Skip to content

Commit 9bbfb78

Browse files
author
xiao.dong
committed
refactor interface
1 parent 9271699 commit 9bbfb78

File tree

10 files changed

+464
-450
lines changed

10 files changed

+464
-450
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ set(ICEBERG_SOURCES
4848
manifest_reader.cc
4949
manifest_reader_internal.cc
5050
manifest_writer.cc
51-
manifest_writer_internal.cc
5251
arrow_c_data_guard_internal.cc
5352
util/murmurhash3_internal.cc
5453
util/timepoint.cc

src/iceberg/manifest_writer.cc

Lines changed: 182 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -21,81 +21,205 @@
2121

2222
#include "iceberg/manifest_entry.h"
2323
#include "iceberg/manifest_list.h"
24-
#include "iceberg/manifest_writer_internal.h"
2524
#include "iceberg/schema.h"
2625
#include "iceberg/util/macros.h"
26+
#include "iceberg/v1_metadata.h"
27+
#include "iceberg/v2_metadata.h"
28+
#include "iceberg/v3_metadata.h"
29+
#include "iceberg/v4_metadata.h"
2730

2831
namespace iceberg {
2932

30-
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::Make(
31-
int32_t format_version, int64_t snapshot_id, std::optional<int64_t> first_row_id,
32-
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
33-
std::shared_ptr<Schema> partition_schema) {
33+
/// \brief Write manifest files to a manifest list file.
34+
class ManifestWriterImpl : public ManifestWriter {
35+
public:
36+
ManifestWriterImpl(std::unique_ptr<Writer> writer,
37+
std::unique_ptr<ManifestEntryAdapter> adapter)
38+
: writer_(std::move(writer)), adapter_(std::move(adapter)) {}
39+
40+
Status Add(const ManifestEntry& entry) override {
41+
if (adapter_->size() >= kBatchSize) {
42+
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
43+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
44+
ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending());
45+
}
46+
return adapter_->Append(entry);
47+
}
48+
49+
Status AddAll(const std::vector<ManifestEntry>& entries) override {
50+
for (const auto& entry : entries) {
51+
ICEBERG_RETURN_UNEXPECTED(Add(entry));
52+
}
53+
return {};
54+
}
55+
56+
Status Close() override {
57+
if (adapter_->size() > 0) {
58+
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
59+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
60+
}
61+
return {};
62+
}
63+
64+
private:
65+
static constexpr int64_t kBatchSize = 1024;
66+
std::unique_ptr<Writer> writer_;
67+
std::unique_ptr<ManifestEntryAdapter> adapter_;
68+
};
69+
70+
std::shared_ptr<Schema> ParseSchema(std::shared_ptr<Schema> partition_schema) {
3471
auto manifest_entry_schema =
3572
ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
3673
auto fields_span = manifest_entry_schema->fields();
3774
std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
38-
auto schema = std::make_shared<Schema>(fields);
75+
return std::make_shared<Schema>(fields);
76+
}
77+
78+
Result<std::unique_ptr<Writer>> OpenFileWriter(std::string_view location,
79+
const std::shared_ptr<Schema> schema,
80+
std::shared_ptr<FileIO> file_io) {
3981
ICEBERG_ASSIGN_OR_RAISE(
40-
auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
41-
{.path = std::string(manifest_location),
42-
.schema = schema,
43-
.io = std::move(file_io)}));
44-
switch (format_version) {
45-
case 1:
46-
return std::make_unique<ManifestWriterV1>(snapshot_id, std::move(writer),
47-
std::move(schema));
48-
case 2:
49-
return std::make_unique<ManifestWriterV2>(snapshot_id, std::move(writer),
50-
std::move(schema));
51-
case 3:
52-
// first_row_id is required for V3 manifest entry
53-
if (!first_row_id.has_value()) {
54-
return InvalidManifest("first_row_id is required for V3 manifest entry");
55-
}
56-
return std::make_unique<ManifestWriterV3>(snapshot_id, first_row_id.value(),
57-
std::move(writer), std::move(schema));
58-
59-
default:
60-
return NotSupported("Unsupported manifest format version: {}", format_version);
82+
auto writer,
83+
WriterFactoryRegistry::Open(
84+
FileFormatType::kAvro,
85+
{.path = std::string(location), .schema = schema, .io = std::move(file_io)}));
86+
return writer;
87+
}
88+
89+
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
90+
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
91+
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> partition_schema) {
92+
auto schema = ParseSchema(partition_schema);
93+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
94+
OpenFileWriter(manifest_location, schema, std::move(file_io)));
95+
auto adapter = std::make_unique<ManifestEntryAdapterV1>(snapshot_id, std::move(schema));
96+
return std::make_unique<ManifestWriterImpl>(std::move(writer), std::move(adapter));
97+
}
98+
99+
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
100+
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
101+
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> partition_schema) {
102+
auto schema = ParseSchema(partition_schema);
103+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
104+
OpenFileWriter(manifest_location, schema, std::move(file_io)));
105+
auto adapter = std::make_unique<ManifestEntryAdapterV2>(snapshot_id, std::move(schema));
106+
return std::make_unique<ManifestWriterImpl>(std::move(writer), std::move(adapter));
107+
}
108+
109+
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
110+
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
111+
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
112+
std::shared_ptr<Schema> partition_schema) {
113+
auto schema = ParseSchema(partition_schema);
114+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
115+
OpenFileWriter(manifest_location, schema, std::move(file_io)));
116+
auto adapter = std::make_unique<ManifestEntryAdapterV3>(snapshot_id, first_row_id,
117+
std::move(schema));
118+
return std::make_unique<ManifestWriterImpl>(std::move(writer), std::move(adapter));
119+
}
120+
121+
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV4Writer(
122+
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
123+
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
124+
std::shared_ptr<Schema> partition_schema) {
125+
auto schema = ParseSchema(partition_schema);
126+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
127+
OpenFileWriter(manifest_location, schema, std::move(file_io)));
128+
auto adapter = std::make_unique<ManifestEntryAdapterV4>(snapshot_id, first_row_id,
129+
std::move(schema));
130+
return std::make_unique<ManifestWriterImpl>(std::move(writer), std::move(adapter));
131+
}
132+
133+
/// \brief Write manifest files to a manifest list file.
134+
class ManifestListWriterImpl : public ManifestListWriter {
135+
public:
136+
ManifestListWriterImpl(std::unique_ptr<Writer> writer,
137+
std::unique_ptr<ManifestFileAdapter> adapter)
138+
: writer_(std::move(writer)), adapter_(std::move(adapter)) {}
139+
140+
Status Add(const ManifestFile& file) override {
141+
if (adapter_->size() >= kBatchSize) {
142+
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
143+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
144+
ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending());
145+
}
146+
return adapter_->Append(file);
147+
}
148+
149+
Status AddAll(const std::vector<ManifestFile>& files) override {
150+
for (const auto& file : files) {
151+
ICEBERG_RETURN_UNEXPECTED(Add(file));
152+
}
153+
return {};
61154
}
155+
156+
Status Close() override {
157+
if (adapter_->size() > 0) {
158+
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
159+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
160+
}
161+
return {};
162+
}
163+
164+
private:
165+
static constexpr int64_t kBatchSize = 1024;
166+
std::unique_ptr<Writer> writer_;
167+
std::unique_ptr<ManifestFileAdapter> adapter_;
168+
};
169+
170+
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
171+
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
172+
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
173+
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
174+
ManifestFile::Type().fields().end());
175+
auto schema = std::make_shared<Schema>(fields);
176+
ICEBERG_ASSIGN_OR_RAISE(
177+
auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io)));
178+
auto adapter = std::make_unique<ManifestFileAdapterV1>(snapshot_id, parent_snapshot_id,
179+
std::move(schema));
180+
return std::make_unique<ManifestListWriterImpl>(std::move(writer), std::move(adapter));
62181
}
63182

64-
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::Make(
65-
int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id,
66-
std::optional<int64_t> sequence_number, std::optional<int64_t> first_row_id,
183+
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(
184+
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
185+
int64_t sequence_number, std::string_view manifest_list_location,
186+
std::shared_ptr<FileIO> file_io) {
187+
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
188+
ManifestFile::Type().fields().end());
189+
auto schema = std::make_shared<Schema>(fields);
190+
ICEBERG_ASSIGN_OR_RAISE(
191+
auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io)));
192+
auto adapter = std::make_unique<ManifestFileAdapterV2>(
193+
snapshot_id, parent_snapshot_id, sequence_number, std::move(schema));
194+
return std::make_unique<ManifestListWriterImpl>(std::move(writer), std::move(adapter));
195+
}
196+
197+
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
198+
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
199+
int64_t sequence_number, std::optional<int64_t> first_row_id,
67200
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
68201
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
69202
ManifestFile::Type().fields().end());
70203
auto schema = std::make_shared<Schema>(fields);
71-
ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open(
72-
FileFormatType::kAvro,
73-
{.path = std::string(manifest_list_location),
74-
.schema = schema,
75-
.io = std::move(file_io)}));
76-
switch (format_version) {
77-
case 1:
78-
return std::make_unique<ManifestListWriterV1>(snapshot_id, parent_snapshot_id,
79-
std::move(writer), std::move(schema));
80-
case 2:
81-
return std::make_unique<ManifestListWriterV2>(snapshot_id, parent_snapshot_id,
82-
sequence_number.value(),
83-
std::move(writer), std::move(schema));
84-
case 3:
85-
// sequence_number&first_row_id is required for V3 manifest list
86-
if (!sequence_number.has_value()) {
87-
return InvalidManifestList("sequence_number is required for V3 manifest list");
88-
}
89-
if (!first_row_id.has_value()) {
90-
return InvalidManifestList("first_row_id is required for V3 manifest list");
91-
}
92-
return std::make_unique<ManifestListWriterV3>(
93-
snapshot_id, parent_snapshot_id, sequence_number.value(), first_row_id.value(),
94-
std::move(writer), std::move(schema));
95-
96-
default:
97-
return NotSupported("Unsupported manifest list format version: {}", format_version);
98-
}
204+
ICEBERG_ASSIGN_OR_RAISE(
205+
auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io)));
206+
auto adapter = std::make_unique<ManifestFileAdapterV3>(
207+
snapshot_id, parent_snapshot_id, sequence_number, first_row_id, std::move(schema));
208+
return std::make_unique<ManifestListWriterImpl>(std::move(writer), std::move(adapter));
209+
}
210+
211+
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV4Writer(
212+
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
213+
int64_t sequence_number, std::optional<int64_t> first_row_id,
214+
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
215+
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
216+
ManifestFile::Type().fields().end());
217+
auto schema = std::make_shared<Schema>(fields);
218+
ICEBERG_ASSIGN_OR_RAISE(
219+
auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io)));
220+
auto adapter = std::make_unique<ManifestFileAdapterV4>(
221+
snapshot_id, parent_snapshot_id, sequence_number, first_row_id, std::move(schema));
222+
return std::make_unique<ManifestListWriterImpl>(std::move(writer), std::move(adapter));
99223
}
100224

101225
} // namespace iceberg

src/iceberg/manifest_writer.h

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,42 @@ class ICEBERG_EXPORT ManifestWriter {
5050
virtual Status Close() = 0;
5151

5252
/// \brief Creates a writer for a manifest file.
53-
/// \param format_version Format version of the manifest.
53+
/// \param snapshot_id ID of the snapshot.
54+
/// \param manifest_location Path to the manifest file.
55+
/// \param file_io File IO implementation to use.
56+
/// \return A Result containing the writer or an error.
57+
static Result<std::unique_ptr<ManifestWriter>> MakeV1Writer(
58+
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
59+
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> partition_schema);
60+
61+
/// \brief Creates a writer for a manifest file.
62+
/// \param snapshot_id ID of the snapshot.
63+
/// \param manifest_location Path to the manifest file.
64+
/// \param file_io File IO implementation to use.
65+
/// \return A Result containing the writer or an error.
66+
static Result<std::unique_ptr<ManifestWriter>> MakeV2Writer(
67+
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
68+
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> partition_schema);
69+
70+
/// \brief Creates a writer for a manifest file.
5471
/// \param snapshot_id ID of the snapshot.
5572
/// \param first_row_id First row ID of the snapshot.
5673
/// \param manifest_location Path to the manifest file.
5774
/// \param file_io File IO implementation to use.
5875
/// \return A Result containing the writer or an error.
59-
static Result<std::unique_ptr<ManifestWriter>> Make(
60-
int32_t format_version, int64_t snapshot_id, std::optional<int64_t> first_row_id,
76+
static Result<std::unique_ptr<ManifestWriter>> MakeV3Writer(
77+
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
78+
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
79+
std::shared_ptr<Schema> partition_schema);
80+
81+
/// \brief Creates a writer for a manifest file.
82+
/// \param snapshot_id ID of the snapshot.
83+
/// \param first_row_id First row ID of the snapshot.
84+
/// \param manifest_location Path to the manifest file.
85+
/// \param file_io File IO implementation to use.
86+
/// \return A Result containing the writer or an error.
87+
static Result<std::unique_ptr<ManifestWriter>> MakeV4Writer(
88+
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
6189
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
6290
std::shared_ptr<Schema> partition_schema);
6391
};
@@ -80,18 +108,52 @@ class ICEBERG_EXPORT ManifestListWriter {
80108
/// \brief Close writer and flush to storage.
81109
virtual Status Close() = 0;
82110

111+
/// \brief Creates a writer for the v1 manifest list.
112+
/// \param snapshot_id ID of the snapshot.
113+
/// \param parent_snapshot_id ID of the parent snapshot.
114+
/// \param manifest_list_location Path to the manifest list file.
115+
/// \param file_io File IO implementation to use.
116+
/// \return A Result containing the writer or an error.
117+
static Result<std::unique_ptr<ManifestListWriter>> MakeV1Writer(
118+
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
119+
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);
120+
121+
/// \brief Creates a writer for the manifest list.
122+
/// \param snapshot_id ID of the snapshot.
123+
/// \param parent_snapshot_id ID of the parent snapshot.
124+
/// \param sequence_number Sequence number of the snapshot.
125+
/// \param manifest_list_location Path to the manifest list file.
126+
/// \param file_io File IO implementation to use.
127+
/// \return A Result containing the writer or an error.
128+
static Result<std::unique_ptr<ManifestListWriter>> MakeV2Writer(
129+
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
130+
int64_t sequence_number, std::string_view manifest_list_location,
131+
std::shared_ptr<FileIO> file_io);
132+
133+
/// \brief Creates a writer for the manifest list.
134+
/// \param snapshot_id ID of the snapshot.
135+
/// \param parent_snapshot_id ID of the parent snapshot.
136+
/// \param sequence_number Sequence number of the snapshot.
137+
/// \param first_row_id First row ID of the snapshot.
138+
/// \param manifest_list_location Path to the manifest list file.
139+
/// \param file_io File IO implementation to use.
140+
/// \return A Result containing the writer or an error.
141+
static Result<std::unique_ptr<ManifestListWriter>> MakeV3Writer(
142+
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
143+
int64_t sequence_number, std::optional<int64_t> first_row_id,
144+
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);
145+
83146
/// \brief Creates a writer for the manifest list.
84-
/// \param format_version Format version of the manifest list.
85147
/// \param snapshot_id ID of the snapshot.
86148
/// \param parent_snapshot_id ID of the parent snapshot.
87149
/// \param sequence_number Sequence number of the snapshot.
88150
/// \param first_row_id First row ID of the snapshot.
89151
/// \param manifest_list_location Path to the manifest list file.
90152
/// \param file_io File IO implementation to use.
91153
/// \return A Result containing the writer or an error.
92-
static Result<std::unique_ptr<ManifestListWriter>> Make(
93-
int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id,
94-
std::optional<int64_t> sequence_number, std::optional<int64_t> first_row_id,
154+
static Result<std::unique_ptr<ManifestListWriter>> MakeV4Writer(
155+
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
156+
int64_t sequence_number, std::optional<int64_t> first_row_id,
95157
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);
96158
};
97159

0 commit comments

Comments
 (0)