Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
Expand Down Expand Up @@ -63,7 +64,7 @@ public static void checkGroup(String group) throws MQClientException {
}
}

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer, Consumer<Message> msgConsumer) throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
Expand All @@ -80,6 +81,9 @@ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}

// compress
msgConsumer.accept(msg);

if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
Expand Down Expand Up @@ -138,4 +142,5 @@ public static void checkBrokerConfig(final Properties brokerConfig) throws MQCli
String.format("brokerPermission value: %s is invalid.", brokerConfig.getProperty("brokerPermission")));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.client.impl.producer;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -36,6 +35,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
Expand Down Expand Up @@ -685,7 +685,7 @@ public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector
final long timeout) throws MQClientException, RemotingTooMuchRequestException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
checkMessage(msg);

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
Expand Down Expand Up @@ -742,7 +742,7 @@ private SendResult sendDefaultImpl(
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
checkMessage(msg);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
Expand Down Expand Up @@ -893,6 +893,10 @@ private SendResult sendDefaultImpl(
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

private void checkMessage(Message msg) throws MQClientException {
Validators.checkMessage(msg, this.defaultMQProducer, defaultMQProducer::tryToCompressMessage);
}

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
Expand Down Expand Up @@ -942,13 +946,8 @@ private SendResult sendKernelImpl(final Message msg,
topicWithNamespace = true;
}

int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= this.defaultMQProducer.getCompressType().getCompressionFlag();
msgBodyCompressed = true;
}
String sysFlagStr = MessageAccessor.removeProperty(msg, MessageConst.PROPERTY_SYS_FLAG);
int sysFlag = NumberUtils.isDigits(sysFlagStr) ? 0 : Integer.parseInt(sysFlagStr);

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
Expand Down Expand Up @@ -978,7 +977,7 @@ private SendResult sendKernelImpl(final Message msg,
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
if ("true".equals(isTrans)) {
context.setMsgType(MessageType.Trans_Msg_Half);
}

Expand Down Expand Up @@ -1021,7 +1020,8 @@ private SendResult sendKernelImpl(final Message msg,
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
String msgBodyCompressed = MessageAccessor.removeProperty(msg, MessageConst.PROPERTY_MSG_BODY_COMPRESSED);
if ("true".equals(msgBodyCompressed)) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using compressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
Expand Down Expand Up @@ -1107,32 +1107,6 @@ public MQClientInstance getmQClientFactory() {
return mQClientFactory;
}

private boolean tryToCompressMessage(final Message msg) {
if (msg instanceof MessageBatch) {
//batch does not support compressing right now
return false;
}
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = this.defaultMQProducer.getCompressor().compress(body, this.defaultMQProducer.getCompressLevel());
if (data != null) {
msg.setBody(data);
return true;
}
} catch (IOException e) {
log.error("tryToCompressMessage exception", e);
if (log.isDebugEnabled()) {
log.debug(msg.toString());
}
}
}
}

return false;
}

