@@ -68,36 +68,38 @@ public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?
6868 }
6969 }
7070
71- private synchronized void triggerRedelivery (Timeout t ) {
72- if (nackedMessages .isEmpty ()) {
73- this .timeout = null ;
74- return ;
75- }
76-
77- // Group all the nacked messages into one single re-delivery request
71+ private void triggerRedelivery (Timeout t ) {
7872 Set <MessageId > messagesToRedeliver = new HashSet <>();
79- long now = System .nanoTime ();
80- nackedMessages .forEach ((ledgerId , entryId , partitionIndex , timestamp ) -> {
81- if (timestamp < now ) {
82- MessageId msgId = new MessageIdImpl (ledgerId , entryId ,
83- // need to covert non-partitioned topic partition index to -1
84- (int ) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex ));
85- addChunkedMessageIdsAndRemoveFromSequenceMap (msgId , messagesToRedeliver , this .consumer );
86- messagesToRedeliver .add (msgId );
73+ synchronized (this ) {
74+ if (nackedMessages .isEmpty ()) {
75+ this .timeout = null ;
76+ return ;
8777 }
88- });
8978
90- if (!messagesToRedeliver .isEmpty ()) {
79+ long now = System .nanoTime ();
80+ nackedMessages .forEach ((ledgerId , entryId , partitionIndex , timestamp ) -> {
81+ if (timestamp < now ) {
82+ MessageId msgId = new MessageIdImpl (ledgerId , entryId ,
83+ // need to covert non-partitioned topic partition index to -1
84+ (int ) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex ));
85+ addChunkedMessageIdsAndRemoveFromSequenceMap (msgId , messagesToRedeliver , this .consumer );
86+ messagesToRedeliver .add (msgId );
87+ }
88+ });
9189 for (MessageId messageId : messagesToRedeliver ) {
9290 nackedMessages .remove (((MessageIdImpl ) messageId ).getLedgerId (),
9391 ((MessageIdImpl ) messageId ).getEntryId ());
9492 }
93+ this .timeout = timer .newTimeout (this ::triggerRedelivery , timerIntervalNanos , TimeUnit .NANOSECONDS );
94+ }
95+
96+ // release the lock of NegativeAcksTracker before calling consumer.redeliverUnacknowledgedMessages,
97+ // in which we may acquire the lock of consumer, leading to potential deadlock.
98+ if (!messagesToRedeliver .isEmpty ()) {
9599 consumer .onNegativeAcksSend (messagesToRedeliver );
96100 log .info ("[{}] {} messages will be re-delivered" , consumer , messagesToRedeliver .size ());
97101 consumer .redeliverUnacknowledgedMessages (messagesToRedeliver );
98102 }
99-
100- this .timeout = timer .newTimeout (this ::triggerRedelivery , timerIntervalNanos , TimeUnit .NANOSECONDS );
101103 }
102104
103105 public synchronized void add (MessageId messageId ) {
0 commit comments