1919
2020#include " iceberg/manifest_adapter.h"
2121
22+ #include < memory>
2223#include < utility>
2324
2425#include < nanoarrow/nanoarrow.h>
2526
2627#include " iceberg/arrow/nanoarrow_status_internal.h"
2728#include " iceberg/manifest_entry.h"
2829#include " iceberg/manifest_list.h"
30+ #include " iceberg/partition_summary_internal.h"
2931#include " iceberg/result.h"
3032#include " iceberg/schema.h"
31- #include " iceberg/schema_internal.h"
3233#include " iceberg/util/checked_cast.h"
3334#include " iceberg/util/macros.h"
3435
@@ -141,10 +142,12 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
141142 return &array_;
142143}
143144
144- ManifestEntryAdapter::ManifestEntryAdapter (std::shared_ptr<PartitionSpec> partition_spec,
145+ ManifestEntryAdapter::ManifestEntryAdapter (std::optional<int64_t > snapshot_id_,
146+ std::shared_ptr<PartitionSpec> partition_spec,
145147 std::shared_ptr<Schema> current_schema,
146148 ManifestContent content)
147- : partition_spec_(std::move(partition_spec)),
149+ : snapshot_id_(snapshot_id_),
150+ partition_spec_ (std::move(partition_spec)),
148151 current_schema_(std::move(current_schema)),
149152 content_(content) {
150153 if (!partition_spec_) {
@@ -161,6 +164,110 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
161164 }
162165}
163166
167+ Status ManifestEntryAdapter::AddEntry (ManifestEntry& entry) {
168+ ICEBERG_RETURN_UNEXPECTED (CheckDataFile (*entry.data_file ));
169+ entry.status = ManifestStatus::kAdded ;
170+ entry.snapshot_id = snapshot_id_;
171+ if (entry.sequence_number .has_value () &&
172+ entry.sequence_number .value () < TableMetadata::kInitialSequenceNumber ) {
173+ entry.sequence_number = std::nullopt ;
174+ }
175+ entry.file_sequence_number = std::nullopt ;
176+ return AddEntryInternal (entry);
177+ }
178+
179+ Status ManifestEntryAdapter::AddDeleteEntry (ManifestEntry& entry) {
180+ ICEBERG_RETURN_UNEXPECTED (CheckDataFile (*entry.data_file ));
181+ entry.status = ManifestStatus::kDeleted ;
182+ entry.snapshot_id = snapshot_id_;
183+ return AddEntryInternal (entry);
184+ }
185+
186+ Status ManifestEntryAdapter::AddExistingEntry (ManifestEntry& entry) {
187+ ICEBERG_RETURN_UNEXPECTED (CheckDataFile (*entry.data_file ));
188+ entry.status = ManifestStatus::kExisting ;
189+ return AddEntryInternal (entry);
190+ }
191+
192+ ManifestFile ManifestEntryAdapter::ToManifestFile () const {
193+ ManifestFile manifest_file;
194+ manifest_file.partition_spec_id = partition_spec_->spec_id ();
195+ manifest_file.content = content_;
196+ // sequence_number and min_sequence_number with kInvalidSequenceNumber will be
197+ // replace with real sequence number in `ManifestListWriter`.
198+ manifest_file.sequence_number = TableMetadata::kInvalidSequenceNumber ;
199+ manifest_file.min_sequence_number =
200+ min_sequence_number_.value_or (TableMetadata::kInvalidSequenceNumber );
201+ manifest_file.existing_files_count = existing_files_count_;
202+ manifest_file.added_snapshot_id = snapshot_id_.value_or (Snapshot::kInvalidSnapshotId );
203+ manifest_file.added_files_count = add_files_count_;
204+ manifest_file.existing_files_count = existing_files_count_;
205+ manifest_file.deleted_files_count = delete_files_count_;
206+ manifest_file.added_rows_count = add_rows_count_;
207+ manifest_file.existing_rows_count = existing_rows_count_;
208+ manifest_file.deleted_rows_count = delete_rows_count_;
209+ manifest_file.partitions = std::move (partition_summary_->Summaries ());
210+ return manifest_file;
211+ }
212+
213+ Status ManifestEntryAdapter::CheckDataFile (const DataFile& file) const {
214+ switch (content_) {
215+ case ManifestContent::kData :
216+ if (file.content != DataFile::Content::kData ) {
217+ return InvalidArgument (
218+ " Manifest content type: data, data file content should be: data, but got: {}" ,
219+ ToString (file.content ));
220+ }
221+ break ;
222+ case ManifestContent::kDeletes :
223+ if (file.content != DataFile::Content::kPositionDeletes &&
224+ file.content != DataFile::Content::kEqualityDeletes ) {
225+ return InvalidArgument (
226+ " Manifest content type: deletes, data file content should be: "
227+ " position_deletes or equality_deletes, but got: {}" ,
228+ ToString (file.content ));
229+ }
230+ break ;
231+ default :
232+ std::unreachable ();
233+ }
234+ return {};
235+ }
236+
237+ Status ManifestEntryAdapter::AddEntryInternal (const ManifestEntry& entry) {
238+ if (entry.data_file == nullptr ) [[unlikely]] {
239+ return InvalidManifest (" Missing required data_file field from manifest entry." );
240+ }
241+
242+ switch (entry.status ) {
243+ case ManifestStatus::kAdded :
244+ add_files_count_++;
245+ add_rows_count_ += entry.data_file ->record_count ;
246+ break ;
247+ case ManifestStatus::kExisting :
248+ existing_files_count_++;
249+ existing_rows_count_ += entry.data_file ->record_count ;
250+ break ;
251+ case ManifestStatus::kDeleted :
252+ delete_files_count_++;
253+ delete_rows_count_ += entry.data_file ->record_count ;
254+ break ;
255+ default :
256+ std::unreachable ();
257+ }
258+
259+ ICEBERG_RETURN_UNEXPECTED (partition_summary_->Update (entry.data_file ->partition ));
260+
261+ if (entry.IsAlive () && entry.sequence_number .has_value ()) {
262+ if (!min_sequence_number_.has_value () ||
263+ entry.sequence_number .value () < min_sequence_number_.value ()) {
264+ min_sequence_number_ = entry.sequence_number .value ();
265+ }
266+ }
267+
268+ return AppendInternal (entry);
269+ }
270+
164271Status ManifestEntryAdapter::AppendPartitionValues (
165272 ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
166273 const std::vector<Literal>& partition_values) {
0 commit comments