Skip to content
Merged
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ set(ICEBERG_SOURCES
expression/literal.cc
file_reader.cc
file_writer.cc
inheritable_metadata.cc
json_internal.cc
manifest_entry.cc
manifest_list.cc
Expand Down
110 changes: 110 additions & 0 deletions src/iceberg/inheritable_metadata.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/inheritable_metadata.h"

#include <cassert>
#include <utility>

#include <iceberg/result.h>

#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/snapshot.h"

namespace iceberg {

BaseInheritableMetadata::BaseInheritableMetadata(int32_t spec_id, int64_t snapshot_id,
int64_t sequence_number,
std::string manifest_location)
: spec_id_(spec_id),
snapshot_id_(snapshot_id),
sequence_number_(sequence_number),
manifest_location_(std::move(manifest_location)) {}

Status BaseInheritableMetadata::Apply(ManifestEntry& entry) {
if (!entry.snapshot_id.has_value()) {
entry.snapshot_id = snapshot_id_;
}

// In v1 metadata, the data sequence number is not persisted and can be safely defaulted
// to 0.
// In v2 metadata, the data sequence number should be inherited iff the entry status
// is ADDED.
if (!entry.sequence_number.has_value() &&
(sequence_number_ == 0 || entry.status == ManifestStatus::kAdded)) {
entry.sequence_number = sequence_number_;
}

// In v1 metadata, the file sequence number is not persisted and can be safely defaulted
// to 0.
// In v2 metadata, the file sequence number should be inherited iff the entry status
// is ADDED.
if (!entry.file_sequence_number.has_value() &&
(sequence_number_ == 0 || entry.status == ManifestStatus::kAdded)) {
entry.file_sequence_number = sequence_number_;
}

if (entry.data_file) {
entry.data_file->partition_spec_id = spec_id_;
} else {
return InvalidManifest("Manifest entry has no data file");
}

return {};
}

Status EmptyInheritableMetadata::Apply(ManifestEntry& entry) {
if (!entry.snapshot_id.has_value()) {
return InvalidManifest(
"Entries must have explicit snapshot ids if inherited metadata is empty");
}
return {};
}

CopyInheritableMetadata::CopyInheritableMetadata(int64_t snapshot_id)
: snapshot_id_(snapshot_id) {}

Status CopyInheritableMetadata::Apply(ManifestEntry& entry) {
entry.snapshot_id = snapshot_id_;
return {};
}

Result<std::unique_ptr<InheritableMetadata>> InheritableMetadataFactory::Empty() {
return std::make_unique<EmptyInheritableMetadata>();
}

Result<std::unique_ptr<InheritableMetadata>> InheritableMetadataFactory::FromManifest(
const ManifestFile& manifest) {
// Validate that the manifest has a snapshot ID assigned
if (manifest.added_snapshot_id == Snapshot::kInvalidSnapshotId) {
return InvalidManifest("Manifest file {} has no snapshot ID", manifest.manifest_path);
}

return std::make_unique<BaseInheritableMetadata>(
manifest.partition_spec_id, manifest.added_snapshot_id, manifest.sequence_number,
manifest.manifest_path);
}

Result<std::unique_ptr<InheritableMetadata>> InheritableMetadataFactory::ForCopy(
int64_t snapshot_id) {
return std::make_unique<CopyInheritableMetadata>(snapshot_id);
}

} // namespace iceberg
111 changes: 111 additions & 0 deletions src/iceberg/inheritable_metadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/inheritable_metadata.h
/// Metadata inheritance system for manifest entries.

#include <cstdint>
#include <memory>
#include <string>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Interface for applying inheritable metadata to manifest entries.
///
/// When manifest entries have null values for certain fields (snapshot id,
/// data sequence number, file sequence number), these values should be inherited
/// from the manifest file. This interface provides a way to apply such inheritance rules.
class ICEBERG_EXPORT InheritableMetadata {
public:
virtual ~InheritableMetadata() = default;

/// \brief Apply inheritable metadata to a manifest entry.
/// \param entry The manifest entry to modify.
/// \return Status indicating success or failure.
virtual Status Apply(ManifestEntry& entry) = 0;
};

/// \brief Base implementation of InheritableMetadata that handles standard inheritance
/// rules.
class ICEBERG_EXPORT BaseInheritableMetadata : public InheritableMetadata {
public:
/// \brief Constructor for base inheritable metadata.
/// \param spec_id Partition spec ID from the manifest.
/// \param snapshot_id Snapshot ID from the manifest.
/// \param sequence_number Sequence number from the manifest.
/// \param manifest_location Path to the manifest file.
BaseInheritableMetadata(int32_t spec_id, int64_t snapshot_id, int64_t sequence_number,
std::string manifest_location);

Status Apply(ManifestEntry& entry) override;

private:
int32_t spec_id_;
int64_t snapshot_id_;
int64_t sequence_number_;
std::string manifest_location_;
};

/// \brief Empty implementation that applies no inheritance.
class ICEBERG_EXPORT EmptyInheritableMetadata : public InheritableMetadata {
public:
Status Apply(ManifestEntry& entry) override;
};

/// \brief Metadata inheritance for copying manifests before commit.
class ICEBERG_EXPORT CopyInheritableMetadata : public InheritableMetadata {
public:
/// \brief Constructor for copy metadata.
/// \param snapshot_id The snapshot ID to use for copying.
explicit CopyInheritableMetadata(int64_t snapshot_id);

Status Apply(ManifestEntry& entry) override;

private:
int64_t snapshot_id_;
};

/// \brief Factory for creating InheritableMetadata instances.
class ICEBERG_EXPORT InheritableMetadataFactory {
public:
/// \brief Create an empty metadata instance that applies no inheritance.
static Result<std::unique_ptr<InheritableMetadata>> Empty();

/// \brief Create metadata instance from a manifest file.
/// \param manifest The manifest file to extract metadata from.
/// \return Inheritable metadata based on the manifest.
static Result<std::unique_ptr<InheritableMetadata>> FromManifest(
const ManifestFile& manifest);

/// \brief Create metadata instance for rewriting a manifest before commit.
/// \param snapshot_id The snapshot ID for the copy operation.
/// \return Inheritable metadata for copying.
static Result<std::unique_ptr<InheritableMetadata>> ForCopy(int64_t snapshot_id);

private:
InheritableMetadataFactory() = default;
};

} // namespace iceberg
30 changes: 27 additions & 3 deletions src/iceberg/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,33 @@
#include "iceberg/manifest_list.h"
#include "iceberg/manifest_reader_internal.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg {

