Skip to content

Commit ac19e79

Browse files
authored
[ISSUE #1133] The java client adds the maxStartupAttempts parameter to configure the retry times at startup (#1134)
1 parent 90d9f15 commit ac19e79

File tree

5 files changed

+48
-12
lines changed

5 files changed

+48
-12
lines changed

.github/workflows/java_build.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ jobs:
2828
steps:
2929
- name: Checkout Current Repository
3030
uses: actions/checkout@v3
31-
# Use JDK 17.
32-
- name: Use JDK 17
31+
# Use JDK 21.
32+
- name: Use JDK 21
3333
uses: actions/setup-java@v3
3434
with:
35-
java-version: 17
35+
java-version: 21
3636
distribution: "adopt"
3737
# Build the code of the current repository, skipping tests and code style checks.
3838
- name: Build Current Repository
@@ -66,10 +66,10 @@ jobs:
6666
run: |
6767
sed -i 's/org\.apache\.rocketmq:rocketmq-client-java:[^"]*/org.apache.rocketmq:rocketmq-client-java:${{ steps.get_version.outputs.version }}/' instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/build.gradle.kts
6868
# Use JDK 17.
69-
- name: Use JDK 17
69+
- name: Use JDK 21
7070
uses: actions/setup-java@v3
7171
with:
72-
java-version: 17
72+
java-version: 21
7373
distribution: "adopt"
7474
cache: gradle
7575
# Build the rocketmq opentelemetry test.

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,20 @@ public class ClientConfiguration {
2929
private final Duration requestTimeout;
3030
private final boolean sslEnabled;
3131
private final String namespace;
32+
private final int maxStartupAttempts;
3233

3334
/**
3435
* The caller is supposed to have validated the arguments and handled throwing exceptions or
3536
* logging warnings already, so we avoid repeating args check here.
3637
*/
3738
ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider,
38-
Duration requestTimeout, boolean sslEnabled, String namespace) {
39+
Duration requestTimeout, boolean sslEnabled, String namespace, int maxStartupAttempts) {
3940
this.endpoints = endpoints;
4041
this.sessionCredentialsProvider = sessionCredentialsProvider;
4142
this.requestTimeout = requestTimeout;
4243
this.sslEnabled = sslEnabled;
4344
this.namespace = namespace;
45+
this.maxStartupAttempts = maxStartupAttempts;
4446
}
4547

4648
public static ClientConfigurationBuilder newBuilder() {
@@ -66,4 +68,8 @@ public boolean isSslEnabled() {
6668
public String getNamespace() {
6769
return namespace;
6870
}
71+
72+
public int getMaxStartupAttempts() {
73+
return maxStartupAttempts;
74+
}
6975
}

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.rocketmq.client.apis;
1919

20+
import static com.google.common.base.Preconditions.checkArgument;
2021
import static com.google.common.base.Preconditions.checkNotNull;
2122

2223
import java.time.Duration;
@@ -32,6 +33,7 @@ public class ClientConfigurationBuilder {
3233
private Duration requestTimeout = Duration.ofSeconds(3);
3334
private boolean sslEnabled = true;
3435
private String namespace = "";
36+
private int maxStartupAttempts = 3;
3537

3638
/**
3739
* Configure the access point with which the SDK should communicate.
@@ -93,6 +95,18 @@ public ClientConfigurationBuilder setNamespace(String namespace) {
9395
return this;
9496
}
9597

98+
/**
99+
* Configure maxStartupAttempts for client
100+
*
101+
* @param maxStartupAttempts max attempt times when client startup
102+
* @return The {@link ClientConfigurationBuilder} instance, to allow for method chaining.
103+
*/
104+
public ClientConfigurationBuilder setMaxStartupAttempts(int maxStartupAttempts) {
105+
checkArgument(maxStartupAttempts > 0, "maxStartupAttempts should more than 0");
106+
this.maxStartupAttempts = maxStartupAttempts;
107+
return this;
108+
}
109+
96110
/**
97111
* Finalize the build of {@link ClientConfiguration}.
98112
*
@@ -101,6 +115,7 @@ public ClientConfigurationBuilder setNamespace(String namespace) {
101115
public ClientConfiguration build() {
102116
checkNotNull(endpoints, "endpoints should not be null");
103117
checkNotNull(requestTimeout, "requestTimeout should not be null");
104-
return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled, namespace);
118+
return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled, namespace,
119+
maxStartupAttempts);
105120
}
106121
}

java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ private static Producer buildProducer(TransactionChecker checker, String... topi
5454
// On some Windows platforms, you may encounter SSL compatibility issues. Try turning off the SSL option in
5555
// client configuration to solve the problem please if SSL is not essential.
5656
// .enableSsl(false)
57+
// Due to the lazy loading of gRPC, when the network conditions are poor or the load of the application
58+
// at startup is high, the first startup may fail, and you can try multiple startups.
59+
// .setMaxStartupAttempts(3)
5760
.setCredentialProvider(sessionCredentialsProvider)
5861
.build();
5962
final ProducerBuilder builder = provider.newProducerBuilder()

java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,24 @@ protected void startUp() throws Exception {
186186
// Fetch topic route from remote.
187187
log.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}",
188188
clientId, topics);
189-
for (String topic : topics) {
190-
final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic);
191-
future.get();
189+
for (int attempt = 1; attempt <= clientConfiguration.getMaxStartupAttempts(); attempt++) {
190+
try {
191+
for (String topic : topics) {
192+
final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic);
193+
future.get();
194+
}
195+
log.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}",
196+
clientId, topics);
197+
break;
198+
} catch (Exception e) {
199+
log.error("Fetch topics failed when client start, clientId={}, topics={}, attemptTime={}", clientId,
200+
topics, attempt, e);
201+
if (attempt == clientConfiguration.getMaxStartupAttempts()) {
202+
throw new RuntimeException(
203+
String.format("Failed to fetch topics after %d attempts", attempt), e);
204+
}
205+
}
192206
}
193-
log.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}",
194-
clientId, topics);
195207
// Update route cache periodically.
196208
final ScheduledExecutorService scheduler = clientManager.getScheduler();
197209
this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {

0 commit comments

Comments
 (0)