2424import io .netty .util .Timer ;
2525import java .io .Closeable ;
2626import java .util .HashSet ;
27- import java .util .Optional ;
2827import java .util .Set ;
2928import java .util .concurrent .TimeUnit ;
29+
30+ import it .unimi .dsi .fastutil .longs .Long2ObjectMap ;
31+ import it .unimi .dsi .fastutil .longs .Long2ObjectOpenHashMap ;
32+ import it .unimi .dsi .fastutil .longs .Long2ObjectRBTreeMap ;
33+ import it .unimi .dsi .fastutil .longs .Long2ObjectSortedMap ;
34+ import it .unimi .dsi .fastutil .longs .LongBidirectionalIterator ;
3035import org .apache .pulsar .client .api .Message ;
3136import org .apache .pulsar .client .api .MessageId ;
3237import org .apache .pulsar .client .api .MessageIdAdv ;
3338import org .apache .pulsar .client .api .RedeliveryBackoff ;
3439import org .apache .pulsar .client .impl .conf .ConsumerConfigurationData ;
35- import org .apache . pulsar . common . util . collections . ConcurrentLongLongPairHashMap ;
40+ import org .roaringbitmap . longlong . Roaring64Bitmap ;
3641import org .slf4j .Logger ;
3742import org .slf4j .LoggerFactory ;
3843
3944class NegativeAcksTracker implements Closeable {
4045 private static final Logger log = LoggerFactory .getLogger (NegativeAcksTracker .class );
4146
42- private ConcurrentLongLongPairHashMap nackedMessages = null ;
47+ // timestamp -> ledgerId -> entryId, no need to batch index, if different messages have
48+ // different timestamp, there will be multiple entries in the map
49+ // RB Tree -> LongOpenHashMap -> Roaring64Bitmap
50+ private Long2ObjectSortedMap <Long2ObjectMap <Roaring64Bitmap >> nackedMessages = null ;
4351
4452 private final ConsumerBase <?> consumer ;
4553 private final Timer timer ;
46- private final long nackDelayNanos ;
47- private final long timerIntervalNanos ;
54+ private final long nackDelayMs ;
4855 private final RedeliveryBackoff negativeAckRedeliveryBackoff ;
56+ private final int negativeAckPrecisionBitCnt ;
4957
5058 private Timeout timeout ;
5159
5260 // Set a min delay to allow for grouping nacks within a single batch
53- private static final long MIN_NACK_DELAY_NANOS = TimeUnit . MILLISECONDS . toNanos ( 100 ) ;
54- private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = Long . MAX_VALUE ;
61+ private static final long MIN_NACK_DELAY_MS = 100 ;
62+ private static final int DUMMY_PARTITION_INDEX = - 2 ;
5563
5664 public NegativeAcksTracker (ConsumerBase <?> consumer , ConsumerConfigurationData <?> conf ) {
5765 this .consumer = consumer ;
5866 this .timer = consumer .getClient ().timer ();
59- this .nackDelayNanos = Math .max (TimeUnit .MICROSECONDS .toNanos (conf .getNegativeAckRedeliveryDelayMicros ()),
60- MIN_NACK_DELAY_NANOS );
67+ this .nackDelayMs = Math .max (TimeUnit .MICROSECONDS .toMillis (conf .getNegativeAckRedeliveryDelayMicros ()),
68+ MIN_NACK_DELAY_MS );
6169 this .negativeAckRedeliveryBackoff = conf .getNegativeAckRedeliveryBackoff ();
62- if (negativeAckRedeliveryBackoff != null ) {
63- this .timerIntervalNanos = Math .max (
64- TimeUnit .MILLISECONDS .toNanos (negativeAckRedeliveryBackoff .next (0 )),
65- MIN_NACK_DELAY_NANOS ) / 3 ;
66- } else {
67- this .timerIntervalNanos = nackDelayNanos / 3 ;
68- }
70+ this .negativeAckPrecisionBitCnt = conf .getNegativeAckPrecisionBitCnt ();
6971 }
7072
7173 private void triggerRedelivery (Timeout t ) {
@@ -76,21 +78,48 @@ private void triggerRedelivery(Timeout t) {
7678 return ;
7779 }
7880
79- long now = System .nanoTime ();
80- nackedMessages .forEach ((ledgerId , entryId , partitionIndex , timestamp ) -> {
81- if (timestamp < now ) {
82- MessageId msgId = new MessageIdImpl (ledgerId , entryId ,
83- // need to covert non-partitioned topic partition index to -1
84- (int ) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex ));
85- addChunkedMessageIdsAndRemoveFromSequenceMap (msgId , messagesToRedeliver , this .consumer );
86- messagesToRedeliver .add (msgId );
81+ long currentTimestamp = System .currentTimeMillis ();
82+ for (long timestamp : nackedMessages .keySet ()) {
83+ if (timestamp > currentTimestamp ) {
84+ // We are done with all the messages that need to be redelivered
85+ break ;
86+ }
87+
88+ Long2ObjectMap <Roaring64Bitmap > ledgerMap = nackedMessages .get (timestamp );
89+ for (Long2ObjectMap .Entry <Roaring64Bitmap > ledgerEntry : ledgerMap .long2ObjectEntrySet ()) {
90+ long ledgerId = ledgerEntry .getLongKey ();
91+ Roaring64Bitmap entrySet = ledgerEntry .getValue ();
92+ entrySet .forEach (entryId -> {
93+ MessageId msgId = new MessageIdImpl (ledgerId , entryId , DUMMY_PARTITION_INDEX );
94+ addChunkedMessageIdsAndRemoveFromSequenceMap (msgId , messagesToRedeliver , this .consumer );
95+ messagesToRedeliver .add (msgId );
96+ });
97+ }
98+ }
99+
100+ // remove entries from the nackedMessages map
101+ LongBidirectionalIterator iterator = nackedMessages .keySet ().iterator ();
102+ while (iterator .hasNext ()) {
103+ long timestamp = iterator .nextLong ();
104+ if (timestamp <= currentTimestamp ) {
105+ iterator .remove ();
106+ } else {
107+ break ;
108+ }
109+ }
110+
111+ // Schedule the next redelivery if there are still messages to redeliver
112+ if (!nackedMessages .isEmpty ()) {
113+ long nextTriggerTimestamp = nackedMessages .firstLongKey ();
114+ long delayMs = Math .max (nextTriggerTimestamp - currentTimestamp , 0 );
115+ if (delayMs > 0 ) {
116+ this .timeout = timer .newTimeout (this ::triggerRedelivery , delayMs , TimeUnit .MILLISECONDS );
117+ } else {
118+ this .timeout = timer .newTimeout (this ::triggerRedelivery , 0 , TimeUnit .MILLISECONDS );
87119 }
88- });
89- for (MessageId messageId : messagesToRedeliver ) {
90- nackedMessages .remove (((MessageIdImpl ) messageId ).getLedgerId (),
91- ((MessageIdImpl ) messageId ).getEntryId ());
120+ } else {
121+ this .timeout = null ;
92122 }
93- this .timeout = timer .newTimeout (this ::triggerRedelivery , timerIntervalNanos , TimeUnit .NANOSECONDS );
94123 }
95124
96125 // release the lock of NegativeAcksTracker before calling consumer.redeliverUnacknowledgedMessages,
@@ -110,39 +139,56 @@ public synchronized void add(Message<?> message) {
110139 add (message .getMessageId (), message .getRedeliveryCount ());
111140 }
112141
142+ static long trimLowerBit (long timestamp , int bits ) {
143+ return timestamp & (-1L << bits );
144+ }
145+
113146 private synchronized void add (MessageId messageId , int redeliveryCount ) {
114147 if (nackedMessages == null ) {
115- nackedMessages = ConcurrentLongLongPairHashMap .newBuilder ()
116- .autoShrink (true )
117- .concurrencyLevel (1 )
118- .build ();
148+ nackedMessages = new Long2ObjectRBTreeMap <>();
119149 }
120150
121- long backoffNs ;
151+ long backoffMs ;
122152 if (negativeAckRedeliveryBackoff != null ) {
123- backoffNs = TimeUnit .MILLISECONDS .toNanos (negativeAckRedeliveryBackoff .next (redeliveryCount ));
153+ backoffMs = TimeUnit .MILLISECONDS .toMillis (negativeAckRedeliveryBackoff .next (redeliveryCount ));
124154 } else {
125- backoffNs = nackDelayNanos ;
155+ backoffMs = nackDelayMs ;
126156 }
127- MessageIdAdv messageIdAdv = MessageIdAdvUtils .discardBatch (messageId );
128- // ConcurrentLongLongPairHashMap requires the key and value >=0.
129- // partitionIndex is -1 if the message is from a non-partitioned topic, but we don't use
130- // partitionIndex actually, so we can set it to Long.MAX_VALUE in the case of non-partitioned topic to
131- // avoid exception from ConcurrentLongLongPairHashMap.
132- nackedMessages .put (messageIdAdv .getLedgerId (), messageIdAdv .getEntryId (),
133- messageIdAdv .getPartitionIndex () >= 0 ? messageIdAdv .getPartitionIndex () :
134- NON_PARTITIONED_TOPIC_PARTITION_INDEX , System .nanoTime () + backoffNs );
157+ MessageIdAdv messageIdAdv = (MessageIdAdv ) messageId ;
158+ long timestamp = trimLowerBit (System .currentTimeMillis () + backoffMs , negativeAckPrecisionBitCnt );
159+ nackedMessages .computeIfAbsent (timestamp , k -> new Long2ObjectOpenHashMap <>())
160+ .computeIfAbsent (messageIdAdv .getLedgerId (), k -> new Roaring64Bitmap ())
161+ .add (messageIdAdv .getEntryId ());
135162
136163 if (this .timeout == null ) {
137164 // Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for
138165 // nack immediately following the current one will be batched into the same redeliver request.
139- this .timeout = timer .newTimeout (this ::triggerRedelivery , timerIntervalNanos , TimeUnit .NANOSECONDS );
166+ this .timeout = timer .newTimeout (this ::triggerRedelivery , backoffMs , TimeUnit .MILLISECONDS );
140167 }
141168 }
142169
170+ /**
171+ * Discard the batch index and partition index from the message id.
172+ *
173+ * @param messageId
174+ * @return
175+ */
176+ public static MessageIdAdv discardBatchAndPartitionIndex (MessageId messageId ) {
177+ if (messageId instanceof ChunkMessageIdImpl ) {
178+ return (MessageIdAdv ) messageId ;
179+ }
180+ MessageIdAdv msgId = (MessageIdAdv ) messageId ;
181+ return new MessageIdImpl (msgId .getLedgerId (), msgId .getEntryId (), DUMMY_PARTITION_INDEX );
182+ }
183+
143184 @ VisibleForTesting
144- Optional <Long > getNackedMessagesCount () {
145- return Optional .ofNullable (nackedMessages ).map (ConcurrentLongLongPairHashMap ::size );
185+ synchronized long getNackedMessagesCount () {
186+ if (nackedMessages == null ) {
187+ return 0 ;
188+ }
189+ return nackedMessages .values ().stream ().mapToLong (
190+ ledgerMap -> ledgerMap .values ().stream ().mapToLong (
191+ Roaring64Bitmap ::getLongCardinality ).sum ()).sum ();
146192 }
147193
148194 @ Override
0 commit comments