diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 9b634014b..1a8c33491 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -32,7 +32,8 @@ set(ICEBERG_SOURCES table_metadata.cc transform.cc transform_function.cc - type.cc) + type.cc + snapshot.cc) set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS) set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc new file mode 100644 index 000000000..a17d9e8b2 --- /dev/null +++ b/src/iceberg/snapshot.cc @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/snapshot.h" + +namespace iceberg { + +SnapshotRefType SnapshotRef::type() const noexcept { + return std::visit( + [&](const auto& retention) -> SnapshotRefType { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return SnapshotRefType::kBranch; + } else { + return SnapshotRefType::kTag; + } + }, + retention); +} + +std::optional Snapshot::operation() const { + auto it = summary.find(SnapshotSummaryFields::kOperation); + if (it != summary.end()) { + return it->second; + } + return std::nullopt; +} + +bool Snapshot::Equals(const Snapshot& other) const { + if (this == &other) { + return true; + } + return snapshot_id == other.snapshot_id && + parent_snapshot_id == other.parent_snapshot_id && + sequence_number == other.sequence_number && timestamp_ms == other.timestamp_ms && + schema_id == other.schema_id; +} + +} // namespace iceberg diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h new file mode 100644 index 000000000..29577e9b1 --- /dev/null +++ b/src/iceberg/snapshot.h @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief The type of snapshot reference +enum class SnapshotRefType { + /// Branches are mutable named references that can be updated by committing a new + /// snapshot as the branch’s referenced snapshot using the Commit Conflict Resolution + /// and Retry procedures. + kBranch, + /// Tags are labels for individual snapshots + kTag, +}; + +/// \brief A reference to a snapshot, either a branch or a tag. +struct ICEBERG_EXPORT SnapshotRef { + struct ICEBERG_EXPORT Branch { + /// A positive number for the minimum number of snapshots to keep in a branch while + /// expiring snapshots. Defaults to table property + /// history.expire.min-snapshots-to-keep. + std::optional min_snapshots_to_keep; + /// A positive number for the max age of snapshots to keep when + /// expiring, including the latest snapshot. Defaults to table property + /// history.expire.max-snapshot-age-ms. + std::optional max_snapshot_age_ms; + /// For snapshot references except the main branch, a positive number for the max age + /// of the snapshot reference to keep while expiring snapshots. Defaults to table + /// property history.expire.max-ref-age-ms. The main branch never expires. + std::optional max_ref_age_ms; + }; + + struct ICEBERG_EXPORT Tag { + /// For snapshot references except the main branch, a positive number for the max age + /// of the snapshot reference to keep while expiring snapshots. Defaults to table + /// property history.expire.max-ref-age-ms. The main branch never expires. + std::optional max_ref_age_ms; + }; + + /// A reference's snapshot ID. The tagged snapshot or latest snapshot of a branch. + int64_t snapshot_id; + /// Snapshot retention policy + std::variant retention; + + SnapshotRefType type() const noexcept; +}; + +/// \brief Optional Snapshot Summary Fields +struct SnapshotSummaryFields { + /// \brief The operation field key + inline static const std::string kOperation = "operation"; + + /// Metrics, see https://iceberg.apache.org/spec/#metrics + + /// \brief Number of data files added in the snapshot + inline static const std::string kAddedDataFiles = "added-data-files"; + /// \brief Number of data files deleted in the snapshot + inline static const std::string kDeletedDataFiles = "deleted-data-files"; + /// \brief Total number of live data files in the snapshot + inline static const std::string kTotalDataFiles = "total-data-files"; + /// \brief Number of positional/equality delete files and deletion vectors added in the + /// snapshot + inline static const std::string kAddedDeleteFiles = "added-delete-files"; + /// \brief Number of equality delete files added in the snapshot + inline static const std::string kAddedEqDeleteFiles = "added-equality-delete-files"; + /// \brief Number of equality delete files removed in the snapshot + inline static const std::string kRemovedEqDeleteFiles = "removed-equality-delete-files"; + /// \brief Number of position delete files added in the snapshot + inline static const std::string kAddedPosDeleteFiles = "added-position-delete-files"; + /// \brief Number of position delete files removed in the snapshot + inline static const std::string kRemovedPosDeleteFiles = + "removed-position-delete-files"; + /// \brief Number of deletion vectors added in the snapshot + inline static const std::string kAddedDVs = "added-dvs"; + /// \brief Number of deletion vectors removed in the snapshot + inline static const std::string kRemovedDVs = "removed-dvs"; + /// \brief Number of positional/equality delete files and deletion vectors removed in + /// the snapshot + inline static const std::string kRemovedDeleteFiles = "removed-delete-files"; + /// \brief Total number of live positional/equality delete files and deletion vectors in + /// the snapshot + inline static const std::string kTotalDeleteFiles = "total-delete-files"; + /// \brief Number of records added in the snapshot + inline static const std::string kAddedRecords = "added-records"; + /// \brief Number of records deleted in the snapshot + inline static const std::string kDeletedRecords = "deleted-records"; + /// \brief Total number of records in the snapshot + inline static const std::string kTotalRecords = "total-records"; + /// \brief The size of files added in the snapshot + inline static const std::string kAddedFileSize = "added-files-size"; + /// \brief The size of files removed in the snapshot + inline static const std::string kRemovedFileSize = "removed-files-size"; + /// \brief Total size of live files in the snapshot + inline static const std::string kTotalFileSize = "total-files-size"; + /// \brief Number of position delete records added in the snapshot + inline static const std::string kAddedPosDeletes = "added-position-deletes"; + /// \brief Number of position delete records removed in the snapshot + inline static const std::string kRemovedPosDeletes = "removed-position-deletes"; + /// \brief Total number of position delete records in the snapshot + inline static const std::string kTotalPosDeletes = "total-position-deletes"; + /// \brief Number of equality delete records added in the snapshot + inline static const std::string kAddedEqDeletes = "added-equality-deletes"; + /// \brief Number of equality delete records removed in the snapshot + inline static const std::string kRemovedEqDeletes = "removed-equality-deletes"; + /// \brief Total number of equality delete records in the snapshot + inline static const std::string kTotalEqDeletes = "total-equality-deletes"; + /// \brief Number of duplicate files deleted (duplicates are files recorded more than + /// once in the manifest) + inline static const std::string kDeletedDuplicatedFiles = "deleted-duplicate-files"; + /// \brief Number of partitions with files added or removed in the snapshot + inline static const std::string kChangedPartitionCountProp = "changed-partition-count"; + + /// Other Fields, see https://iceberg.apache.org/spec/#other-fields + + /// \brief The Write-Audit-Publish id of a staged snapshot + inline static const std::string kWAPID = "wap.id"; + /// \brief The Write-Audit-Publish id of a snapshot already been published + inline static const std::string kPublishedWAPID = "published-wap-id"; + /// \brief The original id of a cherry-picked snapshot + inline static const std::string kSourceSnapshotID = "source-snapshot-id"; + /// \brief Name of the engine that created the snapshot + inline static const std::string kEngineName = "engine-name"; + /// \brief Version of the engine that created the snapshot + inline static const std::string kEngineVersion = "engine-version"; +}; + +/// \brief Data operation that produce snapshots. +/// +/// A snapshot can return the operation that created the snapshot to help other components +/// ignore snapshots that are not needed for some tasks. For example, snapshot expiration +/// does not need to clean up deleted files for appends, which have no deleted files. +struct ICEBERG_EXPORT DataOperation { + /// \brief Only data files were added and no files were removed. + inline static const std::string kAppend = "append"; + /// \brief Data and delete files were added and removed without changing table data; + /// i.e. compaction, change the data file format, or relocating data files. + inline static const std::string kReplace = "replace"; + /// \brief Data and delete files were added and removed in a logical overwrite + /// operation. + inline static const std::string kOverwrite = "overwrite"; + /// \brief Data files were removed and their contents logically deleted and/or delete + /// files were added to delete rows. + inline static const std::string kDelete = "delete"; +}; + +/// \brief A snapshot of the data in a table at a point in time. +/// +/// A snapshot consist of one or more file manifests, and the complete table contents is +/// the union of all the data files in those manifests. +/// +/// Snapshots are created by table operations. +struct ICEBERG_EXPORT Snapshot { + /// A unqiue long ID. + int64_t snapshot_id; + /// The snapshot ID of the snapshot's parent. Omitted for any snapshot with no parent. + std::optional parent_snapshot_id; + /// A monotonically increasing long that tracks the order of changes to a table. + int64_t sequence_number; + /// A timestamp when the snapshot was created, used for garbage collection and table + /// inspection. + int64_t timestamp_ms; + /// The location of a manifest list for this snapshot that tracks manifest files with + /// additional metadata. + std::string manifest_list; + /// A string map that summaries the snapshot changes, including operation. + std::unordered_map summary; + /// ID of the table's current schema when the snapshot was created. + std::optional schema_id; + + /// \brief Return the name of the DataOperations data operation that produced this + /// snapshot. + /// + /// \return the operation that produced this snapshot, or nullopt if the operation is + /// unknown. + std::optional operation() const; + + /// \brief Compare two snapshots for equality. + friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) { + return lhs.Equals(rhs); + } + + /// \brief Compare two snapshots for inequality. + friend bool operator!=(const Snapshot& lhs, const Snapshot& rhs) { + return !(lhs == rhs); + } + + private: + /// \brief Compare two snapshots for equality. + bool Equals(const Snapshot& other) const; +}; + +} // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 81028804b..4c41364d0 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -34,7 +34,8 @@ target_sources(schema_test partition_field_test.cc partition_spec_test.cc sort_field_test.cc - sort_order_test.cc) + sort_order_test.cc + snapshot_test.cc) target_link_libraries(schema_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock) add_test(NAME schema_test COMMAND schema_test) diff --git a/test/snapshot_test.cc b/test/snapshot_test.cc new file mode 100644 index 000000000..995c64a86 --- /dev/null +++ b/test/snapshot_test.cc @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "iceberg/snapshot.h" + +#include + +namespace iceberg { + +TEST(SnapshotRefTest, SnapshotRefBranchInitialization) { + SnapshotRef snapshot_ref; + + snapshot_ref.snapshot_id = 12345; + snapshot_ref.retention = SnapshotRef::Branch{ + .min_snapshots_to_keep = 10, .max_snapshot_age_ms = 5000, .max_ref_age_ms = 10000}; + + EXPECT_EQ(snapshot_ref.snapshot_id, 12345); + EXPECT_TRUE(std::holds_alternative(snapshot_ref.retention)); + EXPECT_EQ(snapshot_ref.type(), SnapshotRefType::kBranch); + + auto* branch = std::get_if(&snapshot_ref.retention); + ASSERT_NE(branch, nullptr); + EXPECT_TRUE(branch->min_snapshots_to_keep.has_value()); + EXPECT_EQ(branch->min_snapshots_to_keep.value(), 10); + EXPECT_TRUE(branch->max_snapshot_age_ms.has_value()); + EXPECT_EQ(branch->max_snapshot_age_ms.value(), 5000); + EXPECT_TRUE(branch->max_ref_age_ms.has_value()); + EXPECT_EQ(branch->max_ref_age_ms.value(), 10000); +} + +TEST(SnapshotRefTest, SnapshotRefTagInitialization) { + SnapshotRef snapshot_ref; + + snapshot_ref.snapshot_id = 67890; + snapshot_ref.retention = SnapshotRef::Tag{.max_ref_age_ms = 20000}; + + EXPECT_EQ(snapshot_ref.snapshot_id, 67890); + EXPECT_TRUE(std::holds_alternative(snapshot_ref.retention)); + EXPECT_EQ(snapshot_ref.type(), SnapshotRefType::kTag); + + auto* tag = std::get_if(&snapshot_ref.retention); + ASSERT_NE(tag, nullptr); + EXPECT_TRUE(tag->max_ref_age_ms.has_value()); + EXPECT_EQ(tag->max_ref_age_ms.value(), 20000); +} + +class SnapshotTest : public ::testing::Test { + protected: + void SetUp() override { + // Initialize some common test data + summary1 = {{SnapshotSummaryFields::kOperation, DataOperation::kAppend}, + {SnapshotSummaryFields::kAddedDataFiles, "101"}}; + + summary2 = {{SnapshotSummaryFields::kOperation, DataOperation::kAppend}, + {SnapshotSummaryFields::kAddedDataFiles, "101"}}; + + summary3 = {{SnapshotSummaryFields::kOperation, DataOperation::kDelete}, + {SnapshotSummaryFields::kDeletedDataFiles, "20"}}; + } + + std::unordered_map summary1; + std::unordered_map summary2; + std::unordered_map summary3; +}; + +TEST_F(SnapshotTest, ConstructionAndFieldAccess) { + // Test the constructor and field access + Snapshot snapshot{.snapshot_id = 12345, + .parent_snapshot_id = 54321, + .sequence_number = 1, + .timestamp_ms = 1615569200000, + .manifest_list = "s3://example/manifest_list.avro", + .summary = summary1, + .schema_id = 10}; + + EXPECT_EQ(snapshot.snapshot_id, 12345); + EXPECT_TRUE(snapshot.parent_snapshot_id.has_value()); + EXPECT_EQ(*snapshot.parent_snapshot_id, 54321); + EXPECT_EQ(snapshot.sequence_number, 1); + EXPECT_EQ(snapshot.timestamp_ms, 1615569200000); + EXPECT_EQ(snapshot.manifest_list, "s3://example/manifest_list.avro"); + EXPECT_EQ(snapshot.operation().value(), DataOperation::kAppend); + EXPECT_EQ(snapshot.summary.at(std::string(SnapshotSummaryFields::kAddedDataFiles)), + "101"); + EXPECT_EQ(snapshot.summary.at(std::string(SnapshotSummaryFields::kOperation)), + DataOperation::kAppend); + EXPECT_TRUE(snapshot.schema_id.has_value()); + EXPECT_EQ(snapshot.schema_id.value(), 10); +} + +TEST_F(SnapshotTest, EqualityComparison) { + // Test the == and != operators + Snapshot snapshot1(12345, {}, 1, 1615569200000, "s3://example/manifest_list.avro", + summary1, {}); + + Snapshot snapshot2(12345, {}, 1, 1615569200000, "s3://example/manifest_list.avro", + summary2, {}); + + Snapshot snapshot3(67890, {}, 1, 1615569200000, "s3://example/manifest_list.avro", + summary3, {}); + + EXPECT_EQ(snapshot1, snapshot2); + EXPECT_NE(snapshot1, snapshot3); +} + +} // namespace iceberg