Skip to content

Commit 0f924db

Browse files
author
xiao.dong
committed
feat: support expire snapshots
1 parent 68fe381 commit 0f924db

16 files changed

+851
-13
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ set(ICEBERG_SOURCES
8080
transform.cc
8181
transform_function.cc
8282
type.cc
83+
update/expire_snapshots.cc
8384
update/pending_update.cc
8485
update/update_partition_spec.cc
8586
update/update_properties.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ iceberg_sources = files(
101101
'transform.cc',
102102
'transform_function.cc',
103103
'type.cc',
104+
'update/expire_snapshots.cc',
104105
'update/pending_update.cc',
105106
'update/update_partition_spec.cc',
106107
'update/update_properties.cc',

src/iceberg/table.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "iceberg/table_properties.h"
3131
#include "iceberg/table_scan.h"
3232
#include "iceberg/transaction.h"
33+
#include "iceberg/update/expire_snapshots.h"
3334
#include "iceberg/update/update_partition_spec.h"
3435
#include "iceberg/update/update_properties.h"
3536
#include "iceberg/update/update_schema.h"
@@ -179,6 +180,13 @@ Result<std::shared_ptr<UpdateSchema>> Table::NewUpdateSchema() {
179180
return transaction->NewUpdateSchema();
180181
}
181182

183+
Result<std::shared_ptr<ExpireSnapshots>> Table::NewExpireSnapshots() {
184+
ICEBERG_ASSIGN_OR_RAISE(
185+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
186+
/*auto_commit=*/true));
187+
return transaction->NewExpireSnapshots();
188+
}
189+
182190
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
183191
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
184192
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
144144
/// changes.
145145
virtual Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();
146146

147+
/// \brief Create a new ExpireSnapshots to remove expired snapshots and commit the
148+
/// changes.
149+
virtual Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
150+
147151
protected:
148152
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
149153
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table_metadata.cc

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,12 @@ class TableMetadataBuilder::Impl {
592592
Result<int32_t> AddSchema(const Schema& schema, int32_t new_last_column_id);
593593
void SetLocation(std::string_view location);
594594

595+
Status SetRef(const std::string& name, std::shared_ptr<SnapshotRef> ref);
596+
Status RemoveRef(const std::string& name);
597+
Status AddSnapshot(std::shared_ptr<Snapshot> snapshot);
598+
Status RemoveSnapshots(const std::vector<int64_t>& snapshot_ids);
599+
Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
600+
595601
Result<std::unique_ptr<TableMetadata>> Build();
596602

597603
private:
@@ -1077,6 +1083,79 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
10771083
return new_schema_id;
10781084
}
10791085

1086+
Status TableMetadataBuilder::Impl::SetRef(const std::string& name,
1087+
std::shared_ptr<SnapshotRef> ref) {
1088+
ICEBERG_PRECHECK(!metadata_.refs.contains(name),
1089+
"Cannot set ref: {}, which is already exist.", name);
1090+
metadata_.refs[name] = ref;
1091+
if (ref->type() == SnapshotRefType::kBranch) {
1092+
auto retention = std::get<SnapshotRef::Branch>(ref->retention);
1093+
changes_.push_back(std::make_unique<table::SetSnapshotRef>(
1094+
name, ref->snapshot_id, ref->type(), retention.min_snapshots_to_keep,
1095+
retention.max_snapshot_age_ms, retention.max_ref_age_ms));
1096+
} else {
1097+
auto retention = std::get<SnapshotRef::Tag>(ref->retention);
1098+
changes_.push_back(std::make_unique<table::SetSnapshotRef>(
1099+
name, ref->snapshot_id, ref->type(), std::nullopt, std::nullopt,
1100+
retention.max_ref_age_ms));
1101+
}
1102+
return {};
1103+
}
1104+
1105+
Status TableMetadataBuilder::Impl::RemoveRef(const std::string& name) {
1106+
ICEBERG_PRECHECK(metadata_.refs.contains(name),
1107+
"Cannot remove ref: {}, which is not exist.", name);
1108+
1109+
metadata_.refs.erase(name);
1110+
changes_.push_back(std::make_unique<table::RemoveSnapshotRef>(name));
1111+
1112+
return {};
1113+
}
1114+
1115+
Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr<Snapshot> snapshot) {
1116+
// TODO(xiao.dong) this is only for test, not official complete implementation
1117+
metadata_.snapshots.emplace_back(std::move(snapshot));
1118+
return {};
1119+
}
1120+
1121+
Status TableMetadataBuilder::Impl::RemoveSnapshots(
1122+
const std::vector<int64_t>& snapshot_ids) {
1123+
auto current_snapshot_id = metadata_.current_snapshot_id;
1124+
std::unordered_set<int64_t> snapshot_ids_set(snapshot_ids.begin(), snapshot_ids.end());
1125+
ICEBERG_PRECHECK(!snapshot_ids_set.contains(current_snapshot_id),
1126+
"Cannot remove current snapshot: {}", current_snapshot_id);
1127+
1128+
if (!snapshot_ids.empty()) {
1129+
metadata_.snapshots =
1130+
metadata_.snapshots | std::views::filter([&](const auto& snapshot) {
1131+
return !snapshot_ids_set.contains(snapshot->snapshot_id);
1132+
}) |
1133+
std::ranges::to<std::vector<std::shared_ptr<iceberg::Snapshot>>>();
1134+
changes_.push_back(std::make_unique<table::RemoveSnapshots>(snapshot_ids));
1135+
}
1136+
1137+
return {};
1138+
}
1139+
1140+
Status TableMetadataBuilder::Impl::RemovePartitionSpecs(
1141+
const std::vector<int32_t>& spec_ids) {
1142+
auto default_spec_id = metadata_.default_spec_id;
1143+
std::unordered_set<int32_t> spec_ids_set(spec_ids.begin(), spec_ids.end());
1144+
ICEBERG_PRECHECK(!spec_ids_set.contains(default_spec_id),
1145+
"Cannot remove default spec: {}", default_spec_id);
1146+
1147+
if (!spec_ids.empty()) {
1148+
metadata_.partition_specs =
1149+
metadata_.partition_specs | std::views::filter([&](const auto& spec) {
1150+
return !spec_ids_set.contains(spec->spec_id());
1151+
}) |
1152+
std::ranges::to<std::vector<std::shared_ptr<iceberg::PartitionSpec>>>();
1153+
changes_.push_back(std::make_unique<table::RemovePartitionSpecs>(spec_ids));
1154+
}
1155+
1156+
return {};
1157+
}
1158+
10801159
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
10811160
: impl_(std::make_unique<Impl>(format_version)) {}
10821161

