Skip to content

Commit bb3d84b

Browse files
committed
Add Stork support to the new Vert.x gRPC impl
1 parent 4859ae3 commit bb3d84b

File tree

34 files changed

+856
-151
lines changed

34 files changed

+856
-151
lines changed

extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientProcessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import io.quarkus.grpc.runtime.config.GrpcClientBuildTimeConfig;
7474
import io.quarkus.grpc.runtime.stork.GrpcStorkRecorder;
7575
import io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor;
76+
import io.quarkus.grpc.runtime.stork.VertxStorkMeasuringGrpcInterceptor;
7677
import io.quarkus.grpc.runtime.supports.Channels;
7778
import io.quarkus.grpc.runtime.supports.GrpcClientConfigProvider;
7879
import io.quarkus.grpc.runtime.supports.IOThreadClientInterceptor;
@@ -97,6 +98,7 @@ void registerBeans(BuildProducer<AdditionalBeanBuildItem> beans) {
9798
@BuildStep
9899
void registerStorkInterceptor(BuildProducer<AdditionalBeanBuildItem> beans) {
99100
beans.produce(new AdditionalBeanBuildItem(StorkMeasuringGrpcInterceptor.class));
101+
beans.produce(new AdditionalBeanBuildItem(VertxStorkMeasuringGrpcInterceptor.class));
100102
}
101103

102104
@BuildStep
@@ -407,6 +409,7 @@ SyntheticBeanBuildItem clientInterceptorStorage(GrpcClientRecorder recorder, Rec
407409

408410
// it's okay if this one is not used:
409411
superfluousInterceptors.remove(StorkMeasuringGrpcInterceptor.class.getName());
412+
superfluousInterceptors.remove(VertxStorkMeasuringGrpcInterceptor.class.getName());
410413
if (!superfluousInterceptors.isEmpty()) {
411414
LOGGER.warnf("At least one unused gRPC client interceptor found: %s. If there are meant to be used globally, " +
412415
"annotate them with @GlobalInterceptor.", String.join(", ", superfluousInterceptors));

extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,12 @@ public void initializeGrpcServer(RuntimeValue<Vertx> vertxSupplier,
9494
if (grpcContainer == null) {
9595
throw new IllegalStateException("gRPC not initialized, GrpcContainer not found");
9696
}
97-
Vertx vertx = vertxSupplier.getValue();
9897
if (hasNoServices(grpcContainer.getServices()) && LaunchMode.current() != LaunchMode.DEVELOPMENT) {
9998
LOGGER.error("Unable to find beans exposing the `BindableService` interface - not starting the gRPC server");
99+
return; // OK?
100100
}
101101

102+
Vertx vertx = vertxSupplier.getValue();
102103
GrpcServerConfiguration configuration = cfg.server;
103104
GrpcBuilderProvider<?> provider = GrpcBuilderProvider.findServerBuilderProvider(configuration);
104105

extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ public class GrpcClientConfiguration {
3434
@ConfigItem
3535
public InProcess inProcess;
3636

37+
/**
38+
* Configure Stork usage with new Vert.x gRPC, if enabled.
39+
*/
40+
@ConfigItem
41+
public StorkConfig stork;
42+
3743
/**
3844
* The gRPC service port.
3945
*/
@@ -168,7 +174,7 @@ public class GrpcClientConfiguration {
168174

169175
/**
170176
* Use a custom load balancing policy.
171-
* Accepted values are: {@code pick_value}, {@code round_robin}, {@code grpclb}.
177+
* Accepted values are: {@code pick_first}, {@code round_robin}, {@code grpclb}.
172178
* This value is ignored if name-resolver is set to 'stork'.
173179
*/
174180
@ConfigItem(defaultValue = "pick_first")
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.quarkus.grpc.runtime.config;
2+
3+
import io.quarkus.runtime.annotations.ConfigGroup;
4+
import io.quarkus.runtime.annotations.ConfigItem;
5+
6+
/**
7+
* Stork config for new Vert.x gRPC
8+
*/
9+
@ConfigGroup
10+
public class StorkConfig {
11+
/**
12+
* Number of threads on a delayed gRPC ClientCall
13+
*/
14+
@ConfigItem(defaultValue = "10")
15+
public int threads;
16+
17+
/**
18+
* Deadline in milliseconds of delayed gRPC call
19+
*/
20+
@ConfigItem(defaultValue = "5000")
21+
public long deadline;
22+
23+
/**
24+
* Number of retries on a gRPC ClientCall
25+
*/
26+
@ConfigItem(defaultValue = "3")
27+
public int retries;
28+
29+
/**
30+
* Initial delay in seconds on refresh check
31+
*/
32+
@ConfigItem(defaultValue = "60")
33+
public long delay;
34+
35+
/**
36+
* Refresh period in seconds
37+
*/
38+
@ConfigItem(defaultValue = "120")
39+
public long period;
40+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.quarkus.grpc.runtime.stork;
2+
3+
import io.grpc.ClientCall;
4+
import io.grpc.ForwardingClientCall;
5+
import io.smallrye.stork.api.ServiceInstance;
6+
7+
abstract class AbstractStorkMeasuringCall<ReqT, RespT> extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>
8+
implements StorkMeasuringCollector {
9+
final boolean recordTime;
10+
11+
protected AbstractStorkMeasuringCall(ClientCall<ReqT, RespT> delegate, boolean recordTime) {
12+
super(delegate);
13+
this.recordTime = recordTime;
14+
}
15+
16+
protected abstract ServiceInstance serviceInstance();
17+
18+
public void recordReply() {
19+
if (serviceInstance() != null && recordTime) {
20+
serviceInstance().recordReply();
21+
}
22+
}
23+
24+
public void recordEnd(Throwable error) {
25+
if (serviceInstance() != null) {
26+
serviceInstance().recordEnd(error);
27+
}
28+
}
29+
}

extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/GrpcLoadBalancerProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import static io.grpc.ConnectivityState.IDLE;
44
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
5-
import static io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor.STORK_MEASURE_TIME;
6-
import static io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor.STORK_SERVICE_INSTANCE;
5+
import static io.quarkus.grpc.runtime.stork.StorkMeasuringCollector.STORK_MEASURE_TIME;
6+
import static io.quarkus.grpc.runtime.stork.StorkMeasuringCollector.STORK_SERVICE_INSTANCE;
77

88
import java.util.Collections;
99
import java.util.Comparator;

extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/stork/GrpcStorkServiceDiscovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private void informListener(List<ServiceInstance> instances) {
121121
socketAddresses.add(new InetSocketAddress(inetAddress, instance.getPort()));
122122
}
123123
} catch (UnknownHostException e) {
124-
log.errorf(e, "Ignoring wrong host: '%s' for service name '%s'", instance.getHost(),
124+
log.warnf(e, "Ignoring wrong host: '%s' for service name '%s'", instance.getHost(),
125125
serviceName);
126126
}
127127

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package io.quarkus.grpc.runtime.stork;
2+
3+
import static io.quarkus.grpc.runtime.stork.StorkMeasuringCollector.STORK_MEASURE_TIME;
4+
import static io.quarkus.grpc.runtime.stork.StorkMeasuringCollector.STORK_SERVICE_INSTANCE;
5+
6+
import java.net.InetAddress;
7+
import java.net.InetSocketAddress;
8+
import java.net.UnknownHostException;
9+
import java.util.ArrayList;
10+
import java.util.Comparator;
11+
import java.util.HashSet;
12+
import java.util.List;
13+
import java.util.Map;
14+
import java.util.Set;
15+
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.Executor;
17+
import java.util.concurrent.ScheduledExecutorService;
18+
import java.util.concurrent.ScheduledThreadPoolExecutor;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import javax.annotation.Nullable;
23+
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import io.grpc.CallOptions;
28+
import io.grpc.Channel;
29+
import io.grpc.ClientCall;
30+
import io.grpc.Deadline;
31+
import io.grpc.MethodDescriptor;
32+
import io.grpc.internal.DelayedClientCall;
33+
import io.quarkus.grpc.runtime.config.StorkConfig;
34+
import io.smallrye.mutiny.Uni;
35+
import io.smallrye.stork.Stork;
36+
import io.smallrye.stork.api.Service;
37+
import io.smallrye.stork.api.ServiceInstance;
38+
import io.vertx.core.net.SocketAddress;
39+
import io.vertx.grpc.client.GrpcClient;
40+
import io.vertx.grpc.client.GrpcClientChannel;
41+
42+
public class StorkGrpcChannel extends Channel implements AutoCloseable {
43+
private static final Logger log = LoggerFactory.getLogger(StorkGrpcChannel.class);
44+
45+
private final Map<Long, ServiceInstance> services = new ConcurrentHashMap<>();
46+
private final Map<Long, Channel> channels = new ConcurrentHashMap<>();
47+
private final ScheduledExecutorService scheduler;
48+
49+
private final GrpcClient client;
50+
private final String serviceName;
51+
private final StorkConfig stork;
52+
private final Executor executor;
53+
54+
private static class Context {
55+
Service service;
56+
boolean measureTime;
57+
ServiceInstance instance;
58+
InetSocketAddress address;
59+
Channel channel;
60+
AtomicReference<ServiceInstance> ref;
61+
}
62+
63+
public StorkGrpcChannel(GrpcClient client, String serviceName, StorkConfig stork, Executor executor) {
64+
this.client = client;
65+
this.serviceName = serviceName;
66+
this.stork = stork;
67+
this.executor = executor;
68+
this.scheduler = new ScheduledThreadPoolExecutor(stork.threads);
69+
this.scheduler.scheduleAtFixedRate(this::refresh, stork.delay, stork.period, TimeUnit.SECONDS);
70+
}
71+
72+
@Override
73+
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(MethodDescriptor<RequestT, ResponseT> methodDescriptor,
74+
CallOptions callOptions) {
75+
Service service = Stork.getInstance().getService(serviceName);
76+
if (service == null) {
77+
throw new IllegalStateException("No service definition for serviceName " + serviceName + " found.");
78+
}
79+
80+
Context context = new Context();
81+
context.service = service;
82+
// handle this calls here
83+
Boolean measureTime = STORK_MEASURE_TIME.get();
84+
context.measureTime = measureTime != null && measureTime;
85+
context.ref = STORK_SERVICE_INSTANCE.get();
86+
87+
DelayedClientCall<RequestT, ResponseT> delayed = new StorkDelayedClientCall<>(executor, scheduler,
88+
Deadline.after(stork.deadline, TimeUnit.MILLISECONDS));
89+
90+
asyncCall(methodDescriptor, callOptions, context)
91+
.onFailure()
92+
.retry()
93+
.atMost(stork.retries)
94+
.subscribe()
95+
.asCompletionStage()
96+
.thenApply(delayed::setCall)
97+
.thenAccept(Runnable::run)
98+
.exceptionally(t -> {
99+
delayed.cancel("Failed to create new Stork ClientCall", t);
100+
return null;
101+
});
102+
103+
return delayed;
104+
}
105+
106+
private <RequestT, ResponseT> Uni<ClientCall<RequestT, ResponseT>> asyncCall(
107+
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions, Context context) {
108+
Uni<Context> entry = pickServiceInstanceWithChannel(context);
109+
return entry.map(c -> {
110+
ServiceInstance instance = c.instance;
111+
long serviceId = instance.getId();
112+
Channel channel = c.channel;
113+
try {
114+
services.put(serviceId, instance);
115+
channels.put(serviceId, channel);
116+
return channel.newCall(methodDescriptor, callOptions);
117+
} catch (Exception ex) {
118+
// remove, no good
119+
services.remove(serviceId);
120+
channels.remove(serviceId);
121+
throw new IllegalStateException(ex);
122+
}
123+
});
124+
}
125+
126+
@Override
127+
public String authority() {
128+
return null;
129+
}
130+
131+
@Override
132+
public void close() {
133+
scheduler.shutdown();
134+
}
135+
136+
@Override
137+
public String toString() {
138+
return super.toString() + String.format(" [%s]", serviceName);
139+
}
140+
141+
private void refresh() {
142+
// any better way to know which are OK / bad?
143+
services.clear();
144+
channels.clear();
145+
}
146+
147+
private Uni<Context> pickServiceInstanceWithChannel(Context context) {
148+
Uni<ServiceInstance> uni = pickServerInstance(context.service, context.measureTime);
149+
return uni
150+
.map(si -> {
151+
context.instance = si;
152+
if (si.gatherStatistics() && context.ref != null) {
153+
context.ref.set(si);
154+
}
155+
return context;
156+
})
157+
.invoke(this::checkSocketAddress)
158+
.invoke(c -> {
159+
ServiceInstance instance = context.instance;
160+
InetSocketAddress isa = context.address;
161+
context.channel = channels.computeIfAbsent(instance.getId(), id -> {
162+
SocketAddress address = SocketAddress.inetSocketAddress(isa.getPort(), isa.getHostName());
163+
return new GrpcClientChannel(client, address);
164+
});
165+
});
166+
}
167+
168+
private Uni<ServiceInstance> pickServerInstance(Service service, boolean measureTime) {
169+
return Uni.createFrom()
170+
.deferred(() -> {
171+
if (services.isEmpty()) {
172+
return service.getInstances()
173+
.invoke(l -> l.forEach(s -> services.put(s.getId(), s)));
174+
} else {
175+
List<ServiceInstance> list = new ArrayList<>(services.values());
176+
return Uni.createFrom().item(list);
177+
}
178+
})
179+
.invoke(list -> {
180+
// list should not be empty + sort by id
181+
list.sort(Comparator.comparing(ServiceInstance::getId));
182+
})
183+
.map(list -> service.selectInstanceAndRecordStart(list, measureTime));
184+
}
185+
186+
private void checkSocketAddress(Context context) {
187+
ServiceInstance instance = context.instance;
188+
Set<InetSocketAddress> socketAddresses = new HashSet<>();
189+
try {
190+
for (InetAddress inetAddress : InetAddress.getAllByName(instance.getHost())) {
191+
socketAddresses.add(new InetSocketAddress(inetAddress, instance.getPort()));
192+
}
193+
} catch (UnknownHostException e) {
194+
log.warn("Ignoring wrong host: '{}' for service name '{}'", instance.getHost(), serviceName, e);
195+
}
196+
197+
if (!socketAddresses.isEmpty()) {
198+
context.address = socketAddresses.iterator().next(); // pick first
199+
} else {
200+
long serviceId = instance.getId();
201+
services.remove(serviceId);
202+
channels.remove(serviceId);
203+
throw new IllegalStateException("Failed to determine working socket addresses for service-name: " + serviceName);
204+
}
205+
}
206+
207+
private static class StorkDelayedClientCall<RequestT, ResponseT> extends DelayedClientCall<RequestT, ResponseT> {
208+
public StorkDelayedClientCall(Executor callExecutor, ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
209+
super(callExecutor, scheduler, deadline);
210+
}
211+
}
212+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.quarkus.grpc.runtime.stork;
2+
3+
import io.grpc.ClientCall;
4+
import io.grpc.ForwardingClientCall;
5+
import io.smallrye.stork.api.ServiceInstance;
6+
7+
abstract class StorkMeasuringCall<ReqT, RespT> extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>
8+
implements StorkMeasuringCollector {
9+
final boolean recordTime;
10+
11+
protected StorkMeasuringCall(ClientCall<ReqT, RespT> delegate, boolean recordTime) {
12+
super(delegate);
13+
this.recordTime = recordTime;
14+
}
15+
16+
protected abstract ServiceInstance serviceInstance();
17+
18+
public void recordReply() {
19+
if (serviceInstance() != null && recordTime) {
20+
serviceInstance().recordReply();
21+
}
22+
}
23+
24+
public void recordEnd(Throwable error) {
25+
if (serviceInstance() != null) {
26+
serviceInstance().recordEnd(error);
27+
}
28+
}
29+
}

0 commit comments

Comments
 (0)