Skip to content

Commit 83c4a8f

Browse files
drpmmalizhanhui
andauthored
Support long polling in rocketmq proxy in the protocol (#5788)
* Add long polling * Change rocketmq-proto version to 2.0.2 * fix checkstyle * Fix rocketmq-proto version Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> * Change pollTime to timeRemaining * fix test Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> Co-authored-by: Li Zhanhui <lizhanhui@gmail.com>
1 parent 59dfe8d commit 83c4a8f

File tree

7 files changed

+68
-41
lines changed

7 files changed

+68
-41
lines changed

WORKSPACE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ maven_install(
6767
"org.bouncycastle:bcpkix-jdk15on:1.69",
6868
"com.google.code.gson:gson:2.8.9",
6969
"com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2",
70-
"org.apache.rocketmq:rocketmq-proto:2.0.1",
70+
"org.apache.rocketmq:rocketmq-proto:2.0.2",
7171
"com.google.protobuf:protobuf-java:3.20.1",
7272
"com.google.protobuf:protobuf-java-util:3.20.1",
7373
"com.conversantmedia:disruptor:1.2.10",

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@
127127
<annotations-api.version>6.0.53</annotations-api.version>
128128
<extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
129129
<concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
130-
<rocketmq-proto.version>2.0.1</rocketmq-proto.version>
130+
<rocketmq-proto.version>2.0.2</rocketmq-proto.version>
131131
<grpc.version>1.50.0</grpc.version>
132132
<protobuf.version>3.20.1</protobuf.version>
133133
<disruptor.version>1.2.10</disruptor.version>

proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ public class ProxyConfig implements ConfigFile {
121121
private long grpcClientProducerBackoffInitialMillis = 10;
122122
private long grpcClientProducerBackoffMaxMillis = 1000;
123123
private int grpcClientProducerBackoffMultiplier = 2;
124-
private long grpcClientConsumerLongPollingTimeoutMillis = Duration.ofSeconds(30).toMillis();
124+
private long grpcClientConsumerMinLongPollingTimeoutMillis = Duration.ofSeconds(5).toMillis();
125+
private long grpcClientConsumerMaxLongPollingTimeoutMillis = Duration.ofSeconds(20).toMillis();
125126
private int grpcClientConsumerLongPollingBatchSize = 32;
126127
private long grpcClientIdleTimeMills = Duration.ofSeconds(120).toMillis();
127128

@@ -598,12 +599,20 @@ public void setGrpcClientProducerBackoffMultiplier(int grpcClientProducerBackoff
598599
this.grpcClientProducerBackoffMultiplier = grpcClientProducerBackoffMultiplier;
599600
}
600601

601-
public long getGrpcClientConsumerLongPollingTimeoutMillis() {
602-
return grpcClientConsumerLongPollingTimeoutMillis;
602+
public long getGrpcClientConsumerMinLongPollingTimeoutMillis() {
603+
return grpcClientConsumerMinLongPollingTimeoutMillis;
603604
}
604605

605-
public void setGrpcClientConsumerLongPollingTimeoutMillis(long grpcClientConsumerLongPollingTimeoutMillis) {
606-
this.grpcClientConsumerLongPollingTimeoutMillis = grpcClientConsumerLongPollingTimeoutMillis;
606+
public void setGrpcClientConsumerMinLongPollingTimeoutMillis(long grpcClientConsumerMinLongPollingTimeoutMillis) {
607+
this.grpcClientConsumerMinLongPollingTimeoutMillis = grpcClientConsumerMinLongPollingTimeoutMillis;
608+
}
609+
610+
public long getGrpcClientConsumerMaxLongPollingTimeoutMillis() {
611+
return grpcClientConsumerMaxLongPollingTimeoutMillis;
612+
}
613+
614+
public void setGrpcClientConsumerMaxLongPollingTimeoutMillis(long grpcClientConsumerMaxLongPollingTimeoutMillis) {
615+
this.grpcClientConsumerMaxLongPollingTimeoutMillis = grpcClientConsumerMaxLongPollingTimeoutMillis;
607616
}
608617

609618
public int getGrpcClientConsumerLongPollingBatchSize() {

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ protected static Settings mergeSubscriptionData(Settings settings, SubscriptionG
143143

144144
resultSettingsBuilder.getSubscriptionBuilder()
145145
.setReceiveBatchSize(config.getGrpcClientConsumerLongPollingBatchSize())
146-
.setLongPollingTimeout(Durations.fromMillis(config.getGrpcClientConsumerLongPollingTimeoutMillis()))
146+
.setLongPollingTimeout(Durations.fromMillis(config.getGrpcClientConsumerMaxLongPollingTimeoutMillis()))
147147
.setFifo(groupConfig.isConsumeMessageOrderly());
148148

149149
resultSettingsBuilder.getBackoffPolicyBuilder().setMaxAttempts(groupConfig.getRetryMaxTimes() + 1);

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,27 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
6868
ProxyConfig config = ConfigurationManager.getProxyConfig();
6969

7070
Long timeRemaining = ctx.getRemainingMs();
71-
long pollTime = timeRemaining - Durations.toMillis(settings.getRequestTimeout()) / 2;
72-
if (pollTime < 0) {
73-
pollTime = 0;
71+
long pollingTime;
72+
if (request.hasLongPollingTimeout()) {
73+
pollingTime = Durations.toMillis(request.getLongPollingTimeout());
74+
} else {
75+
pollingTime = timeRemaining - Durations.toMillis(settings.getRequestTimeout()) / 2;
76+
}
77+
if (pollingTime < config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
78+
pollingTime = config.getGrpcClientConsumerMinLongPollingTimeoutMillis();
79+
}
80+
if (pollingTime > config.getGrpcClientConsumerMaxLongPollingTimeoutMillis()) {
81+
pollingTime = config.getGrpcClientConsumerMaxLongPollingTimeoutMillis();
7482
}
75-
if (pollTime > config.getGrpcClientConsumerLongPollingTimeoutMillis()) {
76-
pollTime = config.getGrpcClientConsumerLongPollingTimeoutMillis();
83+
84+
if (pollingTime > timeRemaining) {
85+
if (timeRemaining >= config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
86+
pollingTime = timeRemaining;
87+
} else {
88+
writer.writeAndComplete(ctx, Code.ILLEGAL_POLLING_TIME, "The deadline time remaining is not enough" +
89+
" for polling, please check network condition");
90+
return;
91+
}
7792
}
7893

7994
validateTopicAndConsumerGroup(request.getMessageQueue().getTopic(), request.getGroup());
@@ -100,37 +115,37 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
100115
}
101116

102117
this.messagingProcessor.popMessage(
103-
ctx,
104-
new ReceiveMessageQueueSelector(
105-
request.getMessageQueue().getBroker().getName()
106-
),
107-
group,
108-
topic,
109-
request.getBatchSize(),
110-
actualInvisibleTime,
111-
pollTime,
112-
ConsumeInitMode.MAX,
113-
subscriptionData,
114-
fifo,
115-
new PopMessageResultFilterImpl(maxAttempts),
116-
timeRemaining
117-
).thenAccept(popResult -> {
118-
if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
119-
if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
120-
List<MessageExt> messageExtList = popResult.getMsgFoundList();
121-
for (MessageExt messageExt : messageExtList) {
122-
String receiptHandle = messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
123-
if (receiptHandle != null) {
124-
MessageReceiptHandle messageReceiptHandle =
125-
new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
126-
messageExt.getQueueOffset(), messageExt.getReconsumeTimes());
127-
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
118+
ctx,
119+
new ReceiveMessageQueueSelector(
120+
request.getMessageQueue().getBroker().getName()
121+
),
122+
group,
123+
topic,
124+
request.getBatchSize(),
125+
actualInvisibleTime,
126+
pollingTime,
127+
ConsumeInitMode.MAX,
128+
subscriptionData,
129+
fifo,
130+
new PopMessageResultFilterImpl(maxAttempts),
131+
timeRemaining
132+
).thenAccept(popResult -> {
133+
if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
134+
if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
135+
List<MessageExt> messageExtList = popResult.getMsgFoundList();
136+
for (MessageExt messageExt : messageExtList) {
137+
String receiptHandle = messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
138+
if (receiptHandle != null) {
139+
MessageReceiptHandle messageReceiptHandle =
140+
new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
141+
messageExt.getQueueOffset(), messageExt.getReconsumeTimes());
142+
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
143+
}
128144
}
129145
}
130146
}
131-
}
132-
writer.writeAndComplete(ctx, request, popResult);
133-
})
147+
writer.writeAndComplete(ctx, request, popResult);
148+
})
134149
.exceptionally(t -> {
135150
writer.writeAndComplete(ctx, request, t);
136151
return null;

proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.rocketmq.common.MixAll;
3939
import org.apache.rocketmq.common.constant.PermName;
4040
import org.apache.rocketmq.proxy.common.ProxyContext;
41+
import org.apache.rocketmq.proxy.config.ConfigurationManager;
4142
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
4243
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
4344
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
@@ -71,6 +72,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
7172
@Before
7273
public void before() throws Throwable {
7374
super.before();
75+
ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0);
7476
this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor,
7577
grpcClientSettingsManager, grpcChannelManager);
7678
}

test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ public void setUp() throws Exception {
158158
ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
159159
ConfigurationManager.getProxyConfig().setHeartbeatSyncerTopicClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
160160
ConfigurationManager.getProxyConfig().setMinInvisibleTimeMillsForRecv(3);
161+
ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0);
161162
}
162163

163164
protected MessagingServiceGrpc.MessagingServiceStub createStub(Channel channel) {

0 commit comments

Comments
 (0)