@@ -1179,7 +1258,8 @@ TableMetadataBuilder& TableMetadataBuilder::AddPartitionSpec(
11791258

11801259
TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs(
11811260
const std::vector<int32_t>& spec_ids) {
1182-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1261+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemovePartitionSpecs(spec_ids));
1262+
return *this;
11831263
}
11841264

11851265
TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas(
@@ -1207,7 +1287,8 @@ TableMetadataBuilder& TableMetadataBuilder::AddSortOrder(
12071287

12081288
TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(
12091289
std::shared_ptr<Snapshot> snapshot) {
1210-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1290+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(std::move(snapshot)));
1291+
return *this;
12111292
}
12121293

12131294
TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(int64_t snapshot_id,
@@ -1217,11 +1298,13 @@ TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(int64_t snapshot_i
12171298

12181299
TableMetadataBuilder& TableMetadataBuilder::SetRef(const std::string& name,
12191300
std::shared_ptr<SnapshotRef> ref) {
1220-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1301+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetRef(name, std::move(ref)));
1302+
return *this;
12211303
}
12221304

12231305
TableMetadataBuilder& TableMetadataBuilder::RemoveRef(const std::string& name) {
1224-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1306+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveRef(name));
1307+
return *this;
12251308
}
12261309

12271310
TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots(
@@ -1231,7 +1314,8 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots(
12311314

12321315
TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots(
12331316
const std::vector<int64_t>& snapshot_ids) {
1234-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1317+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveSnapshots(snapshot_ids));
1318+
return *this;
12351319
}
12361320

12371321
TableMetadataBuilder& TableMetadataBuilder::SuppressHistoricalSnapshots() {

src/iceberg/table_update.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ void SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context)
9292
// RemovePartitionSpecs
9393

9494
void RemovePartitionSpecs::ApplyTo(TableMetadataBuilder& builder) const {
95-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
95+
builder.RemovePartitionSpecs(spec_ids_);
9696
}
9797

9898
void RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) const {
@@ -143,7 +143,9 @@ void AddSnapshot::GenerateRequirements(TableUpdateContext& context) const {
143143

144144
// RemoveSnapshots
145145

146-
void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {}
146+
void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {
147+
builder.RemoveSnapshots(snapshot_ids_);
148+
}
147149

148150
void RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
149151
// RemoveSnapshots doesn't generate any requirements
@@ -152,7 +154,7 @@ void RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
152154
// RemoveSnapshotRef
153155

154156
void RemoveSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
155-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
157+
builder.RemoveRef(ref_name_);
156158
}
157159

158160
void RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ if(ICEBERG_BUILD_BUNDLE)
163163
add_iceberg_test(table_update_test
164164
USE_BUNDLE
165165
SOURCES
166+
expire_snapshots_test.cc
166167
transaction_test.cc
167168
update_partition_spec_test.cc
168169
update_properties_test.cc
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/expire_snapshots.h"
21+
22+
#include "iceberg/test/matchers.h"
23+
#include "iceberg/test/update_test_base.h"
24+
25+
namespace iceberg {
26+
27+
class ExpireSnapshotsTest : public UpdateTestBase {
28+
protected:
29+
};
30+
31+
TEST_F(ExpireSnapshotsTest, Empty) {
32+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
33+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
34+
EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1);
35+
EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
36+
EXPECT_THAT(result.ref_to_remove.empty(), true);
37+
EXPECT_THAT(result.schema_to_remove.empty(), true);
38+
EXPECT_THAT(result.partition_spec_to_remove.empty(), true);
39+
}
40+
41+
TEST_F(ExpireSnapshotsTest, Keep2) {
42+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
43+
update->RetainLast(2);
44+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
45+
EXPECT_THAT(result.snapshot_ids_to_remove.empty(), true);
46+
EXPECT_THAT(result.ref_to_remove.empty(), true);
47+
EXPECT_THAT(result.schema_to_remove.empty(), true);
48+
EXPECT_THAT(result.partition_spec_to_remove.empty(), true);
49+
}
50+
51+
TEST_F(ExpireSnapshotsTest, ExpireById) {
52+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
53+
update->ExpireSnapshotId(3051729675574597004);
54+
update->RetainLast(2);
55+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
56+
EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1);
57+
EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
58+
EXPECT_THAT(result.ref_to_remove.empty(), true);
59+
EXPECT_THAT(result.schema_to_remove.empty(), true);
60+
EXPECT_THAT(result.partition_spec_to_remove.empty(), true);
61+
}
62+
63+
TEST_F(ExpireSnapshotsTest, ExpireByIdNotExist) {
64+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
65+
update->ExpireSnapshotId(3055729675574597003);
66+
update->RetainLast(2);
67+
auto result = update->Apply();
68+
EXPECT_THAT(result.has_value(), false);
69+
EXPECT_THAT(result.error().message.contains("Snapshot:3055729675574597003 not exist"),
70+
true);
71+
}
72+
73+
TEST_F(ExpireSnapshotsTest, ExpireOlderThan1) {
74+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
75+
update->ExpireOlderThan(1515100955770 - 1);
76+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
77+
EXPECT_THAT(result.snapshot_ids_to_remove.empty(), true);
78+
EXPECT_THAT(result.ref_to_remove.empty(), true);
79+
EXPECT_THAT(result.schema_to_remove.empty(), true);
80+
EXPECT_THAT(result.partition_spec_to_remove.empty(), true);
81+
}
82+
83+
TEST_F(ExpireSnapshotsTest, ExpireOlderThan2) {
84+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
85+
update->ExpireOlderThan(1515100955770 + 1);
86+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
87+
EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1);
88+
EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
89+
EXPECT_THAT(result.ref_to_remove.empty(), true);
90+
EXPECT_THAT(result.schema_to_remove.empty(), true);
91+
EXPECT_THAT(result.partition_spec_to_remove.empty(), true);
92+
}
93+
94+
} // namespace iceberg

0 commit comments

Comments
 (0)