Skip to content

Commit 4afe70e

Browse files
authored
[ISSUE #984] Support fifo parallel consume in java pushconsumer (#985)
1 parent 14e9d55 commit 4afe70e

File tree

10 files changed

+458
-149
lines changed

10 files changed

+458
-149
lines changed

.github/workflows/java_build.yml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ jobs:
88
strategy:
99
fail-fast: false
1010
matrix:
11-
os: [ubuntu-20.04, macos-latest, windows-2022]
11+
os: [ ubuntu-22.04, macos-latest, windows-2022 ]
1212
jdk: [11, 17]
1313
steps:
1414
- name: Checkout
15-
uses: actions/checkout@v2
15+
uses: actions/checkout@v3
1616
- name: Set up JDK ${{ matrix.jdk }}
17-
uses: actions/setup-java@v2
17+
uses: actions/setup-java@v3
1818
with:
1919
java-version: ${{ matrix.jdk }}
2020
distribution: "adopt"
@@ -27,10 +27,10 @@ jobs:
2727
runs-on: ubuntu-latest
2828
steps:
2929
- name: Checkout Current Repository
30-
uses: actions/checkout@v2
30+
uses: actions/checkout@v3
3131
# Use JDK 17.
3232
- name: Use JDK 17
33-
uses: actions/setup-java@v2
33+
uses: actions/setup-java@v3
3434
with:
3535
java-version: 17
3636
distribution: "adopt"
@@ -53,7 +53,7 @@ jobs:
5353
| xargs echo "::set-output name=tag_name::"
5454
# Clone the opentelemetry-java-instrumentation repository.
5555
- name: Checkout Latest Release
56-
uses: actions/checkout@v2
56+
uses: actions/checkout@v3
5757
with:
5858
repository: open-telemetry/opentelemetry-java-instrumentation
5959
ref: ${{ steps.get_release_tag.outputs.tag_name }}
@@ -67,7 +67,7 @@ jobs:
6767
sed -i 's/org\.apache\.rocketmq:rocketmq-client-java:[^"]*/org.apache.rocketmq:rocketmq-client-java:${{ steps.get_version.outputs.version }}/' instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/build.gradle.kts
6868
# Use JDK 17.
6969
- name: Use JDK 17
70-
uses: actions/setup-java@v2
70+
uses: actions/setup-java@v3
7171
with:
7272
java-version: 17
7373
distribution: "adopt"

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ public interface PushConsumerBuilder {
8181
*/
8282
PushConsumerBuilder setConsumptionThreadCount(int count);
8383

84+
/**
85+
* Set enable fifo consume accelerator. If enabled, the consumer will consume messages in parallel by messageGroup,
86+
* it may increase the probability of repeatedly consuming the same message.
87+
*
88+
* @param enableFifoConsumeAccelerator enable fifo parallel processing.
89+
* @return the consumer builder instance.
90+
*/
91+
PushConsumerBuilder setEnableFifoConsumeAccelerator(boolean enableFifoConsumeAccelerator);
92+
8493
/**
8594
* Finalize the build of {@link PushConsumer} and start.
8695
*

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@
2020
import com.google.common.util.concurrent.Futures;
2121
import com.google.common.util.concurrent.ListenableFuture;
2222
import com.google.common.util.concurrent.MoreExecutors;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
2325
import java.util.Iterator;
2426
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Optional;
2529
import java.util.concurrent.ScheduledExecutorService;
2630
import java.util.concurrent.ThreadPoolExecutor;
2731
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
@@ -35,16 +39,38 @@
3539
@SuppressWarnings("UnstableApiUsage")
3640
class FifoConsumeService extends ConsumeService {
3741
private static final Logger log = LoggerFactory.getLogger(FifoConsumeService.class);
42+
private final boolean enableFifoConsumeAccelerator;
3843

3944
public FifoConsumeService(ClientId clientId, MessageListener messageListener,
4045
ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor,
41-
ScheduledExecutorService scheduler) {
46+
ScheduledExecutorService scheduler, boolean enableFifoConsumeAccelerator) {
4247
super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler);
48+
this.enableFifoConsumeAccelerator = enableFifoConsumeAccelerator;
4349
}
4450

4551
@Override
4652
public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
47-
consumeIteratively(pq, messageViews.iterator());
53+
if (!enableFifoConsumeAccelerator || messageViews.size() <= 1) {
54+
consumeIteratively(pq, messageViews.iterator());
55+
return;
56+
}
57+
Map<String, List<MessageViewImpl>> messageViewsGroupByMessageGroup = new HashMap<>();
58+
List<MessageViewImpl> messageViewsWithoutMessageGroup = new ArrayList<>();
59+
for (MessageViewImpl messageView : messageViews) {
60+
Optional<String> messageGroup = messageView.getMessageGroup();
61+
if (messageGroup.isPresent()) {
62+
messageViewsGroupByMessageGroup.computeIfAbsent(messageGroup.get(), k -> new ArrayList<>())
63+
.add(messageView);
64+
} else {
65+
messageViewsWithoutMessageGroup.add(messageView);
66+
}
67+
}
68+
69+
log.debug("FifoConsumeService parallel consume, messageViewsNum={}, groupNum={}", messageViews.size(),
70+
messageViewsGroupByMessageGroup.size() + (messageViewsWithoutMessageGroup.isEmpty() ? 0 : 1));
71+
72+
messageViewsGroupByMessageGroup.values().forEach(list -> consumeIteratively(pq, list.iterator()));
73+
consumeIteratively(pq, messageViewsWithoutMessageGroup.iterator());
4874
}
4975

5076
public void consumeIteratively(ProcessQueue pq, Iterator<MessageViewImpl> iterator) {

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class PushConsumerBuilderImpl implements PushConsumerBuilder {
4141
private int maxCacheMessageCount = 1024;
4242
private int maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
4343
private int consumptionThreadCount = 20;
44+
private boolean enableFifoConsumeAccelerator = false;
4445

4546
/**
4647
* @see PushConsumerBuilder#setClientConfiguration(ClientConfiguration)
@@ -113,6 +114,15 @@ public PushConsumerBuilder setConsumptionThreadCount(int consumptionThreadCount)
113114
return this;
114115
}
115116

117+
/**
118+
* @see PushConsumerBuilder#setEnableFifoConsumeAccelerator(boolean)
119+
*/
120+
@Override
121+
public PushConsumerBuilder setEnableFifoConsumeAccelerator(boolean enableFifoConsumeAccelerator) {
122+
this.enableFifoConsumeAccelerator = enableFifoConsumeAccelerator;
123+
return this;
124+
}
125+
116126
/**
117127
* @see PushConsumerBuilder#build()
118128
*/
@@ -124,7 +134,7 @@ public PushConsumer build() throws ClientException {
124134
checkArgument(!subscriptionExpressions.isEmpty(), "subscriptionExpressions have not been set yet");
125135
final PushConsumerImpl pushConsumer = new PushConsumerImpl(clientConfiguration, consumerGroup,
126136
subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes,
127-
consumptionThreadCount);
137+
consumptionThreadCount, enableFifoConsumeAccelerator);
128138
pushConsumer.startAsync().awaitRunning();
129139
return pushConsumer;
130140
}

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
102102
private final MessageListener messageListener;
103103
private final int maxCacheMessageCount;
104104
private final int maxCacheMessageSizeInBytes;
105+
private final boolean enableFifoConsumeAccelerator;
105106

106107
/**
107108
* Indicates the times of message reception.
@@ -124,7 +125,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
124125
*/
125126
public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup,
126127
Map<String, FilterExpression> subscriptionExpressions, MessageListener messageListener,
127-
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) {
128+
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount,
129+
boolean enableFifoConsumeAccelerator) {
128130
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
129131
this.clientConfiguration = clientConfiguration;
130132
Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
@@ -136,6 +138,7 @@ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumer
136138
this.messageListener = messageListener;
137139
this.maxCacheMessageCount = maxCacheMessageCount;
138140
this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
141+
this.enableFifoConsumeAccelerator = enableFifoConsumeAccelerator;
139142

140143
this.receptionTimes = new AtomicLong(0);
141144
this.receivedMessagesQuantity = new AtomicLong(0);
@@ -153,6 +156,13 @@ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumer
153156
new ThreadFactoryImpl("MessageConsumption", this.getClientId().getIndex()));
154157
}
155158

