Skip to content

Commit ab69e90

Browse files
committed
feat: adopt partition summary & add AddEntry variants
including AddEntry, AddExistingEntry, AddDeleteEntry
1 parent 7f7f85b commit ab69e90

24 files changed

+419
-74
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ set(ICEBERG_SOURCES
3939
name_mapping.cc
4040
partition_field.cc
4141
partition_spec.cc
42+
partition_summary_internal.cc
4243
row/arrow_array_wrapper.cc
4344
row/manifest_wrapper.cc
4445
schema.cc

src/iceberg/expression/literal.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,17 @@ bool Literal::IsAboveMax() const { return std::holds_alternative<AboveMax>(value
504504

505505
bool Literal::IsNull() const { return std::holds_alternative<std::monostate>(value_); }
506506

507+
bool Literal::IsNan() const {
508+
if (type_->type_id() == TypeId::kFloat) {
509+
auto val = std::get<float>(value_);
510+
return std::isnan(val);
511+
} else if (type_->type_id() == TypeId::kDouble) {
512+
auto val = std::get<double>(value_);
513+
return std::isnan(val);
514+
}
515+
return false;
516+
}
517+
507518
// LiteralCaster implementation
508519

509520
Result<Literal> LiteralCaster::CastTo(const Literal& literal,

src/iceberg/expression/literal.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,14 @@ class ICEBERG_EXPORT Literal : public util::Formattable {
150150
/// \return true if this literal represents a BelowMin value, false otherwise
151151
bool IsBelowMin() const;
152152

153-
/// Check if this literal is null.
153+
/// \brief Check if this literal is null.
154154
/// \return true if this literal is null, false otherwise
155155
bool IsNull() const;
156156

157+
/// \brief Check if this literal is NaN (Not a Number).
158+
/// \return true if this literal is NaN, false otherwise
159+
bool IsNan() const;
160+
157161
std::string ToString() const override;
158162

159163
private:

src/iceberg/manifest_adapter.cc

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "iceberg/arrow/nanoarrow_status_internal.h"
2727
#include "iceberg/manifest_entry.h"
2828
#include "iceberg/manifest_list.h"
29+
#include "iceberg/partition_summary_internal.h"
2930
#include "iceberg/result.h"
3031
#include "iceberg/schema.h"
3132
#include "iceberg/schema_internal.h"
@@ -161,6 +162,39 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
161162
}
162163
}
163164

165+
Status ManifestEntryAdapter::AddEntryInternal(const ManifestEntry& entry) {
166+
if (entry.data_file == nullptr) [[unlikely]] {
167+
return InvalidManifest("Missing required data_file field from manifest entry.");
168+
}
169+
switch (entry.status) {
170+
case ManifestStatus::kAdded:
171+
add_files_count_++;
172+
add_rows_count_ += entry.data_file->record_count;
173+
break;
174+
case ManifestStatus::kExisting:
175+
existing_files_count_++;
176+
existing_rows_count_ += entry.data_file->record_count;
177+
break;
178+
case ManifestStatus::kDeleted:
179+
delete_files_count_++;
180+
delete_rows_count_ += entry.data_file->record_count;
181+
break;
182+
default:
183+
std::unreachable();
184+
}
185+
186+
partition_summary_->Update(entry.data_file->partition);
187+
188+
if (entry.IsAlive() && entry.sequence_number.has_value()) {
189+
if (!min_data_sequence_number_.has_value() ||
190+
entry.sequence_number.value() < min_data_sequence_number_.value()) {
191+
min_data_sequence_number_ = entry.sequence_number.value();
192+
}
193+
}
194+
195+
return AppendInternal(entry);
196+
}
197+
164198
Status ManifestEntryAdapter::AppendPartitionValues(
165199
ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
166200
const std::vector<Literal>& partition_values) {

src/iceberg/manifest_adapter.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <memory>
2626
#include <optional>
2727
#include <unordered_map>
28-
#include <unordered_set>
2928
#include <vector>
3029

3130
#include "iceberg/arrow_c_data.h"
@@ -66,13 +65,16 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
6665

6766
~ManifestEntryAdapter() override;
6867

69-
virtual Status Append(const ManifestEntry& entry) = 0;
68+
virtual Status AddEntry(ManifestEntry& entry) = 0;
69+
virtual Status AddDeleteEntry(ManifestEntry& entry) = 0;
70+
virtual Status AddExistingEntry(ManifestEntry& entry) = 0;
7071

7172
const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
7273

7374
ManifestContent content() const { return content_; }
7475

7576
protected:
77+
Status AddEntryInternal(const ManifestEntry& entry);
7678
Status AppendInternal(const ManifestEntry& entry);
7779
Status AppendDataFile(ArrowArray* array,
7880
const std::shared_ptr<StructType>& data_file_type,
@@ -95,6 +97,14 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
9597
std::shared_ptr<Schema> current_schema_;
9698
std::shared_ptr<Schema> manifest_schema_;
9799
const ManifestContent content_;
100+
int32_t add_files_count_{0};
101+
int32_t existing_files_count_{0};
102+
int32_t delete_files_count_{0};
103+
int64_t add_rows_count_{0L};
104+
int64_t existing_rows_count_{0L};
105+
int64_t delete_rows_count_{0L};
106+
std::optional<int64_t> min_data_sequence_number_{std::nullopt};
107+
std::unique_ptr<PartitionSummary> partition_summary_;
98108
};
99109

100110
/// \brief Adapter for appending a list of `ManifestFile`s to an `ArrowArray`.

src/iceberg/manifest_entry.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,6 @@ ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
5757
}
5858
}
5959

60-
enum class ManifestContent {
61-
kData = 0,
62-
kDeletes = 1,
63-
};
64-
65-
ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent content) noexcept;
66-
ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(
67-
std::string_view str) noexcept;
68-
6960
/// \brief DataFile carries data file path, partition tuple, metrics, ...
7061
struct ICEBERG_EXPORT DataFile {
7162
/// \brief Content of a data file
@@ -315,6 +306,17 @@ struct ICEBERG_EXPORT ManifestEntry {
315306
inline static const SchemaField kFileSequenceNumber =
316307
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
317308

309+
/// \brief Check if this manifest entry is deleted.
310+
constexpr bool IsAlive() const {
311+
return status == ManifestStatus::kAdded || status == ManifestStatus::kExisting;
312+
}
313+
314+
/// \brief Create a copy of this manifest entry.
315+
ManifestEntry Copy() const {
316+
ManifestEntry copy = *this;
317+
return copy;
318+
}
319+
318320
bool operator==(const ManifestEntry& other) const;
319321

320322
static std::shared_ptr<StructType> TypeFromPartitionType(

src/iceberg/manifest_list.h

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <optional>
2626
#include <string>
2727
#include <string_view>
28-
#include <utility>
2928

3029
#include "iceberg/iceberg_export.h"
3130
#include "iceberg/partition_spec.h"
@@ -72,17 +71,17 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
7271
static const StructType& Type();
7372
};
7473

74+
/// \brief The type of files tracked by the manifest, either data or delete files; 0 for
75+
/// all v1 manifests
76+
enum class ManifestContent {
77+
/// The manifest content is data.
78+
kData = 0,
79+
/// The manifest content is deletes.
80+
kDeletes = 1,
81+
};
82+
7583
/// \brief Entry in a manifest list.
7684
struct ICEBERG_EXPORT ManifestFile {
77-
/// \brief The type of files tracked by the manifest, either data or delete files; 0 for
78-
/// all v1 manifests
79-
enum class Content {
80-
/// The manifest content is data.
81-
kData = 0,
82-
/// The manifest content is deletes.
83-
kDeletes = 1,
84-
};
85-
8685
/// Field id: 500
8786
/// Location of the manifest file
8887
std::string manifest_path;
@@ -96,7 +95,7 @@ struct ICEBERG_EXPORT ManifestFile {
9695
/// Field id: 517
9796
/// The type of files tracked by the manifest, either data or delete files; 0 for all v1
9897
/// manifests
99-
Content content = Content::kData;
98+
ManifestContent content = ManifestContent::kData;
10099
/// Field id: 515
101100
/// The sequence number when the manifest was added to the table; use 0 when reading v1
102101
/// manifest lists
@@ -218,21 +217,20 @@ struct ICEBERG_EXPORT ManifestList {
218217
};
219218

220219
/// \brief Get the relative manifest content type name
221-
ICEBERG_EXPORT constexpr std::string_view ToString(ManifestFile::Content type) noexcept {
220+
ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent type) noexcept {
222221
switch (type) {
223-
case ManifestFile::Content::kData:
222+
case ManifestContent::kData:
224223
return "data";
225-
case ManifestFile::Content::kDeletes:
224+
case ManifestContent::kDeletes:
226225
return "deletes";
227226
}
228-
std::unreachable();
229227
}
230228

231229
/// \brief Get the relative manifest content type from name
232-
ICEBERG_EXPORT constexpr Result<ManifestFile::Content> ManifestFileContentFromString(
230+
ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(
233231
std::string_view str) noexcept {
234-
if (str == "data") return ManifestFile::Content::kData;
235-
if (str == "deletes") return ManifestFile::Content::kDeletes;
232+
if (str == "data") return ManifestContent::kData;
233+
if (str == "deletes") return ManifestContent::kDeletes;
236234
return InvalidArgument("Invalid manifest content type: {}", str);
237235
}
238236

src/iceberg/manifest_reader_internal.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
237237
break;
238238
case ManifestFileField::kContent:
239239
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
240-
ManifestFile::Content);
240+
ManifestContent);
241241
break;
242242
case ManifestFileField::kSequenceNumber:
243243
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, view_of_column,

src/iceberg/manifest_writer.cc

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,39 @@
2929

3030
namespace iceberg {
3131

32-
Status ManifestWriter::Add(const ManifestEntry& entry) {
32+
Status ManifestWriter::AddEntry(const ManifestEntry& entry) {
3333
if (adapter_->size() >= kBatchSize) {
3434
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
3535
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
3636
ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending());
3737
}
38-
return adapter_->Append(entry);
38+
auto copy = entry.Copy();
39+
return adapter_->AddEntry(copy);
40+
}
41+
42+
Status ManifestWriter::AddDelete(const ManifestEntry& entry) {
43+
if (adapter_->size() >= kBatchSize) {
44+
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
45+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
46+
ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending());
47+
}
48+
auto copy = entry.Copy();
49+
return adapter_->AddDeleteEntry(copy);
50+
}
51+
52+
Status ManifestWriter::AddExisting(const ManifestEntry& entry) {
53+
if (adapter_->size() >= kBatchSize) {
54+
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
55+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
56+
ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending());
57+
}
58+
auto copy = entry.Copy();
59+
return adapter_->AddExistingEntry(copy);
3960
}
4061

4162
Status ManifestWriter::AddAll(const std::vector<ManifestEntry>& entries) {
4263
for (const auto& entry : entries) {
43-
ICEBERG_RETURN_UNEXPECTED(Add(entry));
64+
ICEBERG_RETURN_UNEXPECTED(AddEntry(entry));
4465
}
4566
return {};
4667
}
@@ -53,6 +74,8 @@ Status ManifestWriter::Close() {
5374
return writer_->Close();
5475
}
5576

77+
std::optional<Metrics> ManifestWriter::Metrics() const { return writer_->metrics(); }
78+
5679
ManifestContent ManifestWriter::content() const { return adapter_->content(); }
5780

5881
Result<std::unique_ptr<Writer>> OpenFileWriter(

src/iceberg/manifest_writer.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "iceberg/file_writer.h"
2929
#include "iceberg/iceberg_export.h"
3030
#include "iceberg/manifest_adapter.h"
31+
#include "iceberg/metrics.h"
3132
#include "iceberg/type_fwd.h"
3233

3334
namespace iceberg {
@@ -41,10 +42,20 @@ class ICEBERG_EXPORT ManifestWriter {
4142

4243
~ManifestWriter() = default;
4344

44-
/// \brief Write manifest entry to file.
45+
/// \brief Add a new Manifest entry.
4546
/// \param entry Manifest entry to write.
4647
/// \return Status::OK() if entry was written successfully
47-
Status Add(const ManifestEntry& entry);
48+
Status AddEntry(const ManifestEntry& entry);
49+
50+
/// \brief Add an existing Manifest entry.
51+
/// \param entry Manifest entry to write.
52+
/// \return Status::OK() if entry was written successfully
53+
Status AddExisting(const ManifestEntry& entry);
54+
55+
/// \brief Add a delete Manifest entry.
56+
/// \param entry Manifest entry to write.
57+
/// \return Status::OK() if entry was written successfully
58+
Status AddDelete(const ManifestEntry& entry);
4859

4960
/// \brief Write manifest entries to file.
5061
/// \param entries Manifest entries to write.
@@ -54,6 +65,9 @@ class ICEBERG_EXPORT ManifestWriter {
5465
/// \brief Close writer and flush to storage.
5566
Status Close();
5667

68+
/// \brief Get the metrics of written manifest file.
69+
std::optional<Metrics> Metrics() const;
70+
5771
/// \brief Get the content of the manifest.
5872
ManifestContent content() const;
5973

0 commit comments

Comments
 (0)