Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.

Commit c82b19a

Browse files
[Improve][Connector-V2] Add branch option for paimon sink (apache#9982)
1 parent ddd9238 commit c82b19a

File tree

12 files changed

+212
-12
lines changed

12 files changed

+212
-12
lines changed

docs/en/connector-v2/sink/Paimon.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ libfb303-xxx.jar
7878
| paimon.hadoop.conf | Map | No | - | Properties in hadoop conf |
7979
| paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files |
8080
| paimon.table.non-primary-key | Boolean | false | - | Switch to create `table with PK` or `table without PK`. true : `table without PK`, false : `table with PK` |
81+
| branch | String | No | main | The branch name of Paimon table to write data to. If the branch does not exist, an exception will be thrown. |
8182

8283

8384
## Checkpoint in batch mode

docs/zh/connector-v2/sink/Paimon.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ libfb303-xxx.jar
7777
| paimon.hadoop.conf | Map || - | Hadoop配置文件属性信息 |
7878
| paimon.hadoop.conf-path | 字符串 || - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |
7979
| paimon.table.non-primary-key | Boolean | false | - | 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表 |
80+
| branch | 字符串 || main | 要写入数据的Paimon表分支名称。如果指定的分支不存在,将抛出异常。 |
8081

8182
## 批模式下的checkpoint
8283

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public OptionRule optionRule() {
5555
PaimonSinkOptions.DATA_SAVE_MODE,
5656
PaimonSinkOptions.PRIMARY_KEYS,
5757
PaimonSinkOptions.PARTITION_KEYS,
58-
PaimonSinkOptions.WRITE_PROPS)
58+
PaimonSinkOptions.WRITE_PROPS,
59+
PaimonSinkOptions.BRANCH)
5960
.conditional(
6061
PaimonBaseOptions.CATALOG_TYPE,
6162
PaimonCatalogEnum.HIVE,

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class PaimonSinkConfig extends PaimonConfig {
4040
private final DataSaveMode dataSaveMode;
4141
private final CoreOptions.ChangelogProducer changelogProducer;
4242
private final String changelogTmpPath;
43+
private final String branch;
4344
private final Boolean nonPrimaryKey;
4445
private final List<String> primaryKeys;
4546
private final List<String> partitionKeys;
@@ -79,5 +80,6 @@ public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
7980
this.changelogTmpPath =
8081
writeProps.getOrDefault(
8182
PaimonSinkOptions.CHANGELOG_TMP_PATH, System.getProperty("java.io.tmpdir"));
83+
this.branch = readonlyConfig.get(PaimonSinkOptions.BRANCH);
8284
}
8385
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,7 @@ public class PaimonSinkOptions extends PaimonBaseOptions {
6868
.defaultValue(new HashMap<>())
6969
.withDescription(
7070
"Properties passed through to paimon table initialization, such as 'file.format', 'bucket'(org.apache.paimon.CoreOptions)");
71+
72+
public static final Option<String> BRANCH =
73+
Options.key("branch").stringType().noDefaultValue().withDescription("branch");
7174
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public enum PaimonConnectorErrorCode implements SeaTunnelErrorCode {
3232
WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in dynamic bucket mode"),
3333
NON_PRIMARY_KEY_CHECK_ERROR(
3434
"PAIMON-10", "Primary keys should be empty when nonPrimaryKey is true"),
35-
DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is incompatible. ");
35+
DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is incompatible. "),
36+
BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. ");
3637

3738
private final String code;
3839
private final String description;

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.handler;
1919

20+
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
21+
2022
import org.apache.seatunnel.api.sink.DataSaveMode;
2123
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
2224
import org.apache.seatunnel.api.sink.SchemaSaveMode;
@@ -26,35 +28,41 @@
2628
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
2729
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable;
2830

31+
import org.apache.paimon.table.FileStoreTable;
2932
import org.apache.paimon.table.Table;
3033

3134
public class PaimonSaveModeHandler extends DefaultSaveModeHandler {
3235

3336
private SupportLoadTable<Table> supportLoadTable;
3437
private Catalog catalog;
3538
private CatalogTable catalogTable;
39+
private String branch;
3640

3741
public PaimonSaveModeHandler(
3842
SupportLoadTable supportLoadTable,
3943
SchemaSaveMode schemaSaveMode,
4044
DataSaveMode dataSaveMode,
4145
Catalog catalog,
4246
CatalogTable catalogTable,
43-
String customSql) {
47+
String customSql,
48+
String branch) {
4449
super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql);
4550
this.supportLoadTable = supportLoadTable;
4651
this.catalog = catalog;
4752
this.catalogTable = catalogTable;
53+
this.branch = branch;
4854
}
4955

5056
@Override
5157
public void handleSchemaSaveMode() {
5258
super.handleSchemaSaveMode();
5359
TablePath tablePath = catalogTable.getTablePath();
5460
Table paimonTable = ((PaimonCatalog) catalog).getPaimonTable(tablePath);
55-
// load paimon table and set it into paimon sink
5661
Table loadTable = this.supportLoadTable.getLoadTable();
5762
if (loadTable == null || this.schemaSaveMode == SchemaSaveMode.RECREATE_SCHEMA) {
63+
if (StringUtils.isNotEmpty(branch)) {
64+
paimonTable = ((FileStoreTable) paimonTable).switchToBranch(branch);
65+
}
5866
this.supportLoadTable.setLoadTable(paimonTable);
5967
}
6068
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
1919

20+
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
21+
2022
import org.apache.seatunnel.api.common.JobContext;
2123
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2224
import org.apache.seatunnel.api.serialization.DefaultSerializer;
@@ -35,6 +37,8 @@
3537
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
3638
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
3739
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
40+
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
41+
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
3842
import org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler;
3943
import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
4044
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
@@ -43,14 +47,19 @@
4347
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
4448
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
4549

50+
import org.apache.paimon.table.FileStoreTable;
4651
import org.apache.paimon.table.Table;
52+
import org.apache.paimon.utils.BranchManager;
53+
54+
import lombok.extern.slf4j.Slf4j;
4755

4856
import java.io.IOException;
4957
import java.util.Arrays;
5058
import java.util.List;
5159
import java.util.Optional;
5260
import java.util.UUID;
5361

62+
@Slf4j
5463
public class PaimonSink
5564
implements SeaTunnelSink<
5665
SeaTunnelRow,
@@ -66,7 +75,7 @@ public class PaimonSink
6675

6776
public static final String PLUGIN_NAME = "Paimon";
6877

69-
private Table paimonTable;
78+
private FileStoreTable paimonTable;
7079

7180
private JobContext jobContext;
7281

@@ -92,11 +101,25 @@ public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
92101
paimonCatalog.open();
93102
boolean databaseExists =
94103
paimonCatalog.databaseExists(this.paimonSinkConfig.getNamespace());
95-
if (databaseExists) {
96-
TablePath tablePath = catalogTable.getTablePath();
97-
boolean tableExists = paimonCatalog.tableExists(tablePath);
98-
if (tableExists) {
99-
this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
104+
if (!databaseExists) {
105+
return;
106+
}
107+
TablePath tablePath = catalogTable.getTablePath();
108+
boolean tableExists = paimonCatalog.tableExists(tablePath);
109+
if (!tableExists) {
110+
return;
111+
}
112+
this.paimonTable = (FileStoreTable) paimonCatalog.getPaimonTable(tablePath);
113+
String branchName = paimonSinkConfig.getBranch();
114+
if (StringUtils.isNotEmpty(branchName)) {
115+
BranchManager branchManager = paimonTable.branchManager();
116+
if (!branchManager.branchExists(branchName)) {
117+
throw new PaimonConnectorException(
118+
PaimonConnectorErrorCode.BRANCH_NOT_EXISTS, branchName);
119+
}
120+
if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
121+
this.paimonTable = paimonTable.switchToBranch(branchName);
122+
log.info("Switch to branch {}", branchName);
100123
}
101124
}
102125
}
@@ -168,12 +191,13 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
168191
paimonSinkConfig.getDataSaveMode(),
169192
paimonCatalog,
170193
catalogTable,
171-
null));
194+
null,
195+
paimonSinkConfig.getBranch()));
172196
}
173197

