Skip to content

Commit 2293651

Browse files
authored
[FLINK-38829][pipline-connector/starrocks] Support rename column DDL for starrocks sink connector (#4197)
1 parent e2b8076 commit 2293651

File tree

8 files changed

+69
-21
lines changed

8 files changed

+69
-21
lines changed

docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ under the License.
6767
version: '2.1'
6868
services:
6969
StarRocks:
70-
image: starrocks/allin1-ubuntu:3.2.6
70+
image: starrocks/allin1-ubuntu:3.5.10
7171
ports:
7272
- "8080:8080"
7373
- "9030:9030"

docs/content/docs/get-started/quickstart/mysql-to-starrocks.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ Create a `docker-compose.yml` file using the content provided below:
7070
version: '2.1'
7171
services:
7272
StarRocks:
73-
image: starrocks/allin1-ubuntu:3.2.6
73+
image: starrocks/allin1-ubuntu:3.5.10
7474
ports:
7575
- "8080:8080"
7676
- "9030:9030"

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,7 @@ public StarRocksEnrichedCatalog(String jdbcUrl, String username, String password
3838

3939
public void truncateTable(String databaseName, String tableName)
4040
throws StarRocksCatalogException {
41-
Preconditions.checkArgument(
42-
!StringUtils.isNullOrWhitespaceOnly(databaseName),
43-
"Database name cannot be null or empty.");
44-
Preconditions.checkArgument(
45-
!StringUtils.isNullOrWhitespaceOnly(tableName),
46-
"Table name cannot be null or empty.");
41+
checkTableArgument(databaseName, tableName);
4742
String alterSql = this.buildTruncateTableSql(databaseName, tableName);
4843
try {
4944
// TRUNCATE TABLE is not regarded as a column-based schema change for StarRocks, so
@@ -62,12 +57,7 @@ public void truncateTable(String databaseName, String tableName)
6257
}
6358

6459
public void dropTable(String databaseName, String tableName) throws StarRocksCatalogException {
65-
Preconditions.checkArgument(
66-
!StringUtils.isNullOrWhitespaceOnly(databaseName),
67-
"Database name cannot be null or empty.");
68-
Preconditions.checkArgument(
69-
!StringUtils.isNullOrWhitespaceOnly(tableName),
70-
"Table name cannot be null or empty.");
60+
checkTableArgument(databaseName, tableName);
7161
String alterSql = this.buildDropTableSql(databaseName, tableName);
7262
try {
7363
// like TRUNCATE TABLE, DROP TABLE isn't a column-affecting operation and `executeAlter`
@@ -84,6 +74,36 @@ public void dropTable(String databaseName, String tableName) throws StarRocksCat
8474
}
8575
}
8676

77+
public void renameColumn(
78+
String databaseName, String tableName, String oldColumnName, String newColumnName)
79+
throws StarRocksCatalogException {
80+
checkTableArgument(databaseName, tableName);
81+
Preconditions.checkArgument(
82+
!StringUtils.isNullOrWhitespaceOnly(oldColumnName),
83+
"old column name cannot be null or empty.");
84+
Preconditions.checkArgument(
85+
!StringUtils.isNullOrWhitespaceOnly(newColumnName),
86+
"new column name cannot be null or empty.");
87+
String alterSql =
88+
this.buildRenameColumnSql(databaseName, tableName, oldColumnName, newColumnName);
89+
try {
90+
executeUpdateStatement(alterSql);
91+
} catch (Exception e) {
92+
LOG.error(
93+
"Failed to alter table `{}`.`{}` rename column {} to {}. SQL executed: {}",
94+
databaseName,
95+
tableName,
96+
oldColumnName,
97+
newColumnName,
98+
alterSql);
99+
throw new StarRocksCatalogException(
100+
String.format(
101+
"Failed to alter table `%s`.`%s` rename column %s to %s.",
102+
databaseName, tableName, oldColumnName, newColumnName),
103+
e);
104+
}
105+
}
106+
87107
private String buildTruncateTableSql(String databaseName, String tableName) {
88108
return String.format("TRUNCATE TABLE `%s`.`%s`;", databaseName, tableName);
89109
}
@@ -92,6 +112,13 @@ private String buildDropTableSql(String databaseName, String tableName) {
92112
return String.format("DROP TABLE `%s`.`%s`;", databaseName, tableName);
93113
}
94114

115+
private String buildRenameColumnSql(
116+
String databaseName, String tableName, String oldColumnName, String newColumnName) {
117+
return String.format(
118+
"ALTER TABLE `%s`.`%s` RENAME COLUMN %s TO %s;",
119+
databaseName, tableName, oldColumnName, newColumnName);
120+
}
121+
95122
private void executeUpdateStatement(String sql) throws StarRocksCatalogException {
96123
try {
97124
Method m =
@@ -104,4 +131,13 @@ private void executeUpdateStatement(String sql) throws StarRocksCatalogException
104131
throw new RuntimeException(e);
105132
}
106133
}
134+
135+
private void checkTableArgument(String databaseName, String tableName) {
136+
Preconditions.checkArgument(
137+
!StringUtils.isNullOrWhitespaceOnly(databaseName),
138+
"Database name cannot be null or empty.");
139+
Preconditions.checkArgument(
140+
!StringUtils.isNullOrWhitespaceOnly(tableName),
141+
"Table name cannot be null or empty.");
142+
}
107143
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
import java.util.ArrayList;
4545
import java.util.List;
46+
import java.util.Map;
4647
import java.util.Set;
4748

4849
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType;
@@ -89,6 +90,7 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
8990
SchemaChangeEventType.CREATE_TABLE,
9091
SchemaChangeEventType.ADD_COLUMN,
9192
SchemaChangeEventType.DROP_COLUMN,
93+
SchemaChangeEventType.RENAME_COLUMN,
9294
SchemaChangeEventType.DROP_TABLE,
9395
SchemaChangeEventType.TRUNCATE_TABLE);
9496
}
@@ -307,9 +309,20 @@ private void applyDropColumn(DropColumnEvent dropColumnEvent) throws SchemaEvolv
307309

308310
private void applyRenameColumn(RenameColumnEvent renameColumnEvent)
309311
throws SchemaEvolveException {
310-
// TODO StarRocks plans to support column rename since 3.3 which has not been released.
311-
// Support it later.
312-
throw new UnsupportedSchemaChangeEventException(renameColumnEvent);
312+
try {
313+
TableId tableId = renameColumnEvent.tableId();
314+
Map<String, String> nameMapping = renameColumnEvent.getNameMapping();
315+
for (Map.Entry<String, String> entry : nameMapping.entrySet()) {
316+
catalog.renameColumn(
317+
tableId.getSchemaName(),
318+
tableId.getTableName(),
319+
entry.getKey(),
320+
entry.getValue());
321+
}
322+
} catch (Exception e) {
323+
throw new SchemaEvolveException(
324+
renameColumnEvent, "fail to apply rename column event", e);
325+
}
313326
}
314327

315328
private void applyAlterColumnType(AlterColumnTypeEvent alterColumnTypeEvent)

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,6 @@ void testStarRocksDropColumn() throws Exception {
307307
}
308308

309309
@Test
310-
@Disabled("Rename column is not supported currently.")
311310
void testStarRocksRenameColumn() throws Exception {
312311
TableId tableId =
313312
TableId.tableId(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
@Testcontainers
3434
public class StarRocksContainer extends JdbcDatabaseContainer<StarRocksContainer> {
3535

36-
private static final String DOCKER_IMAGE_NAME = "starrocks/allin1-ubuntu:3.2.6";
36+
private static final String DOCKER_IMAGE_NAME = "starrocks/allin1-ubuntu:3.5.10";
3737

3838
// exposed ports
3939
public static final int FE_HTTP_SERVICE_PORT = 8080;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public static void startContainers() {
8383
long startWaitingTimestamp = System.currentTimeMillis();
8484

8585
new LogMessageWaitStrategy()
86-
.withRegEx(".*Enjoy the journal to StarRocks blazing-fast lake-house engine!.*\\s")
86+
.withRegEx(".*Enjoy the journey to StarRocks blazing-fast lake-house engine!.*\\s")
8787
.withTimes(1)
8888
.withStartupTimeout(
8989
Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS))

tools/cdcup/src/sink/star_rocks.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def connector_name
2424

2525
def prepend_to_docker_compose_yaml(docker_compose_yaml)
2626
docker_compose_yaml['services']['starrocks'] = {
27-
'image' => 'starrocks/allin1-ubuntu:3.2.6',
27+
'image' => 'starrocks/allin1-ubuntu:3.5.10',
2828
'hostname' => 'starrocks',
2929
'ports' => %w[8080 9030],
3030
'volumes' => ["#{CDC_DATA_VOLUME}:/data"]

0 commit comments

Comments
 (0)