Skip to content

Commit 6c3c551

Browse files
xylaaaaaChenjunwei
andauthored
[Feature](Iceberg) Implement publish_changes procedure for Iceberg tables (#58755)
### What problem does this PR solve? - **Issue Number**: part of #58199 - **Related PR**: N/A Problem Summary: This PR implements the `publish_changes` action for Iceberg tables. This action serves as the "Publish" step in the Write-Audit-Publish (WAP) pattern. The procedure locates a snapshot tagged with a specific `wap.id` property and cherry-picks it into the current table state. This allows users to atomically make "staged" data visible after validation. Syntax: ```sql EXECUTE TABLE catalog.db.table_name publish_changes("wap_id" = "batch_123"); ```` Output: Returns `previous_snapshot_id` (STRING) and `current_snapshot_id` (STRING) indicating the state transition. Use cases: 1. Implement Write-Audit-Publish (WAP) workflows. 2. Atomically publish validated data to the main branch. 3. Manage staged snapshots based on custom WAP IDs. Co-authored-by: Chenjunwei <chenjunwei@ChenjunweideMacBook-Pro.local>
1 parent 955ac51 commit 6c3c551

File tree

5 files changed

+274
-1
lines changed

5 files changed

+274
-1
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
2+
CREATE DATABASE IF NOT EXISTS demo.wap_test;
3+
4+
5+
USE demo.wap_test;
6+
7+
8+
DROP TABLE IF EXISTS orders_wap;
9+
10+
-- WAP-enabled orders table
11+
CREATE TABLE orders_wap (
12+
order_id INT,
13+
customer_id INT,
14+
amount DECIMAL(10, 2),
15+
order_date STRING
16+
)
17+
USING iceberg;
18+
ALTER TABLE wap_test.orders_wap SET TBLPROPERTIES ('write.wap.enabled'='true');
19+
20+
SET spark.wap.id = test_wap_001;
21+
22+
23+
24+
INSERT INTO orders_wap VALUES
25+
(1, 103, 150.00, '2025-12-03'),
26+
(2, 104, 320.25, '2025-12-04');
27+
28+
29+
DROP TABLE IF EXISTS orders_non_wap;
30+
-- Non WAP-enabled orders table
31+
CREATE TABLE orders_non_wap (
32+
order_id INT,
33+
customer_id INT,
34+
amount DECIMAL(10, 2),
35+
order_date STRING
36+
)
37+
USING iceberg;
38+
39+
INSERT INTO orders_non_wap VALUES
40+
(1, 201, 10.00, '2025-12-01');

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class IcebergExecuteActionFactory {
3939
public static final String FAST_FORWARD = "fast_forward";
4040
public static final String EXPIRE_SNAPSHOTS = "expire_snapshots";
4141
public static final String REWRITE_DATA_FILES = "rewrite_data_files";
42+
public static final String PUBLISH_CHANGES = "publish_changes";
4243

4344
/**
4445
* Create an Iceberg-specific ExecuteAction instance.
@@ -80,6 +81,9 @@ public static ExecuteAction createAction(String actionType, Map<String, String>
8081
case REWRITE_DATA_FILES:
8182
return new IcebergRewriteDataFilesAction(properties, partitionNamesInfo,
8283
whereCondition);
84+
case PUBLISH_CHANGES:
85+
return new IcebergPublishChangesAction(properties, partitionNamesInfo,
86+
whereCondition);
8387
default:
8488
throw new DdlException("Unsupported Iceberg procedure: " + actionType
8589
+ ". Supported procedures: " + String.join(", ", getSupportedActions()));
@@ -99,7 +103,8 @@ public static String[] getSupportedActions() {
99103
CHERRYPICK_SNAPSHOT,
100104
FAST_FORWARD,
101105
EXPIRE_SNAPSHOTS,
102-
REWRITE_DATA_FILES
106+
REWRITE_DATA_FILES,
107+
PUBLISH_CHANGES
103108
};
104109
}
105110
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.iceberg.action;
19+
20+
import org.apache.doris.catalog.Column;
21+
import org.apache.doris.catalog.Env;
22+
import org.apache.doris.catalog.TableIf;
23+
import org.apache.doris.catalog.Type;
24+
import org.apache.doris.common.ArgumentParsers;
25+
import org.apache.doris.common.UserException;
26+
import org.apache.doris.datasource.ExternalTable;
27+
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
28+
import org.apache.doris.info.PartitionNamesInfo;
29+
import org.apache.doris.nereids.trees.expressions.Expression;
30+
31+
import com.google.common.collect.Lists;
32+
import org.apache.iceberg.Snapshot;
33+
import org.apache.iceberg.Table;
34+
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Optional;
38+
39+
/**
40+
* Implements Iceberg's publish_changes action (Core of the WAP pattern).
41+
* This action finds a snapshot tagged with a specific 'wap.id' and cherry-picks it
42+
* into the current table state.
43+
* Corresponds to Spark syntax: CALL catalog.system.publish_changes('table', 'wap_id_123')
44+
*/
45+
public class IcebergPublishChangesAction extends BaseIcebergAction {
46+
public static final String WAP_ID = "wap_id";
47+
private static final String WAP_ID_PROP = "wap.id";
48+
49+
public IcebergPublishChangesAction(Map<String, String> properties,
50+
Optional<PartitionNamesInfo> partitionNamesInfo, Optional<Expression> whereCondition) {
51+
super("publish_changes", properties, partitionNamesInfo, whereCondition);
52+
}
53+
54+
@Override
55+
protected void registerIcebergArguments() {
56+
namedArguments.registerRequiredArgument(WAP_ID,
57+
"The WAP ID matching the snapshot to publish",
58+
ArgumentParsers.nonEmptyString(WAP_ID));
59+
}
60+
61+
@Override
62+
protected void validateIcebergAction() throws UserException {
63+
validateNoPartitions();
64+
validateNoWhereCondition();
65+
}
66+
67+
@Override
68+
protected List<String> executeAction(TableIf table) throws UserException {
69+
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
70+
String targetWapId = namedArguments.getString(WAP_ID);
71+
72+
// Find the target WAP snapshot
73+
Snapshot wapSnapshot = null;
74+
for (Snapshot snapshot : icebergTable.snapshots()) {
75+
if (targetWapId.equals(snapshot.summary().get(WAP_ID_PROP))) {
76+
wapSnapshot = snapshot;
77+
break;
78+
}
79+
}
80+
81+
if (wapSnapshot == null) {
82+
throw new UserException("Cannot find snapshot with " + WAP_ID_PROP + " = " + targetWapId);
83+
}
84+
85+
long wapSnapshotId = wapSnapshot.snapshotId();
86+
87+
try {
88+
// Get previous snapshot ID for result
89+
Snapshot previousSnapshot = icebergTable.currentSnapshot();
90+
Long previousSnapshotId = previousSnapshot != null ? previousSnapshot.snapshotId() : null;
91+
92+
// Execute Cherry-pick
93+
icebergTable.manageSnapshots().cherrypick(wapSnapshotId).commit();
94+
95+
// Get current snapshot ID after commit
96+
Snapshot currentSnapshot = icebergTable.currentSnapshot();
97+
Long currentSnapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : null;
98+
99+
// Invalidate iceberg catalog table cache
100+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
101+
102+
String previousSnapshotIdString = previousSnapshotId != null ? String.valueOf(previousSnapshotId) : "null";
103+
String currentSnapshotIdString = currentSnapshotId != null ? String.valueOf(currentSnapshotId) : "null";
104+
105+
return Lists.newArrayList(
106+
previousSnapshotIdString,
107+
currentSnapshotIdString
108+
);
109+
110+
} catch (Exception e) {
111+
throw new UserException("Failed to publish changes for wap.id " + targetWapId + ": " + e.getMessage(), e);
112+
}
113+
}
114+
115+
@Override
116+
protected List<Column> getResultSchema() {
117+
return Lists.newArrayList(
118+
new Column("previous_snapshot_id", Type.STRING, false,
119+
"ID of the snapshot before the publish operation"),
120+
new Column("current_snapshot_id", Type.STRING, false,
121+
"ID of the new snapshot created as a result of the publish operation"));
122+
}
123+
124+
@Override
125+
public String getDescription() {
126+
return "Publish a WAP snapshot by cherry-picking it to the current table state";
127+
}
128+
}

regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,9 @@
6666
2 record2 200
6767
3 record3 300
6868

69+
-- !wap_before_publish --
70+
71+
-- !wap_after_publish --
72+
1 103 150.00 2025-12-03
73+
2 104 320.25 2025-12-04
74+

regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,4 +636,98 @@ suite("test_iceberg_optimize_actions_ddl", "p0,external,doris,external_docker,ex
636636
"""
637637
exception "Action 'expire_snapshots' does not support partition specification"
638638
}
639+
640+
// =====================================================================================
641+
// Test Case 6: publish_changes action with WAP (Write-Audit-Publish) pattern
642+
// Simplified workflow:
643+
//
644+
// - Main branch is initially empty (0 rows)
645+
// - A WAP snapshot exists with wap.id = "test_wap_001" and 2 rows
646+
// - publish_changes should cherry-pick the WAP snapshot into the main branch
647+
// =====================================================================================
648+
649+
logger.info("Starting simplified WAP (Write-Audit-Publish) workflow verification test")
650+
651+
// WAP test database and table
652+
String wap_db = "wap_test"
653+
String wap_table = "orders_wap"
654+
655+
// Step 1: Verify no data is visible before publish_changes
656+
logger.info("Step 1: Verifying table is empty before publish_changes")
657+
qt_wap_before_publish """
658+
SELECT order_id, customer_id, amount, order_date
659+
FROM ${catalog_name}.${wap_db}.${wap_table}
660+
ORDER BY order_id
661+
"""
662+
663+
// Step 2: Publish the WAP changes with wap_id = "test_wap_001"
664+
logger.info("Step 2: Publishing WAP changes with wap_id=test_wap_001")
665+
sql """
666+
ALTER TABLE ${catalog_name}.${wap_db}.${wap_table}
667+
EXECUTE publish_changes("wap_id" = "test_wap_001")
668+
"""
669+
logger.info("Publish changes executed successfully")
670+
671+
// Step 3: Verify WAP data is visible after publish_changes
672+
logger.info("Step 3: Verifying WAP data is visible after publish_changes")
673+
qt_wap_after_publish """
674+
SELECT order_id, customer_id, amount, order_date
675+
FROM ${catalog_name}.${wap_db}.${wap_table}
676+
ORDER BY order_id
677+
"""
678+
679+
logger.info("Simplified WAP (Write-Audit-Publish) workflow verification completed successfully")
680+
681+
// Negative tests for publish_changes
682+
683+
// publish_changes on table without write.wap.enabled = true (should fail)
684+
test {
685+
String nonWapDb = "wap_test"
686+
String nonWapTable = "orders_non_wap"
687+
688+
sql """
689+
ALTER TABLE ${catalog_name}.${nonWapDb}.${nonWapTable}
690+
EXECUTE publish_changes("wap_id" = "test_wap_001")
691+
"""
692+
exception "Cannot find snapshot with wap.id = test_wap_001"
693+
}
694+
695+
696+
// publish_changes with missing wap_id (should fail)
697+
test {
698+
sql """
699+
ALTER TABLE ${catalog_name}.${db_name}.${table_name}
700+
EXECUTE publish_changes ()
701+
"""
702+
exception "Missing required argument: wap_id"
703+
}
704+
705+
// publish_changes with invalid wap_id (should fail)
706+
test {
707+
sql """
708+
ALTER TABLE ${catalog_name}.${wap_db}.${wap_table}
709+
EXECUTE publish_changes("wap_id" = "non_existing_wap_id")
710+
"""
711+
exception "Cannot find snapshot with wap.id = non_existing_wap_id"
712+
}
713+
714+
// publish_changes with partition specification (should fail)
715+
test {
716+
sql """
717+
ALTER TABLE ${catalog_name}.${db_name}.${table_name}
718+
EXECUTE publish_changes ("wap_id" = "test_wap_001") PARTITIONS (part1)
719+
"""
720+
exception "Action 'publish_changes' does not support partition specification"
721+
}
722+
723+
// publish_changes with WHERE condition (should fail)
724+
test {
725+
sql """
726+
ALTER TABLE ${catalog_name}.${db_name}.${table_name}
727+
EXECUTE publish_changes ("wap_id" = "test_wap_001") WHERE id > 0
728+
"""
729+
exception "Action 'publish_changes' does not support WHERE condition"
730+
}
731+
732+
639733
}

0 commit comments

Comments
 (0)