23
23
import org .apache .flink .configuration .CoreOptions ;
24
24
import org .apache .flink .core .execution .JobClient ;
25
25
import org .apache .flink .core .execution .SavepointFormatType ;
26
- import org .apache .flink .core .testutils .FlinkAssertions ;
27
26
import org .apache .flink .runtime .jobgraph .SavepointConfigOptions ;
27
+ import org .apache .flink .runtime .messages .FlinkJobTerminatedWithoutCancellationException ;
28
28
import org .apache .flink .streaming .api .datastream .DataStream ;
29
29
import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
30
30
import org .apache .flink .streaming .api .functions .sink .SinkFunction ;
36
36
import org .apache .flink .table .utils .EncodingUtils ;
37
37
import org .apache .flink .test .util .SuccessException ;
38
38
import org .apache .flink .types .Row ;
39
+ import org .apache .flink .util .function .RunnableWithException ;
39
40
40
41
import org .apache .kafka .clients .consumer .NoOffsetForPartitionException ;
41
42
import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
42
43
import org .apache .kafka .common .TopicPartition ;
44
+ import org .apache .kafka .common .errors .UnknownTopicOrPartitionException ;
43
45
import org .assertj .core .api .Assertions ;
46
+ import org .assertj .core .api .ThrowingConsumer ;
44
47
import org .junit .Before ;
45
48
import org .junit .Test ;
46
49
import org .junit .runner .RunWith ;
65
68
import java .util .stream .IntStream ;
66
69
67
70
import static org .apache .flink .core .testutils .CommonTestUtils .waitUtil ;
71
+ import static org .apache .flink .core .testutils .FlinkAssertions .anyCauseMatches ;
68
72
import static org .apache .flink .streaming .connectors .kafka .table .KafkaTableTestUtils .collectAllRows ;
69
73
import static org .apache .flink .streaming .connectors .kafka .table .KafkaTableTestUtils .collectRows ;
70
74
import static org .apache .flink .streaming .connectors .kafka .table .KafkaTableTestUtils .readLines ;
@@ -186,7 +190,7 @@ public void testKafkaSourceSink() throws Exception {
186
190
187
191
// ------------- cleanup -------------------
188
192
189
- deleteTestTopic (topic );
193
+ cleanupTopic (topic );
190
194
}
191
195
192
196
@ Test
@@ -266,8 +270,8 @@ public void testKafkaSourceSinkWithTopicList() throws Exception {
266
270
assertThat (topic2Results ).containsExactly (Row .of (topic2 , 2 , 1103 , "behavior 2" ));
267
271
268
272
// ------------- cleanup -------------------
269
- deleteTestTopic (topic1 );
270
- deleteTestTopic (topic2 );
273
+ cleanupTopic (topic1 );
274
+ cleanupTopic (topic2 );
271
275
}
272
276
273
277
@ Test
@@ -349,8 +353,8 @@ public void testKafkaSourceSinkWithTopicPattern() throws Exception {
349
353
350
354
// ------------- cleanup -------------------
351
355
352
- deleteTestTopic (topic1 );
353
- deleteTestTopic (topic2 );
356
+ cleanupTopic (topic1 );
357
+ cleanupTopic (topic2 );
354
358
}
355
359
356
360
@ Test
@@ -406,7 +410,7 @@ public void testKafkaSourceEmptyResultOnDeletedOffsets() throws Exception {
406
410
407
411
// ------------- cleanup -------------------
408
412
409
- deleteTestTopic (topic );
413
+ cleanupTopic (topic );
410
414
}
411
415
412
416
@ Test
@@ -460,7 +464,7 @@ public void testKafkaSourceSinkWithBoundedSpecificOffsets() throws Exception {
460
464
461
465
// ------------- cleanup -------------------
462
466
463
- deleteTestTopic (topic );
467
+ cleanupTopic (topic );
464
468
}
465
469
466
470
@ Test
@@ -517,7 +521,7 @@ public void testKafkaSourceSinkWithBoundedTimestamp() throws Exception {
517
521
518
522
// ------------- cleanup -------------------
519
523
520
- deleteTestTopic (topic );
524
+ cleanupTopic (topic );
521
525
}
522
526
523
527
@ Test
@@ -702,7 +706,7 @@ public void testKafkaSourceSinkWithMetadata() throws Exception {
702
706
703
707
// ------------- cleanup -------------------
704
708
705
- deleteTestTopic (topic );
709
+ cleanupTopic (topic );
706
710
}
707
711
708
712
@ Test
@@ -783,7 +787,7 @@ public void testKafkaSourceSinkWithKeyAndPartialValue() throws Exception {
783
787
784
788
// ------------- cleanup -------------------
785
789
786
- deleteTestTopic (topic );
790
+ cleanupTopic (topic );
787
791
}
788
792
789
793
@ Test
@@ -861,7 +865,7 @@ public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
861
865
862
866
// ------------- cleanup -------------------
863
867
864
- deleteTestTopic (topic );
868
+ cleanupTopic (topic );
865
869
}
866
870
867
871
@ Test
@@ -977,8 +981,8 @@ public void testKafkaTemporalJoinChangelog() throws Exception {
977
981
978
982
// ------------- cleanup -------------------
979
983
980
- deleteTestTopic (orderTopic );
981
- deleteTestTopic (productTopic );
984
+ cleanupTopic (orderTopic );
985
+ cleanupTopic (productTopic );
982
986
}
983
987
984
988
private void initialProductChangelog (String topic , String bootstraps ) throws Exception {
@@ -1093,8 +1097,8 @@ public void testPerPartitionWatermarkKafka() throws Exception {
1093
1097
1094
1098
// ------------- cleanup -------------------
1095
1099
1096
- tableResult . getJobClient (). ifPresent ( JobClient :: cancel );
1097
- deleteTestTopic (topic );
1100
+ cancelJob ( tableResult );
1101
+ cleanupTopic (topic );
1098
1102
}
1099
1103
1100
1104
@ Test
@@ -1168,8 +1172,8 @@ public void testPerPartitionWatermarkWithIdleSource() throws Exception {
1168
1172
1169
1173
// ------------- cleanup -------------------
1170
1174
1171
- tableResult . getJobClient (). ifPresent ( JobClient :: cancel );
1172
- deleteTestTopic (topic );
1175
+ cancelJob ( tableResult );
1176
+ cleanupTopic (topic );
1173
1177
}
1174
1178
1175
1179
@ Test
@@ -1300,8 +1304,8 @@ public void testLatestOffsetStrategyResume() throws Exception {
1300
1304
1301
1305
// ------------- cleanup -------------------
1302
1306
1303
- tableResult . getJobClient (). ifPresent ( JobClient :: cancel );
1304
- deleteTestTopic (topic );
1307
+ cancelJob ( tableResult );
1308
+ cleanupTopic (topic );
1305
1309
}
1306
1310
1307
1311
@ Test
@@ -1317,7 +1321,7 @@ public void testStartFromGroupOffsetsEarliest() throws Exception {
1317
1321
@ Test
1318
1322
public void testStartFromGroupOffsetsNone () {
1319
1323
Assertions .assertThatThrownBy (() -> testStartFromGroupOffsetsWithNoneResetStrategy ())
1320
- .satisfies (FlinkAssertions . anyCauseMatches (NoOffsetForPartitionException .class ));
1324
+ .satisfies (anyCauseMatches (NoOffsetForPartitionException .class ));
1321
1325
}
1322
1326
1323
1327
private List <String > appendNewData (
@@ -1433,10 +1437,8 @@ private void testStartFromGroupOffsets(String resetStrategy) throws Exception {
1433
1437
KafkaTableTestUtils .waitingExpectedResults (sinkName , expected , Duration .ofSeconds (15 ));
1434
1438
} finally {
1435
1439
// ------------- cleanup -------------------
1436
- if (tableResult != null ) {
1437
- tableResult .getJobClient ().ifPresent (JobClient ::cancel );
1438
- }
1439
- deleteTestTopic (topic );
1440
+ cancelJob (tableResult );
1441
+ cleanupTopic (topic );
1440
1442
}
1441
1443
}
1442
1444
@@ -1455,10 +1457,8 @@ private void testStartFromGroupOffsetsWithNoneResetStrategy()
1455
1457
tableResult .await ();
1456
1458
} finally {
1457
1459
// ------------- cleanup -------------------
1458
- if (tableResult != null ) {
1459
- tableResult .getJobClient ().ifPresent (JobClient ::cancel );
1460
- }
1461
- deleteTestTopic (topic );
1460
+ cancelJob (tableResult );
1461
+ cleanupTopic (topic );
1462
1462
}
1463
1463
}
1464
1464
@@ -1514,4 +1514,33 @@ private static boolean isCausedByJobFinished(Throwable e) {
1514
1514
return false ;
1515
1515
}
1516
1516
}
1517
+
1518
+ private void cleanupTopic (String topic ) {
1519
+ ignoreExceptions (
1520
+ () -> deleteTestTopic (topic ),
1521
+ anyCauseMatches (UnknownTopicOrPartitionException .class ));
1522
+ }
1523
+
1524
+ private static void cancelJob (TableResult tableResult ) {
1525
+ if (tableResult != null && tableResult .getJobClient ().isPresent ()) {
1526
+ ignoreExceptions (
1527
+ () -> tableResult .getJobClient ().get ().cancel ().get (),
1528
+ anyCauseMatches (FlinkJobTerminatedWithoutCancellationException .class ),
1529
+ anyCauseMatches (
1530
+ "MiniCluster is not yet running or has already been shut down." ));
1531
+ }
1532
+ }
1533
+
1534
+ @ SafeVarargs
1535
+ private static void ignoreExceptions (
1536
+ RunnableWithException runnable , ThrowingConsumer <? super Throwable >... ignoreIf ) {
1537
+ try {
1538
+ runnable .run ();
1539
+ } catch (InterruptedException e ) {
1540
+ Thread .currentThread ().interrupt ();
1541
+ } catch (Exception ex ) {
1542
+ // check if the exception is one of the ignored ones
1543
+ assertThat (ex ).satisfiesAnyOf (ignoreIf );
1544
+ }
1545
+ }
1517
1546
}
0 commit comments