Skip to content

Commit b9c48f4

Browse files
authored
[feat][iceberg] Support Iceberg Meta Procedure implementations (apache#56257)
### What problem does this PR solve? This PR extends the OPTIMIZE TABLE framework introduced in apache#55679 by implementing additional Iceberg meta procedure actions. Building upon the foundation established for Iceberg table optimization, this enhancement adds critical snapshot management operations that enable more sophisticated Iceberg table maintenance workflows. #### New Iceberg Actions Implemented This PR introduces **5 new Iceberg meta procedure actions**: 1. **`cherrypick_snapshot`** - Cherry-picks changes from a specific snapshot 2. **`fast_forward`** - Fast-forwards one branch to match another branch's latest snapshot 3. **`rollback_to_snapshot`** - Rolls back table to a specific snapshot 4. **`rollback_to_timestamp`** - Rolls back table to a specific timestamp 5. **`set_current_snapshot`** - Sets a specific snapshot as current #### Example Usage ```sql -- Cherry-pick changes from a snapshot OPTIMIZE TABLE iceberg_catalog.db.table PROPERTIES("action" = "cherrypick_snapshot", "snapshot_id" = "123456789"); ``` ``` -- Fast-forward branch to match another branch OPTIMIZE TABLE iceberg_catalog.db.table PROPERTIES("action" = "fast_forward", "branch" = "feature", "to" = "main"); ``` ``` -- Rollback to specific snapshot OPTIMIZE TABLE iceberg_catalog.db.table PROPERTIES("action" = "rollback_to_snapshot", "snapshot_id" = "987654321"); ``` The regression testing strategy utilizes internal Iceberg catalog operations for table creation, data insertion, and branch/tag management, ensuring test stability and eliminating dependencies on external tools like Spark SQL for test data preparation.
1 parent 50b4c42 commit b9c48f4

File tree

7 files changed

+657
-72
lines changed

7 files changed

+657
-72
lines changed

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergCherrypickSnapshotAction.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,21 @@
1717

1818
package org.apache.doris.datasource.iceberg.action;
1919

20+
import org.apache.doris.catalog.Column;
21+
import org.apache.doris.catalog.Env;
2022
import org.apache.doris.catalog.TableIf;
23+
import org.apache.doris.catalog.Type;
2124
import org.apache.doris.common.ArgumentParsers;
22-
import org.apache.doris.common.DdlException;
2325
import org.apache.doris.common.UserException;
26+
import org.apache.doris.datasource.ExternalTable;
2427
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
2528
import org.apache.doris.info.PartitionNamesInfo;
2629
import org.apache.doris.nereids.trees.expressions.Expression;
2730

31+
import com.google.common.collect.Lists;
32+
import org.apache.iceberg.Snapshot;
33+
import org.apache.iceberg.Table;
34+
2835
import java.util.List;
2936
import java.util.Map;
3037
import java.util.Optional;
@@ -41,8 +48,7 @@ public class IcebergCherrypickSnapshotAction extends BaseIcebergAction {
4148
public static final String SNAPSHOT_ID = "snapshot_id";
4249

4350
public IcebergCherrypickSnapshotAction(Map<String, String> properties,
44-
Optional<PartitionNamesInfo> partitionNamesInfo,
45-
Optional<Expression> whereCondition,
51+
Optional<PartitionNamesInfo> partitionNamesInfo, Optional<Expression> whereCondition,
4652
IcebergExternalTable icebergTable) {
4753
super("cherrypick_snapshot", properties, partitionNamesInfo, whereCondition, icebergTable);
4854
}
@@ -65,7 +71,38 @@ protected void validateIcebergAction() throws UserException {
6571

6672
@Override
6773
protected List<String> executeAction(TableIf table) throws UserException {
68-
throw new DdlException("Iceberg cherrypick_snapshot procedure is not implemented yet");
74+
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
75+
Long sourceSnapshotId = namedArguments.getLong(SNAPSHOT_ID);
76+
77+
try {
78+
Snapshot targetSnapshot = icebergTable.snapshot(sourceSnapshotId);
79+
if (targetSnapshot == null) {
80+
throw new UserException("Snapshot not found in table");
81+
}
82+
83+
icebergTable.manageSnapshots().cherrypick(sourceSnapshotId).commit();
84+
Snapshot currentSnapshot = icebergTable.currentSnapshot();
85+
86+
// invalid iceberg catalog table cache.
87+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
88+
return Lists.newArrayList(
89+
String.valueOf(sourceSnapshotId),
90+
String.valueOf(currentSnapshot.snapshotId()
91+
)
92+
);
93+
94+
} catch (Exception e) {
95+
throw new UserException("Failed to cherry-pick snapshot " + sourceSnapshotId + ": " + e.getMessage(), e);
96+
}
97+
}
98+
99+
@Override
100+
protected List<Column> getResultSchema() {
101+
return Lists.newArrayList(new Column("source_snapshot_id", Type.BIGINT, false,
102+
"ID of the snapshot whose changes were cherry-picked into the current table state"),
103+
new Column("current_snapshot_id", Type.BIGINT, false,
104+
"ID of the new snapshot created as a result of the cherry-pick operation, "
105+
+ "now set as the current snapshot"));
69106
}
70107

71108
@Override

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergFastForwardAction.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@
1717

1818
package org.apache.doris.datasource.iceberg.action;
1919

20+
import org.apache.doris.catalog.Column;
21+
import org.apache.doris.catalog.Env;
2022
import org.apache.doris.catalog.TableIf;
23+
import org.apache.doris.catalog.Type;
2124
import org.apache.doris.common.ArgumentParsers;
22-
import org.apache.doris.common.DdlException;
2325
import org.apache.doris.common.UserException;
26+
import org.apache.doris.datasource.ExternalTable;
2427
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
2528
import org.apache.doris.info.PartitionNamesInfo;
2629
import org.apache.doris.nereids.trees.expressions.Expression;
2730

31+
import com.google.common.collect.Lists;
32+
import org.apache.iceberg.Table;
33+
2834
import java.util.List;
2935
import java.util.Map;
3036
import java.util.Optional;
@@ -49,11 +55,11 @@ public IcebergFastForwardAction(Map<String, String> properties,
4955
protected void registerIcebergArguments() {
5056
// Register required arguments for branch and to
5157
namedArguments.registerRequiredArgument(BRANCH,
52-
"Name of the target branch to fast-forward to",
58+
"Name of the branch to fast-forward to",
5359
ArgumentParsers.nonEmptyString(BRANCH));
5460
namedArguments.registerRequiredArgument(TO,
55-
"Target snapshot ID to fast-forward to",
56-
ArgumentParsers.positiveLong(TO));
61+
"Target branch to fast-forward to",
62+
ArgumentParsers.nonEmptyString(TO));
5763
}
5864

5965
@Override
@@ -65,7 +71,41 @@ protected void validateIcebergAction() throws UserException {
6571

6672
@Override
6773
protected List<String> executeAction(TableIf table) throws UserException {
68-
throw new DdlException("Iceberg fast_forward procedure is not implemented yet");
74+
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
75+
76+
String sourceBranch = namedArguments.getString(BRANCH);
77+
String desBranch = namedArguments.getString(TO);
78+
79+
try {
80+
Long snapshotBefore =
81+
icebergTable.snapshot(sourceBranch) != null ? icebergTable.snapshot(sourceBranch).snapshotId()
82+
: null;
83+
icebergTable.manageSnapshots().fastForwardBranch(sourceBranch, desBranch).commit();
84+
long snapshotAfter = icebergTable.snapshot(sourceBranch).snapshotId();
85+
// invalid iceberg catalog table cache.
86+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
87+
return Lists.newArrayList(
88+
sourceBranch.trim(),
89+
String.valueOf(snapshotBefore),
90+
String.valueOf(snapshotAfter)
91+
);
92+
93+
} catch (Exception e) {
94+
throw new UserException(
95+
"Failed to fast-forward branch " + sourceBranch + " to snapshot " + desBranch + ": "
96+
+ e.getMessage(), e);
97+
}
98+
}
99+
100+
@Override
101+
protected List<Column> getResultSchema() {
102+
return Lists.newArrayList(
103+
new Column("branch_updated", Type.STRING, false,
104+
"Name of the branch that was fast-forwarded to match the target branch"),
105+
new Column("previous_ref", Type.BIGINT, true,
106+
"Snapshot ID that the branch was pointing to before the fast-forward operation"),
107+
new Column("updated_ref", Type.BIGINT, false,
108+
"Snapshot ID that the branch is pointing to after the fast-forward operation"));
69109
}
70110

71111
@Override

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToSnapshotAction.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,21 @@
1717

1818
package org.apache.doris.datasource.iceberg.action;
1919

20+
import org.apache.doris.catalog.Column;
21+
import org.apache.doris.catalog.Env;
2022
import org.apache.doris.catalog.TableIf;
23+
import org.apache.doris.catalog.Type;
2124
import org.apache.doris.common.ArgumentParsers;
22-
import org.apache.doris.common.DdlException;
2325
import org.apache.doris.common.UserException;
26+
import org.apache.doris.datasource.ExternalTable;
2427
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
2528
import org.apache.doris.info.PartitionNamesInfo;
2629
import org.apache.doris.nereids.trees.expressions.Expression;
2730

31+
import com.google.common.collect.Lists;
32+
import org.apache.iceberg.Snapshot;
33+
import org.apache.iceberg.Table;
34+
2835
import java.util.List;
2936
import java.util.Map;
3037
import java.util.Optional;
@@ -62,7 +69,43 @@ protected void validateIcebergAction() throws UserException {
6269

6370
@Override
6471
protected List<String> executeAction(TableIf table) throws UserException {
65-
throw new DdlException("Iceberg rollback_to_snapshot procedure is not implemented yet");
72+
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
73+
Long targetSnapshotId = namedArguments.getLong(SNAPSHOT_ID);
74+
75+
Snapshot targetSnapshot = icebergTable.snapshot(targetSnapshotId);
76+
if (targetSnapshot == null) {
77+
throw new UserException("Snapshot " + targetSnapshotId + " not found in table " + icebergTable.name());
78+
}
79+
80+
try {
81+
Snapshot previousSnapshot = icebergTable.currentSnapshot();
82+
Long previousSnapshotId = previousSnapshot != null ? previousSnapshot.snapshotId() : null;
83+
if (previousSnapshot != null && previousSnapshot.snapshotId() == targetSnapshotId) {
84+
return Lists.newArrayList(
85+
String.valueOf(previousSnapshotId),
86+
String.valueOf(targetSnapshotId)
87+
);
88+
}
89+
icebergTable.manageSnapshots().rollbackTo(targetSnapshotId).commit();
90+
// invalid iceberg catalog table cache.
91+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
92+
return Lists.newArrayList(
93+
String.valueOf(previousSnapshotId),
94+
String.valueOf(targetSnapshotId)
95+
);
96+
97+
} catch (Exception e) {
98+
throw new UserException("Failed to rollback to snapshot " + targetSnapshotId + ": " + e.getMessage(), e);
99+
}
100+
}
101+
102+
@Override
103+
protected List<Column> getResultSchema() {
104+
return Lists.newArrayList(
105+
new Column("previous_snapshot_id", Type.BIGINT, false,
106+
"ID of the snapshot that was current before the rollback operation"),
107+
new Column("current_snapshot_id", Type.BIGINT, false,
108+
"ID of the snapshot that is now current after rolling back to the specified snapshot"));
66109
}
67110

68111
@Override

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRollbackToTimestampAction.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,22 @@
1717

1818
package org.apache.doris.datasource.iceberg.action;
1919

20+
import org.apache.doris.catalog.Column;
21+
import org.apache.doris.catalog.Env;
2022
import org.apache.doris.catalog.TableIf;
21-
import org.apache.doris.common.DdlException;
23+
import org.apache.doris.catalog.Type;
2224
import org.apache.doris.common.UserException;
25+
import org.apache.doris.common.util.TimeUtils;
26+
import org.apache.doris.datasource.ExternalTable;
2327
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
2428
import org.apache.doris.info.PartitionNamesInfo;
2529
import org.apache.doris.nereids.trees.expressions.Expression;
2630

31+
import com.google.common.collect.Lists;
32+
import org.apache.iceberg.Snapshot;
33+
import org.apache.iceberg.Table;
34+
35+
import java.time.format.DateTimeFormatter;
2736
import java.util.List;
2837
import java.util.Map;
2938
import java.util.Optional;
@@ -34,6 +43,7 @@
3443
* at a specific timestamp.
3544
*/
3645
public class IcebergRollbackToTimestampAction extends BaseIcebergAction {
46+
private static final DateTimeFormatter DATETIME_MS_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
3747
public static final String TIMESTAMP = "timestamp";
3848

3949
public IcebergRollbackToTimestampAction(Map<String, String> properties,
@@ -48,7 +58,7 @@ protected void registerIcebergArguments() {
4858
// Create a custom timestamp parser that supports both ISO datetime and
4959
// millisecond formats
5060
namedArguments.registerRequiredArgument(TIMESTAMP,
51-
"A timestamp to rollback to (ISO datetime 'yyyy-MM-ddTHH:mm:ss' or milliseconds since epoch)",
61+
"A timestamp to rollback to (formats: 'yyyy-MM-dd HH:mm:ss.SSS' or milliseconds since epoch)",
5262
value -> {
5363
if (value == null || value.trim().isEmpty()) {
5464
throw new IllegalArgumentException("timestamp cannot be empty");
@@ -64,14 +74,13 @@ protected void registerIcebergArguments() {
6474
}
6575
return trimmed;
6676
} catch (NumberFormatException e) {
67-
// If not a number, try as ISO datetime format
77+
// Second attempt: Parse as ISO datetime format (yyyy-MM-dd HH:mm:ss.SSS)
6878
try {
69-
java.time.LocalDateTime.parse(trimmed,
70-
java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME);
79+
java.time.LocalDateTime.parse(trimmed, DATETIME_MS_FORMAT);
7180
return trimmed;
7281
} catch (java.time.format.DateTimeParseException dte) {
7382
throw new IllegalArgumentException("Invalid timestamp format. Expected ISO datetime "
74-
+ "(yyyy-MM-ddTHH:mm:ss) or timestamp in milliseconds: " + trimmed);
83+
+ "(yyyy-MM-dd HH:mm:ss.SSS) or timestamp in milliseconds: " + trimmed);
7584
}
7685
}
7786
});
@@ -87,7 +96,38 @@ protected void validateIcebergAction() throws UserException {
8796

8897
@Override
8998
protected List<String> executeAction(TableIf table) throws UserException {
90-
throw new DdlException("Iceberg rollback_to_timestamp procedure is not implemented yet");
99+
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
100+
101+
String timestampStr = namedArguments.getString(TIMESTAMP);
102+
103+
Snapshot previousSnapshot = icebergTable.currentSnapshot();
104+
Long previousSnapshotId = previousSnapshot != null ? previousSnapshot.snapshotId() : null;
105+
106+
try {
107+
long targetTimestamp = TimeUtils.msTimeStringToLong(timestampStr, TimeUtils.getTimeZone());
108+
icebergTable.manageSnapshots().rollbackToTime(targetTimestamp).commit();
109+
110+
Snapshot currentSnapshot = icebergTable.currentSnapshot();
111+
Long currentSnapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : null;
112+
// invalid iceberg catalog table cache.
113+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
114+
return Lists.newArrayList(
115+
String.valueOf(previousSnapshotId),
116+
String.valueOf(currentSnapshotId)
117+
);
118+
119+
} catch (Exception e) {
120+
throw new UserException("Failed to rollback to timestamp " + timestampStr + ": " + e.getMessage(), e);
121+
}
122+
}
123+
124+
@Override
125+
protected List<Column> getResultSchema() {
126+
return Lists.newArrayList(
127+
new Column("previous_snapshot_id", Type.BIGINT, false,
128+
"ID of the snapshot that was current before the rollback operation"),
129+
new Column("current_snapshot_id", Type.BIGINT, false,
130+
"ID of the snapshot that was current at the specified timestamp and is now set as current"));
91131
}
92132

93133
@Override

0 commit comments

Comments
 (0)