Skip to content

Commit 3518ee8

Browse files
poorbarcodelhotari
authored andcommitted
[fix][client] Fix incorrect producer.getPendingQueueSize due to incomplete queue implementation (apache#24184)
Co-authored-by: Lari Hotari <[email protected]> Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit a4a3409) (cherry picked from commit 64f574a)
1 parent 6af888b commit 3518ee8

File tree

3 files changed

+78
-2
lines changed

3 files changed

+78
-2
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.Executors;
4747
import java.util.concurrent.TimeUnit;
4848
import java.util.concurrent.atomic.AtomicInteger;
49+
import java.util.concurrent.atomic.AtomicReference;
4950
import java.util.regex.Pattern;
5051
import java.util.stream.Collectors;
5152
import lombok.Cleanup;
@@ -62,6 +63,7 @@
6263
import org.apache.pulsar.client.admin.PulsarAdminException;
6364
import org.apache.pulsar.client.api.Consumer;
6465
import org.apache.pulsar.client.api.Message;
66+
import org.apache.pulsar.client.api.MessageId;
6567
import org.apache.pulsar.client.api.Producer;
6668
import org.apache.pulsar.client.api.PulsarClientException;
6769
import org.apache.pulsar.client.api.Schema;
@@ -73,6 +75,7 @@
7375
import org.apache.pulsar.client.api.schema.SchemaDefinition;
7476
import org.apache.pulsar.client.impl.ConsumerImpl;
7577
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
78+
import org.apache.pulsar.client.impl.ProducerImpl;
7679
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
7780
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
7881
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -88,6 +91,7 @@
8891
import org.apache.pulsar.common.schema.SchemaInfo;
8992
import org.apache.pulsar.common.schema.SchemaType;
9093
import org.apache.pulsar.common.util.FutureUtil;
94+
import org.awaitility.Awaitility;
9195
import org.testng.Assert;
9296
import org.testng.annotations.AfterMethod;
9397
import org.testng.annotations.BeforeMethod;
@@ -1445,4 +1449,29 @@ public User(String name) {
14451449
}
14461450
}
14471451

1452+
1453+
@Test
1454+
public void testPendingQueueSizeIfIncompatible() throws Exception {
1455+
final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns");
1456+
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
1457+
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
1458+
final String topic = BrokerTestUtil.newUniqueName(namespace + "/tp");
1459+
admin.topics().createNonPartitionedTopic(topic);
1460+
1461+
ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
1462+
.maxPendingMessages(50).enableBatching(false).topic(topic).create();
1463+
producer.newMessage(Schema.STRING).value("msg").sendAsync();
1464+
AtomicReference<CompletableFuture<MessageId>> latestSend = new AtomicReference<>();
1465+
for (int i = 0; i < 100; i++) {
1466+
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync());
1467+
}
1468+
Awaitility.await().untilAsserted(() -> {
1469+
assertTrue(latestSend.get().isDone());
1470+
assertEquals(producer.getPendingQueueSize(), 0);
1471+
});
1472+
1473+
// cleanup.
1474+
producer.close();
1475+
admin.topics().delete(topic, false);
1476+
}
14481477
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1645,7 +1645,8 @@ protected OpSendMsg newObject(Handle<OpSendMsg> handle) {
16451645
* This queue is not thread safe.
16461646
*/
16471647
protected static class OpSendMsgQueue implements Iterable<OpSendMsg> {
1648-
private final Queue<OpSendMsg> delegate = new ArrayDeque<>();
1648+
@VisibleForTesting
1649+
final Queue<OpSendMsg> delegate = new ArrayDeque<>();
16491650
private int forEachDepth = 0;
16501651
private List<OpSendMsg> postponedOpSendMgs;
16511652
private final AtomicInteger messagesCount = new AtomicInteger(0);
@@ -1702,7 +1703,35 @@ public int messagesCount() {
17021703

17031704
@Override
17041705
public Iterator<OpSendMsg> iterator() {
1705-
return delegate.iterator();
1706+
Iterator<OpSendMsg> delegateIterator = delegate.iterator();
1707+
return new Iterator<OpSendMsg>() {
1708+
OpSendMsg currentOp;
1709+
1710+
@Override
1711+
public boolean hasNext() {
1712+
return delegateIterator.hasNext();
1713+
}
1714+
1715+
@Override
1716+
public OpSendMsg next() {
1717+
currentOp = delegateIterator.next();
1718+
return currentOp;
1719+
}
1720+
1721+
@Override
1722+
public void remove() {
1723+
delegateIterator.remove();
1724+
if (currentOp != null) {
1725+
messagesCount.addAndGet(-currentOp.numMessagesInBatch);
1726+
currentOp = null;
1727+
}
1728+
}
1729+
1730+
@Override
1731+
public void forEachRemaining(Consumer<? super OpSendMsg> action) {
1732+
delegateIterator.forEachRemaining(action);
1733+
}
1734+
};
17061735
}
17071736
}
17081737

pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.testng.Assert.assertEquals;
2424
import com.google.common.collect.Lists;
2525
import java.util.Arrays;
26+
import java.util.Iterator;
2627
import org.testng.annotations.BeforeClass;
2728
import org.testng.annotations.Test;
2829

@@ -82,4 +83,21 @@ public void shouldPostponeAddsAlsoInRecursiveCalls() {
8283
// then
8384
assertEquals(Lists.newArrayList(queue), Arrays.asList(opSendMsg, opSendMsg2, opSendMsg3, opSendMsg4));
8485
}
86+
87+
@Test
88+
public void testIteratorRemove() {
89+
ProducerImpl.OpSendMsgQueue queue = new ProducerImpl.OpSendMsgQueue();
90+
for (int i = 0; i < 10; i++) {
91+
queue.add(createDummyOpSendMsg());
92+
}
93+
94+
Iterator<ProducerImpl.OpSendMsg> iterator = queue.iterator();
95+
while (iterator.hasNext()) {
96+
iterator.next();
97+
iterator.remove();
98+
}
99+
// Verify: the result of "messagesCount()" is 0 after removed all items.
100+
assertEquals(queue.delegate.size(), 0);
101+
assertEquals(queue.messagesCount(), 0);
102+
}
85103
}

0 commit comments

Comments
 (0)