Skip to content

Commit ea7c20e

Browse files
genussST-DDT
andauthored
Request connection on startup (#377)
* Check connection on startup * Fix spotless errors * Replace boolean field with duration * Add tests for immediate connect * Add copyright * Naming and timeout (1s is not enough in CI perhaps?) * Show logs in CI * Fix test * Fix test * Wait for READY state * Rearrange code * Check if channel moved to READY state too fast * Turn off logging as it was * Fix review * Fix spotless Co-authored-by: ST-DDT <[email protected]>
1 parent c2c3ee7 commit ea7c20e

File tree

3 files changed

+202
-0
lines changed

3 files changed

+202
-0
lines changed

grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/channelfactory/AbstractChannelFactory.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
import static java.util.Objects.requireNonNull;
2121

22+
import java.time.Duration;
2223
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.CountDownLatch;
2628
import java.util.concurrent.TimeUnit;
2729

2830
import javax.annotation.PreDestroy;
@@ -132,6 +134,10 @@ protected ManagedChannel newManagedChannel(final String name) {
132134
final T builder = newChannelBuilder(name);
133135
configure(builder, name);
134136
final ManagedChannel channel = builder.build();
137+
final Duration timeout = properties.getChannel(name).getImmediateConnectTimeout();
138+
if (!timeout.isZero()) {
139+
connectOnStartup(name, channel, timeout);
140+
}
135141
watchConnectivityState(name, channel);
136142
return channel;
137143
}
@@ -254,6 +260,36 @@ protected void watchConnectivityState(final String name, final ManagedChannel ch
254260
}
255261
}
256262

263+
private void connectOnStartup(String name, ManagedChannel channel, Duration timeout) {
264+
log.debug("Initiating connection to channel {}", name);
265+
channel.getState(true);
266+
267+
final CountDownLatch readyLatch = new CountDownLatch(1);
268+
waitForReady(channel, readyLatch);
269+
boolean connected;
270+
try {
271+
log.debug("Waiting for connection to channel {}", name);
272+
connected = !readyLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
273+
} catch (InterruptedException e) {
274+
Thread.currentThread().interrupt();
275+
connected = false;
276+
}
277+
if (connected) {
278+
throw new IllegalStateException("Can't connect to channel " + name);
279+
}
280+
log.info("Successfully connected to channel {}", name);
281+
}
282+
283+
private void waitForReady(ManagedChannel channel, CountDownLatch readySignal) {
284+
final ConnectivityState state = channel.getState(false);
285+
log.debug("Waiting for ready state. Currently in {}", state);
286+
if (state == ConnectivityState.READY) {
287+
readySignal.countDown();
288+
} else {
289+
channel.notifyWhenStateChanged(state, () -> waitForReady(channel, readySignal));
290+
}
291+
}
292+
257293
/**
258294
* Closes this channel factory and the channels created by this instance. The shutdown happens in two phases, first
259295
* an orderly shutdown is initiated on all channels and then the method waits for all channels to terminate. If the

grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/config/GrpcChannelProperties.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,38 @@ public void setNegotiationType(final NegotiationType negotiationType) {
361361

362362
// --------------------------------------------------
363363

364+
private Duration immediateConnectTimeout;
365+
private static final Duration DEFAULT_IMMEDIATE_CONNECT = Duration.ZERO;
366+
367+
/**
368+
* Get the connection timeout at application startup.
369+
*
370+
* @return connection timeout at application startup.
371+
*
372+
* @see #setImmediateConnectTimeout(Duration)
373+
*/
374+
public Duration getImmediateConnectTimeout() {
375+
return this.immediateConnectTimeout == null ? DEFAULT_IMMEDIATE_CONNECT : this.immediateConnectTimeout;
376+
}
377+
378+
/**
379+
* If set to a positive duration instructs a client to connect to GRPC-endpoint when GRPC stub is created. If it's
380+
* set to a positive timeout application startup will be slower due to connection process will be executed
381+
* synchronously with maximum to connection timeout. If connection fails stub will fail to create with an exception
382+
* which in turn causes context startup to If connection fails stub will fail to create with an exception which in
383+
* turn causes context fail. Defaults to false.
384+
*
385+
* @param immediateConnectTimeout Connection timeout at application startup.
386+
*/
387+
public void setImmediateConnectTimeout(final Duration immediateConnectTimeout) {
388+
if (immediateConnectTimeout.isNegative()) {
389+
throw new IllegalArgumentException("Timeout can't be negative");
390+
}
391+
this.immediateConnectTimeout = immediateConnectTimeout;
392+
}
393+
394+
// --------------------------------------------------
395+
364396
private final Security security = new Security();
365397

