Skip to content

Commit 691387f

Browse files
committed
feat: adopt partition summary & add AddEntry variants
including AddEntry, AddExistingEntry, AddDeleteEntry
1 parent 6fe80fe commit 691387f

29 files changed

+1142
-160
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717

1818
build/
19+
cmake-build/
1920
cmake-build-debug/
2021
cmake-build-release/
2122
.DS_Store

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ set(ICEBERG_SOURCES
3939
name_mapping.cc
4040
partition_field.cc
4141
partition_spec.cc
42+
partition_summary_internal.cc
4243
row/arrow_array_wrapper.cc
4344
row/manifest_wrapper.cc
4445
schema.cc

src/iceberg/expression/literal.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,17 @@ bool Literal::IsAboveMax() const { return std::holds_alternative<AboveMax>(value
504504

505505
bool Literal::IsNull() const { return std::holds_alternative<std::monostate>(value_); }
506506

507+
bool Literal::IsNan() const {
508+
if (type_->type_id() == TypeId::kFloat) {
509+
auto val = std::get<float>(value_);
510+
return std::isnan(val);
511+
} else if (type_->type_id() == TypeId::kDouble) {
512+
auto val = std::get<double>(value_);
513+
return std::isnan(val);
514+
}
515+
return false;
516+
}
517+
507518
// LiteralCaster implementation
508519

509520
Result<Literal> LiteralCaster::CastTo(const Literal& literal,

src/iceberg/expression/literal.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,14 @@ class ICEBERG_EXPORT Literal : public util::Formattable {
150150
/// \return true if this literal represents a BelowMin value, false otherwise
151151
bool IsBelowMin() const;
152152

153-
/// Check if this literal is null.
153+
/// \brief Check if this literal is null.
154154
/// \return true if this literal is null, false otherwise
155155
bool IsNull() const;
156156

157+
/// \brief Check if this literal is NaN (Not a Number).
158+
/// \return true if this literal is NaN, false otherwise
159+
bool IsNan() const;
160+
157161
std::string ToString() const override;
158162

159163
private:

src/iceberg/manifest_adapter.cc

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919

2020
#include "iceberg/manifest_adapter.h"
2121

22+
#include <memory>
2223
#include <utility>
2324

2425
#include <nanoarrow/nanoarrow.h>
2526

2627
#include "iceberg/arrow/nanoarrow_status_internal.h"
2728
#include "iceberg/manifest_entry.h"
2829
#include "iceberg/manifest_list.h"
30+
#include "iceberg/partition_summary_internal.h"
2931
#include "iceberg/result.h"
3032
#include "iceberg/schema.h"
3133
#include "iceberg/schema_internal.h"
@@ -141,10 +143,12 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
141143
return &array_;
142144
}
143145

144-
ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
146+
ManifestEntryAdapter::ManifestEntryAdapter(std::optional<int64_t> snapshot_id_,
147+
std::shared_ptr<PartitionSpec> partition_spec,
145148
std::shared_ptr<Schema> current_schema,
146149
ManifestContent content)
147-
: partition_spec_(std::move(partition_spec)),
150+
: snapshot_id_(snapshot_id_),
151+
partition_spec_(std::move(partition_spec)),
148152
current_schema_(std::move(current_schema)),
149153
content_(content) {
150154
if (!partition_spec_) {
@@ -161,6 +165,110 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
161165
}
162166
}
163167

168+
Status ManifestEntryAdapter::AddEntry(ManifestEntry& entry) {
169+
ICEBERG_RETURN_UNEXPECTED(CheckDataFile(*entry.data_file));
170+
entry.status = ManifestStatus::kAdded;
171+
entry.snapshot_id = snapshot_id_;
172+
if (entry.sequence_number.has_value() &&
173+
entry.sequence_number.value() < TableMetadata::kInitialSequenceNumber) {
174+
entry.sequence_number = std::nullopt;
175+
}
176+
entry.file_sequence_number = std::nullopt;
177+
return AddEntryInternal(entry);
178+
}
179+
180+
Status ManifestEntryAdapter::AddDeleteEntry(ManifestEntry& entry) {
181+
ICEBERG_RETURN_UNEXPECTED(CheckDataFile(*entry.data_file));
182+
entry.status = ManifestStatus::kDeleted;
183+
entry.snapshot_id = snapshot_id_;
184+
return AddEntryInternal(entry);
185+
}
186+
187+
Status ManifestEntryAdapter::AddExistingEntry(ManifestEntry& entry) {
188+
ICEBERG_RETURN_UNEXPECTED(CheckDataFile(*entry.data_file));
189+
entry.status = ManifestStatus::kExisting;
190+
return AddEntryInternal(entry);
191+
}
192+
193+
ManifestFile ManifestEntryAdapter::ToManifestFile() const {
194+
ManifestFile manifest_file;
195+
manifest_file.partition_spec_id = partition_spec_->spec_id();
196+
manifest_file.content = content_;
197+
// sequence_number and min_sequence_number with kInvalidSequenceNumber will be
198+
// replace with real sequence number in `ManifestListWriter`.
199+
manifest_file.sequence_number = TableMetadata::kInvalidSequenceNumber;
200+
manifest_file.min_sequence_number =
201+
min_sequence_number_.value_or(TableMetadata::kInvalidSequenceNumber);
202+
manifest_file.existing_files_count = existing_files_count_;
203+
manifest_file.added_snapshot_id = snapshot_id_.value_or(Snapshot::kInvalidSnapshotId);
204+
manifest_file.added_files_count = add_files_count_;
205+
manifest_file.existing_files_count = existing_files_count_;
206+
manifest_file.deleted_files_count = delete_files_count_;
207+
manifest_file.added_rows_count = add_rows_count_;
208+
manifest_file.existing_rows_count = existing_rows_count_;
209+
manifest_file.deleted_rows_count = delete_rows_count_;
210+
manifest_file.partitions = std::move(partition_summary_->Summaries());
211+
return manifest_file;
212+
}
213+
214+
Status ManifestEntryAdapter::CheckDataFile(const DataFile& file) const {
215+
switch (content_) {
216+
case ManifestContent::kData:
217+
if (file.content != DataFile::Content::kData) {
218+
return InvalidArgument(
219+
"Manifest content type: data, data file content should be: data, but got: {}",
220+
ToString(file.content));
221+
}
222+
break;
223+
case ManifestContent::kDeletes:
224+
if (file.content != DataFile::Content::kPositionDeletes &&
225+
file.content != DataFile::Content::kEqualityDeletes) {
226+
return InvalidArgument(
227+
"Manifest content type: deletes, data file content should be: "
228+
"position_deletes or equality_deletes, but got: {}",
229+
ToString(file.content));
230+
}
231+
break;
232+
default:
233+
std::unreachable();
234+
}
235+
return {};
236+
}
237+
238+
Status ManifestEntryAdapter::AddEntryInternal(const ManifestEntry& entry) {
239+
if (entry.data_file == nullptr) [[unlikely]] {
240+
return InvalidManifest("Missing required data_file field from manifest entry.");
241+
}
242+
243+
switch (entry.status) {
244+
case ManifestStatus::kAdded:
245+
add_files_count_++;
246+
add_rows_count_ += entry.data_file->record_count;
247+
break;
248+
case ManifestStatus::kExisting:
249+
existing_files_count_++;
250+
existing_rows_count_ += entry.data_file->record_count;
251+
break;
252+
case ManifestStatus::kDeleted:
253+
delete_files_count_++;
254+
delete_rows_count_ += entry.data_file->record_count;
255+
break;
256+
default:
257+
std::unreachable();
258+
}
259+
260+
partition_summary_->Update(entry.data_file->partition);
261+
262+
if (entry.IsAlive() && entry.sequence_number.has_value()) {
263+
if (!min_sequence_number_.has_value() ||
264+
entry.sequence_number.value() < min_sequence_number_.value()) {
265+
min_sequence_number_ = entry.sequence_number.value();
266+
}
267+
}
268+
269+
return AppendInternal(entry);
270+
}
271+
164272
Status ManifestEntryAdapter::AppendPartitionValues(
165273
ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
166274
const std::vector<Literal>& partition_values) {

src/iceberg/manifest_adapter.h

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <memory>
2626
#include <optional>
2727
#include <unordered_map>
28-
#include <unordered_set>
2928
#include <vector>
3029

3130
#include "iceberg/arrow_c_data.h"
@@ -61,18 +60,45 @@ class ICEBERG_EXPORT ManifestAdapter {
6160
/// Implemented by different versions with version-specific schemas.
6261
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
6362
public:
64-
ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
63+
ManifestEntryAdapter(std::optional<int64_t> snapshot_id_,
64+
std::shared_ptr<PartitionSpec> partition_spec,
6565
std::shared_ptr<Schema> current_schema, ManifestContent content);
6666

6767
~ManifestEntryAdapter() override;
6868

69-
virtual Status Append(const ManifestEntry& entry) = 0;
69+
/// \brief Add a new entry to the manifest.
70+
///
71+
/// This method will update following status of the entry:
72+
/// - Update the entry status to `Added`
73+
/// - Set the snapshot id to the current snapshot id
74+
/// - Set the sequence number to nullopt if it is invalid(smaller than 0)
75+
/// - Set the file sequence number to nullopt
76+
virtual Status AddEntry(ManifestEntry& entry);
77+
78+
/// \brief Add a delete entry to the manifest.
79+
///
80+
/// This method will update following status of the entry:
81+
/// - Update the entry status to `Deleted`
82+
/// - Set the snapshot id to the current snapshot id
83+
virtual Status AddDeleteEntry(ManifestEntry& entry);
84+
85+
/// \brief Add an existing entry to the manifest.
86+
///
87+
/// This method will update following status of the entry:
88+
/// - Update the entry status to `Existing`
89+
virtual Status AddExistingEntry(ManifestEntry& entry);
7090

7191
const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
7292

7393
ManifestContent content() const { return content_; }
7494

95+
/// \brief Create a ManifestFile object without setting file metadata, such as
96+
/// location, file size, key metadata, etc.
97+
ManifestFile ToManifestFile() const;
98+
7599
protected:
100+
Status CheckDataFile(const DataFile& file) const;
101+
Status AddEntryInternal(const ManifestEntry& entry);
76102
Status AppendInternal(const ManifestEntry& entry);
77103
Status AppendDataFile(ArrowArray* array,
78104
const std::shared_ptr<StructType>& data_file_type,
@@ -91,10 +117,19 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
91117
const DataFile& file) const;
92118

93119
protected:
120+
std::optional<int64_t> snapshot_id_;
94121
std::shared_ptr<PartitionSpec> partition_spec_;
95122
std::shared_ptr<Schema> current_schema_;
96123
std::shared_ptr<Schema> manifest_schema_;
97124
const ManifestContent content_;
125+
int32_t add_files_count_{0};
126+
int32_t existing_files_count_{0};
127+
int32_t delete_files_count_{0};
128+
int64_t add_rows_count_{0L};
129+
int64_t existing_rows_count_{0L};
130+
int64_t delete_rows_count_{0L};
131+
std::optional<int64_t> min_sequence_number_{std::nullopt};
132+
std::unique_ptr<PartitionSummary> partition_summary_;
98133
};
99134

100135
/// \brief Adapter for appending a list of `ManifestFile`s to an `ArrowArray`.

src/iceberg/manifest_entry.h

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,6 @@ ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
5757
}
5858
}
5959

60-
enum class ManifestContent {
61-
kData = 0,
62-
kDeletes = 1,
63-
};
64-
65-
ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent content) noexcept;
66-
ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(
67-
std::string_view str) noexcept;
68-
6960
/// \brief DataFile carries data file path, partition tuple, metrics, ...
7061
struct ICEBERG_EXPORT DataFile {
7162
/// \brief Content of a data file
@@ -315,6 +306,17 @@ struct ICEBERG_EXPORT ManifestEntry {
315306
inline static const SchemaField kFileSequenceNumber =
316307
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
317308

309+
/// \brief Check if this manifest entry is deleted.
310+
constexpr bool IsAlive() const {
311+
return status == ManifestStatus::kAdded || status == ManifestStatus::kExisting;
312+
}
313+
314+
/// \brief Create a copy of this manifest entry.
315+
ManifestEntry Copy() const {
316+
ManifestEntry copy = *this;
317+
return copy;
318+
}
319+
318320
bool operator==(const ManifestEntry& other) const;
319321

320322
static std::shared_ptr<StructType> TypeFromPartitionType(
@@ -323,6 +325,19 @@ struct ICEBERG_EXPORT ManifestEntry {
323325
std::shared_ptr<StructType> datafile_type);
324326
};
325327

328+
/// \brief Get the relative datafile content type name
329+
ICEBERG_EXPORT constexpr std::string_view ToString(DataFile::Content type) noexcept {
330+
switch (type) {
331+
case DataFile::Content::kData:
332+
return "data";
333+
case DataFile::Content::kPositionDeletes:
334+
return "position_deletes";
335+
case DataFile::Content::kEqualityDeletes:
336+
return "equality_deletes";
337+
}
338+
std::unreachable();
339+
}
340+
326341
/// \brief Get the relative data file content type from int
327342
ICEBERG_EXPORT constexpr Result<DataFile::Content> DataFileContentFromInt(
328343
int content) noexcept {

src/iceberg/manifest_list.h

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <optional>
2626
#include <string>
2727
#include <string_view>
28-
#include <utility>
2928

3029
#include "iceberg/iceberg_export.h"
3130
#include "iceberg/partition_spec.h"
@@ -72,17 +71,17 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
7271
static const StructType& Type();
7372
};
7473

74+
/// \brief The type of files tracked by the manifest, either data or delete files; 0 for
75+
/// all v1 manifests
76+
enum class ManifestContent {
77+
/// The manifest content is data.
78+
kData = 0,
79+
/// The manifest content is deletes.
80+
kDeletes = 1,
81+
};
82+
7583
/// \brief Entry in a manifest list.
7684
struct ICEBERG_EXPORT ManifestFile {
77-
/// \brief The type of files tracked by the manifest, either data or delete files; 0 for
78-
/// all v1 manifests
79-
enum class Content {
80-
/// The manifest content is data.
81-
kData = 0,
82-
/// The manifest content is deletes.
83-
kDeletes = 1,
84-
};
85-
8685
/// Field id: 500
8786
/// Location of the manifest file
8887
std::string manifest_path;
@@ -96,7 +95,7 @@ struct ICEBERG_EXPORT ManifestFile {
9695
/// Field id: 517
9796
/// The type of files tracked by the manifest, either data or delete files; 0 for all v1
9897
/// manifests
99-
Content content = Content::kData;
98+
ManifestContent content = ManifestContent::kData;
10099
/// Field id: 515
101100
/// The sequence number when the manifest was added to the table; use 0 when reading v1
102101
/// manifest lists
@@ -218,21 +217,20 @@ struct ICEBERG_EXPORT ManifestList {
218217
};
219218

220219
/// \brief Get the relative manifest content type name
221-
ICEBERG_EXPORT constexpr std::string_view ToString(ManifestFile::Content type) noexcept {
220+
ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent type) noexcept {
222221
switch (type) {
223-
case ManifestFile::Content::kData:
222+
case ManifestContent::kData:
224223
return "data";
225-
case ManifestFile::Content::kDeletes:
224+
case ManifestContent::kDeletes:
226225
return "deletes";
227226
}
228-
std::unreachable();
229227
}
230228

231229
/// \brief Get the relative manifest content type from name
232-
ICEBERG_EXPORT constexpr Result<ManifestFile::Content> ManifestFileContentFromString(
230+
ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(
233231
std::string_view str) noexcept {
234-
if (str == "data") return ManifestFile::Content::kData;
235-
if (str == "deletes") return ManifestFile::Content::kDeletes;
232+
if (str == "data") return ManifestContent::kData;
233+
if (str == "deletes") return ManifestContent::kDeletes;
236234
return InvalidArgument("Invalid manifest content type: {}", str);
237235
}
238236

0 commit comments

Comments
 (0)