public boolean hasCheckForbiddenHook() {
return !checkForbiddenHookList.isEmpty();
}
Expand Down Expand Up @@ -1227,7 +1201,7 @@ public SendResult send(Message msg, MessageQueue mq, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
checkMessage(msg);

if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("message's topic not equal mq's topic", null);
Expand Down Expand Up @@ -1270,7 +1244,7 @@ public void send(final Message msg, final MessageQueue mq, final SendCallback se
public void run() {
try {
makeSureStateOK();
Validators.checkMessage(msg, defaultMQProducer);
checkMessage(msg);

if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("Topic of the message does not match its target message queue", null);
Expand Down Expand Up @@ -1302,7 +1276,7 @@ public void run() {
public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
checkMessage(msg);

try {
this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout());
Expand Down Expand Up @@ -1333,7 +1307,7 @@ private SendResult sendSelectImpl(
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
checkMessage(msg);

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
Expand Down Expand Up @@ -1441,7 +1415,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}

Validators.checkMessage(msg, this.defaultMQProducer);
checkMessage(msg);

SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.producer;

import java.io.IOException;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
Expand All @@ -40,6 +41,7 @@
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -781,7 +783,7 @@ public SendResult sendByAccumulator(Message msg, MessageQueue mq,
if (!canBatch(msg)) {
return sendDirect(msg, mq, sendCallback);
} else {
Validators.checkMessage(msg, this);
Validators.checkMessage(msg, this, this::tryToCompressMessage);
MessageClientIDSetter.setUniqID(msg);
if (sendCallback == null) {
return this.produceAccumulator.send(msg, mq, this);
Expand All @@ -792,6 +794,43 @@ public SendResult sendByAccumulator(Message msg, MessageQueue mq,
}
}

public boolean tryToCompressMessage(final Message msg) {
int sysFlag = 0;

try {
if (msg instanceof MessageBatch) {
//batch does not support compressing right now
return false;
}
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = this.getCompressor().compress(body, this.getCompressLevel());
if (data != null) {
msg.setBody(data);

msg.putUserProperty(MessageConst.PROPERTY_MSG_BODY_COMPRESSED, "true");
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= this.getCompressType().getCompressionFlag();

return true;
}
} catch (IOException e) {
logger.error("tryToCompressMessage exception", e);
if (logger.isDebugEnabled()) {
logger.debug(msg.toString());
}
}
}
}

return false;
} finally {
msg.putUserProperty(MessageConst.PROPERTY_SYS_FLAG, String.valueOf(sysFlag));
}
}

/**
* Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message. </p>
*
Expand Down Expand Up @@ -1174,7 +1213,7 @@ private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
try {
msgBatch = MessageBatch.generateFromList(msgs);
for (Message message : msgBatch) {
Validators.checkMessage(message, this);
Validators.checkMessage(message, this, this::tryToCompressMessage);
MessageClientIDSetter.setUniqID(message);
message.setTopic(withNamespace(message.getTopic()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.producer.RecallMessageHandle;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -60,6 +61,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand All @@ -78,6 +80,9 @@ public class DefaultMQProducerImplTest {
@Mock
private Message message;

@Mock
private Message messageOverMaxMessageSize;

@Mock
private MessageQueue messageQueue;

Expand Down Expand Up @@ -118,6 +123,11 @@ public void init() throws Exception {
when(message.getTopic()).thenReturn(defaultTopic);
when(message.getProperty(MessageConst.PROPERTY_CORRELATION_ID)).thenReturn("correlation-id");
when(message.getBody()).thenReturn(new byte[1]);

messageOverMaxMessageSize = new Message();
messageOverMaxMessageSize.setTopic(defaultTopic);
messageOverMaxMessageSize.setBody(new byte[5 * 1024 * 1024]);

TransactionMQProducer producer = new TransactionMQProducer("test-producer-group");
producer.setTransactionListener(mock(TransactionListener.class));
producer.setTopics(Collections.singletonList(defaultTopic));
Expand Down Expand Up @@ -175,6 +185,12 @@ public void testSend() throws RemotingException, InterruptedException, MQClientE
assertNull(defaultMQProducerImpl.send(message));
}

@Test
public void testSendMessageOverMaxMessageSize() {
MQClientException clientException = assertThrows(MQClientException.class, () -> defaultMQProducerImpl.send(messageOverMaxMessageSize));
assertNotEquals(ResponseCode.MESSAGE_ILLEGAL, clientException.getResponseCode());
}

@Test
public void assertSendByQueue() throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
SendResult actual = defaultMQProducerImpl.send(message, messageQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ public static void clearProperty(final Message msg, final String name) {
msg.clearProperty(name);
}

public static String removeProperty(final Message msg, final String name) {
String propertyValue = msg.getUserProperty(name);
msg.clearProperty(name);
return propertyValue;
}

public static void setProperties(final Message msg, Map<String, String> properties) {
msg.setProperties(properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class MessageConst {
public static final String PROPERTY_TIMER_DELIVER_MS = "TIMER_DELIVER_MS";
public static final String PROPERTY_BORN_HOST = "__BORNHOST";
public static final String PROPERTY_BORN_TIMESTAMP = "BORN_TIMESTAMP";
public static final String PROPERTY_MSG_BODY_COMPRESSED = "MSG_BODY_COMPRESSED";
public static final String PROPERTY_SYS_FLAG = "SYS_FLAG";

/**
* property which name starts with "__RMQ.TRANSIENT." is called transient one that will not stored in broker disks.
Expand Down