Skip to content

Commit c3727d3

Browse files
committed
[fix][client]Producer stuck or geo-replication stuck due to wrong value of message.numMessagesInBatch (#25106)
(cherry picked from commit ab65faa)
1 parent 8849484 commit c3727d3

File tree

2 files changed

+71
-3
lines changed

2 files changed

+71
-3
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
import org.apache.pulsar.client.api.schema.GenericRecord;
102102
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
103103
import org.apache.pulsar.client.impl.ClientBuilderImpl;
104+
import org.apache.pulsar.client.impl.ClientCnx;
104105
import org.apache.pulsar.client.impl.ConsumerBase;
105106
import org.apache.pulsar.client.impl.ConsumerImpl;
106107
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -112,10 +113,12 @@
112113
import org.apache.pulsar.client.impl.TopicMessageImpl;
113114
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
114115
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
116+
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
115117
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
116118
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
117119
import org.apache.pulsar.common.api.EncryptionContext;
118120
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
121+
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
119122
import org.apache.pulsar.common.api.proto.MessageMetadata;
120123
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
121124
import org.apache.pulsar.common.compression.CompressionCodec;
@@ -5406,4 +5409,66 @@ public void testBacklogAfterCreatedSubscription(boolean trimLegderBeforeGetStats
54065409
// cleanup
54075410
admin.topics().delete(topic, false);
54085411
}
5412+
5413+
/**
5414+
* The internal producer of replicator will resend messages after reconnected. This test guarantees that the
5415+
* internal producer will continuously resent messages even though the client side encounters the following bugs.
5416+
* - The client side issue causes `message.metadata.numMessagesInBatch` being `0`, such as
5417+
* https://github.com/streamnative/pulsar-rs/issues/376.
5418+
* - Before the fix, the resend mechanism relies on `message.metadata.numMessagesInBatch`, after the fix, the
5419+
* producer only care about whether there are pending messages.
5420+
* see also https://github.com/apache/pulsar/pull/25106.
5421+
*/
5422+
@Test
5423+
public void testResendMessagesWhichNumMessagesInBatchIsZero() throws Exception {
5424+
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
5425+
final String subscriptionName = "s1";
5426+
admin.topics().createNonPartitionedTopic(topic);
5427+
admin.topics().createSubscription(topic, subscriptionName, MessageId.earliest);
5428+
5429+
// Create a producer which can be paused to publish.
5430+
AtomicBoolean stuckProducerReconnection = new AtomicBoolean(false);
5431+
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
5432+
PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
5433+
new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
5434+
protected void handleProducerSuccess(CommandProducerSuccess success) {
5435+
if (stuckProducerReconnection.get()) {
5436+
synchronized (stuckProducerReconnection) {
5437+
super.handleProducerSuccess(success);
5438+
}
5439+
} else {
5440+
super.handleProducerSuccess(success);
5441+
}
5442+
}
5443+
});
5444+
ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client.newProducer().topic(topic)
5445+
.sendTimeout(0, TimeUnit.SECONDS)
5446+
.enableBatching(false).create();
5447+
5448+
// Trigger a resending by unloading topics.
5449+
AtomicReference<CompletableFuture<MessageId>> latestPublishing = new AtomicReference<>();
5450+
synchronized (stuckProducerReconnection) {
5451+
stuckProducerReconnection.set(true);
5452+
admin.topics().unload(topic);
5453+
for (int i = 0; i < 10; i++) {
5454+
ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1);
5455+
MessageMetadata messageMetadata = new MessageMetadata();
5456+
messageMetadata.setUncompressedSize(1);
5457+
MessageImpl<byte[]> message1 = MessageImpl.create(topic, null, messageMetadata, payload,
5458+
Optional.empty(), null, Schema.BYTES, 0, true, 0);
5459+
// Mock bugs, which publish messages with 0 numMessagesInBatch.
5460+
message1.getMessageBuilder().setNumMessagesInBatch(0);
5461+
latestPublishing.set(producer1.sendAsync(message1));
5462+
}
5463+
stuckProducerReconnection.set(false);
5464+
}
5465+
5466+
// Verify: no messages being stuck.
5467+
latestPublishing.get().get(10, TimeUnit.SECONDS);
5468+
5469+
// cleanup.
5470+
producer1.close();
5471+
client.close();
5472+
admin.topics().delete(topic, false);
5473+
}
54095474
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import java.util.List;
5757
import java.util.Map;
5858
import java.util.Optional;
59-
import java.util.Queue;
6059
import java.util.concurrent.CompletableFuture;
6160
import java.util.concurrent.Semaphore;
6261
import java.util.concurrent.TimeUnit;
@@ -1762,7 +1761,7 @@ protected OpSendMsg newObject(Handle<OpSendMsg> handle) {
17621761
*/
17631762
protected static class OpSendMsgQueue implements Iterable<OpSendMsg> {
17641763
@VisibleForTesting
1765-
final Queue<OpSendMsg> delegate = new ArrayDeque<>();
1764+
final ArrayDeque<OpSendMsg> delegate = new ArrayDeque<>();
17661765
private int forEachDepth = 0;
17671766
private List<OpSendMsg> postponedOpSendMgs;
17681767
private final AtomicInteger messagesCount = new AtomicInteger(0);
@@ -1817,6 +1816,10 @@ public int messagesCount() {
18171816
return messagesCount.get();
18181817
}
18191818

1819+
public int size() {
1820+
return delegate.size();
1821+
}
1822+
18201823
@Override
18211824
public Iterator<OpSendMsg> iterator() {
18221825
Iterator<OpSendMsg> delegateIterator = delegate.iterator();
@@ -2129,7 +2132,7 @@ private void resendMessages(ClientCnx cnx, long expectedEpoch) {
21292132
}
21302133

21312134
int messagesToResend = pendingMessages.messagesCount();
2132-
if (messagesToResend == 0) {
2135+
if (pendingMessages.size() == 0) {
21332136
if (log.isDebugEnabled()) {
21342137
log.debug("[{}] [{}] No pending messages to resend {}", topic, producerName, messagesToResend);
21352138
}

0 commit comments

Comments
 (0)