Skip to content

Commit a7997cc

Browse files
authored
Merge branch 'master' into feat/add_http_header_size_config
2 parents 6fdcffd + ea7c20e commit a7997cc

File tree

10 files changed

+453
-44
lines changed

10 files changed

+453
-44
lines changed

grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/autoconfigure/GrpcClientAutoConfiguration.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646
import net.devh.boot.grpc.client.interceptor.AnnotationGlobalClientInterceptorConfigurer;
4747
import net.devh.boot.grpc.client.interceptor.GlobalClientInterceptorRegistry;
4848
import net.devh.boot.grpc.client.nameresolver.NameResolverRegistration;
49+
import net.devh.boot.grpc.client.stubfactory.AsyncStubFactory;
50+
import net.devh.boot.grpc.client.stubfactory.BlockingStubFactory;
51+
import net.devh.boot.grpc.client.stubfactory.FutureStubFactory;
4952
import net.devh.boot.grpc.common.autoconfigure.GrpcCommonCodecAutoConfiguration;
5053

5154
/**
@@ -65,6 +68,21 @@ static GrpcClientBeanPostProcessor grpcClientBeanPostProcessor(final Application
6568
return new GrpcClientBeanPostProcessor(applicationContext);
6669
}
6770

71+
@Bean
72+
AsyncStubFactory asyncStubFactory() {
73+
return new AsyncStubFactory();
74+
}
75+
76+
@Bean
77+
BlockingStubFactory blockingStubFactory() {
78+
return new BlockingStubFactory();
79+
}
80+
81+
@Bean
82+
FutureStubFactory futureStubFactory() {
83+
return new FutureStubFactory();
84+
}
85+
6886
@ConditionalOnMissingBean
6987
@Bean
7088
GrpcChannelsProperties grpcChannelsProperties() {

grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/channelfactory/AbstractChannelFactory.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
import static java.util.Objects.requireNonNull;
2121

22+
import java.time.Duration;
2223
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.CountDownLatch;
2628
import java.util.concurrent.TimeUnit;
2729

2830
import javax.annotation.PreDestroy;
@@ -132,6 +134,10 @@ protected ManagedChannel newManagedChannel(final String name) {
132134
final T builder = newChannelBuilder(name);
133135
configure(builder, name);
134136
final ManagedChannel channel = builder.build();
137+
final Duration timeout = properties.getChannel(name).getImmediateConnectTimeout();
138+
if (!timeout.isZero()) {
139+
connectOnStartup(name, channel, timeout);
140+
}
135141
watchConnectivityState(name, channel);
136142
return channel;
137143
}
@@ -254,6 +260,36 @@ protected void watchConnectivityState(final String name, final ManagedChannel ch
254260
}
255261
}
256262

263+
private void connectOnStartup(String name, ManagedChannel channel, Duration timeout) {
264+
log.debug("Initiating connection to channel {}", name);
265+
channel.getState(true);
266+
267+
final CountDownLatch readyLatch = new CountDownLatch(1);
268+
waitForReady(channel, readyLatch);
269+
boolean connected;
270+
try {
271+
log.debug("Waiting for connection to channel {}", name);
272+
connected = !readyLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
273+
} catch (InterruptedException e) {
274+
Thread.currentThread().interrupt();
275+
connected = false;
276+
}
277+
if (connected) {
278+
throw new IllegalStateException("Can't connect to channel " + name);
279+
}
280+
log.info("Successfully connected to channel {}", name);
281+
}
282+
283+
private void waitForReady(ManagedChannel channel, CountDownLatch readySignal) {
284+
final ConnectivityState state = channel.getState(false);
285+
log.debug("Waiting for ready state. Currently in {}", state);
286+
if (state == ConnectivityState.READY) {
287+
readySignal.countDown();
288+
} else {
289+
channel.notifyWhenStateChanged(state, () -> waitForReady(channel, readySignal));
290+
}
291+
}
292+
257293
/**
258294
* Closes this channel factory and the channels created by this instance. The shutdown happens in two phases, first
259295
* an orderly shutdown is initiated on all channels and then the method waits for all channels to terminate. If the

grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/config/GrpcChannelProperties.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,38 @@ public void setNegotiationType(final NegotiationType negotiationType) {
361361

362362
// --------------------------------------------------
363363

364+
private Duration immediateConnectTimeout;
365+
private static final Duration DEFAULT_IMMEDIATE_CONNECT = Duration.ZERO;
366+
367+
/**
368+
* Get the connection timeout at application startup.
369+
*
370+
* @return connection timeout at application startup.
371+
*
372+
* @see #setImmediateConnectTimeout(Duration)
373+
*/
374+
public Duration getImmediateConnectTimeout() {
375+
return this.immediateConnectTimeout == null ? DEFAULT_IMMEDIATE_CONNECT : this.immediateConnectTimeout;
376+
}
377+
378+
/**
379+
* If set to a positive duration instructs a client to connect to GRPC-endpoint when GRPC stub is created. If it's
380+
* set to a positive timeout application startup will be slower due to connection process will be executed
381+
* synchronously with maximum to connection timeout. If connection fails stub will fail to create with an exception
382+
* which in turn causes context startup to If connection fails stub will fail to create with an exception which in
383+
* turn causes context fail. Defaults to false.
384+
*
385+
* @param immediateConnectTimeout Connection timeout at application startup.
386+
*/
387+
public void setImmediateConnectTimeout(final Duration immediateConnectTimeout) {
388+
if (immediateConnectTimeout.isNegative()) {
389+
throw new IllegalArgumentException("Timeout can't be negative");
390+
}
391+
this.immediateConnectTimeout = immediateConnectTimeout;
392+
}
393+
394+
// --------------------------------------------------
395+
364396
private final Security security = new Security();
365397

