diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 7f588d56eae..f495aef05cd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -19,6 +19,7 @@
import java.io.File;
import java.util.Properties;
+import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -63,7 +64,7 @@ public static void checkGroup(String group) throws MQClientException {
}
}
- public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
+ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer, Consumer msgConsumer) throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
@@ -80,6 +81,9 @@ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
+ // compress
+ msgConsumer.accept(msg);
+
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
@@ -138,4 +142,5 @@ public static void checkBrokerConfig(final Properties brokerConfig) throws MQCli
String.format("brokerPermission value: %s is invalid.", brokerConfig.getProperty("brokerPermission")));
}
}
+
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index b54091e3ced..754f09754a5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.client.impl.producer;
-import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -36,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -685,7 +685,7 @@ public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector
final long timeout) throws MQClientException, RemotingTooMuchRequestException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
+ checkMessage(msg);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
@@ -742,7 +742,7 @@ private SendResult sendDefaultImpl(
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
+ checkMessage(msg);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
@@ -893,6 +893,10 @@ private SendResult sendDefaultImpl(
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
+ private void checkMessage(Message msg) throws MQClientException {
+ Validators.checkMessage(msg, this.defaultMQProducer, defaultMQProducer::tryToCompressMessage);
+ }
+
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
@@ -942,13 +946,8 @@ private SendResult sendKernelImpl(final Message msg,
topicWithNamespace = true;
}
- int sysFlag = 0;
- boolean msgBodyCompressed = false;
- if (this.tryToCompressMessage(msg)) {
- sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
- sysFlag |= this.defaultMQProducer.getCompressType().getCompressionFlag();
- msgBodyCompressed = true;
- }
+ String sysFlagStr = MessageAccessor.removeProperty(msg, MessageConst.PROPERTY_SYS_FLAG);
+ int sysFlag = NumberUtils.isDigits(sysFlagStr) ? 0 : Integer.parseInt(sysFlagStr);
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
@@ -978,7 +977,7 @@ private SendResult sendKernelImpl(final Message msg,
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (isTrans != null && isTrans.equals("true")) {
+ if ("true".equals(isTrans)) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
@@ -1021,7 +1020,8 @@ private SendResult sendKernelImpl(final Message msg,
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
- if (msgBodyCompressed) {
+ String msgBodyCompressed = MessageAccessor.removeProperty(msg, MessageConst.PROPERTY_MSG_BODY_COMPRESSED);
+ if ("true".equals(msgBodyCompressed)) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using compressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
@@ -1107,32 +1107,6 @@ public MQClientInstance getmQClientFactory() {
return mQClientFactory;
}
- private boolean tryToCompressMessage(final Message msg) {
- if (msg instanceof MessageBatch) {
- //batch does not support compressing right now
- return false;
- }
- byte[] body = msg.getBody();
- if (body != null) {
- if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
- try {
- byte[] data = this.defaultMQProducer.getCompressor().compress(body, this.defaultMQProducer.getCompressLevel());
- if (data != null) {
- msg.setBody(data);
- return true;
- }
- } catch (IOException e) {
- log.error("tryToCompressMessage exception", e);
- if (log.isDebugEnabled()) {
- log.debug(msg.toString());
- }
- }
- }
- }
-
- return false;
- }
-
public boolean hasCheckForbiddenHook() {
return !checkForbiddenHookList.isEmpty();
}
@@ -1227,7 +1201,7 @@ public SendResult send(Message msg, MessageQueue mq, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
+ checkMessage(msg);
if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("message's topic not equal mq's topic", null);
@@ -1270,7 +1244,7 @@ public void send(final Message msg, final MessageQueue mq, final SendCallback se
public void run() {
try {
makeSureStateOK();
- Validators.checkMessage(msg, defaultMQProducer);
+ checkMessage(msg);
if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("Topic of the message does not match its target message queue", null);
@@ -1302,7 +1276,7 @@ public void run() {
public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
+ checkMessage(msg);
try {
this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout());
@@ -1333,7 +1307,7 @@ private SendResult sendSelectImpl(
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
+ checkMessage(msg);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
@@ -1441,7 +1415,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
- Validators.checkMessage(msg, this.defaultMQProducer);
+ checkMessage(msg);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 2091bbabbff..a55bcf7593a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.producer;
+import java.io.IOException;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
@@ -40,6 +41,7 @@
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -781,7 +783,7 @@ public SendResult sendByAccumulator(Message msg, MessageQueue mq,
if (!canBatch(msg)) {
return sendDirect(msg, mq, sendCallback);
} else {
- Validators.checkMessage(msg, this);
+ Validators.checkMessage(msg, this, this::tryToCompressMessage);
MessageClientIDSetter.setUniqID(msg);
if (sendCallback == null) {
return this.produceAccumulator.send(msg, mq, this);
@@ -792,6 +794,43 @@ public SendResult sendByAccumulator(Message msg, MessageQueue mq,
}
}
+ public boolean tryToCompressMessage(final Message msg) {
+ int sysFlag = 0;
+
+ try {
+ if (msg instanceof MessageBatch) {
+ //batch does not support compressing right now
+ return false;
+ }
+ byte[] body = msg.getBody();
+ if (body != null) {
+ if (body.length >= this.getCompressMsgBodyOverHowmuch()) {
+ try {
+ byte[] data = this.getCompressor().compress(body, this.getCompressLevel());
+ if (data != null) {
+ msg.setBody(data);
+
+ msg.putUserProperty(MessageConst.PROPERTY_MSG_BODY_COMPRESSED, "true");
+ sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
+ sysFlag |= this.getCompressType().getCompressionFlag();
+
+ return true;
+ }
+ } catch (IOException e) {
+ logger.error("tryToCompressMessage exception", e);
+ if (logger.isDebugEnabled()) {
+ logger.debug(msg.toString());
+ }
+ }
+ }
+ }
+
+ return false;
+ } finally {
+ msg.putUserProperty(MessageConst.PROPERTY_SYS_FLAG, String.valueOf(sysFlag));
+ }
+ }
+
/**
* Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message.
*
@@ -1174,7 +1213,7 @@ private MessageBatch batch(Collection msgs) throws MQClientException {
try {
msgBatch = MessageBatch.generateFromList(msgs);
for (Message message : msgBatch) {
- Validators.checkMessage(message, this);
+ Validators.checkMessage(message, this, this::tryToCompressMessage);
MessageClientIDSetter.setUniqID(message);
message.setTopic(withNamespace(message.getTopic()));
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/selector/DefaultMQProducerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/selector/DefaultMQProducerImplTest.java
index 77a83af19c0..1bb2f2cb7c6 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/selector/DefaultMQProducerImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/selector/DefaultMQProducerImplTest.java
@@ -41,6 +41,7 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.producer.RecallMessageHandle;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
import org.junit.Before;
import org.junit.Test;
@@ -60,6 +61,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -78,6 +80,9 @@ public class DefaultMQProducerImplTest {
@Mock
private Message message;
+ @Mock
+ private Message messageOverMaxMessageSize;
+
@Mock
private MessageQueue messageQueue;
@@ -118,6 +123,11 @@ public void init() throws Exception {
when(message.getTopic()).thenReturn(defaultTopic);
when(message.getProperty(MessageConst.PROPERTY_CORRELATION_ID)).thenReturn("correlation-id");
when(message.getBody()).thenReturn(new byte[1]);
+
+ messageOverMaxMessageSize = new Message();
+ messageOverMaxMessageSize.setTopic(defaultTopic);
+ messageOverMaxMessageSize.setBody(new byte[5 * 1024 * 1024]);
+
TransactionMQProducer producer = new TransactionMQProducer("test-producer-group");
producer.setTransactionListener(mock(TransactionListener.class));
producer.setTopics(Collections.singletonList(defaultTopic));
@@ -175,6 +185,12 @@ public void testSend() throws RemotingException, InterruptedException, MQClientE
assertNull(defaultMQProducerImpl.send(message));
}
+ @Test
+ public void testSendMessageOverMaxMessageSize() {
+ MQClientException clientException = assertThrows(MQClientException.class, () -> defaultMQProducerImpl.send(messageOverMaxMessageSize));
+ assertNotEquals(ResponseCode.MESSAGE_ILLEGAL, clientException.getResponseCode());
+ }
+
@Test
public void assertSendByQueue() throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
SendResult actual = defaultMQProducerImpl.send(message, messageQueue);
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
index 62e3bbd7e6e..8ef37e3b0ff 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
@@ -26,6 +26,12 @@ public static void clearProperty(final Message msg, final String name) {
msg.clearProperty(name);
}
+ public static String removeProperty(final Message msg, final String name) {
+ String propertyValue = msg.getUserProperty(name);
+ msg.clearProperty(name);
+ return propertyValue;
+ }
+
public static void setProperties(final Message msg, Map properties) {
msg.setProperties(properties);
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 24f7bdb99a5..b5e0f3689a5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -70,6 +70,8 @@ public class MessageConst {
public static final String PROPERTY_TIMER_DELIVER_MS = "TIMER_DELIVER_MS";
public static final String PROPERTY_BORN_HOST = "__BORNHOST";
public static final String PROPERTY_BORN_TIMESTAMP = "BORN_TIMESTAMP";
+ public static final String PROPERTY_MSG_BODY_COMPRESSED = "MSG_BODY_COMPRESSED";
+ public static final String PROPERTY_SYS_FLAG = "SYS_FLAG";
/**
* property which name starts with "__RMQ.TRANSIENT." is called transient one that will not stored in broker disks.