Skip to content

Commit a61f923

Browse files
committed
[BugFix] In the 3.1 separated storage and computation version, the use of the "insert into" statement in Flink SQL will lose the data with Delete semantics.
Signed-off-by: andystenhe <andystenhe@hotmail.com>
1 parent cc8689d commit a61f923

File tree

2 files changed

+2
-28
lines changed

2 files changed

+2
-28
lines changed

src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -424,21 +424,8 @@ private void validateTableStructure(TableSchema flinkSchema) {
424424
}
425425
// validate primary keys
426426
List<String> primayKeys = new ArrayList<>();
427-
for (int i = 0; i < rows.size(); i++) {
428-
String keysType = rows.get(i).get("COLUMN_KEY").toString();
429-
if (!"PRI".equals(keysType)) {
430-
continue;
431-
}
432-
primayKeys.add(rows.get(i).get("COLUMN_NAME").toString().toLowerCase());
433-
}
427+
flinkSchema.getPrimaryKey().ifPresent(c -> c.getColumns().forEach(colName -> primayKeys.add(colName.toLowerCase())));
434428
if (!primayKeys.isEmpty()) {
435-
if (!constraint.isPresent()) {
436-
throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`.");
437-
}
438-
if (constraint.get().getColumns().size() != primayKeys.size() ||
439-
!constraint.get().getColumns().stream().allMatch(col -> primayKeys.contains(col.toLowerCase()))) {
440-
throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table.");
441-
}
442429
sinkOptions.enableUpsertDelete();
443430
}
444431

src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -102,21 +102,8 @@ public void validateTableStructure(StarRocksSinkOptions sinkOptions, TableSchema
102102
}
103103
// validate primary keys
104104
List<String> primaryKeys = new ArrayList<>();
105-
for (Map<String, Object> row : rows) {
106-
String keysType = row.get("COLUMN_KEY").toString();
107-
if (!"PRI".equals(keysType)) {
108-
continue;
109-
}
110-
primaryKeys.add(row.get("COLUMN_NAME").toString().toLowerCase());
111-
}
105+
flinkSchema.getPrimaryKey().ifPresent(c -> c.getColumns().forEach(colName -> primaryKeys.add(colName.toLowerCase())));
112106
if (!primaryKeys.isEmpty()) {
113-
if (!constraint.isPresent()) {
114-
throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`.");
115-
}
116-
if (constraint.get().getColumns().size() != primaryKeys.size() ||
117-
!constraint.get().getColumns().stream().allMatch(col -> primaryKeys.contains(col.toLowerCase()))) {
118-
throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table.");
119-
}
120107
sinkOptions.enableUpsertDelete();
121108
}
122109

0 commit comments

Comments
 (0)