Skip to content

Commit 4dd96fe

Browse files
committed
tmp diag code for testing
1 parent 6b6ced8 commit 4dd96fe

File tree

5 files changed

+415
-368
lines changed

5 files changed

+415
-368
lines changed

.github/workflows/push_pr.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: &flink_versions [ 2.0.1, 2.1.1, 2.2.0 ]
31+
flink: &flink_versions [ 2.2.0 ]
3232
jdk: &jdk_versions [ '11, 17, 21' ]
3333
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3434
with:
3535
flink_version: ${{ matrix.flink }}
3636
jdk_version: ${{ matrix.jdk }}
37+
timeout_test: 10
3738
python_test:
3839
strategy:
3940
matrix:
@@ -43,3 +44,4 @@ jobs:
4344
with:
4445
flink_version: ${{ matrix.flink }}
4546
jdk_version: ${{ matrix.jdk }}
47+
timeout_test: 10

flink-connector-kafka/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,15 @@ under the License.
421421
</execution>
422422
</executions>
423423
</plugin>
424+
425+
<!-- Temporarily run only KafkaSinkITCase for faster feedback on hanging test issue -->
426+
<plugin>
427+
<groupId>org.apache.maven.plugins</groupId>
428+
<artifactId>maven-surefire-plugin</artifactId>
429+
<configuration>
430+
<test>KafkaSinkITCase</test>
431+
</configuration>
432+
</plugin>
424433
</plugins>
425434
</build>
426435

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.slf4j.LoggerFactory;
2727

2828
import javax.annotation.Nullable;
29-
import javax.annotation.concurrent.NotThreadSafe;
29+
import javax.annotation.concurrent.ThreadSafe;
3030

3131
import java.util.ArrayDeque;
3232
import java.util.ArrayList;
@@ -68,10 +68,9 @@
6868
* pool is still in the transaction or not.
6969
* </ul>
7070
*
71-
* <p>This pool is not thread-safe and is only intended to be accessed from the writer, which owns
72-
* it.
71+
* <p>This pool is thread-safe and can be accessed concurrently by multiple threads.
7372
*/
74-
@NotThreadSafe
73+
@ThreadSafe
7574
@Internal
7675
public class ProducerPoolImpl implements ProducerPool {
7776
private static final Logger LOG = LoggerFactory.getLogger(ProducerPoolImpl.class);
@@ -116,7 +115,7 @@ public ProducerPoolImpl(
116115
}
117116

118117
@Override
119-
public void recycleByTransactionId(String transactionalId, boolean success) {
118+
public synchronized void recycleByTransactionId(String transactionalId, boolean success) {
120119
ProducerEntry producerEntry = producerByTransactionalId.remove(transactionalId);
121120
LOG.debug("Transaction {} finished, producer {}", transactionalId, producerEntry);
122121

@@ -169,7 +168,7 @@ private void closeProducer(@Nullable FlinkKafkaInternalProducer<byte[], byte[]>
169168
}
170169

171170
@Override
172-
public void recycle(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
171+
public synchronized void recycle(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
173172
recycleProducer(producer);
174173
ProducerEntry producerEntry =
175174
producerByTransactionalId.remove(producer.getTransactionalId());
@@ -218,7 +217,7 @@ private void initPrecommittedTransactions(
218217
}
219218

220219
@Override
221-
public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
220+
public synchronized FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
222221
String transactionalId, long checkpointId) {
223222
FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.poll();
224223
if (producer == null) {
@@ -249,17 +248,17 @@ public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
249248
}
250249

251250
@Override
252-
public Collection<CheckpointTransaction> getOngoingTransactions() {
251+
public synchronized Collection<CheckpointTransaction> getOngoingTransactions() {
253252
return new ArrayList<>(transactionalIdsByCheckpoint.keySet());
254253
}
255254

256255
@VisibleForTesting
257-
public Collection<FlinkKafkaInternalProducer<byte[], byte[]>> getProducers() {
258-
return producerPool;
256+
public synchronized Collection<FlinkKafkaInternalProducer<byte[], byte[]>> getProducers() {
257+
return new ArrayList<>(producerPool);
259258
}
260259

261260
@Override
262-
public void close() throws Exception {
261+
public synchronized void close() throws Exception {
263262
LOG.debug(
264263
"Closing used producers {} and free producers {}",
265264
producerByTransactionalId,

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/AdminUtils.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,17 @@ public static Set<String> getTopicsByPattern(Admin admin, Pattern topicPattern)
7272
public static Map<String, TopicDescription> getTopicMetadata(
7373
Admin admin, Collection<String> topicNames) {
7474
try {
75-
return admin.describeTopics(topicNames).allTopicNames().get();
75+
// Add timeout to prevent infinite hang during recovery with POOLING strategy +
76+
// chained=false
77+
return admin.describeTopics(topicNames)
78+
.allTopicNames()
79+
.get(30, java.util.concurrent.TimeUnit.SECONDS);
80+
} catch (java.util.concurrent.TimeoutException e) {
81+
throw new RuntimeException(
82+
String.format(
83+
"Timeout while getting metadata for topics %s. This may indicate network issues or slow Kafka responses during recovery.",
84+
topicNames),
85+
e);
7686
} catch (Exception e) {
7787
checkIfInterrupted(e);
7888
throw new RuntimeException(
@@ -83,7 +93,17 @@ public static Map<String, TopicDescription> getTopicMetadata(
8393
public static Map<TopicPartition, DescribeProducersResult.PartitionProducerState>
8494
getProducerStates(Admin admin, Collection<String> topicNames) {
8595
try {
86-
return admin.describeProducers(getTopicPartitions(admin, topicNames)).all().get();
96+
// Add timeout to prevent infinite hang during recovery with POOLING strategy +
97+
// chained=false
98+
return admin.describeProducers(getTopicPartitions(admin, topicNames))
99+
.all()
100+
.get(30, java.util.concurrent.TimeUnit.SECONDS);
101+
} catch (java.util.concurrent.TimeoutException e) {
102+
throw new RuntimeException(
103+
String.format(
104+
"Timeout while getting producers for topics %s. This may indicate network issues or slow Kafka responses during recovery.",
105+
topicNames),
106+
e);
87107
} catch (Exception e) {
88108
checkIfInterrupted(e);
89109
throw new RuntimeException(
@@ -103,12 +123,20 @@ public static Collection<Long> getProducerIds(Admin admin, Collection<String> to
103123
public static Collection<TransactionListing> getOpenTransactionsForTopics(
104124
Admin admin, Collection<String> topicNames) {
105125
try {
126+
// Add timeout to prevent infinite hang during recovery with POOLING strategy +
127+
// chained=false
106128
return admin.listTransactions(
107129
new ListTransactionsOptions()
108130
.filterProducerIds(getProducerIds(admin, topicNames))
109131
.filterStates(List.of(TransactionState.ONGOING)))
110132
.all()
111-
.get();
133+
.get(30, java.util.concurrent.TimeUnit.SECONDS);
134+
} catch (java.util.concurrent.TimeoutException e) {
135+
throw new RuntimeException(
136+
String.format(
137+
"Timeout while getting open transactions for topics %s. This may indicate network issues or slow Kafka responses during recovery.",
138+
topicNames),
139+
e);
112140
} catch (Exception e) {
113141
checkIfInterrupted(e);
114142
throw new RuntimeException(

0 commit comments

Comments
 (0)