|
19 | 19 | package org.apache.pulsar.client.impl; |
20 | 20 |
|
21 | 21 | import static org.testng.Assert.assertEquals; |
| 22 | +import static org.testng.Assert.assertNotNull; |
22 | 23 | import static org.testng.Assert.assertNull; |
23 | 24 | import static org.testng.Assert.assertTrue; |
24 | 25 | import java.util.HashSet; |
@@ -311,19 +312,64 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { |
311 | 312 | // negative topic message id |
312 | 313 | consumer.negativeAcknowledge(topicMessageId); |
313 | 314 | NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker(); |
314 | | - assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L); |
| 315 | + assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L); |
315 | 316 | assertEquals(unAckedMessageTracker.size(), 0); |
316 | 317 | negativeAcksTracker.close(); |
317 | 318 | // negative batch message id |
318 | 319 | unAckedMessageTracker.add(messageId); |
319 | 320 | consumer.negativeAcknowledge(batchMessageId); |
320 | 321 | consumer.negativeAcknowledge(batchMessageId2); |
321 | 322 | consumer.negativeAcknowledge(batchMessageId3); |
322 | | - assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L); |
| 323 | + assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L); |
323 | 324 | assertEquals(unAckedMessageTracker.size(), 0); |
324 | 325 | negativeAcksTracker.close(); |
325 | 326 | } |
326 | 327 |
|
| 328 | + /** |
| 329 | + * If we nack multiple messages in the same batch with different redelivery delays, the messages should be redelivered |
| 330 | + * with the correct delay. However, all messages are redelivered at the same time. |
| 331 | + * @throws Exception |
| 332 | + */ |
| 333 | + @Test |
| 334 | + public void testNegativeAcksWithBatch() throws Exception { |
| 335 | + cleanup(); |
| 336 | + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); |
| 337 | + setup(); |
| 338 | + String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch"); |
| 339 | + |
| 340 | + @Cleanup |
| 341 | + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) |
| 342 | + .topic(topic) |
| 343 | + .subscriptionName("sub1") |
| 344 | + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) |
| 345 | + .subscriptionType(SubscriptionType.Shared) |
| 346 | + .enableBatchIndexAcknowledgment(true) |
| 347 | + .negativeAckRedeliveryDelay(3, TimeUnit.SECONDS) |
| 348 | + .subscribe(); |
| 349 | + |
| 350 | + @Cleanup |
| 351 | + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) |
| 352 | + .topic(topic) |
| 353 | + .enableBatching(true) |
| 354 | + .batchingMaxPublishDelay(1, TimeUnit.HOURS) |
| 355 | + .batchingMaxMessages(2) |
| 356 | + .create(); |
| 357 | + // send two messages in the same batch |
| 358 | + producer.sendAsync("test-0"); |
| 359 | + producer.sendAsync("test-1"); |
| 360 | + producer.flush(); |
| 361 | + |
| 362 | + // negative ack the first message |
| 363 | + consumer.negativeAcknowledge(consumer.receive()); |
| 364 | + // wait for 2s, negative ack the second message |
| 365 | + Thread.sleep(2000); |
| 366 | + consumer.negativeAcknowledge(consumer.receive()); |
| 367 | + |
| 368 | + // now 2s has passed, the first message should be redelivered 1s later. |
| 369 | + Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS); |
| 370 | + assertNotNull(msg1); |
| 371 | + } |
| 372 | + |
327 | 373 | @Test |
328 | 374 | public void testNegativeAcksWithBatchAckEnabled() throws Exception { |
329 | 375 | cleanup(); |
|
0 commit comments