20
20
import static com .google .common .base .Preconditions .checkNotNull ;
21
21
import static com .google .common .base .Preconditions .checkState ;
22
22
import static java .util .Objects .requireNonNull ;
23
+ import static net .devh .boot .grpc .client .nameresolver .DiscoveryClientResolverFactory .DISCOVERY_INSTANCE_ID_KEY ;
24
+ import static net .devh .boot .grpc .client .nameresolver .DiscoveryClientResolverFactory .DISCOVERY_SERVICE_NAME_KEY ;
25
+ import static net .devh .boot .grpc .common .util .GrpcUtils .CLOUD_DISCOVERY_METADATA_PORT ;
23
26
24
27
import java .net .InetSocketAddress ;
25
28
import java .util .List ;
26
29
import java .util .Map ;
27
30
import java .util .concurrent .Executor ;
28
31
import java .util .concurrent .atomic .AtomicReference ;
32
+ import java .util .function .Consumer ;
29
33
30
34
import org .springframework .cloud .client .ServiceInstance ;
31
35
import org .springframework .cloud .client .discovery .DiscoveryClient ;
34
38
import com .google .common .collect .Lists ;
35
39
36
40
import io .grpc .Attributes ;
41
+ import io .grpc .Attributes .Builder ;
37
42
import io .grpc .EquivalentAddressGroup ;
38
43
import io .grpc .NameResolver ;
39
44
import io .grpc .Status ;
40
45
import io .grpc .SynchronizationContext ;
41
46
import io .grpc .internal .SharedResourceHolder ;
42
47
import lombok .extern .slf4j .Slf4j ;
43
- import net .devh .boot .grpc .common .util .GrpcUtils ;
44
48
45
49
/**
46
50
* The DiscoveryClientNameResolver resolves the service hosts and their associated gRPC port using the channel's name
@@ -59,7 +63,7 @@ public class DiscoveryClientNameResolver extends NameResolver {
59
63
private final String name ;
60
64
private final DiscoveryClient client ;
61
65
private final SynchronizationContext syncContext ;
62
- private final Runnable externalCleaner ;
66
+ private final Consumer < DiscoveryClientNameResolver > shutdownHook ;
63
67
private final SharedResourceHolder .Resource <Executor > executorResource ;
64
68
private final boolean usingExecutorResource ;
65
69
@@ -78,27 +82,46 @@ public class DiscoveryClientNameResolver extends NameResolver {
78
82
* @param client The client used to look up the service addresses.
79
83
* @param args The name resolver args.
80
84
* @param executorResource The executor resource.
81
- * @param externalCleaner The optional cleaner used during {@link #shutdown()}
85
+ * @param shutdownHook The optional cleaner used during {@link #shutdown()}
82
86
*/
83
87
public DiscoveryClientNameResolver (final String name , final DiscoveryClient client , final Args args ,
84
- final SharedResourceHolder .Resource <Executor > executorResource , final Runnable externalCleaner ) {
88
+ final SharedResourceHolder .Resource <Executor > executorResource ,
89
+ final Consumer <DiscoveryClientNameResolver > shutdownHook ) {
85
90
this .name = name ;
86
91
this .client = client ;
87
92
this .syncContext = requireNonNull (args .getSynchronizationContext (), "syncContext" );
88
- this .externalCleaner = externalCleaner ;
93
+ this .shutdownHook = shutdownHook ;
89
94
this .executor = args .getOffloadExecutor ();
90
95
this .usingExecutorResource = this .executor == null ;
91
96
this .executorResource = executorResource ;
92
97
}
93
98
99
+ /**
100
+ * Gets the name of the service to get the instances of.
101
+ *
102
+ * @return The name associated with this resolver.
103
+ */
104
+ protected final String getName () {
105
+ return this .name ;
106
+ }
107
+
108
+ /**
109
+ * Checks whether this resolver is active. E.g. {@code #start} has been called, but not {@code #shutdown()}.
110
+ *
111
+ * @return True, if there is a listener attached. False, otherwise.
112
+ */
113
+ protected final boolean isActive () {
114
+ return this .listener != null ;
115
+ }
116
+
94
117
@ Override
95
118
public final String getServiceAuthority () {
96
119
return this .name ;
97
120
}
98
121
99
122
@ Override
100
123
public void start (final Listener2 listener ) {
101
- checkState (this . listener == null , "already started" );
124
+ checkState (! isActive () , "already started" );
102
125
if (this .usingExecutorResource ) {
103
126
this .executor = SharedResourceHolder .get (this .executorResource );
104
127
}
@@ -108,7 +131,7 @@ public void start(final Listener2 listener) {
108
131
109
132
@ Override
110
133
public void refresh () {
111
- checkState (this . listener != null , "not started" );
134
+ checkState (isActive () , "not started" );
112
135
resolve ();
113
136
}
114
137
@@ -120,19 +143,99 @@ public void refresh() {
120
143
*/
121
144
public void refreshFromExternal () {
122
145
this .syncContext .execute (() -> {
123
- if (this . listener != null ) {
146
+ if (isActive () ) {
124
147
resolve ();
125
148
}
126
149
});
127
150
}
128
151
152
+ /**
153
+ * Discovers matching service instances. Can be overwritten to apply some custom filtering.
154
+ *
155
+ * @return A list of service instances to use.
156
+ */
157
+ protected List <ServiceInstance > discoverServers () {
158
+ return this .client .getInstances (this .name );
159
+ }
160
+
161
+ /**
162
+ * Extracts the gRPC server port from the given service instance. Can be overwritten for a custom port mapping.
163
+ *
164
+ * @param instance The instance to extract the port from.
165
+ * @return The gRPC server port.
166
+ * @throws IllegalArgumentException If the specified port definition couldn't be parsed.
167
+ */
168
+ protected int getGrpcPort (final ServiceInstance instance ) {
169
+ final Map <String , String > metadata = instance .getMetadata ();
170
+ if (metadata == null || metadata .isEmpty ()) {
171
+ return instance .getPort ();
172
+ }
173
+ String portString = metadata .get (CLOUD_DISCOVERY_METADATA_PORT );
174
+ if (portString == null ) {
175
+ portString = metadata .get (LEGACY_CLOUD_DISCOVERY_METADATA_PORT );
176
+ if (portString == null ) {
177
+ return instance .getPort ();
178
+ } else {
179
+ log .warn ("Found legacy grpc port metadata '{}' for client '{}' use '{}' instead" ,
180
+ LEGACY_CLOUD_DISCOVERY_METADATA_PORT , getName (), CLOUD_DISCOVERY_METADATA_PORT );
181
+ }
182
+ }
183
+ try {
184
+ return Integer .parseInt (portString );
185
+ } catch (final NumberFormatException e ) {
186
+ // TODO: How to handle this case?
187
+ throw new IllegalArgumentException ("Failed to parse gRPC port information from: " + instance , e );
188
+ }
189
+ }
190
+
191
+ /**
192
+ * Gets the attributes from the service instance for later use in a load balancer. Can be overwritten to convert
193
+ * custom attributes.
194
+ *
195
+ * @param serviceInstance The service instance to get them from.
196
+ * @return The newly created attributes for the given instance.
197
+ */
198
+ protected Attributes getAttributes (final ServiceInstance serviceInstance ) {
199
+ final Builder builder = Attributes .newBuilder ();
200
+ builder .set (DISCOVERY_SERVICE_NAME_KEY , this .name );
201
+ builder .set (DISCOVERY_INSTANCE_ID_KEY , serviceInstance .getInstanceId ());
202
+ return builder .build ();
203
+ }
204
+
205
+ /**
206
+ * Checks whether this instance should update its connections.
207
+ *
208
+ * @param newInstanceList The new instances that should be compared to the stored ones.
209
+ * @return True, if the given instance list contains different entries than the stored ones.
210
+ */
211
+ protected boolean needsToUpdateConnections (final List <ServiceInstance > newInstanceList ) {
212
+ if (this .instanceList .size () != newInstanceList .size ()) {
213
+ return true ;
214
+ }
215
+ for (final ServiceInstance instance : this .instanceList ) {
216
+ final int port = getGrpcPort (instance );
217
+ boolean isSame = false ;
218
+ for (final ServiceInstance newInstance : newInstanceList ) {
219
+ final int newPort = getGrpcPort (newInstance );
220
+ if (newInstance .getHost ().equals (instance .getHost ()) && port == newPort ) {
221
+ isSame = true ;
222
+ break ;
223
+ }
224
+ }
225
+ if (!isSame ) {
226
+ return true ;
227
+ }
228
+ }
229
+ return false ;
230
+ }
231
+
129
232
private void resolve () {
130
233
log .debug ("Scheduled resolve for {}" , this .name );
131
234
if (this .resolving ) {
132
235
return ;
133
236
}
134
237
this .resolving = true ;
135
- this .executor .execute (new Resolve (this .listener , this . instanceList ));
238
+ this .executor .execute (new Resolve (this .listener ));
136
239
}
137
240
138
241
@ Override
@@ -142,8 +245,8 @@ public void shutdown() {
142
245
this .executor = SharedResourceHolder .release (this .executorResource , this .executor );
143
246
}
144
247
this .instanceList = Lists .newArrayList ();
145
- if (this .externalCleaner != null ) {
146
- this .externalCleaner . run ( );
248
+ if (this .shutdownHook != null ) {
249
+ this .shutdownHook . accept ( this );
147
250
}
148
251
}
149
252
@@ -157,34 +260,32 @@ public String toString() {
157
260
*/
158
261
private final class Resolve implements Runnable {
159
262
263
+ // The listener is stored in an extra variable to avoid NPEs if the resolver is shutdown while resolving
160
264
private final Listener2 savedListener ;
161
- private final List <ServiceInstance > savedInstanceList ;
162
265
163
266
/**
164
267
* Creates a new Resolve that stores a snapshot of the relevant states of the resolver.
165
268
*
166
269
* @param listener The listener to send the results to.
167
- * @param instanceList The current server instance list.
168
270
*/
169
- Resolve (final Listener2 listener , final List < ServiceInstance > instanceList ) {
271
+ Resolve (final Listener2 listener ) {
170
272
this .savedListener = requireNonNull (listener , "listener" );
171
- this .savedInstanceList = requireNonNull (instanceList , "instanceList" );
172
273
}
173
274
174
275
@ Override
175
276
public void run () {
176
- final AtomicReference <List <ServiceInstance >> resultContainer = new AtomicReference <>();
277
+ final AtomicReference <List <ServiceInstance >> resultContainer = new AtomicReference <>(KEEP_PREVIOUS );
177
278
try {
178
279
resultContainer .set (resolveInternal ());
179
280
} catch (final Exception e ) {
180
281
this .savedListener .onError (Status .UNAVAILABLE .withCause (e )
181
- .withDescription ("Failed to update server list for " + DiscoveryClientNameResolver . this . name ));
282
+ .withDescription ("Failed to update server list for " + getName () ));
182
283
resultContainer .set (Lists .newArrayList ());
183
284
} finally {
184
285
DiscoveryClientNameResolver .this .syncContext .execute (() -> {
185
286
DiscoveryClientNameResolver .this .resolving = false ;
186
287
final List <ServiceInstance > result = resultContainer .get ();
187
- if (result != KEEP_PREVIOUS && DiscoveryClientNameResolver . this . listener != null ) {
288
+ if (result != KEEP_PREVIOUS && isActive () ) {
188
289
DiscoveryClientNameResolver .this .instanceList = result ;
189
290
}
190
291
});
@@ -198,98 +299,45 @@ public void run() {
198
299
* should be used.
199
300
*/
200
301
private List <ServiceInstance > resolveInternal () {
201
- final String name = DiscoveryClientNameResolver .this .name ;
202
- final List <ServiceInstance > newInstanceList =
203
- DiscoveryClientNameResolver .this .client .getInstances (name );
204
- log .debug ("Got {} candidate servers for {}" , newInstanceList .size (), name );
302
+ // Discover servers
303
+ final List <ServiceInstance > newInstanceList = discoverServers ();
205
304
if (CollectionUtils .isEmpty (newInstanceList )) {
206
- log .error ("No servers found for {}" , name );
207
- this .savedListener .onError (Status .UNAVAILABLE .withDescription ("No servers found for " + name ));
305
+ log .error ("No servers found for {}" , getName () );
306
+ this .savedListener .onError (Status .UNAVAILABLE .withDescription ("No servers found for " + getName () ));
208
307
return Lists .newArrayList ();
308
+ } else {
309
+ log .debug ("Got {} candidate servers for {}" , newInstanceList .size (), getName ());
209
310
}
311
+
312
+ // Check for changes
210
313
if (!needsToUpdateConnections (newInstanceList )) {
211
- log .debug ("Nothing has changed... skipping update for {}" , name );
314
+ log .debug ("Nothing has changed... skipping update for {}" , getName () );
212
315
return KEEP_PREVIOUS ;
213
316
}
214
- log .debug ("Ready to update server list for {}" , name );
215
- final List <EquivalentAddressGroup > targets = Lists .newArrayList ();
216
- for (final ServiceInstance instance : newInstanceList ) {
217
- final int port = getGRPCPort (instance );
218
- log .debug ("Found gRPC server {}:{} for {}" , instance .getHost (), port , name );
219
- targets .add (new EquivalentAddressGroup (
220
- new InetSocketAddress (instance .getHost (), port ), Attributes .EMPTY ));
221
- }
222
- if (targets .isEmpty ()) {
223
- log .error ("None of the servers for {} specified a gRPC port" , name );
224
- this .savedListener .onError (Status .UNAVAILABLE
225
- .withDescription ("None of the servers for " + name + " specified a gRPC port" ));
226
- return Lists .newArrayList ();
227
- } else {
228
- this .savedListener .onResult (ResolutionResult .newBuilder ()
229
- .setAddresses (targets )
230
- .build ());
231
- log .info ("Done updating server list for {}" , name );
232
- return newInstanceList ;
233
- }
317
+
318
+ // Set new servers
319
+ log .debug ("Ready to update server list for {}" , getName ());
320
+ this .savedListener .onResult (ResolutionResult .newBuilder ()
321
+ .setAddresses (toTargets (newInstanceList ))
322
+ .build ());
323
+ log .info ("Done updating server list for {}" , getName ());
324
+ return newInstanceList ;
234
325
}
235
326
236
- /**
237
- * Extracts the gRPC server port from the given service instance.
238
- *
239
- * @param instance The instance to extract the port from.
240
- * @return The gRPC server port.
241
- * @throws IllegalArgumentException If the specified port definition couldn't be parsed.
242
- */
243
- private int getGRPCPort (final ServiceInstance instance ) {
244
- final Map <String , String > metadata = instance .getMetadata ();
245
- if (metadata == null ) {
246
- return instance .getPort ();
247
- }
248
- String portString = metadata .get (GrpcUtils .CLOUD_DISCOVERY_METADATA_PORT );
249
- if (portString == null ) {
250
- portString = metadata .get (LEGACY_CLOUD_DISCOVERY_METADATA_PORT );
251
- if (portString == null ) {
252
- return instance .getPort ();
253
- } else {
254
- log .warn ("Found legacy grpc port metadata '{}' for client '{}' use '{}' instead" ,
255
- LEGACY_CLOUD_DISCOVERY_METADATA_PORT , DiscoveryClientNameResolver .this .name ,
256
- GrpcUtils .CLOUD_DISCOVERY_METADATA_PORT );
257
- }
258
- }
259
- try {
260
- return Integer .parseInt (portString );
261
- } catch (final NumberFormatException e ) {
262
- // TODO: How to handle this case?
263
- throw new IllegalArgumentException ("Failed to parse gRPC port information from: " + instance , e );
327
+ private List <EquivalentAddressGroup > toTargets (final List <ServiceInstance > newInstanceList ) {
328
+ final List <EquivalentAddressGroup > targets = Lists .newArrayList ();
329
+ for (final ServiceInstance instance : newInstanceList ) {
330
+ targets .add (toTarget (instance ));
264
331
}
332
+ return targets ;
265
333
}
266
334
267
- /**
268
- * Checks whether this instance should update its connections.
269
- *
270
- * @param newInstanceList The new instances that should be compared to the stored ones.
271
- * @return True, if the given instance list contains different entries than the stored ones.
272
- */
273
- private boolean needsToUpdateConnections (final List <ServiceInstance > newInstanceList ) {
274
- if (this .savedInstanceList .size () != newInstanceList .size ()) {
275
- return true ;
276
- }
277
- for (final ServiceInstance instance : this .savedInstanceList ) {
278
- final int port = getGRPCPort (instance );
279
- boolean isSame = false ;
280
- for (final ServiceInstance newInstance : newInstanceList ) {
281
- final int newPort = getGRPCPort (newInstance );
282
- if (newInstance .getHost ().equals (instance .getHost ())
283
- && port == newPort ) {
284
- isSame = true ;
285
- break ;
286
- }
287
- }
288
- if (!isSame ) {
289
- return true ;
290
- }
291
- }
292
- return false ;
335
+ private EquivalentAddressGroup toTarget (final ServiceInstance instance ) {
336
+ final String host = instance .getHost ();
337
+ final int port = getGrpcPort (instance );
338
+ final Attributes attributes = getAttributes (instance );
339
+ log .debug ("Found gRPC server {}:{} for {}" , host , port , getName ());
340
+ return new EquivalentAddressGroup (new InetSocketAddress (host , port ), attributes );
293
341
}
294
342
295
343
}
0 commit comments