Skip to content

Commit ab65faa

Browse files
authored
[fix][client]Producer stuck or geo-replication stuck due to wrong value of message.numMessagesInBatch (#25106)
1 parent 9c5e1c3 commit ab65faa

File tree

2 files changed

+72
-3
lines changed

2 files changed

+72
-3
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@
100100
import org.apache.pulsar.client.admin.PulsarAdminException;
101101
import org.apache.pulsar.client.api.schema.GenericRecord;
102102
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
103+
import org.apache.pulsar.client.impl.ClientBuilderImpl;
104+
import org.apache.pulsar.client.impl.ClientCnx;
103105
import org.apache.pulsar.client.impl.ConsumerBase;
104106
import org.apache.pulsar.client.impl.ConsumerImpl;
105107
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -111,10 +113,12 @@
111113
import org.apache.pulsar.client.impl.TopicMessageImpl;
112114
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
113115
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
116+
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
114117
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
115118
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
116119
import org.apache.pulsar.common.api.EncryptionContext;
117120
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
121+
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
118122
import org.apache.pulsar.common.api.proto.MessageMetadata;
119123
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
120124
import org.apache.pulsar.common.compression.CompressionCodec;
@@ -5433,4 +5437,66 @@ public void testBacklogAfterCreatedSubscription(boolean trimLegderBeforeGetStats
54335437
// cleanup
54345438
admin.topics().delete(topic, false);
54355439
}
5440+
5441+
/**
5442+
* The internal producer of replicator will resend messages after reconnected. This test guarantees that the
5443+
* internal producer will continuously resent messages even though the client side encounters the following bugs.
5444+
* - The client side issue causes `message.metadata.numMessagesInBatch` being `0`, such as
5445+
* https://github.com/streamnative/pulsar-rs/issues/376.
5446+
* - Before the fix, the resend mechanism relies on `message.metadata.numMessagesInBatch`, after the fix, the
5447+
* producer only care about whether there are pending messages.
5448+
* see also https://github.com/apache/pulsar/pull/25106.
5449+
*/
5450+
@Test
5451+
public void testResendMessagesWhichNumMessagesInBatchIsZero() throws Exception {
5452+
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
5453+
final String subscriptionName = "s1";
5454+
admin.topics().createNonPartitionedTopic(topic);
5455+
admin.topics().createSubscription(topic, subscriptionName, MessageId.earliest);
5456+
5457+
// Create a producer which can be paused to publish.
5458+
AtomicBoolean stuckProducerReconnection = new AtomicBoolean(false);
5459+
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
5460+
PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
5461+
new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
5462+
protected void handleProducerSuccess(CommandProducerSuccess success) {
5463+
if (stuckProducerReconnection.get()) {
5464+
synchronized (stuckProducerReconnection) {
5465+
super.handleProducerSuccess(success);
5466+
}
5467+
} else {
5468+
super.handleProducerSuccess(success);
5469+
}
5470+
}
5471+
});
5472+
ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client.newProducer().topic(topic)
5473+
.sendTimeout(0, TimeUnit.SECONDS)
5474+
.enableBatching(false).create();
5475+
5476+
// Trigger a resending by unloading topics.
5477+
AtomicReference<CompletableFuture<MessageId>> latestPublishing = new AtomicReference<>();
5478+
synchronized (stuckProducerReconnection) {
5479+
stuckProducerReconnection.set(true);
5480+
admin.topics().unload(topic);
5481+
for (int i = 0; i < 10; i++) {
5482+
ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1);
5483+
MessageMetadata messageMetadata = new MessageMetadata();
5484+
messageMetadata.setUncompressedSize(1);
5485+
MessageImpl<byte[]> message1 = MessageImpl.create(topic, null, messageMetadata, payload,
5486+
Optional.empty(), null, Schema.BYTES, 0, true, 0);
5487+
// Mock bugs, which publish messages with 0 numMessagesInBatch.
5488+
message1.getMessageBuilder().setNumMessagesInBatch(0);
5489+
latestPublishing.set(producer1.sendAsync(message1));
5490+
}
5491+
stuckProducerReconnection.set(false);
5492+
}
5493+
5494+
// Verify: no messages being stuck.
5495+
latestPublishing.get().get(10, TimeUnit.SECONDS);
5496+
5497+
// cleanup.
5498+
producer1.close();
5499+
client.close();
5500+
admin.topics().delete(topic, false);
5501+
}
54365502
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import java.util.List;
5757
import java.util.Map;
5858
import java.util.Optional;
59-
import java.util.Queue;
6059
import java.util.concurrent.CompletableFuture;
6160
import java.util.concurrent.Semaphore;
6261
import java.util.concurrent.TimeUnit;
@@ -1781,7 +1780,7 @@ protected OpSendMsg newObject(Handle<OpSendMsg> handle) {
17811780
*/
17821781
protected static class OpSendMsgQueue implements Iterable<OpSendMsg> {
17831782
@VisibleForTesting
1784-
final Queue<OpSendMsg> delegate = new ArrayDeque<>();
1783+
final ArrayDeque<OpSendMsg> delegate = new ArrayDeque<>();
17851784
private int forEachDepth = 0;
17861785
private List<OpSendMsg> postponedOpSendMgs;
17871786
private final AtomicInteger messagesCount = new AtomicInteger(0);
@@ -1836,6 +1835,10 @@ public int messagesCount() {
18361835
return messagesCount.get();
18371836
}
18381837

1838+
public int size() {
1839+
return delegate.size();
1840+
}
1841+
18391842
@Override
18401843
public Iterator<OpSendMsg> iterator() {
18411844
Iterator<OpSendMsg> delegateIterator = delegate.iterator();
@@ -2148,7 +2151,7 @@ private void resendMessages(ClientCnx cnx, long expectedEpoch) {
21482151
}
21492152

21502153
int messagesToResend = pendingMessages.messagesCount();
2151-
if (messagesToResend == 0) {
2154+
if (pendingMessages.size() == 0) {
21522155
if (log.isDebugEnabled()) {
21532156
log.debug("[{}] [{}] No pending messages to resend {}", topic, producerName, messagesToResend);
21542157
}

0 commit comments

Comments
 (0)