Skip to content

Commit c9d7867

Browse files
authored
feat: implement manifest entry metadata inheritance (apache#178)
1 parent 1485ba8 commit c9d7867

10 files changed

+378
-78
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ set(ICEBERG_SOURCES
2424
expression/literal.cc
2525
file_reader.cc
2626
file_writer.cc
27+
inheritable_metadata.cc
2728
json_internal.cc
2829
manifest_entry.cc
2930
manifest_list.cc
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/inheritable_metadata.h"
21+
22+
#include <cassert>
23+
#include <utility>
24+
25+
#include <iceberg/result.h>
26+
27+
#include "iceberg/manifest_entry.h"
28+
#include "iceberg/manifest_list.h"
29+
#include "iceberg/snapshot.h"
30+
31+
namespace iceberg {
32+
33+
BaseInheritableMetadata::BaseInheritableMetadata(int32_t spec_id, int64_t snapshot_id,
34+
int64_t sequence_number,
35+
std::string manifest_location)
36+
: spec_id_(spec_id),
37+
snapshot_id_(snapshot_id),
38+
sequence_number_(sequence_number),
39+
manifest_location_(std::move(manifest_location)) {}
40+
41+
Status BaseInheritableMetadata::Apply(ManifestEntry& entry) {
42+
if (!entry.snapshot_id.has_value()) {
43+
entry.snapshot_id = snapshot_id_;
44+
}
45+
46+
// In v1 metadata, the data sequence number is not persisted and can be safely defaulted
47+
// to 0.
48+
// In v2 metadata, the data sequence number should be inherited iff the entry status
49+
// is ADDED.
50+
if (!entry.sequence_number.has_value() &&
51+
(sequence_number_ == 0 || entry.status == ManifestStatus::kAdded)) {
52+
entry.sequence_number = sequence_number_;
53+
}
54+
55+
// In v1 metadata, the file sequence number is not persisted and can be safely defaulted
56+
// to 0.
57+
// In v2 metadata, the file sequence number should be inherited iff the entry status
58+
// is ADDED.
59+
if (!entry.file_sequence_number.has_value() &&
60+
(sequence_number_ == 0 || entry.status == ManifestStatus::kAdded)) {
61+
entry.file_sequence_number = sequence_number_;
62+
}
63+
64+
if (entry.data_file) {
65+
entry.data_file->partition_spec_id = spec_id_;
66+
} else {
67+
return InvalidManifest("Manifest entry has no data file");
68+
}
69+
70+
return {};
71+
}
72+
73+
Status EmptyInheritableMetadata::Apply(ManifestEntry& entry) {
74+
if (!entry.snapshot_id.has_value()) {
75+
return InvalidManifest(
76+
"Entries must have explicit snapshot ids if inherited metadata is empty");
77+
}
78+
return {};
79+
}
80+
81+
CopyInheritableMetadata::CopyInheritableMetadata(int64_t snapshot_id)
82+
: snapshot_id_(snapshot_id) {}
83+
84+
Status CopyInheritableMetadata::Apply(ManifestEntry& entry) {
85+
entry.snapshot_id = snapshot_id_;
86+
return {};
87+
}
88+
89+
Result<std::unique_ptr<InheritableMetadata>> InheritableMetadataFactory::Empty() {
90+
return std::make_unique<EmptyInheritableMetadata>();
91+
}
92+
93+
Result<std::unique_ptr<InheritableMetadata>> InheritableMetadataFactory::FromManifest(
94+
const ManifestFile& manifest) {
95+
// Validate that the manifest has a snapshot ID assigned
96+
if (manifest.added_snapshot_id == Snapshot::kInvalidSnapshotId) {
97+
return InvalidManifest("Manifest file {} has no snapshot ID", manifest.manifest_path);
98+
}
99+
100+
return std::make_unique<BaseInheritableMetadata>(
101+
manifest.partition_spec_id, manifest.added_snapshot_id, manifest.sequence_number,
102+
manifest.manifest_path);
103+
}
104+
105+
Result<std::unique_ptr<InheritableMetadata>> InheritableMetadataFactory::ForCopy(
106+
int64_t snapshot_id) {
107+
return std::make_unique<CopyInheritableMetadata>(snapshot_id);
108+
}
109+
110+
} // namespace iceberg

src/iceberg/inheritable_metadata.h

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/inheritable_metadata.h
23+
/// Metadata inheritance system for manifest entries.
24+
25+
#include <cstdint>
26+
#include <memory>
27+
#include <string>
28+
29+
#include "iceberg/iceberg_export.h"
30+
#include "iceberg/result.h"
31+
#include "iceberg/type_fwd.h"
32+
33+
namespace iceberg {
34+
35+
/// \brief Interface for applying inheritable metadata to manifest entries.
36+
///
37+
/// When manifest entries have null values for certain fields (snapshot id,
38+
/// data sequence number, file sequence number), these values should be inherited
39+
/// from the manifest file. This interface provides a way to apply such inheritance rules.
40+
class ICEBERG_EXPORT InheritableMetadata {
41+
public:
42+
virtual ~InheritableMetadata() = default;
43+
44+
/// \brief Apply inheritable metadata to a manifest entry.
45+
/// \param entry The manifest entry to modify.
46+
/// \return Status indicating success or failure.
47+
virtual Status Apply(ManifestEntry& entry) = 0;
48+
};
49+
50+
/// \brief Base implementation of InheritableMetadata that handles standard inheritance
51+
/// rules.
52+
class ICEBERG_EXPORT BaseInheritableMetadata : public InheritableMetadata {
53+
public:
54+
/// \brief Constructor for base inheritable metadata.
55+
/// \param spec_id Partition spec ID from the manifest.
56+
/// \param snapshot_id Snapshot ID from the manifest.
57+
/// \param sequence_number Sequence number from the manifest.
58+
/// \param manifest_location Path to the manifest file.
59+
BaseInheritableMetadata(int32_t spec_id, int64_t snapshot_id, int64_t sequence_number,
60+
std::string manifest_location);
61+
62+
Status Apply(ManifestEntry& entry) override;
63+
64+
private:
65+
int32_t spec_id_;
66+
int64_t snapshot_id_;
67+
int64_t sequence_number_;
68+
std::string manifest_location_;
69+
};
70+
71+
/// \brief Empty implementation that applies no inheritance.
72+
class ICEBERG_EXPORT EmptyInheritableMetadata : public InheritableMetadata {
73+
public:
74+
Status Apply(ManifestEntry& entry) override;
75+
};
76+
77+
/// \brief Metadata inheritance for copying manifests before commit.
78+
class ICEBERG_EXPORT CopyInheritableMetadata : public InheritableMetadata {
79+
public:
80+
/// \brief Constructor for copy metadata.
81+
/// \param snapshot_id The snapshot ID to use for copying.
82+
explicit CopyInheritableMetadata(int64_t snapshot_id);
83+
84+
Status Apply(ManifestEntry& entry) override;
85+
86+
private:
87+
int64_t snapshot_id_;
88+
};
89+
90+
/// \brief Factory for creating InheritableMetadata instances.
91+
class ICEBERG_EXPORT InheritableMetadataFactory {
92+
public:
93+
/// \brief Create an empty metadata instance that applies no inheritance.
94+
static Result<std::unique_ptr<InheritableMetadata>> Empty();
95+
96+
/// \brief Create metadata instance from a manifest file.
97+
/// \param manifest The manifest file to extract metadata from.
98+
/// \return Inheritable metadata based on the manifest.
99+
static Result<std::unique_ptr<InheritableMetadata>> FromManifest(
100+
const ManifestFile& manifest);
101+
102+
/// \brief Create metadata instance for rewriting a manifest before commit.
103+
/// \param snapshot_id The snapshot ID for the copy operation.
104+
/// \return Inheritable metadata for copying.
105+
static Result<std::unique_ptr<InheritableMetadata>> ForCopy(int64_t snapshot_id);
106+
107+
private:
108+
InheritableMetadataFactory() = default;
109+
};
110+
111+
} // namespace iceberg

src/iceberg/manifest_reader.cc

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,33 @@
2323
#include "iceberg/manifest_list.h"
2424
#include "iceberg/manifest_reader_internal.h"
2525
#include "iceberg/schema.h"
26+
#include "iceberg/schema_internal.h"
2627
#include "iceberg/util/macros.h"
2728

2829
namespace iceberg {
2930

30-
Result<std::unique_ptr<ManifestReader>> ManifestReader::MakeReader(
31+
Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
32+
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
33+
std::shared_ptr<Schema> partition_schema) {
34+
auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema);
35+
std::shared_ptr<Schema> schema =
36+
FromStructType(std::move(*manifest_entry_schema), std::nullopt);
37+
38+
ICEBERG_ASSIGN_OR_RAISE(auto reader,
39+
ReaderFactoryRegistry::Open(FileFormatType::kAvro,
40+
{.path = manifest.manifest_path,
41+
.length = manifest.manifest_length,
42+
.io = std::move(file_io),
43+
.projection = schema}));
44+
// Create inheritable metadata for this manifest
45+
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
46+
InheritableMetadataFactory::FromManifest(manifest));
47+
48+
return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema),
49+
std::move(inheritable_metadata));
50+
}
51+
52+
Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
3153
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
3254
std::shared_ptr<Schema> partition_schema) {
3355
auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema);
@@ -39,10 +61,12 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::MakeReader(
3961
{.path = std::string(manifest_location),
4062
.io = std::move(file_io),
4163
.projection = schema}));
42-
return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema));
64+
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, InheritableMetadataFactory::Empty());
65+
return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema),
66+
std::move(inheritable_metadata));
4367
}
4468

