Skip to content

Commit 097aa9a

Browse files
authored
[FLINK-38834][pipline-connector/starrocks] Support alter column type DDL for starrocks sink connector (#4198)
1 parent c3f6657 commit 097aa9a

File tree

5 files changed

+133
-17
lines changed

5 files changed

+133
-17
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222

2323
import com.starrocks.connector.flink.catalog.StarRocksCatalog;
2424
import com.starrocks.connector.flink.catalog.StarRocksCatalogException;
25+
import com.starrocks.connector.flink.catalog.StarRocksColumn;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
2728

2829
import java.lang.reflect.InvocationTargetException;
2930
import java.lang.reflect.Method;
31+
import java.util.Optional;
3032

3133
/** An enriched {@code StarRocksCatalog} with more schema evolution abilities. */
3234
public class StarRocksEnrichedCatalog extends StarRocksCatalog {
@@ -104,6 +106,37 @@ public void renameColumn(
104106
}
105107
}
106108

109+
public void alterColumnType(String databaseName, String tableName, StarRocksColumn column)
110+
throws StarRocksCatalogException {
111+
checkTableArgument(databaseName, tableName);
112+
Preconditions.checkArgument(
113+
!StringUtils.isNullOrWhitespaceOnly(column.getColumnName()),
114+
"column name cannot be null or empty.");
115+
String alterSql = buildAlterColumnTypeSql(databaseName, tableName, buildColumnStmt(column));
116+
try {
117+
long startTimeMillis = System.currentTimeMillis();
118+
executeUpdateStatement(alterSql);
119+
LOG.info(
120+
"Success to alter table {}.{} modify column type, duration: {}ms, sql: {}",
121+
databaseName,
122+
tableName,
123+
System.currentTimeMillis() - startTimeMillis,
124+
alterSql);
125+
} catch (Exception e) {
126+
LOG.error(
127+
"Failed to alter table {}.{} modify column type, sql: {}",
128+
databaseName,
129+
tableName,
130+
alterSql,
131+
e);
132+
throw new StarRocksCatalogException(
133+
String.format(
134+
"Failed to alter table %s.%s modify column type",
135+
databaseName, tableName),
136+
e);
137+
}
138+
}
139+
107140
private String buildTruncateTableSql(String databaseName, String tableName) {
108141
return String.format("TRUNCATE TABLE `%s`.`%s`;", databaseName, tableName);
109142
}
@@ -119,6 +152,12 @@ private String buildRenameColumnSql(
119152
databaseName, tableName, oldColumnName, newColumnName);
120153
}
121154

155+
private String buildAlterColumnTypeSql(
156+
String databaseName, String tableName, String columnStmt) {
157+
return String.format(
158+
"ALTER TABLE `%s`.`%s` MODIFY COLUMN %s", databaseName, tableName, columnStmt);
159+
}
160+
122161
private void executeUpdateStatement(String sql) throws StarRocksCatalogException {
123162
try {
124163
Method m =
@@ -140,4 +179,44 @@ private void checkTableArgument(String databaseName, String tableName) {
140179
!StringUtils.isNullOrWhitespaceOnly(tableName),
141180
"Table name cannot be null or empty.");
142181
}
182+
183+
private String buildColumnStmt(StarRocksColumn column) {
184+
StringBuilder builder = new StringBuilder();
185+
builder.append("`");
186+
builder.append(column.getColumnName());
187+
builder.append("` ");
188+
builder.append(
189+
getFullColumnType(
190+
column.getDataType(), column.getColumnSize(), column.getDecimalDigits()));
191+
builder.append(" ");
192+
builder.append(column.isNullable() ? "NULL" : "NOT NULL");
193+
if (column.getDefaultValue().isPresent()) {
194+
builder.append(String.format(" DEFAULT \"%s\"", column.getDefaultValue().get()));
195+
}
196+
197+
if (column.getColumnComment().isPresent()) {
198+
builder.append(String.format(" COMMENT \"%s\"", column.getColumnComment().get()));
199+
}
200+
return builder.toString();
201+
}
202+
203+
private String getFullColumnType(
204+
String type, Optional<Integer> columnSize, Optional<Integer> decimalDigits) {
205+
String dataType = type.toUpperCase();
206+
switch (dataType) {
207+
case "DECIMAL":
208+
Preconditions.checkArgument(
209+
columnSize.isPresent(), "DECIMAL type must have column size");
210+
Preconditions.checkArgument(
211+
decimalDigits.isPresent(), "DECIMAL type must have decimal digits");
212+
return String.format("DECIMAL(%d, %s)", columnSize.get(), decimalDigits.get());
213+
case "CHAR":
214+
case "VARCHAR":
215+
Preconditions.checkArgument(
216+
columnSize.isPresent(), type + " type must have column size");
217+
return String.format("%s(%d)", dataType, columnSize.get());
218+
default:
219+
return dataType;
220+
}
221+
}
143222
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import org.apache.flink.cdc.common.event.TruncateTableEvent;
3030
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
3131
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
32-
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
3332
import org.apache.flink.cdc.common.schema.Column;
3433
import org.apache.flink.cdc.common.sink.MetadataApplier;
34+
import org.apache.flink.cdc.common.types.DataType;
3535

3636
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
3737

@@ -91,6 +91,7 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
9191
SchemaChangeEventType.ADD_COLUMN,
9292
SchemaChangeEventType.DROP_COLUMN,
9393
SchemaChangeEventType.RENAME_COLUMN,
94+
SchemaChangeEventType.ALTER_COLUMN_TYPE,
9495
SchemaChangeEventType.DROP_TABLE,
9596
SchemaChangeEventType.TRUNCATE_TABLE);
9697
}
@@ -325,15 +326,21 @@ private void applyRenameColumn(RenameColumnEvent renameColumnEvent)
325326
}
326327
}
327328

