Skip to content

Commit c142ef9

Browse files
authored
feat: add partition summary and write added/existing/deleted entries to manifest writer (#317)
1 parent de7f1dc commit c142ef9

38 files changed

+1372
-197
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717

1818
build/
19+
cmake-build/
1920
cmake-build-debug/
2021
cmake-build-release/
2122
.DS_Store

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ set(ICEBERG_SOURCES
4242
name_mapping.cc
4343
partition_field.cc
4444
partition_spec.cc
45+
partition_summary.cc
4546
row/arrow_array_wrapper.cc
4647
row/manifest_wrapper.cc
4748
row/struct_like.cc

src/iceberg/avro/avro_writer.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,19 +154,19 @@ Status AvroWriter::Close() {
154154
return {};
155155
}
156156

157-
std::optional<Metrics> AvroWriter::metrics() {
157+
Result<Metrics> AvroWriter::metrics() {
158158
if (impl_->Closed()) {
159159
// TODO(xiao.dong) implement metrics
160160
return {};
161161
}
162-
return std::nullopt;
162+
return Invalid("AvroWriter is not closed");
163163
}
164164

165-
std::optional<int64_t> AvroWriter::length() {
165+
Result<int64_t> AvroWriter::length() {
166166
if (impl_->Closed()) {
167167
return impl_->length();
168168
}
169-
return std::nullopt;
169+
return Invalid("AvroWriter is not closed");
170170
}
171171

172172
std::vector<int64_t> AvroWriter::split_offsets() { return {}; }

src/iceberg/avro/avro_writer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {
3737

3838
Status Write(ArrowArray* data) final;
3939

40-
std::optional<Metrics> metrics() final;
40+
Result<Metrics> metrics() final;
4141

42-
std::optional<int64_t> length() final;
42+
Result<int64_t> length() final;
4343

4444
std::vector<int64_t> split_offsets() final;
4545

src/iceberg/expression/literal.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,11 @@ class ICEBERG_EXPORT Literal : public util::Formattable {
150150
/// \return true if this literal represents a BelowMin value, false otherwise
151151
bool IsBelowMin() const;
152152

153-
/// Check if this literal is null.
153+
/// \brief Check if this literal is null.
154154
/// \return true if this literal is null, false otherwise
155155
bool IsNull() const;
156156

157-
/// Check if this literal is NaN.
157+
/// \brief Check if this literal is NaN.
158158
/// \return true if this literal is NaN, false otherwise
159159
bool IsNaN() const;
160160

src/iceberg/file_writer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,11 @@ class ICEBERG_EXPORT Writer {
9797

9898
/// \brief Get the file statistics.
9999
/// Only valid after the file is closed.
100-
virtual std::optional<Metrics> metrics() = 0;
100+
virtual Result<Metrics> metrics() = 0;
101101

102102
/// \brief Get the file length.
103103
/// Only valid after the file is closed.
104-
virtual std::optional<int64_t> length() = 0;
104+
virtual Result<int64_t> length() = 0;
105105

106106
/// \brief Returns a list of recommended split locations, if applicable, empty
107107
/// otherwise. When available, this information is used for planning scan tasks whose

src/iceberg/manifest_adapter.cc

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "iceberg/manifest_adapter.h"
2121

22+
#include <memory>
2223
#include <utility>
2324

2425
#include <nanoarrow/nanoarrow.h>
@@ -28,7 +29,6 @@
2829
#include "iceberg/manifest_list.h"
2930
#include "iceberg/result.h"
3031
#include "iceberg/schema.h"
31-
#include "iceberg/schema_internal.h"
3232
#include "iceberg/util/checked_cast.h"
3333
#include "iceberg/util/macros.h"
3434

@@ -141,10 +141,12 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
141141
return &array_;
142142
}
143143

144-
ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
144+
ManifestEntryAdapter::ManifestEntryAdapter(std::optional<int64_t> snapshot_id_,
145+
std::shared_ptr<PartitionSpec> partition_spec,
145146
std::shared_ptr<Schema> current_schema,
146147
ManifestContent content)
147-
: partition_spec_(std::move(partition_spec)),
148+
: snapshot_id_(snapshot_id_),
149+
partition_spec_(std::move(partition_spec)),
148150
current_schema_(std::move(current_schema)),
149151
content_(content) {
150152
if (!partition_spec_) {
@@ -386,6 +388,10 @@ Result<std::optional<int64_t>> ManifestEntryAdapter::GetContentSizeInBytes(
386388
}
387389

388390
Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
391+
if (entry.data_file == nullptr) [[unlikely]] {
392+
return InvalidManifest("Missing required data_file field from manifest entry.");
393+
}
394+
389395
const auto& fields = manifest_schema_->fields();
390396
for (size_t i = 0; i < fields.size(); i++) {
391397
const auto& field = fields[i];

src/iceberg/manifest_adapter.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <memory>
2626
#include <optional>
2727
#include <unordered_map>
28-
#include <unordered_set>
2928
#include <vector>
3029

3130
#include "iceberg/arrow_c_data.h"
@@ -61,7 +60,8 @@ class ICEBERG_EXPORT ManifestAdapter {
6160
/// Implemented by different versions with version-specific schemas.
6261
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
6362
public:
64-
ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
63+
ManifestEntryAdapter(std::optional<int64_t> snapshot_id_,
64+
std::shared_ptr<PartitionSpec> partition_spec,
6565
std::shared_ptr<Schema> current_schema, ManifestContent content);
6666

6767
~ManifestEntryAdapter() override;
@@ -72,6 +72,12 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
7272

7373
ManifestContent content() const { return content_; }
7474

75+
std::optional<int64_t> snapshot_id() const { return snapshot_id_; }
76+
77+
const std::shared_ptr<PartitionSpec>& partition_spec() const { return partition_spec_; }
78+
79+
const std::shared_ptr<StructType>& partition_type() const { return partition_type_; }
80+
7581
protected:
7682
Status AppendInternal(const ManifestEntry& entry);
7783
Status AppendDataFile(ArrowArray* array,
@@ -91,8 +97,10 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
9197
const DataFile& file) const;
9298

9399
protected:
100+
std::optional<int64_t> snapshot_id_;
94101
std::shared_ptr<PartitionSpec> partition_spec_;
95102
std::shared_ptr<Schema> current_schema_;
103+
std::shared_ptr<StructType> partition_type_;
96104
std::shared_ptr<Schema> manifest_schema_;
97105
const ManifestContent content_;
98106
};

src/iceberg/manifest_entry.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include <memory>
2323
#include <vector>
2424

25-
#include "iceberg/schema.h"
2625
#include "iceberg/schema_field.h"
2726
#include "iceberg/type.h"
2827

src/iceberg/manifest_entry.h

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,6 @@ ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
5757
}
5858
}
5959

60-
enum class ManifestContent {
61-
kData = 0,
62-
kDeletes = 1,
63-
};
64-
65-
ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent content) noexcept;
66-
ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(
67-
std::string_view str) noexcept;
68-
6960
/// \brief DataFile carries data file path, partition tuple, metrics, ...
7061
struct ICEBERG_EXPORT DataFile {
7162
/// \brief Content of a data file
@@ -277,6 +268,7 @@ struct ICEBERG_EXPORT DataFile {
277268

278269
bool operator==(const DataFile& other) const = default;
279270

271+
/// \brief Get the schema of the data file with the given partition type.
280272
static std::shared_ptr<StructType> Type(std::shared_ptr<StructType> partition_type);
281273
};
282274

@@ -315,6 +307,33 @@ struct ICEBERG_EXPORT ManifestEntry {
315307
inline static const SchemaField kFileSequenceNumber =
316308
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
317309

310+
/// \brief Check if this manifest entry is deleted.
311+
constexpr bool IsAlive() const {
312+
return status == ManifestStatus::kAdded || status == ManifestStatus::kExisting;
313+
}
314+
315+
ManifestEntry AsAdded() const {
316+
ManifestEntry copy = *this;
317+
copy.status = ManifestStatus::kAdded;
318+
if (copy.data_file->first_row_id.has_value()) {
319+
copy.data_file = std::make_unique<DataFile>(*copy.data_file);
320+
copy.data_file->first_row_id = std::nullopt;
321+
}
322+
return copy;
323+
}
324+
325+
ManifestEntry AsExisting() const {
326+
ManifestEntry copy = *this;
327+
copy.status = ManifestStatus::kExisting;
328+
return copy;
329+
}
330+
331+
ManifestEntry AsDeleted() const {
332+
ManifestEntry copy = *this;
333+
copy.status = ManifestStatus::kDeleted;
334+
return copy;
335+
}
336+
318337
bool operator==(const ManifestEntry& other) const;
319338

320339
static std::shared_ptr<StructType> TypeFromPartitionType(
@@ -323,6 +342,19 @@ struct ICEBERG_EXPORT ManifestEntry {
323342
std::shared_ptr<StructType> datafile_type);
324343
};
325344

345+
/// \brief Get the relative datafile content type name
346+
ICEBERG_EXPORT constexpr std::string_view ToString(DataFile::Content type) noexcept {
347+
switch (type) {
348+
case DataFile::Content::kData:
349+
return "data";
350+
case DataFile::Content::kPositionDeletes:
351+
return "position_deletes";
352+
case DataFile::Content::kEqualityDeletes:
353+
return "equality_deletes";
354+
}
355+
std::unreachable();
356+
}
357+
326358
/// \brief Get the relative data file content type from int
327359
ICEBERG_EXPORT constexpr Result<DataFile::Content> DataFileContentFromInt(
328360
int content) noexcept {

0 commit comments

Comments
 (0)