Skip to content

Commit f78f6f1

Browse files
committed
Support procedure expire_snapshots for iceberg
1 parent 9135371 commit f78f6f1

File tree

5 files changed

+445
-0
lines changed

5 files changed

+445
-0
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,40 @@ procedure on the catalog's ``system`` schema::
912912
``unregister_table`` only when using the Hive catalog. This is similar to
913913
the behavior listed above for the ``DROP TABLE`` command.
914914

915+
Expire snapshots
916+
^^^^^^^^^^^^^^^^
917+
918+
Each DML (Data Manipulation Language) action in Iceberg produces a new snapshot while keeping the old data and metadata for snapshot isolation and time travel. Use `expire_snapshots` to remove older snapshots and their files.
919+
920+
This procedure removes old snapshots and their corresponding files, and never removes files which are required by a non-expired snapshot.
921+
922+
The following arguments are available:
923+
924+
===================== ========== =============== =======================================================================
925+
Argument Name required type Description
926+
===================== ========== =============== =======================================================================
927+
``schema`` ✔️ string Schema of the table to update
928+
929+
``table_name`` ✔️ string Name of the table to update
930+
931+
``older_than`` timestamp Timestamp before which snapshots will be removed (Default: 5 days ago)
932+
933+
``retain_last`` int Number of ancestor snapshots to preserve regardless of older_than
934+
(defaults to 1)
935+
936+
``snapshot_ids`` array of long Array of snapshot IDs to expire
937+
===================== ========== =============== =======================================================================
938+
939+
Examples:
940+
941+
* Remove snapshots older than a specific day and time, but retain the last 10 snapshots::
942+
943+
CALL iceberg.system.expire_snapshots('schema_name', 'table_name', TIMESTAMP '2023-08-31 00:00:00.000', 10);
944+
945+
* Remove snapshots with snapshot ID 10001 and 10002 (note that these snapshot IDs should not be the current snapshot)::
946+
947+
CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]);
948+
915949
Schema Evolution
916950
-----------------
917951

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer;
4040
import com.facebook.presto.iceberg.nessie.NessieConfig;
4141
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider;
42+
import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure;
4243
import com.facebook.presto.iceberg.procedure.RegisterTableProcedure;
4344
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
4445
import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure;
@@ -149,6 +150,7 @@ public void setup(Binder binder)
149150
procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON);
150151
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
151152
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
153+
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);
152154

153155
// for orc
154156
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg.procedure;
15+
16+
import com.facebook.presto.common.type.SqlTimestamp;
17+
import com.facebook.presto.iceberg.IcebergAbstractMetadata;
18+
import com.facebook.presto.iceberg.IcebergMetadataFactory;
19+
import com.facebook.presto.iceberg.IcebergUtil;
20+
import com.facebook.presto.spi.ConnectorSession;
21+
import com.facebook.presto.spi.SchemaTableName;
22+
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
23+
import com.facebook.presto.spi.procedure.Procedure;
24+
import com.google.common.collect.ImmutableList;
25+
import org.apache.iceberg.ExpireSnapshots;
26+
import org.apache.iceberg.Table;
27+
28+
import javax.inject.Inject;
29+
import javax.inject.Provider;
30+
31+
import java.lang.invoke.MethodHandle;
32+
import java.util.List;
33+
34+
import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle;
35+
import static com.facebook.presto.common.type.StandardTypes.INTEGER;
36+
import static com.facebook.presto.common.type.StandardTypes.TIMESTAMP;
37+
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
38+
import static java.util.Objects.requireNonNull;
39+
40+
public class ExpireSnapshotsProcedure
41+
implements Provider<Procedure>
42+
{
43+
private static final MethodHandle EXPIRE_SNAPSHOTS = methodHandle(
44+
ExpireSnapshotsProcedure.class,
45+
"expireSnapshots",
46+
ConnectorSession.class,
47+
String.class,
48+
String.class,
49+
SqlTimestamp.class,
50+
Integer.class,
51+
List.class);
52+
private final IcebergMetadataFactory metadataFactory;
53+
54+
@Inject
55+
public ExpireSnapshotsProcedure(IcebergMetadataFactory metadataFactory)
56+
{
57+
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
58+
}
59+
60+
@Override
61+
public Procedure get()
62+
{
63+
return new Procedure(
64+
"system",
65+
"expire_snapshots",
66+
ImmutableList.of(
67+
new Procedure.Argument("schema", VARCHAR),
68+
new Procedure.Argument("table_name", VARCHAR),
69+
new Procedure.Argument("older_than", TIMESTAMP, false, null),
70+
new Procedure.Argument("retain_last", INTEGER, false, null),
71+
new Procedure.Argument("snapshot_ids", "array(bigint)", false, null)),
72+
EXPIRE_SNAPSHOTS.bindTo(this));
73+
}
74+
75+
public void expireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List<Long> snapshotIds)
76+
{
77+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
78+
doExpireSnapshots(clientSession, schema, tableName, olderThan, retainLast, snapshotIds);
79+
}
80+
}
81+
82+
private void doExpireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List<Long> snapshotIds)
83+
{
84+
IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) metadataFactory.create();
85+
SchemaTableName schemaTableName = new SchemaTableName(schema, tableName);
86+
Table icebergTable = IcebergUtil.getIcebergTable(metadata, clientSession, schemaTableName);
87+
88+
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
89+
90+
if (snapshotIds != null) {
91+
for (long id : snapshotIds) {
92+
expireSnapshots = expireSnapshots.expireSnapshotId(id);
93+
}
94+
}
95+
96+
if (olderThan != null) {
97+
expireSnapshots = expireSnapshots.expireOlderThan(olderThan.isLegacyTimestamp() ? olderThan.getMillisUtc() : olderThan.getMillis());
98+
}
99+
100+
if (retainLast != null) {
101+
expireSnapshots = expireSnapshots.retainLast(retainLast);
102+
}
103+
104+
expireSnapshots.cleanExpiredFiles(true)
105+
.commit();
106+
}
107+
}

0 commit comments

Comments
 (0)