328-
private void applyAlterColumnType(AlterColumnTypeEvent alterColumnTypeEvent)
329-
throws SchemaEvolveException {
330-
// TODO There are limitations for data type conversions. We should know the data types
331-
// before and after changing so that we can make a validation. But the event only contains
332-
// data
333-
// types after changing. One way is that the framework delivers the old schema. We can
334-
// support
335-
// the alter after a discussion.
336-
throw new UnsupportedSchemaChangeEventException(alterColumnTypeEvent);
329+
private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolveException {
330+
try {
331+
TableId tableId = event.tableId();
332+
Map<String, DataType> typeMapping = event.getTypeMapping();
333+
334+
for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) {
335+
StarRocksColumn.Builder builder =
336+
new StarRocksColumn.Builder().setColumnName(entry.getKey());
337+
toStarRocksDataType(entry.getValue(), false, builder);
338+
catalog.alterColumnType(
339+
tableId.getSchemaName(), tableId.getTableName(), builder.build());
340+
}
341+
} catch (Exception e) {
342+
throw new SchemaEvolveException(event, "fail to apply alter column type event", e);
343+
}
337344
}
338345

339346
private void applyTruncateTable(TruncateTableEvent truncateTableEvent) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,15 @@ public static StarRocksTable toStarRocksTable(
113113

114114
/** Convert CDC data type to StarRocks data type. */
115115
public static void toStarRocksDataType(
116-
Column cdcColumn, boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
116+
DataType cdcDataType, boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
117117
CdcDataTypeTransformer dataTypeTransformer =
118118
new CdcDataTypeTransformer(isPrimaryKeys, builder);
119-
cdcColumn.getType().accept(dataTypeTransformer);
119+
cdcDataType.accept(dataTypeTransformer);
120+
}
121+
122+
public static void toStarRocksDataType(
123+
Column cdcColumn, boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
124+
toStarRocksDataType(cdcColumn.getType(), isPrimaryKeys, builder);
120125
}
121126

122127
/** Format DATE type data. */

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import org.junit.jupiter.api.AfterEach;
5656
import org.junit.jupiter.api.BeforeAll;
5757
import org.junit.jupiter.api.BeforeEach;
58-
import org.junit.jupiter.api.Disabled;
5958
import org.junit.jupiter.api.Test;
6059

6160
import java.util.ArrayList;
@@ -163,7 +162,7 @@ private List<Event> generateAlterColumnTypeEvents(TableId tableId) {
163162
Schema.newBuilder()
164163
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
165164
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
166-
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
165+
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17).notNull(), null))
167166
.primaryKey("id")
168167
.build();
169168

@@ -327,15 +326,14 @@ void testStarRocksRenameColumn() throws Exception {
327326
}
328327

329328
@Test
330-
@Disabled("Alter column type is not supported currently.")
331329
void testStarRocksAlterColumnType() throws Exception {
332330
TableId tableId =
333331
TableId.tableId(
334332
StarRocksContainer.STARROCKS_DATABASE_NAME,
335333
StarRocksContainer.STARROCKS_TABLE_NAME);
336334

337335
runJobWithEvents(generateAlterColumnTypeEvents(tableId));
338-
336+
waitAlterDone(tableId, 60000L);
339337
List<String> actual = inspectTableSchema(tableId);
340338

341339
List<String> expected =
@@ -348,7 +346,6 @@ void testStarRocksAlterColumnType() throws Exception {
348346
}
349347

350348
@Test
351-
@Disabled("Alter column type is not supported currently.")
352349
void testStarRocksNarrowingAlterColumnType() throws Exception {
353350
Assertions.assertThatThrownBy(
354351
() -> {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
4040
import org.testcontainers.lifecycle.Startables;
4141

42+
import java.sql.Connection;
4243
import java.sql.ResultSet;
4344
import java.sql.SQLException;
4445
import java.time.Duration;
@@ -227,6 +228,33 @@ public List<String> fetchTableContent(TableId tableId, int columnCount) throws S
227228
return results;
228229
}
229230

231+
// Starrocks alter column is asynchronous and does not support Light mode.
232+
public void waitAlterDone(TableId tableId, long timeout)
233+
throws SQLException, InterruptedException {
234+
Connection conn = STARROCKS_CONTAINER.createConnection("");
235+
conn.createStatement().execute(String.format("USE `%s`", tableId.getSchemaName()));
236+
long t0 = System.currentTimeMillis();
237+
while (System.currentTimeMillis() - t0 < timeout) {
238+
ResultSet rs =
239+
conn.createStatement()
240+
.executeQuery(
241+
String.format(
242+
"SHOW ALTER TABLE COLUMN WHERE TableName = '%s' ORDER BY CreateTime DESC LIMIT 1",
243+
tableId.getTableName()));
244+
if (rs.next()) {
245+
String state = rs.getString("State");
246+
if ("FINISHED".equals(state)) {
247+
return;
248+
}
249+
if ("CANCELLED".equals(state)) {
250+
throw new RuntimeException("Alter failed: " + rs.getString("Msg"));
251+
}
252+
}
253+
Thread.sleep(1000L);
254+
}
255+
throw new RuntimeException("Alter job timeout");
256+
}
257+
230258
public static <T> void assertEqualsInAnyOrder(List<T> expected, List<T> actual) {
231259
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
232260
}

0 commit comments

Comments
 (0)