Skip to content

Commit 7a79c78

Browse files
authored
[fix] [client] call redeliver 1 msg but did 2 msgs (#23943)
1 parent d421989 commit 7a79c78

File tree

3 files changed

+81
-20
lines changed

3 files changed

+81
-20
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.awaitility.Awaitility;
6060
import org.testng.Assert;
6161
import org.testng.annotations.BeforeClass;
62+
import org.testng.annotations.DataProvider;
6263
import org.testng.annotations.Test;
6364

6465
@Slf4j
@@ -137,6 +138,67 @@ public void testBatchMessageAck() {
137138
});
138139
}
139140

141+
@DataProvider
142+
public Object[][] enabledBatchSend() {
143+
return new Object[][] {
144+
{false},
145+
{true}
146+
};
147+
}
148+
149+
@Test(dataProvider = "enabledBatchSend")
150+
@SneakyThrows
151+
public void testBatchMessageNAck(boolean enabledBatchSend) {
152+
final String topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
153+
final String subscriptionName = "s1";
154+
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
155+
.subscriptionName(subscriptionName)
156+
.receiverQueueSize(21)
157+
.subscriptionType(SubscriptionType.Shared)
158+
.enableBatchIndexAcknowledgment(true)
159+
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
160+
.subscribe();
161+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
162+
.batchingMaxMessages(20)
163+
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
164+
.enableBatching(enabledBatchSend)
165+
.create();
166+
final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
167+
final AbstractPersistentDispatcherMultipleConsumers dispatcher =
168+
(AbstractPersistentDispatcherMultipleConsumers) topic.getSubscription(subscriptionName).getDispatcher();
169+
170+
// Send messages: 20 * 2.
171+
for (int i = 0; i < 40; i++) {
172+
byte[] message = ("batch-message-" + i).getBytes();
173+
if (i == 19 || i == 39) {
174+
producer.newMessage().value(message).send();
175+
} else {
176+
producer.newMessage().value(message).sendAsync();
177+
}
178+
}
179+
Awaitility.await().untilAsserted(() -> {
180+
if (enabledBatchSend) {
181+
assertEquals(consumer.numMessagesInQueue(), 40);
182+
} else {
183+
assertEquals(consumer.numMessagesInQueue(), 21);
184+
}
185+
});
186+
187+
// Negative ack and verify result/
188+
Message<byte[]> receive1 = consumer.receive();
189+
consumer.pause();
190+
consumer.negativeAcknowledge(receive1);
191+
Awaitility.await().untilAsserted(() -> {
192+
assertEquals(consumer.numMessagesInQueue(), 20);
193+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
194+
});
195+
196+
// cleanup.
197+
producer.close();
198+
consumer.close();
199+
admin.topics().delete(topicName);
200+
}
201+
140202
@Test
141203
public void testBatchMessageMultiNegtiveAck() throws Exception{
142204
final String topicName = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2831,27 +2831,18 @@ private Optional<EncryptionContext> createEncryptionContext(MessageMetadata msgM
28312831

28322832
private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
28332833
int messagesFromQueue = 0;
2834-
Message<T> peek = incomingMessages.peek();
2835-
if (peek != null) {
2836-
MessageId messageId = NegativeAcksTracker.discardBatchAndPartitionIndex(peek.getMessageId());
2837-
if (!messageIds.contains(messageId)) {
2838-
// first message is not expired, then no message is expired in queue.
2839-
return 0;
2840-
}
2841-
2842-
// try not to remove elements that are added while we remove
2843-
Message<T> message = incomingMessages.poll();
2844-
while (message != null) {
2845-
decreaseIncomingMessageSize(message);
2846-
messagesFromQueue++;
2847-
MessageId id = NegativeAcksTracker.discardBatchAndPartitionIndex(message.getMessageId());
2848-
if (!messageIds.contains(id)) {
2849-
messageIds.add(id);
2850-
break;
2851-
}
2852-
message.release();
2853-
message = incomingMessages.poll();
2834+
Message<T> message;
2835+
while (true) {
2836+
message = incomingMessages.pollIf(msg -> {
2837+
MessageId idPolled = NegativeAcksTracker.discardBatchAndPartitionIndex(msg.getMessageId());
2838+
return messageIds.contains(idPolled);
2839+
});
2840+
if (message == null) {
2841+
break;
28542842
}
2843+
decreaseIncomingMessageSize(message);
2844+
messagesFromQueue++;
2845+
message.release();
28552846
}
28562847
return messagesFromQueue;
28572848
}

pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.locks.ReentrantLock;
3333
import java.util.concurrent.locks.StampedLock;
3434
import java.util.function.Consumer;
35+
import java.util.function.Predicate;
3536
import javax.annotation.Nullable;
3637

3738
/**
@@ -83,10 +84,17 @@ public T remove() {
8384

8485
@Override
8586
public T poll() {
87+
return pollIf(v -> true);
88+
}
89+
90+
public T pollIf(Predicate<T> predicate) {
8691
headLock.lock();
8792
try {
8893
if (SIZE_UPDATER.get(this) > 0) {
8994
T item = data[headIndex.value];
95+
if (!predicate.test(item)) {
96+
return null;
97+
}
9098
data[headIndex.value] = null;
9199
headIndex.value = (headIndex.value + 1) & (data.length - 1);
92100
SIZE_UPDATER.decrementAndGet(this);

0 commit comments

Comments
 (0)