174198
@Override
175199
public void setLoadTable(Table table) {
176-
this.paimonTable = table;
200+
this.paimonTable = (FileStoreTable) table;
177201
}
178202

179203
@Override

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public OptionRule optionRule() {
5959
PaimonSinkOptions.PRIMARY_KEYS,
6060
PaimonSinkOptions.PARTITION_KEYS,
6161
PaimonSinkOptions.WRITE_PROPS,
62+
PaimonSinkOptions.BRANCH,
6263
SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
6364
.conditional(
6465
PaimonSinkOptions.CATALOG_TYPE,

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
1919

2020
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
21+
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
2122

2223
import org.apache.seatunnel.api.common.JobContext;
2324
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -58,6 +59,7 @@
5859
import org.apache.paimon.table.sink.StreamTableWrite;
5960
import org.apache.paimon.table.sink.TableCommitImpl;
6061
import org.apache.paimon.table.sink.TableWrite;
62+
import org.apache.paimon.utils.BranchManager;
6163

6264
import lombok.extern.slf4j.Slf4j;
6365

@@ -274,6 +276,18 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
274276
private void reOpenTableWrite() {
275277
this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType();
276278
this.paimonTable = (FileStoreTable) paimonCatalog.getPaimonTable(paimonTablePath);
279+
String branchName = paimonSinkConfig.getBranch();
280+
if (StringUtils.isNotEmpty(branchName)) {
281+
BranchManager branchManager = paimonTable.branchManager();
282+
if (!branchManager.branchExists(branchName)) {
283+
throw new PaimonConnectorException(
284+
PaimonConnectorErrorCode.BRANCH_NOT_EXISTS, branchName);
285+
}
286+
if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
287+
this.paimonTable = this.paimonTable.switchToBranch(branchName);
288+
log.info("Re-switched to branch {} after reopening table", branchName);
289+
}
290+
}
277291
this.sinkPaimonTableSchema = this.paimonTable.schema();
278292
this.newTableWrite();
279293
}

0 commit comments

Comments
 (0)