Skip to content

Commit 9b206b0

Browse files
committed
support modify nullable
1 parent 62eeba5 commit 9b206b0

File tree

2 files changed

+24
-13
lines changed

2 files changed

+24
-13
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,7 @@ public void alterColumnType(String databaseName, String tableName, StarRocksColu
112112
Preconditions.checkArgument(
113113
!StringUtils.isNullOrWhitespaceOnly(column.getColumnName()),
114114
"column name cannot be null or empty.");
115-
String alterSql =
116-
buildAlterColumnTypeSql(
117-
databaseName,
118-
tableName,
119-
column.getColumnName(),
120-
getFullColumnType(
121-
column.getDataType(),
122-
column.getColumnSize(),
123-
column.getDecimalDigits()));
115+
String alterSql = buildAlterColumnTypeSql(databaseName, tableName, buildColumnStmt(column));
124116
try {
125117
long startTimeMillis = System.currentTimeMillis();
126118
executeUpdateStatement(alterSql);
@@ -161,10 +153,9 @@ private String buildRenameColumnSql(
161153
}
162154

163155
private String buildAlterColumnTypeSql(
164-
String databaseName, String tableName, String columnName, String dataType) {
156+
String databaseName, String tableName, String columnStmt) {
165157
return String.format(
166-
"ALTER TABLE `%s`.`%s` MODIFY COLUMN %s %s",
167-
databaseName, tableName, columnName, dataType);
158+
"ALTER TABLE `%s`.`%s` MODIFY COLUMN %s", databaseName, tableName, columnStmt);
168159
}
169160

170161
private void executeUpdateStatement(String sql) throws StarRocksCatalogException {
@@ -189,6 +180,26 @@ private void checkTableArgument(String databaseName, String tableName) {
189180
"Table name cannot be null or empty.");
190181
}
191182

183+
private String buildColumnStmt(StarRocksColumn column) {
184+
StringBuilder builder = new StringBuilder();
185+
builder.append("`");
186+
builder.append(column.getColumnName());
187+
builder.append("` ");
188+
builder.append(
189+
getFullColumnType(
190+
column.getDataType(), column.getColumnSize(), column.getDecimalDigits()));
191+
builder.append(" ");
192+
builder.append(column.isNullable() ? "NULL" : "NOT NULL");
193+
if (column.getDefaultValue().isPresent()) {
194+
builder.append(String.format(" DEFAULT \"%s\"", column.getDefaultValue().get()));
195+
}
196+
197+
if (column.getColumnComment().isPresent()) {
198+
builder.append(String.format(" COMMENT \"%s\"", column.getColumnComment().get()));
199+
}
200+
return builder.toString();
201+
}
202+
192203
private String getFullColumnType(
193204
String type, Optional<Integer> columnSize, Optional<Integer> decimalDigits) {
194205
String dataType = type.toUpperCase();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private List<Event> generateAlterColumnTypeEvents(TableId tableId) {
162162
Schema.newBuilder()
163163
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
164164
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
165-
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
165+
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17).notNull(), null))
166166
.primaryKey("id")
167167
.build();
168168

0 commit comments

Comments
 (0)