Skip to content

Commit 71598c1

Browse files
authored
[fix][client]Fixed getting an incorrect maxMessageSize value when accessing multiple clusters in the same process (#22306)
Co-authored-by: atomchchen <[email protected]>
1 parent 5cabcac commit 71598c1

File tree

9 files changed

+60
-40
lines changed

9 files changed

+60
-40
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,13 @@
9393
import org.apache.pulsar.client.admin.PulsarAdminException;
9494
import org.apache.pulsar.client.api.schema.GenericRecord;
9595
import org.apache.pulsar.client.impl.ClientBuilderImpl;
96-
import org.apache.pulsar.client.impl.ClientCnx;
9796
import org.apache.pulsar.client.impl.ConsumerBase;
9897
import org.apache.pulsar.client.impl.ConsumerImpl;
9998
import org.apache.pulsar.client.impl.MessageIdImpl;
10099
import org.apache.pulsar.client.impl.MessageImpl;
101100
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
102101
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
102+
import org.apache.pulsar.client.impl.ProducerImpl;
103103
import org.apache.pulsar.client.impl.TopicMessageImpl;
104104
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
105105
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
@@ -3906,11 +3906,11 @@ public void testReleaseSemaphoreOnFailMessages() throws Exception {
39063906
.topic("persistent://my-property/my-ns/my-topic2");
39073907

39083908
@Cleanup
3909-
Producer<byte[]> producer = producerBuilder.create();
3909+
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)producerBuilder.create();
39103910
List<Future<MessageId>> futures = new ArrayList<>();
39113911

39123912
// Asynchronously produce messages
3913-
byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
3913+
byte[] message = new byte[producer.getConnectionHandler().getMaxMessageSize() + 1];
39143914
for (int i = 0; i < maxPendingMessages + 10; i++) {
39153915
Future<MessageId> future = producer.sendAsync(message);
39163916
try {

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.mockito.ArgumentMatchers.anyInt;
2222
import static org.mockito.Mockito.doAnswer;
2323
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.when;
2425
import io.netty.buffer.ByteBufAllocator;
2526
import java.lang.reflect.Field;
2627
import java.nio.charset.StandardCharsets;
@@ -33,7 +34,6 @@
3334
import org.apache.pulsar.client.api.PulsarClient;
3435
import org.apache.pulsar.client.api.PulsarClientException;
3536
import org.apache.pulsar.client.api.SizeUnit;
36-
import org.mockito.MockedStatic;
3737
import org.mockito.Mockito;
3838
import org.testng.Assert;
3939
import org.testng.annotations.AfterMethod;
@@ -69,10 +69,12 @@ public void testProducerInvalidMessageMemoryRelease() throws Exception {
6969
.create();
7070
this.stopBroker();
7171
try {
72-
try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) {
73-
mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8);
74-
producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
75-
}
72+
ConnectionHandler connectionHandler = Mockito.spy(producer.getConnectionHandler());
73+
Field field = producer.getClass().getDeclaredField("connectionHandler");
74+
field.setAccessible(true);
75+
field.set(producer, connectionHandler);
76+
when(connectionHandler.getMaxMessageSize()).thenReturn(8);
77+
producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
7678
throw new IllegalStateException("can not reach here");
7779
} catch (PulsarClientException.InvalidMessageException ex) {
7880
PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.mockito.ArgumentMatchers.anyInt;
2323
import static org.mockito.Mockito.doAnswer;
2424
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.when;
2526
import io.netty.buffer.ByteBufAllocator;
2627
import java.lang.reflect.Field;
2728
import java.nio.charset.StandardCharsets;
@@ -33,7 +34,6 @@
3334
import org.apache.pulsar.client.api.Schema;
3435
import org.apache.pulsar.common.api.proto.MessageMetadata;
3536
import org.apache.pulsar.common.util.FutureUtil;
36-
import org.mockito.MockedStatic;
3737
import org.mockito.Mockito;
3838
import org.testng.Assert;
3939
import org.testng.annotations.AfterMethod;
@@ -72,24 +72,22 @@ public void testProducerSemaphoreInvalidMessage() throws Exception {
7272
.maxPendingMessages(pendingQueueSize)
7373
.enableBatching(true)
7474
.create();
75-
7675
this.stopBroker();
7776
try {
78-
try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) {
79-
mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2);
80-
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
81-
}
77+
ConnectionHandler connectionHandler = Mockito.spy(producer.getConnectionHandler());
78+
Field field = producer.getClass().getDeclaredField("connectionHandler");
79+
field.setAccessible(true);
80+
field.set(producer, connectionHandler);
81+
when(connectionHandler.getMaxMessageSize()).thenReturn(2);
82+
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
8283
throw new IllegalStateException("can not reach here");
8384
} catch (PulsarClientException.InvalidMessageException ex) {
8485
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
8586
}
8687

8788
producer.conf.setBatchingEnabled(false);
8889
try {
89-
try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) {
90-
mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2);
91-
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
92-
}
90+
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
9391
throw new IllegalStateException("can not reach here");
9492
} catch (PulsarClientException.InvalidMessageException ex) {
9593
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.pulsar.common.api.proto.CompressionType;
2626
import org.apache.pulsar.common.compression.CompressionCodec;
2727
import org.apache.pulsar.common.compression.CompressionCodecProvider;
28+
import org.apache.pulsar.common.protocol.Commands;
2829

2930
/**
3031
* Batch message container framework.
@@ -59,14 +60,18 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
5960
public boolean haveEnoughSpace(MessageImpl<?> msg) {
6061
int messageSize = msg.getDataBuffer().readableBytes();
6162
return (
62-
(maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= ClientCnx.getMaxMessageSize())
63+
(maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= getMaxMessageSize())
6364
|| (maxBytesInBatch > 0 && (messageSize + currentBatchSizeBytes) <= maxBytesInBatch)
6465
) && (maxNumMessagesInBatch <= 0 || numMessagesInBatch < maxNumMessagesInBatch);
6566
}
67+
protected int getMaxMessageSize() {
68+
return producer != null && producer.getConnectionHandler() != null
69+
? producer.getConnectionHandler().getMaxMessageSize() : Commands.DEFAULT_MAX_MESSAGE_SIZE;
70+
}
6671

6772
protected boolean isBatchFull() {
6873
return (maxBytesInBatch > 0 && currentBatchSizeBytes >= maxBytesInBatch)
69-
|| (maxBytesInBatch <= 0 && currentBatchSizeBytes >= ClientCnx.getMaxMessageSize())
74+
|| (maxBytesInBatch <= 0 && currentBatchSizeBytes >= getMaxMessageSize())
7075
|| (maxNumMessagesInBatch > 0 && numMessagesInBatch >= maxNumMessagesInBatch);
7176
}
7277

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
101101
lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
102102
this.firstCallback = callback;
103103
batchedMessageMetadataAndPayload = allocator.buffer(
104-
Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
104+
Math.min(maxBatchSize, getMaxMessageSize()));
105105
updateAndReserveBatchAllocatedSize(batchedMessageMetadataAndPayload.capacity());
106106
if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
107107
currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
@@ -272,26 +272,26 @@ public OpSendMsg createOpSendMsg() throws IOException {
272272
op.setBatchSizeByte(encryptedPayload.readableBytes());
273273

274274
// handle mgs size check as non-batched in `ProducerImpl.isMessageSizeExceeded`
275-
if (op.getMessageHeaderAndPayloadSize() > ClientCnx.getMaxMessageSize()) {
275+
if (op.getMessageHeaderAndPayloadSize() > getMaxMessageSize()) {
276276
producer.semaphoreRelease(1);
277277
producer.client.getMemoryLimitController().releaseMemory(
278278
messages.get(0).getUncompressedSize() + batchAllocatedSizeBytes);
279279
discard(new PulsarClientException.InvalidMessageException(
280-
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
280+
"Message size is bigger than " + getMaxMessageSize() + " bytes"));
281281
return null;
282282
}
283283
lowestSequenceId = -1L;
284284
return op;
285285
}
286286
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
287287
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
288-
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
288+
if (encryptedPayload.readableBytes() > getMaxMessageSize()) {
289289
producer.semaphoreRelease(messages.size());
290290
messages.forEach(msg -> producer.client.getMemoryLimitController()
291291
.releaseMemory(msg.getUncompressedSize()));
292292
producer.client.getMemoryLimitController().releaseMemory(batchAllocatedSizeBytes);
293293
discard(new PulsarClientException.InvalidMessageException(
294-
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
294+
"Message size is bigger than " + getMaxMessageSize() + " bytes"));
295295
return null;
296296
}
297297
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,7 @@ public class ClientCnx extends PulsarHandler {
168168
private volatile int numberOfRejectRequests = 0;
169169

170170
@Getter
171-
private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
172-
171+
private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
173172
private final int maxNumberOfRejectedRequestPerConnection;
174173
private final int rejectedRequestResetTimeSec = 60;
175174
protected final int protocolVersion;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
import java.util.concurrent.atomic.AtomicBoolean;
2727
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2828
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
29+
import lombok.Getter;
30+
import lombok.Setter;
2931
import org.apache.pulsar.client.api.PulsarClientException;
3032
import org.apache.pulsar.client.impl.HandlerState.State;
33+
import org.apache.pulsar.common.protocol.Commands;
3134
import org.slf4j.Logger;
3235
import org.slf4j.LoggerFactory;
3336

@@ -36,6 +39,10 @@ public class ConnectionHandler {
3639
AtomicReferenceFieldUpdater.newUpdater(ConnectionHandler.class, ClientCnx.class, "clientCnx");
3740
@SuppressWarnings("unused")
3841
private volatile ClientCnx clientCnx = null;
42+
@Getter
43+
@Setter
44+
// Since the `clientCnx` variable will be set to null at some times, it is necessary to save this value here.
45+
private volatile int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
3946

4047
protected final HandlerState state;
4148
protected final Backoff backoff;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,7 @@ public void negativeAcknowledge(Message<?> message) {
776776
@Override
777777
public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
778778
previousExceptions.clear();
779+
getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());
779780

780781
final State state = getState();
781782
if (state == State.Closing || state == State.Closed) {
@@ -1896,7 +1897,7 @@ private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetada
18961897
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
18971898
int uncompressedSize = msgMetadata.getUncompressedSize();
18981899
int payloadSize = payload.readableBytes();
1899-
if (checkMaxMessageSize && payloadSize > ClientCnx.getMaxMessageSize()) {
1900+
if (checkMaxMessageSize && payloadSize > getConnectionHandler().getMaxMessageSize()) {
19001901
// payload size is itself corrupted since it cannot be bigger than the MaxMessageSize
19011902
log.error("[{}][{}] Got corrupted payload message size {} at {}", topic, subscription, payloadSize,
19021903
messageId);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,6 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
180180
this.userProvidedProducerName = StringUtils.isNotBlank(producerName);
181181
this.partitionIndex = partitionIndex;
182182
this.pendingMessages = createPendingMessagesQueue();
183-
this.chunkMaxMessageSize = conf.getChunkMaxMessageSize() > 0
184-
? Math.min(conf.getChunkMaxMessageSize(), ClientCnx.getMaxMessageSize())
185-
: ClientCnx.getMaxMessageSize();
186183
if (conf.getMaxPendingMessages() > 0) {
187184
this.semaphore = Optional.of(new Semaphore(conf.getMaxPendingMessages(), true));
188185
} else {
@@ -275,10 +272,16 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
275272
.setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
276273
.create(),
277274
this);
278-
275+
setChunkMaxMessageSize();
279276
grabCnx();
280277
}
281278

279+
private void setChunkMaxMessageSize() {
280+
this.chunkMaxMessageSize = conf.getChunkMaxMessageSize() > 0
281+
? Math.min(conf.getChunkMaxMessageSize(), getMaxMessageSize())
282+
: getMaxMessageSize();
283+
}
284+
282285
protected void semaphoreRelease(final int releaseCountRequest) {
283286
if (semaphore.isPresent()) {
284287
if (!errorState) {
@@ -455,14 +458,14 @@ public void sendAsync(Message<?> message, SendCallback callback) {
455458

456459
// validate msg-size (For batching this will be check at the batch completion size)
457460
int compressedSize = compressedPayload.readableBytes();
458-
if (compressedSize > ClientCnx.getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
461+
if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
459462
compressedPayload.release();
460463
String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
461464
PulsarClientException.InvalidMessageException invalidMessageException =
462465
new PulsarClientException.InvalidMessageException(
463466
format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds"
464467
+ " %d bytes",
465-
producerName, topic, compressedStr, compressedSize, ClientCnx.getMaxMessageSize()));
468+
producerName, topic, compressedStr, compressedSize, getMaxMessageSize()));
466469
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
467470
return;
468471
}
@@ -492,19 +495,19 @@ public void sendAsync(Message<?> message, SendCallback callback) {
492495
int payloadChunkSize;
493496
if (canAddToBatch(msg) || !conf.isChunkingEnabled()) {
494497
totalChunks = 1;
495-
payloadChunkSize = ClientCnx.getMaxMessageSize();
498+
payloadChunkSize = getMaxMessageSize();
496499
} else {
497500
// Reserve current metadata size for chunk size to avoid message size overflow.
498501
// NOTE: this is not strictly bounded, as metadata will be updated after chunking.
499502
// So there is a small chance that the final message size is larger than ClientCnx.getMaxMessageSize().
500503
// But it won't cause produce failure as broker have 10 KB padding space for these cases.
501-
payloadChunkSize = ClientCnx.getMaxMessageSize() - msgMetadata.getSerializedSize();
504+
payloadChunkSize = getMaxMessageSize() - msgMetadata.getSerializedSize();
502505
if (payloadChunkSize <= 0) {
503506
PulsarClientException.InvalidMessageException invalidMessageException =
504507
new PulsarClientException.InvalidMessageException(
505508
format("The producer %s of the topic %s sends a message with %d bytes metadata that "
506509
+ "exceeds %d bytes", producerName, topic,
507-
msgMetadata.getSerializedSize(), ClientCnx.getMaxMessageSize()));
510+
msgMetadata.getSerializedSize(), getMaxMessageSize()));
508511
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
509512
compressedPayload.release();
510513
return;
@@ -1663,7 +1666,8 @@ public Iterator<OpSendMsg> iterator() {
16631666
@Override
16641667
public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
16651668
previousExceptions.clear();
1666-
chunkMaxMessageSize = Math.min(chunkMaxMessageSize, ClientCnx.getMaxMessageSize());
1669+
getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());
1670+
setChunkMaxMessageSize();
16671671

16681672
final long epoch;
16691673
synchronized (this) {
@@ -2323,18 +2327,22 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
23232327
private boolean isMessageSizeExceeded(OpSendMsg op) {
23242328
if (op.msg != null && !conf.isChunkingEnabled()) {
23252329
int messageSize = op.getMessageHeaderAndPayloadSize();
2326-
if (messageSize > ClientCnx.getMaxMessageSize()) {
2330+
if (messageSize > getMaxMessageSize()) {
23272331
releaseSemaphoreForSendOp(op);
23282332
op.sendComplete(new PulsarClientException.InvalidMessageException(
23292333
format("The producer %s of the topic %s sends a message with %d bytes that exceeds %d bytes",
2330-
producerName, topic, messageSize, ClientCnx.getMaxMessageSize()),
2334+
producerName, topic, messageSize, getMaxMessageSize()),
23312335
op.sequenceId));
23322336
return true;
23332337
}
23342338
}
23352339
return false;
23362340
}
23372341

2342+
private int getMaxMessageSize() {
2343+
return getConnectionHandler().getMaxMessageSize();
2344+
}
2345+
23382346
public long getDelayInMillis() {
23392347
OpSendMsg firstMsg = pendingMessages.peek();
23402348
if (firstMsg != null) {

0 commit comments

Comments
 (0)