Result<std::unique_ptr<ManifestReader>> ManifestReader::MakeReader(
Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> partition_schema) {
auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema);
std::shared_ptr<Schema> schema =
FromStructType(std::move(*manifest_entry_schema), std::nullopt);

ICEBERG_ASSIGN_OR_RAISE(auto reader,
ReaderFactoryRegistry::Open(FileFormatType::kAvro,
{.path = manifest.manifest_path,
.length = manifest.manifest_length,
.io = std::move(file_io),
.projection = schema}));
// Create inheritable metadata for this manifest
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
InheritableMetadataFactory::FromManifest(manifest));

return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema),
std::move(inheritable_metadata));
}

Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> partition_schema) {
auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema);
Expand All @@ -39,10 +61,12 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::MakeReader(
{.path = std::string(manifest_location),
.io = std::move(file_io),
.projection = schema}));
return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema));
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, InheritableMetadataFactory::Empty());
return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema),
std::move(inheritable_metadata));
}

Result<std::unique_ptr<ManifestListReader>> ManifestListReader::MakeReader(
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
ManifestFile::Type().fields().end());
Expand Down
16 changes: 13 additions & 3 deletions src/iceberg/manifest_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
#include <memory>
#include <vector>

#include "iceberg/file_reader.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {
Expand All @@ -37,11 +37,21 @@ class ICEBERG_EXPORT ManifestReader {
virtual ~ManifestReader() = default;
virtual Result<std::vector<ManifestEntry>> Entries() const = 0;

/// \brief Creates a reader for a manifest file.
/// \param manifest A ManifestFile object containing metadata about the manifest.
/// \param file_io File IO implementation to use.
/// \param partition_schema Schema for the partition.
/// \return A Result containing the reader or an error.
static Result<std::unique_ptr<ManifestReader>> Make(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> partition_schema);

/// \brief Creates a reader for a manifest file.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_schema Schema for the partition.
/// \return A Result containing the reader or an error.
static Result<std::unique_ptr<ManifestReader>> MakeReader(
static Result<std::unique_ptr<ManifestReader>> Make(
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> partition_schema);
};
Expand All @@ -56,7 +66,7 @@ class ICEBERG_EXPORT ManifestListReader {
/// \param manifest_list_location Path to the manifest list file.
/// \param file_io File IO implementation to use.
/// \return A Result containing the reader or an error.
static Result<std::unique_ptr<ManifestListReader>> MakeReader(
static Result<std::unique_ptr<ManifestListReader>> Make(
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);
};

Expand Down
8 changes: 6 additions & 2 deletions src/iceberg/manifest_reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

#include "manifest_reader_internal.h"

#include <array>

#include <nanoarrow/nanoarrow.h>

#include "iceberg/arrow_c_data_guard_internal.h"
Expand Down Expand Up @@ -543,6 +541,12 @@ Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() const {
break;
}
}

// Apply inheritance to all entries
for (auto& entry : manifest_entries) {
ICEBERG_RETURN_UNEXPECTED(inheritable_metadata_->Apply(entry));
}

return manifest_entries;
}

