Skip to content

Commit 2a8931e

Browse files
rdhabaliamukesh-ctds
authored andcommitted
[fix][broker] fix broker identifying incorrect stuck topic (apache#24006)
(cherry picked from commit 28f7845) (cherry picked from commit 95101f3)
1 parent b37569b commit 2a8931e

File tree

3 files changed

+14
-6
lines changed

3 files changed

+14
-6
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1227,7 +1227,7 @@ public boolean checkAndUnblockIfStuck() {
12271227
return false;
12281228
}
12291229
// consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read
1230-
if (totalAvailablePermits > 0 && !havePendingReplayRead && !havePendingRead
1230+
if (isAtleastOneConsumerAvailable() && !havePendingReplayRead && !havePendingRead
12311231
&& cursor.getNumberOfEntriesInBacklog(false) > 0) {
12321232
log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name);
12331233
readMoreEntries();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -615,9 +615,9 @@ public boolean checkAndUnblockIfStuck() {
615615
if (consumer == null || cursor.checkAndUpdateReadPositionChanged()) {
616616
return false;
617617
}
618-
int totalAvailablePermits = consumer.getAvailablePermits();
618+
boolean isConsumerAvailable = !consumer.isBlocked() && consumer.getAvailablePermits() > 0;
619619
// consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read
620-
if (totalAvailablePermits > 0 && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) {
620+
if (isConsumerAvailable && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) {
621621
log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name);
622622
readMoreEntries(consumer);
623623
return true;

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21-
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
21+
import com.google.common.collect.Multimap;
22+
import com.google.common.collect.Sets;
2223
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
2324
import static org.mockito.ArgumentMatchers.any;
2425
import static org.mockito.ArgumentMatchers.anyInt;
@@ -35,8 +36,6 @@
3536
import static org.testng.Assert.assertNull;
3637
import static org.testng.Assert.assertTrue;
3738
import static org.testng.Assert.fail;
38-
import com.google.common.collect.Multimap;
39-
import com.google.common.collect.Sets;
4039
import java.io.ByteArrayOutputStream;
4140
import java.lang.reflect.Field;
4241
import java.nio.charset.StandardCharsets;
@@ -74,6 +73,7 @@
7473
import org.apache.pulsar.broker.service.BrokerTestBase;
7574
import org.apache.pulsar.broker.service.Topic;
7675
import org.apache.pulsar.broker.service.TopicPoliciesService;
76+
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
7777
import org.apache.pulsar.client.admin.PulsarAdminException;
7878
import org.apache.pulsar.client.api.Consumer;
7979
import org.apache.pulsar.client.api.Message;
@@ -102,6 +102,7 @@
102102
import org.testng.annotations.DataProvider;
103103
import org.testng.annotations.Test;
104104

105+
105106
@Slf4j
106107
@Test(groups = "broker")
107108
public class PersistentTopicTest extends BrokerTestBase {
@@ -216,6 +217,13 @@ public void testUnblockStuckSubscription() throws Exception {
216217
assertNotNull(msg);
217218
msg = consumer2.receive(5, TimeUnit.SECONDS);
218219
assertNotNull(msg);
220+
221+
org.apache.pulsar.broker.service.Consumer sharedConsumer = sharedDispatcher.getConsumers().get(0);
222+
Field blockField = org.apache.pulsar.broker.service.Consumer.class.getDeclaredField("blockedConsumerOnUnackedMsgs");
223+
blockField.setAccessible(true);
224+
blockField.set(sharedConsumer, true);
225+
producer.newMessage().value("test").eventTime(5).send();
226+
assertFalse(sharedSub.checkAndUnblockIfStuck());
219227
}
220228

221229
@Test

0 commit comments

Comments
 (0)