Skip to content

Commit 39cd5e7

Browse files
3paccccccsrinath-ctds
authored andcommitted
[improve][client] Terminate consumer.receive() when consumer is closed (apache#24550)
(cherry picked from commit 6ad57da) (cherry picked from commit ba9147a)
1 parent c8aaae5 commit 39cd5e7

File tree

5 files changed

+230
-12
lines changed

5 files changed

+230
-12
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static java.nio.charset.StandardCharsets.UTF_8;
2222
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
2323
import static org.assertj.core.api.Assertions.assertThat;
24+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2425
import static org.mockito.Mockito.any;
2526
import static org.mockito.Mockito.atLeastOnce;
2627
import static org.mockito.Mockito.doAnswer;
@@ -4124,6 +4125,73 @@ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
41244125
countDownLatch3.await();
41254126
}
41264127

4128+
@Test(timeOut = 100000)
4129+
public void consumerReceiveThrowExceptionWhenConsumerClose() throws Exception {
4130+
Consumer<byte[]> consumer = pulsarClient
4131+
.newConsumer()
4132+
.topic("persistent://my-property/my-ns/my-topic2")
4133+
.receiverQueueSize(10)
4134+
.subscriptionType(SubscriptionType.Shared)
4135+
.subscriptionName("my-sub")
4136+
.subscribe();
4137+
4138+
Thread thread = new Thread(() -> {
4139+
try {
4140+
// sleep 0.1 second to close consumer to ensure consumer.receive() is triggerd
4141+
Thread.sleep(1000);
4142+
consumer.close();
4143+
} catch (Exception e) {
4144+
throw new RuntimeException(e);
4145+
}
4146+
});
4147+
thread.start();
4148+
4149+
assertThatThrownBy(
4150+
() -> consumer.receive()
4151+
)
4152+
.isInstanceOf(PulsarClientException.class)
4153+
.hasMessage("java.lang.InterruptedException: Queue is terminated")
4154+
.hasCauseInstanceOf(InterruptedException.class);
4155+
}
4156+
4157+
4158+
@Test(timeOut = 100000)
4159+
public void multiThreadConsumerReceiveThrowExceptionWhenConsumerClose() throws Exception {
4160+
Consumer<byte[]> consumer = pulsarClient
4161+
.newConsumer()
4162+
.topic("persistent://my-property/my-ns/my-topic2")
4163+
.receiverQueueSize(10)
4164+
.subscriptionType(SubscriptionType.Shared)
4165+
.subscriptionName("my-sub")
4166+
.subscribe();
4167+
int threadCount = 10;
4168+
4169+
CountDownLatch terminateCompletedLatch = new CountDownLatch(threadCount);
4170+
CountDownLatch allThreadReadyLatch = new CountDownLatch(threadCount);
4171+
AtomicInteger interruptedThreadCount = new AtomicInteger(0);
4172+
for (int i = 0; i < threadCount; i++) {
4173+
new Thread(() -> {
4174+
allThreadReadyLatch.countDown();
4175+
try {
4176+
consumer.receive();
4177+
fail("thread should have been interrupted");
4178+
} catch (PulsarClientException e) {
4179+
terminateCompletedLatch.countDown();
4180+
interruptedThreadCount.incrementAndGet();
4181+
}
4182+
}).start();
4183+
}
4184+
// all threads should be ready in at most 3 seconds
4185+
assertTrue(allThreadReadyLatch.await(3, TimeUnit.SECONDS));
4186+
// close consumer, and all threads should be interrupted by thrown PulsarClientException
4187+
consumer.close();
4188+
// all threads should be terminated in at most 3 seconds
4189+
assertTrue(terminateCompletedLatch.await(3, TimeUnit.SECONDS));
4190+
// Verify all threads were properly terminated
4191+
assertEquals(interruptedThreadCount.get(), threadCount);
4192+
}
4193+
4194+
41274195
@Test(timeOut = 20000)
41284196
public void testResetPosition() throws Exception {
41294197
final String topicName = "persistent://my-property/my-ns/testResetPosition";

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
2122
import static org.testng.Assert.assertEquals;
2223
import static org.testng.Assert.assertNotNull;
2324
import static org.testng.Assert.assertTrue;
@@ -422,6 +423,38 @@ public void testZeroQueueSizeMessageRedeliveryForAsyncReceive()
422423
producer.close();
423424
}
424425