366398
/**
@@ -409,6 +441,9 @@ public void copyDefaultsFrom(final GrpcChannelProperties config) {
409441
if (this.negotiationType == null) {
410442
this.negotiationType = config.negotiationType;
411443
}
444+
if (this.immediateConnectTimeout == null) {
445+
this.immediateConnectTimeout = config.immediateConnectTimeout;
446+
}
412447
this.security.copyDefaultsFrom(config.security);
413448
}
414449

grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/inject/GrpcClientBeanPostProcessor.java

Lines changed: 28 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import static java.util.Objects.requireNonNull;
2121

22-
import java.lang.reflect.Constructor;
2322
import java.lang.reflect.Field;
2423
import java.lang.reflect.Member;
2524
import java.lang.reflect.Method;
@@ -41,12 +40,10 @@
4140

4241
import io.grpc.Channel;
4342
import io.grpc.ClientInterceptor;
44-
import io.grpc.stub.AbstractAsyncStub;
45-
import io.grpc.stub.AbstractBlockingStub;
46-
import io.grpc.stub.AbstractFutureStub;
4743
import io.grpc.stub.AbstractStub;
4844
import net.devh.boot.grpc.client.channelfactory.GrpcChannelFactory;
4945
import net.devh.boot.grpc.client.nameresolver.NameResolverRegistration;
46+
import net.devh.boot.grpc.client.stubfactory.StubFactory;
5047

5148
/**
5249
* This {@link BeanPostProcessor} searches for fields and methods in beans that are annotated with {@link GrpcClient}
@@ -63,6 +60,7 @@ public class GrpcClientBeanPostProcessor implements BeanPostProcessor {
6360
// which could lead to problems with the correct bean setup.
6461
private GrpcChannelFactory channelFactory = null;
6562
private List<StubTransformer> stubTransformers = null;
63+
private List<StubFactory> stubFactories = null;
6664

6765
/**
6866
* Creates a new GrpcClientBeanPostProcessor with the given ApplicationContext.
@@ -217,7 +215,8 @@ protected <T> T valueForMember(final String name, final Member injectionTarget,
217215
} else if (AbstractStub.class.isAssignableFrom(injectionType)) {
218216

219217
@SuppressWarnings("unchecked") // Eclipse incorrectly marks this as not required
220-
AbstractStub<?> stub = createStub(injectionType.asSubclass(AbstractStub.class), channel);
218+
AbstractStub<?> stub = createStub(
219+
(Class<? extends AbstractStub<?>>) injectionType.asSubclass(AbstractStub.class), channel);
221220
for (final StubTransformer stubTransformer : getStubTransformers()) {
222221
stub = stubTransformer.transform(name, stub);
223222
}
@@ -229,53 +228,38 @@ protected <T> T valueForMember(final String name, final Member injectionTarget,
229228
}
230229

231230
/**
232-
* Creates a stub of the given type.
233-
*
234-
* @param <T> The type of the instance to be injected.
235-
* @param stubType The type of the stub to create.
236-
* @param channel The channel used to create the stub.
237-
* @return The newly created stub.
238-
*
239-
* @throws BeanInstantiationException If the stub couldn't be created.
231+
* Creates a stub instance for the specified stub type using the resolved {@link StubFactory}.
232+
*
233+
* @param stubClass The stub class that needs to be created.
234+
* @param channel The gRPC channel associated with the created stub, passed as a parameter to the stub factory.
235+
* @throws BeanInstantiationException If the stub couldn't be created, either because the type isn't supported or
236+
* because of a failure in creation.
237+
* @return A newly created gRPC stub.
240238
*/
241-
protected <T extends AbstractStub<T>> T createStub(final Class<T> stubType, final Channel channel) {
239+
private AbstractStub<?> createStub(Class<? extends AbstractStub<?>> stubClass, Channel channel) {
240+
final StubFactory factory = getStubFactories().stream()
241+
.filter(stubFactory -> stubFactory.isApplicable(stubClass))
242+
.findFirst()
243+
.orElseThrow(() -> new BeanInstantiationException(stubClass,
244+
"Unsupported stub type: " + stubClass.getName() + " -> Please report this issue."));
245+
242246
try {
243-
// First try the public static factory method
244-
final String methodName = deriveStubFactoryMethodName(stubType);
245-
final Class<?> enclosingClass = stubType.getEnclosingClass();
246-
final Method factoryMethod = enclosingClass.getMethod(methodName, Channel.class);
247-
return stubType.cast(factoryMethod.invoke(null, channel));
248-
} catch (final Exception e) {
249-
try {
250-
// Use the private constructor as backup
251-
final Constructor<T> constructor = stubType.getDeclaredConstructor(Channel.class);
252-
constructor.setAccessible(true);
253-
return constructor.newInstance(channel);
254-
} catch (final Exception e1) {
255-
e.addSuppressed(e1);
256-
}
257-
throw new BeanInstantiationException(stubType, "Failed to create gRPC client", e);
247+
return factory.createStub(stubClass, channel);
248+
} catch (Exception exception) {
249+
throw new BeanInstantiationException(stubClass, "Failed to create gRPC stub of type " + stubClass.getName(),
250+
exception);
258251
}
259252
}
260253

261254
/**
262-
* Derives the name of the factory method from the given stub type.
255+
* Lazy getter for the list of defined {@link StubFactory} beans.
263256
*
264-
* @param stubType The type of the stub to get it for.
265-
* @return The name of the factory method.
266-
* @throws IllegalArgumentException If the method was called with an unsupported stub type.
257+
* @return A list of all defined {@link StubFactory} beans.
267258
*/
268-
protected String deriveStubFactoryMethodName(final Class<? extends AbstractStub<?>> stubType) {
269-
if (AbstractAsyncStub.class.isAssignableFrom(stubType)) {
270-
return "newStub";
271-
} else if (AbstractBlockingStub.class.isAssignableFrom(stubType)) {
272-
return "newBlockingStub";
273-
} else if (AbstractFutureStub.class.isAssignableFrom(stubType)) {
274-
return "newFutureStub";
275-
} else {
276-
throw new IllegalArgumentException(
277-
"Unsupported stub type: " + stubType.getName() + " -> Please report this issue.");
259+
private List<StubFactory> getStubFactories() {
260+
if (this.stubFactories == null) {
261+
stubFactories = new ArrayList<>(applicationContext.getBeansOfType(StubFactory.class).values());
278262
}
263+
return stubFactories;
279264
}
280-
281265
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2016-2020 Michael Zhang <[email protected]>
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
7+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package net.devh.boot.grpc.client.stubfactory;
19+
20+
import io.grpc.stub.AbstractAsyncStub;
21+
import io.grpc.stub.AbstractStub;
22+
23+
public class AsyncStubFactory extends StandardJavaGrpcStubFactory {
24+
25+
@Override
26+
public boolean isApplicable(Class<? extends AbstractStub<?>> stubType) {
27+
return AbstractAsyncStub.class.isAssignableFrom(stubType);
28+
}
29+
30+
@Override
31+
protected String getFactoryMethodName() {
32+
return "newStub";
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2016-2020 Michael Zhang <[email protected]>
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
7+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package net.devh.boot.grpc.client.stubfactory;
19+
20+
import io.grpc.stub.AbstractBlockingStub;
21+
import io.grpc.stub.AbstractStub;
22+
23+
public class BlockingStubFactory extends StandardJavaGrpcStubFactory {
24+
25+
@Override
26+
public boolean isApplicable(Class<? extends AbstractStub<?>> stubType) {
27+
return AbstractBlockingStub.class.isAssignableFrom(stubType);
28+
}
29+
30+
@Override
31+
protected String getFactoryMethodName() {
32+
return "newBlockingStub";
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2016-2020 Michael Zhang <[email protected]>
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
7+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package net.devh.boot.grpc.client.stubfactory;
19+
20+
import io.grpc.stub.AbstractFutureStub;
21+
import io.grpc.stub.AbstractStub;
22+
23+
public class FutureStubFactory extends StandardJavaGrpcStubFactory {
24+
25+
@Override
26+
public boolean isApplicable(Class<? extends AbstractStub<?>> stubType) {
27+
return AbstractFutureStub.class.isAssignableFrom(stubType);
28+
}
29+
30+
@Override
31+
protected String getFactoryMethodName() {
32+
return "newFutureStub";
33+
}
34+
}

0 commit comments

Comments
 (0)