Skip to content

Commit 23e4432

Browse files
committed
tmp diag code for testing
1 parent 6b6ced8 commit 23e4432

File tree

5 files changed

+60
-4
lines changed

5 files changed

+60
-4
lines changed

.github/workflows/push_pr.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: &flink_versions [ 2.0.1, 2.1.1, 2.2.0 ]
31+
flink: &flink_versions [ 2.2.0 ]
3232
jdk: &jdk_versions [ '11, 17, 21' ]
3333
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3434
with:

flink-connector-kafka/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,15 @@ under the License.
421421
</execution>
422422
</executions>
423423
</plugin>
424+
425+
<!-- Temporarily run only KafkaSinkITCase for faster feedback on hanging test issue -->
426+
<plugin>
427+
<groupId>org.apache.maven.plugins</groupId>
428+
<artifactId>maven-surefire-plugin</artifactId>
429+
<configuration>
430+
<test>KafkaSinkITCase</test>
431+
</configuration>
432+
</plugin>
424433
</plugins>
425434
</build>
426435

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/AdminUtils.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,17 @@ public static Set<String> getTopicsByPattern(Admin admin, Pattern topicPattern)
7272
public static Map<String, TopicDescription> getTopicMetadata(
7373
Admin admin, Collection<String> topicNames) {
7474
try {
75-
return admin.describeTopics(topicNames).allTopicNames().get();
75+
// Add timeout to prevent infinite hang during recovery with POOLING strategy +
76+
// chained=false
77+
return admin.describeTopics(topicNames)
78+
.allTopicNames()
79+
.get(30, java.util.concurrent.TimeUnit.SECONDS);
80+
} catch (java.util.concurrent.TimeoutException e) {
81+
throw new RuntimeException(
82+
String.format(
83+
"Timeout while getting metadata for topics %s. This may indicate network issues or slow Kafka responses during recovery.",
84+
topicNames),
85+
e);
7686
} catch (Exception e) {
7787
checkIfInterrupted(e);
7888
throw new RuntimeException(
@@ -83,7 +93,17 @@ public static Map<String, TopicDescription> getTopicMetadata(
8393
public static Map<TopicPartition, DescribeProducersResult.PartitionProducerState>
8494
getProducerStates(Admin admin, Collection<String> topicNames) {
8595
try {
86-
return admin.describeProducers(getTopicPartitions(admin, topicNames)).all().get();
96+
// Add timeout to prevent infinite hang during recovery with POOLING strategy +
97+
// chained=false
98+
return admin.describeProducers(getTopicPartitions(admin, topicNames))
99+
.all()
100+
.get(30, java.util.concurrent.TimeUnit.SECONDS);
101+
} catch (java.util.concurrent.TimeoutException e) {
102+
throw new RuntimeException(
103+
String.format(
104+
"Timeout while getting producers for topics %s. This may indicate network issues or slow Kafka responses during recovery.",
105+
topicNames),
106+
e);
87107
} catch (Exception e) {
88108
checkIfInterrupted(e);
89109
throw new RuntimeException(
@@ -103,12 +123,20 @@ public static Collection<Long> getProducerIds(Admin admin, Collection<String> to
103123
public static Collection<TransactionListing> getOpenTransactionsForTopics(
104124
Admin admin, Collection<String> topicNames) {
105125
try {
126+
// Add timeout to prevent infinite hang during recovery with POOLING strategy +
127+
// chained=false
106128
return admin.listTransactions(
107129
new ListTransactionsOptions()
108130
.filterProducerIds(getProducerIds(admin, topicNames))
109131
.filterStates(List.of(TransactionState.ONGOING)))
110132
.all()
111-
.get();
133+
.get(30, java.util.concurrent.TimeUnit.SECONDS);
134+
} catch (java.util.concurrent.TimeoutException e) {
135+
throw new RuntimeException(
136+
String.format(
137+
"Timeout while getting open transactions for topics %s. This may indicate network issues or slow Kafka responses during recovery.",
138+
topicNames),
139+
e);
112140
} catch (Exception e) {
113141
checkIfInterrupted(e);
114142
throw new RuntimeException(

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,21 @@ public void testRecoveryWithAtLeastOnceGuarantee() throws Exception {
282282
@MethodSource("getEOSParameters")
283283
public void testRecoveryWithExactlyOnceGuarantee(
284284
TransactionNamingStrategy namingStrategy, boolean chained) throws Exception {
285+
LOG.info(
286+
"========== STARTING testRecoveryWithExactlyOnceGuarantee with namingStrategy={}, chained={} ==========",
287+
namingStrategy,
288+
chained);
285289
testRecoveryWithAssertion(DeliveryGuarantee.EXACTLY_ONCE, 1, namingStrategy, chained);
286290
}
287291

288292
@ParameterizedTest(name = "{0}, chained={1}")
289293
@MethodSource("getEOSParameters")
290294
public void testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints(
291295
TransactionNamingStrategy namingStrategy, boolean chained) throws Exception {
296+
LOG.info(
297+
"========== STARTING testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints with namingStrategy={}, chained={} ==========",
298+
namingStrategy,
299+
chained);
292300
testRecoveryWithAssertion(DeliveryGuarantee.EXACTLY_ONCE, 2, namingStrategy, chained);
293301
}
294302

@@ -653,7 +661,15 @@ private void testRecoveryWithAssertion(
653661
.setTransactionalIdPrefix("kafka-sink")
654662
.setTransactionNamingStrategy(namingStrategy)
655663
.build());
664+
LOG.info(
665+
"========== ABOUT TO CALL env.execute() with namingStrategy={}, chained={} ==========",
666+
namingStrategy,
667+
chained);
656668
env.execute();
669+
LOG.info(
670+
"========== SUCCESSFULLY COMPLETED env.execute() with namingStrategy={}, chained={} ==========",
671+
namingStrategy,
672+
chained);
657673

658674
List<Long> committedRecords =
659675
deserializeValues(

pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,9 @@ under the License.
580580
<plugin>
581581
<groupId>com.diffplug.spotless</groupId>
582582
<artifactId>spotless-maven-plugin</artifactId>
583+
<configuration>
584+
<skip>false</skip>
585+
</configuration>
583586
</plugin>
584587
<plugin>
585588
<groupId>org.apache.maven.plugins</groupId>

0 commit comments

Comments
 (0)