Expand Down
10 changes: 8 additions & 2 deletions src/iceberg/manifest_reader_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
/// \file iceberg/internal/manifest_reader_internal.h
/// Reader implementation for manifest list files and manifest files.

#include "iceberg/file_reader.h"
#include "iceberg/inheritable_metadata.h"
#include "iceberg/manifest_reader.h"

namespace iceberg {
Expand All @@ -30,14 +32,18 @@ namespace iceberg {
class ManifestReaderImpl : public ManifestReader {
public:
explicit ManifestReaderImpl(std::unique_ptr<Reader> reader,
std::shared_ptr<Schema> schema)
: schema_(std::move(schema)), reader_(std::move(reader)) {}
std::shared_ptr<Schema> schema,
std::unique_ptr<InheritableMetadata> inheritable_metadata)
: schema_(std::move(schema)),
reader_(std::move(reader)),
inheritable_metadata_(std::move(inheritable_metadata)) {}

Result<std::vector<ManifestEntry>> Entries() const override;

private:
std::shared_ptr<Schema> schema_;
std::unique_ptr<Reader> reader_;
std::unique_ptr<InheritableMetadata> inheritable_metadata_;
};

/// \brief Read manifest files from a manifest list file.
Expand Down
8 changes: 4 additions & 4 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,17 @@ DataTableScan::DataTableScan(TableScanContext context, std::shared_ptr<FileIO> f
Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() const {
ICEBERG_ASSIGN_OR_RAISE(
auto manifest_list_reader,
ManifestListReader::MakeReader(context_.snapshot->manifest_list, file_io_));
ManifestListReader::Make(context_.snapshot->manifest_list, file_io_));
ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files());

std::vector<std::shared_ptr<FileScanTask>> tasks;
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec, context_.table_metadata->PartitionSpec());
auto partition_schema = partition_spec->schema();

for (const auto& manifest_file : manifest_files) {
ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader,
ManifestReader::MakeReader(manifest_file.manifest_path,
file_io_, partition_schema));
ICEBERG_ASSIGN_OR_RAISE(
auto manifest_reader,
ManifestReader::Make(manifest_file, file_io_, partition_schema));
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());

// TODO(gty404): filter manifests using partition spec and filter expression
Expand Down
Loading
Loading