159+
public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup,
160+
Map<String, FilterExpression> subscriptionExpressions, MessageListener messageListener,
161+
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) {
162+
this(clientConfiguration, consumerGroup, subscriptionExpressions, messageListener, maxCacheMessageCount,
163+
maxCacheMessageSizeInBytes, consumptionThreadCount, true);
164+
}
165+
156166
@Override
157167
protected void startUp() throws Exception {
158168
try {
@@ -193,8 +203,10 @@ protected void shutDown() throws InterruptedException {
193203
private ConsumeService createConsumeService() {
194204
final ScheduledExecutorService scheduler = this.getClientManager().getScheduler();
195205
if (pushSubscriptionSettings.isFifo()) {
196-
log.info("Create FIFO consume service, consumerGroup={}, clientId={}", consumerGroup, clientId);
197-
return new FifoConsumeService(clientId, messageListener, consumptionExecutor, this, scheduler);
206+
log.info("Create FIFO consume service, consumerGroup={}, clientId={}, enableFifoConsumeAccelerator={}",
207+
consumerGroup, clientId, enableFifoConsumeAccelerator);
208+
return new FifoConsumeService(clientId, messageListener, consumptionExecutor, this,
209+
scheduler, enableFifoConsumeAccelerator);
198210
}
199211
log.info("Create standard consume service, consumerGroup={}, clientId={}", consumerGroup, clientId);
200212
return new StandardConsumeService(clientId, messageListener, consumptionExecutor, this, scheduler);

0 commit comments

Comments
 (0)