Skip to content

Commit fb7b771

Browse files
committed
Made DiscoveryClientNameResolver extensible + include attributes for LB
Fixes #429
1 parent e081c91 commit fb7b771

File tree

2 files changed

+128
-84
lines changed

2 files changed

+128
-84
lines changed

grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/nameresolver/DiscoveryClientNameResolver.java

Lines changed: 103 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static com.google.common.base.Preconditions.checkNotNull;
2121
import static com.google.common.base.Preconditions.checkState;
2222
import static java.util.Objects.requireNonNull;
23+
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_INSTANCE_ID_KEY;
24+
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_SERVICE_NAME_KEY;
2325
import static net.devh.boot.grpc.common.util.GrpcUtils.CLOUD_DISCOVERY_METADATA_PORT;
2426

2527
import java.net.InetSocketAddress;
@@ -36,6 +38,7 @@
3638
import com.google.common.collect.Lists;
3739

3840
import io.grpc.Attributes;
41+
import io.grpc.Attributes.Builder;
3942
import io.grpc.EquivalentAddressGroup;
4043
import io.grpc.NameResolver;
4144
import io.grpc.Status;
@@ -147,14 +150,85 @@ public void refreshFromExternal() {
147150
}
148151

149152
/**
150-
* Discovers matching service instances.
153+
* Discovers matching service instances. Can be overwritten to apply some custom filtering.
151154
*
152155
* @return A list of service instances to use.
153156
*/
154-
private List<ServiceInstance> discoverServices() {
157+
protected List<ServiceInstance> discoverServers() {
155158
return this.client.getInstances(this.name);
156159
}
157160

161+
/**
162+
* Extracts the gRPC server port from the given service instance. Can be overwritten for a custom port mapping.
163+
*
164+
* @param instance The instance to extract the port from.
165+
* @return The gRPC server port.
166+
* @throws IllegalArgumentException If the specified port definition couldn't be parsed.
167+
*/
168+
protected int getGrpcPort(final ServiceInstance instance) {
169+
final Map<String, String> metadata = instance.getMetadata();
170+
if (metadata == null || metadata.isEmpty()) {
171+
return instance.getPort();
172+
}
173+
String portString = metadata.get(CLOUD_DISCOVERY_METADATA_PORT);
174+
if (portString == null) {
175+
portString = metadata.get(LEGACY_CLOUD_DISCOVERY_METADATA_PORT);
176+
if (portString == null) {
177+
return instance.getPort();
178+
} else {
179+
log.warn("Found legacy grpc port metadata '{}' for client '{}' use '{}' instead",
180+
LEGACY_CLOUD_DISCOVERY_METADATA_PORT, getName(), CLOUD_DISCOVERY_METADATA_PORT);
181+
}
182+
}
183+
try {
184+
return Integer.parseInt(portString);
185+
} catch (final NumberFormatException e) {
186+
// TODO: How to handle this case?
187+
throw new IllegalArgumentException("Failed to parse gRPC port information from: " + instance, e);
188+
}
189+
}
190+
191+
/**
192+
* Gets the attributes from the service instance for later use in a load balancer. Can be overwritten to convert
193+
* custom attributes.
194+
*
195+
* @param serviceInstance The service instance to get them from.
196+
* @return The newly created attributes for the given instance.
197+
*/
198+
protected Attributes getAttributes(final ServiceInstance serviceInstance) {
199+
final Builder builder = Attributes.newBuilder();
200+
builder.set(DISCOVERY_SERVICE_NAME_KEY, this.name);
201+
builder.set(DISCOVERY_INSTANCE_ID_KEY, serviceInstance.getInstanceId());
202+
return builder.build();
203+
}
204+
205+
/**
206+
* Checks whether this instance should update its connections.
207+
*
208+
* @param newInstanceList The new instances that should be compared to the stored ones.
209+
* @return True, if the given instance list contains different entries than the stored ones.
210+
*/
211+
protected boolean needsToUpdateConnections(final List<ServiceInstance> newInstanceList) {
212+
if (this.instanceList.size() != newInstanceList.size()) {
213+
return true;
214+
}
215+
for (final ServiceInstance instance : this.instanceList) {
216+
final int port = getGrpcPort(instance);
217+
boolean isSame = false;
218+
for (final ServiceInstance newInstance : newInstanceList) {
219+
final int newPort = getGrpcPort(newInstance);
220+
if (newInstance.getHost().equals(instance.getHost()) && port == newPort) {
221+
isSame = true;
222+
break;
223+
}
224+
}
225+
if (!isSame) {
226+
return true;
227+
}
228+
}
229+
return false;
230+
}
231+
158232
private void resolve() {
159233
log.debug("Scheduled resolve for {}", this.name);
160234
if (this.resolving) {
@@ -186,6 +260,7 @@ public String toString() {
186260
*/
187261
private final class Resolve implements Runnable {
188262

263+
// The listener is stored in an extra variable to avoid NPEs if the resolver is shutdown while resolving
189264
private final Listener2 savedListener;
190265

191266
/**
@@ -199,7 +274,7 @@ private final class Resolve implements Runnable {
199274

200275
@Override
201276
public void run() {
202-
final AtomicReference<List<ServiceInstance>> resultContainer = new AtomicReference<>();
277+
final AtomicReference<List<ServiceInstance>> resultContainer = new AtomicReference<>(KEEP_PREVIOUS);
203278
try {
204279
resultContainer.set(resolveInternal());
205280
} catch (final Exception e) {
@@ -224,96 +299,45 @@ public void run() {
224299
* should be used.
225300
*/
226301
private List<ServiceInstance> resolveInternal() {
227-
final List<ServiceInstance> newInstanceList = discoverServices();
228-
log.debug("Got {} candidate servers for {}", newInstanceList.size(), getName());
302+
// Discover servers
303+
final List<ServiceInstance> newInstanceList = discoverServers();
229304
if (CollectionUtils.isEmpty(newInstanceList)) {
230305
log.error("No servers found for {}", getName());
231-
this.savedListener.onError(Status.UNAVAILABLE
232-
.withDescription("No servers found for " + getName()));
306+
this.savedListener.onError(Status.UNAVAILABLE.withDescription("No servers found for " + getName()));
233307
return Lists.newArrayList();
308+
} else {
309+
log.debug("Got {} candidate servers for {}", newInstanceList.size(), getName());
234310
}
311+
312+
// Check for changes
235313
if (!needsToUpdateConnections(newInstanceList)) {
236314
log.debug("Nothing has changed... skipping update for {}", getName());
237315
return KEEP_PREVIOUS;
238316
}
317+
318+
// Set new servers
239319
log.debug("Ready to update server list for {}", getName());
240-
final List<EquivalentAddressGroup> targets = Lists.newArrayList();
241-
for (final ServiceInstance instance : newInstanceList) {
242-
final int port = getGRPCPort(instance);
243-
log.debug("Found gRPC server {}:{} for {}", instance.getHost(), port, getName());
244-
targets.add(new EquivalentAddressGroup(
245-
new InetSocketAddress(instance.getHost(), port), Attributes.EMPTY));
246-
}
247-
if (targets.isEmpty()) {
248-
log.error("None of the servers for {} specified a gRPC port", getName());
249-
this.savedListener.onError(Status.UNAVAILABLE
250-
.withDescription("None of the servers for " + getName() + " specified a gRPC port"));
251-
return Lists.newArrayList();
252-
} else {
253-
this.savedListener.onResult(ResolutionResult.newBuilder()
254-
.setAddresses(targets)
255-
.build());
256-
log.info("Done updating server list for {}", getName());
257-
return newInstanceList;
258-
}
320+
this.savedListener.onResult(ResolutionResult.newBuilder()
321+
.setAddresses(toTargets(newInstanceList))
322+
.build());
323+
log.info("Done updating server list for {}", getName());
324+
return newInstanceList;
259325
}
260326

261-
/**
262-
* Extracts the gRPC server port from the given service instance.
263-
*
264-
* @param instance The instance to extract the port from.
265-
* @return The gRPC server port.
266-
* @throws IllegalArgumentException If the specified port definition couldn't be parsed.
267-
*/
268-
private int getGRPCPort(final ServiceInstance instance) {
269-
final Map<String, String> metadata = instance.getMetadata();
270-
if (metadata == null) {
271-
return instance.getPort();
272-
}
273-
String portString = metadata.get(CLOUD_DISCOVERY_METADATA_PORT);
274-
if (portString == null) {
275-
portString = metadata.get(LEGACY_CLOUD_DISCOVERY_METADATA_PORT);
276-
if (portString == null) {
277-
return instance.getPort();
278-
} else {
279-
log.warn("Found legacy grpc port metadata '{}' for client '{}' use '{}' instead",
280-
LEGACY_CLOUD_DISCOVERY_METADATA_PORT, getName(), CLOUD_DISCOVERY_METADATA_PORT);
281-
}
282-
}
283-
try {
284-
return Integer.parseInt(portString);
285-
} catch (final NumberFormatException e) {
286-
// TODO: How to handle this case?
287-
throw new IllegalArgumentException("Failed to parse gRPC port information from: " + instance, e);
327+
private List<EquivalentAddressGroup> toTargets(final List<ServiceInstance> newInstanceList) {
328+
final List<EquivalentAddressGroup> targets = Lists.newArrayList();
329+
for (final ServiceInstance instance : newInstanceList) {
330+
targets.add(toTarget(instance));
288331
}
332+
return targets;
289333
}
290334

291-
/**
292-
* Checks whether this instance should update its connections.
293-
*
294-
* @param newInstanceList The new instances that should be compared to the stored ones.
295-
* @return True, if the given instance list contains different entries than the stored ones.
296-
*/
297-
private boolean needsToUpdateConnections(final List<ServiceInstance> newInstanceList) {
298-
if (DiscoveryClientNameResolver.this.instanceList.size() != newInstanceList.size()) {
299-
return true;
300-
}
301-
for (final ServiceInstance instance : DiscoveryClientNameResolver.this.instanceList) {
302-
final int port = getGRPCPort(instance);
303-
boolean isSame = false;
304-
for (final ServiceInstance newInstance : newInstanceList) {
305-
final int newPort = getGRPCPort(newInstance);
306-
if (newInstance.getHost().equals(instance.getHost())
307-
&& port == newPort) {
308-
isSame = true;
309-
break;
310-
}
311-
}
312-
if (!isSame) {
313-
return true;
314-
}
315-
}
316-
return false;
335+
private EquivalentAddressGroup toTarget(final ServiceInstance instance) {
336+
final String host = instance.getHost();
337+
final int port = getGrpcPort(instance);
338+
final Attributes attributes = getAttributes(instance);
339+
log.debug("Found gRPC server {}:{} for {}", host, port, getName());
340+
return new EquivalentAddressGroup(new InetSocketAddress(host, port), attributes);
317341
}
318342

319343
}

grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/nameresolver/DiscoveryClientResolverFactory.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
import javax.annotation.Nullable;
2727
import javax.annotation.PreDestroy;
2828

29+
import org.springframework.cloud.client.ServiceInstance;
2930
import org.springframework.cloud.client.discovery.DiscoveryClient;
3031
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
3132
import org.springframework.cloud.client.discovery.event.HeartbeatMonitor;
3233
import org.springframework.context.event.EventListener;
3334

35+
import io.grpc.Attributes.Key;
3436
import io.grpc.NameResolver;
3537
import io.grpc.NameResolverProvider;
3638
import io.grpc.internal.GrpcUtil;
@@ -47,6 +49,14 @@ public class DiscoveryClientResolverFactory extends NameResolverProvider {
4749
* The constant containing the scheme that will be used by this factory.
4850
*/
4951
public static final String DISCOVERY_SCHEME = "discovery";
52+
/**
53+
* A key for the service name used to related {@link ServiceInstance}s from the {@link DiscoveryClient}.
54+
*/
55+
public static final Key<String> DISCOVERY_SERVICE_NAME_KEY = Key.create("serviceName");
56+
/**
57+
* A key for the {@link ServiceInstance#getInstanceId() instance id}.
58+
*/
59+
public static final Key<String> DISCOVERY_INSTANCE_ID_KEY = Key.create("instanceId");
5060

5161
private final Set<DiscoveryClientNameResolver> discoveryClientNameResolvers = ConcurrentHashMap.newKeySet();
5262
private final HeartbeatMonitor monitor = new HeartbeatMonitor();
@@ -72,15 +82,25 @@ public NameResolver newNameResolver(final URI targetUri, final NameResolver.Args
7282
+ "expected: '" + DISCOVERY_SCHEME + ":[//]/<service-name>'; "
7383
+ "but was '" + targetUri.toString() + "'");
7484
}
75-
final DiscoveryClientNameResolver discoveryClientNameResolver =
76-
new DiscoveryClientNameResolver(serviceName.substring(1), this.client, args,
77-
GrpcUtil.SHARED_CHANNEL_EXECUTOR, this.discoveryClientNameResolvers::remove);
78-
this.discoveryClientNameResolvers.add(discoveryClientNameResolver);
79-
return discoveryClientNameResolver;
85+
final DiscoveryClientNameResolver nameResolver = newNameResolver(serviceName.substring(1), args);
86+
this.discoveryClientNameResolvers.add(nameResolver);
87+
return nameResolver;
8088
}
8189
return null;
8290
}
8391

92+
/**
93+
* Factory method to create the resolver for the given service name.
94+
*
95+
* @param serviceName The service name to create it for.
96+
* @param args The NameResolver arguments to use.
97+
* @return A newly created DiscoveryClientNameResolver.
98+
*/
99+
protected DiscoveryClientNameResolver newNameResolver(final String serviceName, final NameResolver.Args args) {
100+
return new DiscoveryClientNameResolver(serviceName, this.client, args,
101+
GrpcUtil.SHARED_CHANNEL_EXECUTOR, this.discoveryClientNameResolvers::remove);
102+
}
103+
84104
@Override
85105
public String getDefaultScheme() {
86106
return DISCOVERY_SCHEME;

0 commit comments

Comments
 (0)