Skip to content

Commit 0240447

Browse files
committed
[FLINK-38829][pipline-connector/starrocks] Support rename column DDL for starrocks sink connector
1 parent f520424 commit 0240447

File tree

7 files changed

+128
-27
lines changed

7 files changed

+128
-27
lines changed

docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ under the License.
6767
version: '2.1'
6868
services:
6969
StarRocks:
70-
image: starrocks/allin1-ubuntu:3.2.6
70+
image: starrocks/allin1-ubuntu:3.5.10
7171
ports:
7272
- "8080:8080"
7373
- "9030:9030"

docs/content/docs/get-started/quickstart/mysql-to-starrocks.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ Create a `docker-compose.yml` file using the content provided below:
7070
version: '2.1'
7171
services:
7272
StarRocks:
73-
image: starrocks/allin1-ubuntu:3.2.6
73+
image: starrocks/allin1-ubuntu:3.5.10
7474
ports:
7575
- "8080:8080"
7676
- "9030:9030"

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,7 @@ public StarRocksEnrichedCatalog(String jdbcUrl, String username, String password
3838

3939
public void truncateTable(String databaseName, String tableName)
4040
throws StarRocksCatalogException {
41-
Preconditions.checkArgument(
42-
!StringUtils.isNullOrWhitespaceOnly(databaseName),
43-
"Database name cannot be null or empty.");
44-
Preconditions.checkArgument(
45-
!StringUtils.isNullOrWhitespaceOnly(tableName),
46-
"Table name cannot be null or empty.");
41+
checkTableArgument(databaseName, tableName);
4742
String alterSql = this.buildTruncateTableSql(databaseName, tableName);
4843
try {
4944
// TRUNCATE TABLE is not regarded as a column-based schema change for StarRocks, so
@@ -62,12 +57,7 @@ public void truncateTable(String databaseName, String tableName)
6257
}
6358

6459
public void dropTable(String databaseName, String tableName) throws StarRocksCatalogException {
65-
Preconditions.checkArgument(
66-
!StringUtils.isNullOrWhitespaceOnly(databaseName),
67-
"Database name cannot be null or empty.");
68-
Preconditions.checkArgument(
69-
!StringUtils.isNullOrWhitespaceOnly(tableName),
70-
"Table name cannot be null or empty.");
60+
checkTableArgument(databaseName, tableName);
7161
String alterSql = this.buildDropTableSql(databaseName, tableName);
7262
try {
7363
// like TRUNCATE TABLE, DROP TABLE isn't a column-affecting operation and `executeAlter`
@@ -84,6 +74,36 @@ public void dropTable(String databaseName, String tableName) throws StarRocksCat
8474
}
8575
}
8676

77+
public void renameColumn(
78+
String databaseName, String tableName, String oldColumnName, String newColumnName)
79+
throws StarRocksCatalogException {
80+
checkTableArgument(databaseName, tableName);
81+
Preconditions.checkArgument(
82+
!StringUtils.isNullOrWhitespaceOnly(oldColumnName),
83+
"old column name cannot be null or empty.");
84+
Preconditions.checkArgument(
85+
!StringUtils.isNullOrWhitespaceOnly(newColumnName),
86+
"new column name cannot be null or empty.");
87+
String alterSql =
88+
this.buildRenameColumnSql(databaseName, tableName, oldColumnName, newColumnName);
89+
try {
90+
executeUpdateStatement(alterSql);
91+
} catch (Exception e) {
92+
LOG.error(
93+
"Failed to alter table `{}`.`{}` rename column {} to {}. SQL executed: {}",
94+
databaseName,
95+
tableName,
96+
oldColumnName,
97+
newColumnName,
98+
alterSql);
99+
throw new StarRocksCatalogException(
100+
String.format(
101+
"Failed to alter table `%s`.`%s` rename column %s to %s.",
102+
databaseName, tableName, oldColumnName, newColumnName),
103+
e);
104+
}
105+
}
106+
87107
private String buildTruncateTableSql(String databaseName, String tableName) {
88108
return String.format("TRUNCATE TABLE `%s`.`%s`;", databaseName, tableName);
89109
}
@@ -92,6 +112,13 @@ private String buildDropTableSql(String databaseName, String tableName) {
92112
return String.format("DROP TABLE `%s`.`%s`;", databaseName, tableName);
93113
}
94114

115+
private String buildRenameColumnSql(
116+
String databaseName, String tableName, String oldColumnName, String newColumnName) {
117+
return String.format(
118+
"ALTER TABLE `%s`.`%s` RENAME COLUMN %s TO %s;",
119+
databaseName, tableName, oldColumnName, newColumnName);
120+
}
121+
95122
private void executeUpdateStatement(String sql) throws StarRocksCatalogException {
96123
try {
97124
Method m =
@@ -104,4 +131,13 @@ private void executeUpdateStatement(String sql) throws StarRocksCatalogException
104131
throw new RuntimeException(e);
105132
}
106133
}
134+
135+
private void checkTableArgument(String databaseName, String tableName) {
136+
Preconditions.checkArgument(
137+
!StringUtils.isNullOrWhitespaceOnly(databaseName),
138+
"Database name cannot be null or empty.");
139+
Preconditions.checkArgument(
140+
!StringUtils.isNullOrWhitespaceOnly(tableName),
141+
"Table name cannot be null or empty.");
142+
}
107143
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
import java.util.ArrayList;
4545
import java.util.List;
46+
import java.util.Map;
4647
import java.util.Set;
4748

4849
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType;
@@ -307,9 +308,20 @@ private void applyDropColumn(DropColumnEvent dropColumnEvent) throws SchemaEvolv
307308

308309
private void applyRenameColumn(RenameColumnEvent renameColumnEvent)
309310
throws SchemaEvolveException {
310-
// TODO StarRocks plans to support column rename since 3.3 which has not been released.
311-
// Support it later.
312-
throw new UnsupportedSchemaChangeEventException(renameColumnEvent);
311+
try {
312+
TableId tableId = renameColumnEvent.tableId();
313+
Map<String, String> nameMapping = renameColumnEvent.getNameMapping();
314+
for (Map.Entry<String, String> entry : nameMapping.entrySet()) {
315+
catalog.renameColumn(
316+
tableId.getSchemaName(),
317+
tableId.getTableName(),
318+
entry.getKey(),
319+
entry.getValue());
320+
}
321+
} catch (Exception e) {
322+
throw new SchemaEvolveException(
323+
renameColumnEvent, "fail to apply rename column event", e);
324+
}
313325
}
314326

315327
private void applyAlterColumnType(AlterColumnTypeEvent alterColumnTypeEvent)

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818
package org.apache.flink.cdc.connectors.starrocks.sink;
1919

2020
import org.apache.flink.cdc.common.configuration.Configuration;
21-
import org.apache.flink.cdc.common.event.AddColumnEvent;
22-
import org.apache.flink.cdc.common.event.CreateTableEvent;
23-
import org.apache.flink.cdc.common.event.DropColumnEvent;
24-
import org.apache.flink.cdc.common.event.TableId;
21+
import org.apache.flink.cdc.common.event.*;
2522
import org.apache.flink.cdc.common.schema.Column;
2623
import org.apache.flink.cdc.common.schema.Schema;
2724
import org.apache.flink.cdc.common.types.BooleanType;
@@ -38,10 +35,7 @@
3835
import org.junit.jupiter.api.BeforeEach;
3936
import org.junit.jupiter.api.Test;
4037

41-
import java.util.ArrayList;
42-
import java.util.Arrays;
43-
import java.util.Collections;
44-
import java.util.List;
38+
import java.util.*;
4539

4640
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_CREATE_NUM_BUCKETS;
4741
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_SCHEMA_CHANGE_TIMEOUT;
@@ -226,4 +220,63 @@ void testDropColumn() throws Exception {
226220
.build();
227221
Assertions.assertThat(actualTable).isEqualTo(expectTable);
228222
}
223+
224+
@Test
225+
void testRenameColumn() throws Exception {
226+
TableId tableId = TableId.parse("test.tbl4");
227+
Schema schema =
228+
Schema.newBuilder()
229+
.physicalColumn("col1", new IntType())
230+
.physicalColumn("col2", new BooleanType())
231+
.physicalColumn("col3", new TimestampType())
232+
.primaryKey("col1")
233+
.build();
234+
CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
235+
metadataApplier.applySchemaChange(createTableEvent);
236+
237+
Map<String, String> nameMapping = new HashMap<>();
238+
nameMapping.put("col2", "newCol2");
239+
nameMapping.put("col3", "newCol3");
240+
RenameColumnEvent renameColumnEvent = new RenameColumnEvent(tableId, nameMapping);
241+
metadataApplier.applySchemaChange(renameColumnEvent);
242+
243+
StarRocksTable actualTable =
244+
catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
245+
Assertions.assertThat(actualTable).isNotNull();
246+
247+
List<StarRocksColumn> columns = new ArrayList<>();
248+
columns.add(
249+
new StarRocksColumn.Builder()
250+
.setColumnName("col1")
251+
.setOrdinalPosition(0)
252+
.setDataType("int")
253+
.setNullable(true)
254+
.build());
255+
columns.add(
256+
new StarRocksColumn.Builder()
257+
.setColumnName("newCol2")
258+
.setOrdinalPosition(1)
259+
.setDataType("boolean")
260+
.setNullable(true)
261+
.build());
262+
columns.add(
263+
new StarRocksColumn.Builder()
264+
.setColumnName("newCol3")
265+
.setOrdinalPosition(2)
266+
.setDataType("datetime")
267+
.setNullable(true)
268+
.build());
269+
StarRocksTable expectTable =
270+
new StarRocksTable.Builder()
271+
.setDatabaseName(tableId.getSchemaName())
272+
.setTableName(tableId.getTableName())
273+
.setTableType(StarRocksTable.TableType.PRIMARY_KEY)
274+
.setColumns(columns)
275+
.setTableKeys(schema.primaryKeys())
276+
.setDistributionKeys(schema.primaryKeys())
277+
.setNumBuckets(10)
278+
.setTableProperties(Collections.singletonMap("replication_num", "5"))
279+
.build();
280+
Assertions.assertThat(actualTable).isEqualTo(expectTable);
281+
}
229282
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
@Testcontainers
3434
public class StarRocksContainer extends JdbcDatabaseContainer<StarRocksContainer> {
3535

36-
private static final String DOCKER_IMAGE_NAME = "starrocks/allin1-ubuntu:3.2.6";
36+
private static final String DOCKER_IMAGE_NAME = "starrocks/allin1-ubuntu:3.5.10";
3737

3838
// exposed ports
3939
public static final int FE_HTTP_SERVICE_PORT = 8080;

tools/cdcup/src/sink/star_rocks.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def connector_name
2424

2525
def prepend_to_docker_compose_yaml(docker_compose_yaml)
2626
docker_compose_yaml['services']['starrocks'] = {
27-
'image' => 'starrocks/allin1-ubuntu:3.2.6',
27+
'image' => 'starrocks/allin1-ubuntu:3.5.10',
2828
'hostname' => 'starrocks',
2929
'ports' => %w[8080 9030],
3030
'volumes' => ["#{CDC_DATA_VOLUME}:/data"]

0 commit comments

Comments
 (0)