Skip to content

Commit cfe4f79

Browse files
thetumbledlhotari
authored andcommitted
[fix][client] Fix deadlock of NegativeAcksTracker (#23651)
(cherry picked from commit 68eb8f2)
1 parent d755bef commit cfe4f79

File tree

1 file changed

+21
-19
lines changed

1 file changed

+21
-19
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -68,36 +68,38 @@ public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?
6868
}
6969
}
7070

71-
private synchronized void triggerRedelivery(Timeout t) {
72-
if (nackedMessages.isEmpty()) {
73-
this.timeout = null;
74-
return;
75-
}
76-
77-
// Group all the nacked messages into one single re-delivery request
71+
private void triggerRedelivery(Timeout t) {
7872
Set<MessageId> messagesToRedeliver = new HashSet<>();
79-
long now = System.nanoTime();
80-
nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
81-
if (timestamp < now) {
82-
MessageId msgId = new MessageIdImpl(ledgerId, entryId,
83-
// need to covert non-partitioned topic partition index to -1
84-
(int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
85-
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
86-
messagesToRedeliver.add(msgId);
73+
synchronized (this) {
74+
if (nackedMessages.isEmpty()) {
75+
this.timeout = null;
76+
return;
8777
}
88-
});
8978

90-
if (!messagesToRedeliver.isEmpty()) {
79+
long now = System.nanoTime();
80+
nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
81+
if (timestamp < now) {
82+
MessageId msgId = new MessageIdImpl(ledgerId, entryId,
83+
// need to covert non-partitioned topic partition index to -1
84+
(int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
85+
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
86+
messagesToRedeliver.add(msgId);
87+
}
88+
});
9189
for (MessageId messageId : messagesToRedeliver) {
9290
nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(),
9391
((MessageIdImpl) messageId).getEntryId());
9492
}
93+
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
94+
}
95+
96+
// release the lock of NegativeAcksTracker before calling consumer.redeliverUnacknowledgedMessages,
97+
// in which we may acquire the lock of consumer, leading to potential deadlock.
98+
if (!messagesToRedeliver.isEmpty()) {
9599
consumer.onNegativeAcksSend(messagesToRedeliver);
96100
log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size());
97101
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
98102
}
99-
100-
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
101103
}
102104

103105
public synchronized void add(MessageId messageId) {

0 commit comments

Comments
 (0)