45-
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::MakeReader(
69+
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
4670
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
4771
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
4872
ManifestFile::Type().fields().end());

src/iceberg/manifest_reader.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
#include <memory>
2626
#include <vector>
2727

28-
#include "iceberg/file_reader.h"
2928
#include "iceberg/iceberg_export.h"
29+
#include "iceberg/result.h"
3030
#include "iceberg/type_fwd.h"
3131

3232
namespace iceberg {
@@ -37,11 +37,21 @@ class ICEBERG_EXPORT ManifestReader {
3737
virtual ~ManifestReader() = default;
3838
virtual Result<std::vector<ManifestEntry>> Entries() const = 0;
3939

40+
/// \brief Creates a reader for a manifest file.
41+
/// \param manifest A ManifestFile object containing metadata about the manifest.
42+
/// \param file_io File IO implementation to use.
43+
/// \param partition_schema Schema for the partition.
44+
/// \return A Result containing the reader or an error.
45+
static Result<std::unique_ptr<ManifestReader>> Make(
46+
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
47+
std::shared_ptr<Schema> partition_schema);
48+
4049
/// \brief Creates a reader for a manifest file.
4150
/// \param manifest_location Path to the manifest file.
4251
/// \param file_io File IO implementation to use.
52+
/// \param partition_schema Schema for the partition.
4353
/// \return A Result containing the reader or an error.
44-
static Result<std::unique_ptr<ManifestReader>> MakeReader(
54+
static Result<std::unique_ptr<ManifestReader>> Make(
4555
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
4656
std::shared_ptr<Schema> partition_schema);
4757
};
@@ -56,7 +66,7 @@ class ICEBERG_EXPORT ManifestListReader {
5666
/// \param manifest_list_location Path to the manifest list file.
5767
/// \param file_io File IO implementation to use.
5868
/// \return A Result containing the reader or an error.
59-
static Result<std::unique_ptr<ManifestListReader>> MakeReader(
69+
static Result<std::unique_ptr<ManifestListReader>> Make(
6070
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);
6171
};
6272

src/iceberg/manifest_reader_internal.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
#include "manifest_reader_internal.h"
2121

22-
#include <array>
23-
2422
#include <nanoarrow/nanoarrow.h>
2523

2624
#include "iceberg/arrow_c_data_guard_internal.h"
@@ -543,6 +541,12 @@ Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() const {
543541
break;
544542
}
545543
}
544+
545+
// Apply inheritance to all entries
546+
for (auto& entry : manifest_entries) {
547+
ICEBERG_RETURN_UNEXPECTED(inheritable_metadata_->Apply(entry));
548+
}
549+
546550
return manifest_entries;
547551
}
548552

src/iceberg/manifest_reader_internal.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
/// \file iceberg/internal/manifest_reader_internal.h
2323
/// Reader implementation for manifest list files and manifest files.
2424

25+
#include "iceberg/file_reader.h"
26+
#include "iceberg/inheritable_metadata.h"
2527
#include "iceberg/manifest_reader.h"
2628

2729
namespace iceberg {
@@ -30,14 +32,18 @@ namespace iceberg {
3032
class ManifestReaderImpl : public ManifestReader {
3133
public:
3234
explicit ManifestReaderImpl(std::unique_ptr<Reader> reader,
33-
std::shared_ptr<Schema> schema)
34-
: schema_(std::move(schema)), reader_(std::move(reader)) {}
35+
std::shared_ptr<Schema> schema,
36+
std::unique_ptr<InheritableMetadata> inheritable_metadata)
37+
: schema_(std::move(schema)),
38+
reader_(std::move(reader)),
39+
inheritable_metadata_(std::move(inheritable_metadata)) {}
3540

3641
Result<std::vector<ManifestEntry>> Entries() const override;
3742

3843
private:
3944
std::shared_ptr<Schema> schema_;
4045
std::unique_ptr<Reader> reader_;
46+
std::unique_ptr<InheritableMetadata> inheritable_metadata_;
4147
};
4248

4349
/// \brief Read manifest files from a manifest list file.

src/iceberg/table_scan.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,17 @@ DataTableScan::DataTableScan(TableScanContext context, std::shared_ptr<FileIO> f
147147
Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() const {
148148
ICEBERG_ASSIGN_OR_RAISE(
149149
auto manifest_list_reader,
150-
ManifestListReader::MakeReader(context_.snapshot->manifest_list, file_io_));
150+
ManifestListReader::Make(context_.snapshot->manifest_list, file_io_));
151151
ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files());
152152

153153
std::vector<std::shared_ptr<FileScanTask>> tasks;
154154
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec, context_.table_metadata->PartitionSpec());
155155
auto partition_schema = partition_spec->schema();
156156

157157
for (const auto& manifest_file : manifest_files) {
158-
ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader,
159-
ManifestReader::MakeReader(manifest_file.manifest_path,
160-
file_io_, partition_schema));
158+
ICEBERG_ASSIGN_OR_RAISE(
159+
auto manifest_reader,
160+
ManifestReader::Make(manifest_file, file_io_, partition_schema));
161161
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());
162162

163163
// TODO(gty404): filter manifests using partition spec and filter expression

0 commit comments

Comments
 (0)