426+
@Test(timeOut = 30000)
427+
public void testZeroQueueGetExceptionWhenReceiveBatchMessage() throws PulsarClientException {
428+
429+
int batchMessageDelayMs = 100;
430+
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://prop-xyz/use/ns-abc/topic1")
431+
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(0)
432+
.subscribe();
433+
434+
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
435+
.topic("persistent://prop-xyz/use/ns-abc/topic1")
436+
.messageRoutingMode(MessageRoutingMode.SinglePartition);
437+
438+
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
439+
.batchingMaxMessages(5);
440+
441+
Producer<byte[]> producer = producerBuilder.create();
442+
443+
// send a batch message to trigger zeroQueueConsumer to close
444+
for (int i = 0; i < 10; i++) {
445+
String message = "my-message-" + i;
446+
producer.sendAsync(message.getBytes());
447+
}
448+
449+
// when zeroQueueConsumer receive a batch message, it will close and receive method will throw exception
450+
assertThatThrownBy(
451+
consumer::receive
452+
)
453+
.isInstanceOf(PulsarClientException.class)
454+
.hasMessage("java.lang.InterruptedException: Queue is terminated")
455+
.hasCauseInstanceOf(InterruptedException.class);
456+
}
457+
425458
@Test(timeOut = 30000)
426459
public void testPauseAndResume() throws Exception {
427460
final String topicName = "persistent://prop/ns-abc/zero-queue-pause-and-resume";

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,17 +1243,8 @@ private void closeConsumerTasks() {
12431243
}
12441244
negativeAcksTracker.close();
12451245
stats.getStatTimeout().ifPresent(Timeout::cancel);
1246-
if (poolMessages) {
1247-
releasePooledMessagesAndStopAcceptNew();
1248-
}
1249-
}
1250-
1251-
/**
1252-
* If enabled pooled messages, we should release the messages after closing consumer and stop accept the new
1253-
* messages.
1254-
*/
1255-
private void releasePooledMessagesAndStopAcceptNew() {
1256-
incomingMessages.terminate(message -> message.release());
1246+
//terminate incomingMessages queue, stop accept the new messages and waking up blocked thread.
1247+
incomingMessages.terminate(Message::release);
12571248
clearIncomingMessages();
12581249
}
12591250

pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,9 @@ public T take() throws InterruptedException {
194194

195195
try {
196196
while (SIZE_UPDATER.get(this) == 0) {
197+
if (terminated) {
198+
throw new InterruptedException("Queue is terminated");
199+
}
197200
isNotEmpty.await();
198201
}
199202

@@ -220,6 +223,9 @@ public T poll(long timeout, TimeUnit unit) throws InterruptedException {
220223
if (timeoutNanos <= 0) {
221224
return null;
222225
}
226+
if (terminated) {
227+
return null;
228+
}
223229

224230
timeoutNanos = isNotEmpty.awaitNanos(timeoutNanos);
225231
}
@@ -422,7 +428,8 @@ public String toString() {
422428
}
423429

424430
/**
425-
* Make the queue not accept new items. if there are still new data trying to enter the queue, it will be handed
431+
* Make the queue not accept new items and waking up blocked consume.
432+
* if there are still new data trying to enter the queue, it will be handed
426433
* by {@param itemAfterTerminatedHandler}.
427434
*/
428435
public void terminate(@Nullable Consumer<T> itemAfterTerminatedHandler) {
@@ -436,6 +443,14 @@ public void terminate(@Nullable Consumer<T> itemAfterTerminatedHandler) {
436443
} finally {
437444
tailLock.unlockWrite(stamp);
438445
}
446+
447+
// Signal waiting consumer threads to prevent indefinite blocking after termination
448+
headLock.lock();
449+
try {
450+
isNotEmpty.signalAll();
451+
} finally {
452+
headLock.unlock();
453+
}
439454
}
440455

441456
public boolean isTerminated() {

pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueueTest.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,117 @@ public void pollTimeout2() throws Exception {
208208
latch.await();
209209
}
210210

211+
/**
212+
* test that multi-threads calling take(), and then terminate the queue, these threads will be properly interrupted.
213+
*/
214+
@Test(timeOut = 10000)
215+
public void testTakeBlockingThreadsTermination() throws InterruptedException {
216+
GrowableArrayBlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(2);
217+
int threadCount = 10;
218+
CountDownLatch terminateCompletedLatch = new CountDownLatch(threadCount);
219+
CountDownLatch allThreadReadyLatch = new CountDownLatch(threadCount);
220+
AtomicInteger interruptedThreadCount = new AtomicInteger(0);
221+
for (int i = 0; i < threadCount; i++) {
222+
new Thread(() -> {
223+
try {
224+
allThreadReadyLatch.countDown();
225+
queue.take();
226+
fail("thread should have been interrupted");
227+
} catch (InterruptedException e) {
228+
// Expected interruption, record and notify
229+
terminateCompletedLatch.countDown();
230+
interruptedThreadCount.incrementAndGet();
231+
}
232+
}).start();
233+
}
234+
// all threads should be ready in at most 1 second
235+
assertTrue(allThreadReadyLatch.await(1, TimeUnit.SECONDS));
236+
// Terminate the queue - this should interrupt all threads blocked on take()
237+
queue.terminate(null);
238+
// all threads should be interrupted in at most 1 second
239+
assertTrue(terminateCompletedLatch.await(1, TimeUnit.SECONDS));
240+
// Verify all threads were properly interrupted
241+
assertEquals(interruptedThreadCount.get(), threadCount);
242+
}
243+
244+
/**
245+
* test that multi-threads calling poll(), and then terminate the queue,
246+
* these threads will be terminated by return null value.
247+
*/
248+
@Test(timeOut = 10000)
249+
public void testPollBlockingThreadsTermination() throws InterruptedException {
250+
GrowableArrayBlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(2);
251+
int threadCount = 10;
252+
CountDownLatch terminateCompletedLatch = new CountDownLatch(threadCount);
253+
CountDownLatch allThreadReadyLatch = new CountDownLatch(threadCount);
254+
AtomicInteger terminateThreadCount = new AtomicInteger(0);
255+
for (int i = 0; i < threadCount; i++) {
256+
new Thread(() -> {
257+
try {
258+
allThreadReadyLatch.countDown();
259+
// poll message at a very long timeout, so it will keep pending
260+
Integer poll = queue.poll(1, TimeUnit.HOURS);
261+
// should return a null value if queue is terminated
262+
assertNull(poll);
263+
terminateCompletedLatch.countDown();
264+
terminateThreadCount.incrementAndGet();
265+
} catch (InterruptedException e) {
266+
throw new RuntimeException(e);
267+
}
268+
}).start();
269+
}
270+
// all threads should be ready in at most 1 second
271+
assertTrue(allThreadReadyLatch.await(1, TimeUnit.SECONDS));
272+
// Terminate the queue - this should make all threads return null value
273+
queue.terminate(null);
274+
// all threads should be terminated in at most 1 second
275+
assertTrue(terminateCompletedLatch.await(1, TimeUnit.SECONDS));
276+
assertEquals(terminateThreadCount.get(), threadCount);
277+
}
278+
279+
/**
280+
* test that multi-threads calling take() and poll() mix, and then terminate the queue,
281+
* the queue will signal all these threads properly.
282+
*/
283+
@Test(timeOut = 10000)
284+
public void testPollTakeMixBlockingThreadsTermination() throws InterruptedException {
285+
GrowableArrayBlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(2);
286+
int threadCount = 10;
287+
CountDownLatch terminateCompletedLatch = new CountDownLatch(threadCount);
288+
CountDownLatch allThreadReadyLatch = new CountDownLatch(threadCount);
289+
AtomicInteger terminateThreadCount = new AtomicInteger(0);
290+
for (int i = 0; i < threadCount; i++) {
291+
int finalI = i;
292+
new Thread(() -> {
293+
allThreadReadyLatch.countDown();
294+
if (finalI % 2 == 0) {
295+
try {
296+
queue.take();
297+
} catch (InterruptedException e) {
298+
terminateCompletedLatch.countDown();
299+
terminateThreadCount.incrementAndGet();
300+
}
301+
} else {
302+
try {
303+
Integer poll = queue.poll(1, TimeUnit.HOURS);
304+
assertNull(poll);
305+
terminateCompletedLatch.countDown();
306+
terminateThreadCount.incrementAndGet();
307+
} catch (InterruptedException e) {
308+
throw new RuntimeException(e);
309+
}
310+
}
311+
}).start();
312+
}
313+
// all threads should be ready in at most 1 second
314+
assertTrue(allThreadReadyLatch.await(1, TimeUnit.SECONDS));
315+
// Terminate the queue - this should make all threads return null value
316+
queue.terminate(null);
317+
// all threads should be terminated in at most 1 second
318+
assertTrue(terminateCompletedLatch.await(1, TimeUnit.SECONDS));
319+
assertEquals(terminateThreadCount.get(), threadCount);
320+
}
321+
211322
@Test
212323
public void removeTest() {
213324
BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(4);

0 commit comments

Comments
 (0)