366398
/**
@@ -409,6 +441,9 @@ public void copyDefaultsFrom(final GrpcChannelProperties config) {
409441
if (this.negotiationType == null) {
410442
this.negotiationType = config.negotiationType;
411443
}
444+
if (this.immediateConnectTimeout == null) {
445+
this.immediateConnectTimeout = config.immediateConnectTimeout;
446+
}
412447
this.security.copyDefaultsFrom(config.security);
413448
}
414449

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright (c) 2016-2020 Michael Zhang <[email protected]>
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
7+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package net.devh.boot.grpc.test.setup;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
23+
24+
import org.junit.jupiter.api.Test;
25+
import org.springframework.boot.test.context.SpringBootTest;
26+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
27+
import org.springframework.test.annotation.DirtiesContext;
28+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
29+
30+
import io.grpc.Channel;
31+
import io.grpc.ConnectivityState;
32+
import io.grpc.ManagedChannel;
33+
import lombok.extern.slf4j.Slf4j;
34+
import net.devh.boot.grpc.client.config.GrpcChannelProperties;
35+
import net.devh.boot.grpc.client.inject.GrpcClient;
36+
import net.devh.boot.grpc.test.config.BaseAutoConfiguration;
37+
import net.devh.boot.grpc.test.config.ServiceConfiguration;
38+
39+
/**
40+
* These tests check the property {@link GrpcChannelProperties#getImmediateConnectTimeout()}. They check for
41+
* backwards-compatibility when this property didn't existed and various cases when it's enabled (for successful
42+
* connection and failed one).
43+
*/
44+
public class ImmediateConnectTests {
45+
46+
@Slf4j
47+
@SpringBootTest(properties = {
48+
"grpc.client.GLOBAL.address=localhost:9090",
49+
"grpc.client.GLOBAL.negotiationType=PLAINTEXT",
50+
"grpc.client.GLOBAL.immediateConnectTimeout=10s",
51+
})
52+
@SpringJUnitConfig(classes = {ServiceConfiguration.class, BaseAutoConfiguration.class})
53+
@DirtiesContext
54+
static class ImmediateConnectEnabledAndSuccessfulTest extends AbstractSimpleServerClientTest {
55+
56+
ImmediateConnectEnabledAndSuccessfulTest() {
57+
log.info("--- ImmediateConnectEnabledAnsSuccessfulTest ---");
58+
}
59+
60+
@Test
61+
@DirtiesContext
62+
void immediateConnectEnabledAndSuccessful() {
63+
assumeTrue(channel instanceof ManagedChannel,
64+
"To run this test channel must be ManagedChannel");
65+
ManagedChannel managedChannel = (ManagedChannel) channel;
66+
67+
ConnectivityState state = managedChannel.getState(false);
68+
assertEquals(
69+
"When immediateConnectTimeout property is set to positive duration channel must be in READY state if connection was successful",
70+
ConnectivityState.READY, state);
71+
}
72+
}
73+
74+
@Slf4j
75+
@SpringBootTest(properties = {
76+
"grpc.client.GLOBAL.address=localhost:9090",
77+
"grpc.client.GLOBAL.negotiationType=PLAINTEXT",
78+
})
79+
@SpringJUnitConfig(classes = {ServiceConfiguration.class, BaseAutoConfiguration.class})
80+
@DirtiesContext
81+
static class ImmediateConnectDisabledTest extends AbstractSimpleServerClientTest {
82+
83+
ImmediateConnectDisabledTest() {
84+
log.info("--- ImmediateConnectDisabledTest ---");
85+
}
86+
87+
@Test
88+
@DirtiesContext
89+
void immediateConnectDisabled() {
90+
assumeTrue(channel instanceof ManagedChannel,
91+
"To run this test channel must be ManagedChannel");
92+
ManagedChannel managedChannel = (ManagedChannel) channel;
93+
94+
ConnectivityState state = managedChannel.getState(false);
95+
assertEquals(
96+
"When immediateConnectTimeout property is set to zero or unset grpc must not attempt to connect until first request",
97+
ConnectivityState.IDLE, state);
98+
}
99+
}
100+
101+
@Slf4j
102+
static class ImmediateConnectEnabledAndFailedToConnectTest {
103+
104+
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
105+
.withPropertyValues(
106+
"grpc.client.GLOBAL.address=localhost:9999",
107+
"grpc.client.GLOBAL.negotiationType=PLAINTEXT",
108+
"grpc.client.GLOBAL.immediateConnectTimeout=1s")
109+
.withUserConfiguration(ServiceConfiguration.class, BaseAutoConfiguration.class)
110+
.withBean(FailedChannelHolder.class);
111+
112+
public ImmediateConnectEnabledAndFailedToConnectTest() {
113+
log.info("--- ImmediateConnectEnabledAndFailedToConnectTest ---");
114+
}
115+
116+
@Test
117+
void immediateConnectEnabledAndFailedToConnect() {
118+
contextRunner.run(context -> {
119+
assertThat(context).hasFailed();
120+
assertThat(context.getStartupFailure())
121+
.getCause()
122+
.isOfAnyClassIn(IllegalStateException.class);
123+
});
124+
}
125+
126+
private static class FailedChannelHolder {
127+
@GrpcClient("test")
128+
private Channel channel;
129+
}
130+
}
131+
}

0 commit comments

Comments
 (0)