Skip to content

Commit 8905f83

Browse files
bcaw-ofeerXYFCSLlizhimins
authored
[ISSUE #1081] [Java] Support message filtering in interceptors with proper acknowledgment handling (#1082)
Co-authored-by: xiaoying.ly <xiaoying.ly@alibaba-inc.com> Co-authored-by: terrance.lzm <terrance.lzm@alibaba-inc.com>
1 parent 444c764 commit 8905f83

File tree

6 files changed

+96
-10
lines changed

6 files changed

+96
-10
lines changed

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,15 @@ public interface PushConsumerBuilder {
9090
*/
9191
PushConsumerBuilder setEnableFifoConsumeAccelerator(boolean enableFifoConsumeAccelerator);
9292

93+
/**
94+
* Enable or disable message interceptor filtering functionality.
95+
* When enabled, it supports client-side message filtering by message interceptors.
96+
*
97+
* @param enableMessageInterceptorFiltering whether to enable message interceptor filtering
98+
* @return the consumer builder instance.
99+
*/
100+
PushConsumerBuilder setEnableMessageInterceptorFiltering(boolean enableMessageInterceptorFiltering);
101+
93102
/**
94103
* Finalize the build of {@link PushConsumer} and start.
95104
*

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.ArrayList;
3838
import java.util.Collections;
3939
import java.util.List;
40+
import java.util.Set;
4041
import java.util.UUID;
4142
import java.util.concurrent.ScheduledExecutorService;
4243
import java.util.concurrent.TimeUnit;
@@ -256,13 +257,62 @@ public void onSuccess(ReceiveMessageResult result) {
256257
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);
257258
consumer.doAfter(context0, generalMessages);
258259

259-
try {
260-
onReceiveMessageResult(result);
261-
} catch (Throwable t) {
262-
// Should never reach here.
263-
log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
264-
+ "clientId={}", mq, endpoints, clientId, t);
265-
onReceiveMessageException(t, attemptId);
260+
// Only perform message filtering when enableMessageInterceptorFiltering is enabled.
261+
if (consumer.isEnableMessageInterceptorFiltering()) {
262+
final List<MessageViewImpl> originalMessages =
263+
new ArrayList<>(result.getMessageViewImpls());
264+
265+
final Set<MessageId> filteredMessageIds = generalMessages.stream()
266+
.filter(msg -> msg.getMessageId().isPresent())
267+
.map(msg -> msg.getMessageId().get())
268+
.collect(Collectors.toSet());
269+
270+
final List<MessageViewImpl> filteredOutMessages = new ArrayList<>();
271+
final List<MessageViewImpl> remainingMessages = new ArrayList<>();
272+
273+
for (MessageViewImpl originalMsg : originalMessages) {
274+
if (filteredMessageIds.contains(originalMsg.getMessageId())) {
275+
remainingMessages.add(originalMsg);
276+
} else {
277+
filteredOutMessages.add(originalMsg);
278+
}
279+
}
280+
281+
// Ack filtered out messages.
282+
if (!filteredOutMessages.isEmpty()) {
283+
log.info("Acking {} filtered out messages by interceptor, mq={}, clientId={}",
284+
filteredOutMessages.size(), mq, consumer.getClientId());
285+
286+
for (MessageViewImpl filteredOutMsg : filteredOutMessages) {
287+
ListenableFuture<Void> ackFuture = ackMessage(filteredOutMsg);
288+
ackFuture.addListener(() -> {
289+
log.debug("Successfully acked filtered out message, messageId={}, topic={}",
290+
filteredOutMsg.getMessageId(), filteredOutMsg.getTopic());
291+
}, MoreExecutors.directExecutor());
292+
}
293+
}
294+
295+
try {
296+
// Create new ReceiveMessageResult with filtered messages.
297+
ReceiveMessageResult filteredResult =
298+
ReceiveMessageResult.createFilteredResult(result, remainingMessages);
299+
onReceiveMessageResult(filteredResult);
300+
} catch (Throwable t) {
301+
// Should never reach here.
302+
log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
303+
+ "clientId={}", mq, endpoints, clientId, t);
304+
onReceiveMessageException(t, attemptId);
305+
}
306+
} else {
307+
// When filtering is disabled, use original result directly to avoid performance overhead.
308+
try {
309+
onReceiveMessageResult(result);
310+
} catch (Throwable t) {
311+
// Should never reach here.
312+
log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
313+
+ "clientId={}", mq, endpoints, clientId, t);
314+
onReceiveMessageException(t, attemptId);
315+
}
266316
}
267317
}
268318

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class PushConsumerBuilderImpl implements PushConsumerBuilder {
4242
private int maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
4343
private int consumptionThreadCount = 20;
4444
private boolean enableFifoConsumeAccelerator = false;
45+
private boolean enableMessageInterceptorFiltering = false;
4546

4647
/**
4748
* @see PushConsumerBuilder#setClientConfiguration(ClientConfiguration)
@@ -123,6 +124,14 @@ public PushConsumerBuilder setEnableFifoConsumeAccelerator(boolean enableFifoCon
123124
return this;
124125
}
125126

127+
/**
128+
* @see PushConsumerBuilder#setEnableMessageInterceptorFiltering(boolean)
129+
*/
130+
public PushConsumerBuilder setEnableMessageInterceptorFiltering(boolean enableMessageInterceptorFiltering) {
131+
this.enableMessageInterceptorFiltering = enableMessageInterceptorFiltering;
132+
return this;
133+
}
134+
126135
/**
127136
* @see PushConsumerBuilder#build()
128137
*/
@@ -134,7 +143,7 @@ public PushConsumer build() throws ClientException {
134143
checkArgument(!subscriptionExpressions.isEmpty(), "subscriptionExpressions have not been set yet");
135144
final PushConsumerImpl pushConsumer = new PushConsumerImpl(clientConfiguration, consumerGroup,
136145
subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes,
137-
consumptionThreadCount, enableFifoConsumeAccelerator);
146+
consumptionThreadCount, enableFifoConsumeAccelerator, enableMessageInterceptorFiltering);
138147
pushConsumer.startAsync().awaitRunning();
139148
return pushConsumer;
140149
}

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
104104
private final int maxCacheMessageCount;
105105
private final int maxCacheMessageSizeInBytes;
106106
private final boolean enableFifoConsumeAccelerator;
107+
private final boolean enableMessageInterceptorFiltering;
107108
private final InflightRequestCountInterceptor inflightRequestCountInterceptor;
108109

109110
/**
@@ -129,6 +130,14 @@ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumer
129130
Map<String, FilterExpression> subscriptionExpressions, MessageListener messageListener,
130131
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount,
131132
boolean enableFifoConsumeAccelerator) {
133+
this(clientConfiguration, consumerGroup, subscriptionExpressions, messageListener, maxCacheMessageCount,
134+
maxCacheMessageSizeInBytes, consumptionThreadCount, enableFifoConsumeAccelerator, false);
135+
}
136+
137+
public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup,
138+
Map<String, FilterExpression> subscriptionExpressions, MessageListener messageListener,
139+
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount,
140+
boolean enableFifoConsumeAccelerator, boolean enableMessageInterceptorFiltering) {
132141
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
133142
this.clientConfiguration = clientConfiguration;
134143
Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
@@ -141,6 +150,7 @@ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumer
141150
this.maxCacheMessageCount = maxCacheMessageCount;
142151
this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
143152
this.enableFifoConsumeAccelerator = enableFifoConsumeAccelerator;
153+
this.enableMessageInterceptorFiltering = enableMessageInterceptorFiltering;
144154

145155
this.receptionTimes = new AtomicLong(0);
146156
this.receivedMessagesQuantity = new AtomicLong(0);
@@ -165,7 +175,7 @@ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumer
165175
Map<String, FilterExpression> subscriptionExpressions, MessageListener messageListener,
166176
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) {
167177
this(clientConfiguration, consumerGroup, subscriptionExpressions, messageListener, maxCacheMessageCount,
168-
maxCacheMessageSizeInBytes, consumptionThreadCount, true);
178+
maxCacheMessageSizeInBytes, consumptionThreadCount, true, false);
169179
}
170180

171181
@Override
@@ -626,4 +636,8 @@ public RetryPolicy getRetryPolicy() {
626636
public ThreadPoolExecutor getConsumptionExecutor() {
627637
return consumptionExecutor;
628638
}
639+
640+
public boolean isEnableMessageInterceptorFiltering() {
641+
return enableMessageInterceptorFiltering;
642+
}
629643
}

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ public ReceiveMessageResult(Endpoints endpoints, List<MessageViewImpl> messages)
3232
this.messages = messages;
3333
}
3434

35+
public static ReceiveMessageResult createFilteredResult(ReceiveMessageResult original,
36+
List<MessageViewImpl> filteredMessages) {
37+
return new ReceiveMessageResult(original.getEndpoints(), new ArrayList<>(filteredMessages));
38+
}
39+
3540
public List<MessageView> getMessageViews() {
3641
return new ArrayList<>(messages);
3742
}

protos

Submodule protos deleted from 5c9f841

0 commit comments

Comments
 (0)