Skip to content

Commit 20757cc

Browse files
authored
[ISSUE #816] Fix ack fail when Java Pushconsumer shutdown (#992)
1 parent c246dff commit 20757cc

File tree

12 files changed

+146
-47
lines changed

12 files changed

+146
-47
lines changed

.github/workflows/csharp_build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
strategy:
99
fail-fast: false
1010
matrix:
11-
os: [ubuntu-20.04, macos-latest, windows-2022]
11+
os: [ ubuntu-22.04, macos-latest, windows-2022 ]
1212
steps:
1313
- name: Checkout
1414
uses: actions/checkout@v3

.github/workflows/golang_build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
strategy:
99
fail-fast: false
1010
matrix:
11-
os: [ubuntu-20.04, windows-2022]
11+
os: [ ubuntu-22.04, windows-2022 ]
1212
go: [1.17]
1313
steps:
1414
- name: Checkout

.github/workflows/php_build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ jobs:
99
fail-fast: false
1010
matrix:
1111
php-version: ["7.4", "8.0", "8.1"]
12-
os: [ubuntu-20.04, macos-11, windows-2022]
12+
os: [ ubuntu-22.04, macos-11, windows-2022 ]
1313
steps:
1414
- name: Checkout
1515
uses: actions/checkout@v2

csharp/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ dotnet add package RocketMQ.Client
3838

3939
You can obtain the latest version of `RocketMQ.Client`
4040
from [NuGet Gallery](https://www.nuget.org/packages/RocketMQ.Client). To assist with getting started quickly and working
41-
with various message types and clients, we offer examples [here](./examples).
41+
with various message types and clients, we offer [examples](./examples).
4242

4343
Layout of this project roughly
4444
follows [this guide](https://docs.microsoft.com/en-us/dotnet/core/tutorials/library-with-visual-studio-code?pivots=dotnet-5-0)

docs/design.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ Message identifier provides identity information for each message stored in brok
6666

6767
>**Note**: Internal retries during message publishing may cause message duplication, the duplicate messages here have the same message identifier.
6868
69-
The message identifier layout is redesigned, see more details [here](./message_id.md).
69+
The message identifier layout is redesigned, see more details in [message_id.md](./message_id.md).
7070

7171
## New and Unified APIs
7272

java/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ English | [简体中文](README-CN.md) | [RocketMQ Website](https://rocketmq.apa
88

99
Here is the java implementation of the client for [Apache RocketMQ](https://rocketmq.apache.org/). Different from the [remoting-based client](https://github.com/apache/rocketmq/tree/develop/client), the current implementation is based on separating architecture for computing and storage, which is the more recommended way to access the RocketMQ service.
1010

11-
Here are some preparations you may need to know (or refer to [here](https://rocketmq.apache.org/docs/quickStart/02quickstart/)).
11+
Here are some preparations you may need to know (or refer
12+
to [quick start](https://rocketmq.apache.org/docs/quickStart/02quickstart/)).
1213

1314
1. Java 8+ for runtime, Java 11+ for the build;
1415
2. Setup namesrv, broker, and [proxy](https://github.com/apache/rocketmq/tree/develop/proxy).
@@ -57,7 +58,8 @@ implementation("org.apache.rocketmq:rocketmq-client-java-noshade:${rocketmq.vers
5758
implementation 'org.apache.rocketmq:rocketmq-client-java-noshade:${rocketmq.version}'
5859
```
5960

60-
More code examples are provided [here](./client/src/main/java/org/apache/rocketmq/client/java/example) to assist you in working with various clients and different message types.
61+
More code examples are provided [example](./client/src/main/java/org/apache/rocketmq/client/java/example) to assist you
62+
in working with various clients and different message types.
6163

6264
## Logging System
6365

java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.rocketmq.client.java.hook;
1919

20+
import java.util.ArrayList;
2021
import java.util.HashMap;
2122
import java.util.List;
2223
import java.util.Map;
@@ -29,10 +30,10 @@ public class CompositedMessageInterceptor implements MessageInterceptor {
2930
private static final Logger log = LoggerFactory.getLogger(MessageInterceptor.class);
3031
private static final AttributeKey<Map<Integer, Map<AttributeKey, Attribute>>> INTERCEPTOR_ATTRIBUTES_KEY =
3132
AttributeKey.create("composited_interceptor_attributes");
32-
private final List<MessageInterceptor> interceptors;
33+
private final List<MessageInterceptor> interceptors = new ArrayList<>();
3334

3435
public CompositedMessageInterceptor(List<MessageInterceptor> interceptors) {
35-
this.interceptors = interceptors;
36+
this.interceptors.addAll(interceptors);
3637
}
3738

3839
@Override
@@ -72,4 +73,8 @@ public void doAfter(MessageInterceptorContext context0, List<GeneralMessage> mes
7273
}
7374
}
7475
}
76+
77+
public void addInterceptor(MessageInterceptor interceptor) {
78+
interceptors.add(interceptor);
79+
}
7580
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.java.hook;
19+
20+
import java.util.List;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
import org.apache.rocketmq.client.java.message.GeneralMessage;
23+
24+
public class InflightRequestCountInterceptor implements MessageInterceptor {
25+
private final AtomicLong inflightReceiveRequestCount = new AtomicLong(0);
26+
27+
@Override
28+
public void doBefore(MessageInterceptorContext context, List<GeneralMessage> messages) {
29+
if (context.getMessageHookPoints() == MessageHookPoints.RECEIVE) {
30+
inflightReceiveRequestCount.incrementAndGet();
31+
}
32+
}
33+
34+
@Override
35+
public void doAfter(MessageInterceptorContext context, List<GeneralMessage> messages) {
36+
if (context.getMessageHookPoints() == MessageHookPoints.RECEIVE) {
37+
inflightReceiveRequestCount.decrementAndGet();
38+
}
39+
}
40+
41+
public long getInflightReceiveRequestCount() {
42+
return inflightReceiveRequestCount.get();
43+
}
44+
}

java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,12 @@ protected void shutDown() throws InterruptedException {
229229
log.info("Shutdown the rocketmq client successfully, clientId={}", clientId);
230230
}
231231

232+
protected void addMessageInterceptor(MessageInterceptor messageInterceptor) {
233+
if (!this.isRunning()) {
234+
compositedMessageInterceptor.addInterceptor(messageInterceptor);
235+
}
236+
}
237+
232238
@Override
233239
public void doBefore(MessageInterceptorContext context, List<GeneralMessage> generalMessages) {
234240
try {

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -246,46 +246,46 @@ private void receiveMessageImmediately(String attemptId) {
246246
final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
247247
longPollingTimeout);
248248
Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
249-
@Override
250-
public void onSuccess(ReceiveMessageResult result) {
251-
// Intercept after message reception.
252-
final List<GeneralMessage> generalMessages = result.getMessageViewImpls().stream()
253-
.map((Function<MessageView, GeneralMessage>) GeneralMessageImpl::new)
254-
.collect(Collectors.toList());
255-
final MessageInterceptorContextImpl context0 =
256-
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);
257-
consumer.doAfter(context0, generalMessages);
258-
259-
try {
260-
onReceiveMessageResult(result);
261-
} catch (Throwable t) {
262-
// Should never reach here.
263-
log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
264-
+ "clientId={}", mq, endpoints, clientId, t);
265-
onReceiveMessageException(t, attemptId);
249+
@Override
250+
public void onSuccess(ReceiveMessageResult result) {
251+
// Intercept after message reception.
252+
final List<GeneralMessage> generalMessages = result.getMessageViewImpls().stream()
253+
.map((Function<MessageView, GeneralMessage>) GeneralMessageImpl::new)
254+
.collect(Collectors.toList());
255+
final MessageInterceptorContextImpl context0 =
256+
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);
257+
consumer.doAfter(context0, generalMessages);
258+
259+
try {
260+
onReceiveMessageResult(result);
261+
} catch (Throwable t) {
262+
// Should never reach here.
263+
log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
264+
+ "clientId={}", mq, endpoints, clientId, t);
265+
onReceiveMessageException(t, attemptId);
266+
}
266267
}
267-
}
268268

269-
@Override
270-
public void onFailure(Throwable t) {
271-
String nextAttemptId = null;
272-
if (t instanceof StatusRuntimeException) {
273-
StatusRuntimeException exception = (StatusRuntimeException) t;
274-
if (io.grpc.Status.DEADLINE_EXCEEDED.getCode() == exception.getStatus().getCode()) {
275-
nextAttemptId = request.getAttemptId();
269+
@Override
270+
public void onFailure(Throwable t) {
271+
String nextAttemptId = null;
272+
if (t instanceof StatusRuntimeException) {
273+
StatusRuntimeException exception = (StatusRuntimeException) t;
274+
if (io.grpc.Status.DEADLINE_EXCEEDED.getCode() == exception.getStatus().getCode()) {
275+
nextAttemptId = request.getAttemptId();
276+
}
276277
}
278+
// Intercept after message reception.
279+
final MessageInterceptorContextImpl context0 =
280+
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);
281+
consumer.doAfter(context0, Collections.emptyList());
282+
283+
log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +
284+
"nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,
285+
clientId, t);
286+
onReceiveMessageException(t, nextAttemptId);
277287
}
278-
// Intercept after message reception.
279-
final MessageInterceptorContextImpl context0 =
280-
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);
281-
consumer.doAfter(context0, Collections.emptyList());
282-
283-
log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +
284-
"nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,
285-
clientId, t);
286-
onReceiveMessageException(t, nextAttemptId);
287-
}
288-
}, MoreExecutors.directExecutor());
288+
}, MoreExecutors.directExecutor());
289289
receptionTimes.getAndIncrement();
290290
consumer.getReceptionTimes().getAndIncrement();
291291
} catch (Throwable t) {

0 commit comments

Comments
 (0)