Skip to content

Commit 5a7200d

Browse files
committed
[FLINK-36406][cdc-runtime] Close MetadataApplier when the job stops
1 parent 6b13868 commit 5a7200d

File tree

2 files changed

+18
-0
lines changed

2 files changed

+18
-0
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.io.ByteArrayOutputStream;
5353
import java.io.DataInputStream;
5454
import java.io.DataOutputStream;
55+
import java.io.IOException;
5556
import java.time.Duration;
5657
import java.util.ArrayList;
5758
import java.util.Collection;
@@ -131,6 +132,17 @@ public void start() throws Exception {
131132
"Started SchemaRegistry for {}. Parallelism: {}", operatorName, currentParallelism);
132133
}
133134

135+
@Override
136+
public void close() throws Exception {
137+
super.close();
138+
try {
139+
metadataApplier.close();
140+
} catch (Exception e) {
141+
LOG.error("Failed to close metadata applier.", e);
142+
throw new IOException("Failed to close metadata applier.", e);
143+
}
144+
}
145+
134146
// --------------------------
135147
// Checkpoint related methods
136148
// --------------------------

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ public void close() throws Exception {
117117
if (schemaChangeThreadPool != null && !schemaChangeThreadPool.isShutdown()) {
118118
schemaChangeThreadPool.shutdownNow();
119119
}
120+
try {
121+
metadataApplier.close();
122+
} catch (Exception e) {
123+
LOG.error("Failed to close metadata applier.", e);
124+
throw new IOException("Failed to close metadata applier.", e);
125+
}
120126
}
121127

122128
@Override

0 commit comments

Comments
 (0)