Skip to content

Commit 80128c4

Browse files
authored
[flink] Flink batch delete supports aggregation.remove-record-on-delete option (#5402)
1 parent 9349579 commit 80128c4

File tree

1 file changed

+11
-0
lines changed

1 file changed

+11
-0
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.Set;
5454
import java.util.stream.Collectors;
5555

56+
import static org.apache.paimon.CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE;
5657
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
5758
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
5859
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
@@ -201,6 +202,16 @@ private void validateDeletable() {
201202
SEQUENCE_GROUP,
202203
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP));
203204
}
205+
case AGGREGATE:
206+
if (options.get(AGGREGATION_REMOVE_RECORD_ON_DELETE)) {
207+
return;
208+
} else {
209+
throw new UnsupportedOperationException(
210+
String.format(
211+
"Merge engine %s doesn't support batch delete by default. To support batch delete, "
212+
+ "please set %s to true.",
213+
mergeEngine, AGGREGATION_REMOVE_RECORD_ON_DELETE.key()));
214+
}
204215
default:
205216
throw new UnsupportedOperationException(
206217
String.format(

0 commit comments

Comments
 (0)