20
20
import static com .google .common .base .Preconditions .checkNotNull ;
21
21
import static com .google .common .base .Preconditions .checkState ;
22
22
import static java .util .Objects .requireNonNull ;
23
+ import static net .devh .boot .grpc .common .util .GrpcUtils .CLOUD_DISCOVERY_METADATA_PORT ;
23
24
24
25
import java .net .InetSocketAddress ;
25
26
import java .util .List ;
26
27
import java .util .Map ;
27
28
import java .util .concurrent .Executor ;
28
29
import java .util .concurrent .atomic .AtomicReference ;
30
+ import java .util .function .Consumer ;
29
31
30
32
import org .springframework .cloud .client .ServiceInstance ;
31
33
import org .springframework .cloud .client .discovery .DiscoveryClient ;
40
42
import io .grpc .SynchronizationContext ;
41
43
import io .grpc .internal .SharedResourceHolder ;
42
44
import lombok .extern .slf4j .Slf4j ;
43
- import net .devh .boot .grpc .common .util .GrpcUtils ;
44
45
45
46
/**
46
47
* The DiscoveryClientNameResolver resolves the service hosts and their associated gRPC port using the channel's name
@@ -59,7 +60,7 @@ public class DiscoveryClientNameResolver extends NameResolver {
59
60
private final String name ;
60
61
private final DiscoveryClient client ;
61
62
private final SynchronizationContext syncContext ;
62
- private final Runnable externalCleaner ;
63
+ private final Consumer < DiscoveryClientNameResolver > shutdownHook ;
63
64
private final SharedResourceHolder .Resource <Executor > executorResource ;
64
65
private final boolean usingExecutorResource ;
65
66
@@ -78,27 +79,46 @@ public class DiscoveryClientNameResolver extends NameResolver {
78
79
* @param client The client used to look up the service addresses.
79
80
* @param args The name resolver args.
80
81
* @param executorResource The executor resource.
81
- * @param externalCleaner The optional cleaner used during {@link #shutdown()}
82
+ * @param shutdownHook The optional cleaner used during {@link #shutdown()}
82
83
*/
83
84
public DiscoveryClientNameResolver (final String name , final DiscoveryClient client , final Args args ,
84
- final SharedResourceHolder .Resource <Executor > executorResource , final Runnable externalCleaner ) {
85
+ final SharedResourceHolder .Resource <Executor > executorResource ,
86
+ final Consumer <DiscoveryClientNameResolver > shutdownHook ) {
85
87
this .name = name ;
86
88
this .client = client ;
87
89
this .syncContext = requireNonNull (args .getSynchronizationContext (), "syncContext" );
88
- this .externalCleaner = externalCleaner ;
90
+ this .shutdownHook = shutdownHook ;
89
91
this .executor = args .getOffloadExecutor ();
90
92
this .usingExecutorResource = this .executor == null ;
91
93
this .executorResource = executorResource ;
92
94
}
93
95
96
+ /**
97
+ * Gets the name of the service to get the instances of.
98
+ *
99
+ * @return The name associated with this resolver.
100
+ */
101
+ protected final String getName () {
102
+ return this .name ;
103
+ }
104
+
105
+ /**
106
+ * Checks whether this resolver is active. E.g. {@code #start} has been called, but not {@code #shutdown()}.
107
+ *
108
+ * @return True, if there is a listener attached. False, otherwise.
109
+ */
110
+ protected final boolean isActive () {
111
+ return this .listener != null ;
112
+ }
113
+
94
114
@ Override
95
115
public final String getServiceAuthority () {
96
116
return this .name ;
97
117
}
98
118
99
119
@ Override
100
120
public void start (final Listener2 listener ) {
101
- checkState (this . listener == null , "already started" );
121
+ checkState (! isActive () , "already started" );
102
122
if (this .usingExecutorResource ) {
103
123
this .executor = SharedResourceHolder .get (this .executorResource );
104
124
}
@@ -108,7 +128,7 @@ public void start(final Listener2 listener) {
108
128
109
129
@ Override
110
130
public void refresh () {
111
- checkState (this . listener != null , "not started" );
131
+ checkState (isActive () , "not started" );
112
132
resolve ();
113
133
}
114
134
@@ -120,19 +140,28 @@ public void refresh() {
120
140
*/
121
141
public void refreshFromExternal () {
122
142
this .syncContext .execute (() -> {
123
- if (this . listener != null ) {
143
+ if (isActive () ) {
124
144
resolve ();
125
145
}
126
146
});
127
147
}
128
148
149
+ /**
150
+ * Discovers matching service instances.
151
+ *
152
+ * @return A list of service instances to use.
153
+ */
154
+ private List <ServiceInstance > discoverServices () {
155
+ return this .client .getInstances (this .name );
156
+ }
157
+
129
158
private void resolve () {
130
159
log .debug ("Scheduled resolve for {}" , this .name );
131
160
if (this .resolving ) {
132
161
return ;
133
162
}
134
163
this .resolving = true ;
135
- this .executor .execute (new Resolve (this .listener , this . instanceList ));
164
+ this .executor .execute (new Resolve (this .listener ));
136
165
}
137
166
138
167
@ Override
@@ -142,8 +171,8 @@ public void shutdown() {
142
171
this .executor = SharedResourceHolder .release (this .executorResource , this .executor );
143
172
}
144
173
this .instanceList = Lists .newArrayList ();
145
- if (this .externalCleaner != null ) {
146
- this .externalCleaner . run ( );
174
+ if (this .shutdownHook != null ) {
175
+ this .shutdownHook . accept ( this );
147
176
}
148
177
}
149
178
@@ -158,17 +187,14 @@ public String toString() {
158
187
private final class Resolve implements Runnable {
159
188
160
189
private final Listener2 savedListener ;
161
- private final List <ServiceInstance > savedInstanceList ;
162
190
163
191
/**
164
192
* Creates a new Resolve that stores a snapshot of the relevant states of the resolver.
165
193
*
166
194
* @param listener The listener to send the results to.
167
- * @param instanceList The current server instance list.
168
195
*/
169
- Resolve (final Listener2 listener , final List < ServiceInstance > instanceList ) {
196
+ Resolve (final Listener2 listener ) {
170
197
this .savedListener = requireNonNull (listener , "listener" );
171
- this .savedInstanceList = requireNonNull (instanceList , "instanceList" );
172
198
}
173
199
174
200
@ Override
@@ -178,13 +204,13 @@ public void run() {
178
204
resultContainer .set (resolveInternal ());
179
205
} catch (final Exception e ) {
180
206
this .savedListener .onError (Status .UNAVAILABLE .withCause (e )
181
- .withDescription ("Failed to update server list for " + DiscoveryClientNameResolver . this . name ));
207
+ .withDescription ("Failed to update server list for " + getName () ));
182
208
resultContainer .set (Lists .newArrayList ());
183
209
} finally {
184
210
DiscoveryClientNameResolver .this .syncContext .execute (() -> {
185
211
DiscoveryClientNameResolver .this .resolving = false ;
186
212
final List <ServiceInstance > result = resultContainer .get ();
187
- if (result != KEEP_PREVIOUS && DiscoveryClientNameResolver . this . listener != null ) {
213
+ if (result != KEEP_PREVIOUS && isActive () ) {
188
214
DiscoveryClientNameResolver .this .instanceList = result ;
189
215
}
190
216
});
@@ -198,37 +224,36 @@ public void run() {
198
224
* should be used.
199
225
*/
200
226
private List <ServiceInstance > resolveInternal () {
201
- final String name = DiscoveryClientNameResolver .this .name ;
202
- final List <ServiceInstance > newInstanceList =
203
- DiscoveryClientNameResolver .this .client .getInstances (name );
204
- log .debug ("Got {} candidate servers for {}" , newInstanceList .size (), name );
227
+ final List <ServiceInstance > newInstanceList = discoverServices ();
228
+ log .debug ("Got {} candidate servers for {}" , newInstanceList .size (), getName ());
205
229
if (CollectionUtils .isEmpty (newInstanceList )) {
206
- log .error ("No servers found for {}" , name );
207
- this .savedListener .onError (Status .UNAVAILABLE .withDescription ("No servers found for " + name ));
230
+ log .error ("No servers found for {}" , getName ());
231
+ this .savedListener .onError (Status .UNAVAILABLE
232
+ .withDescription ("No servers found for " + getName ()));
208
233
return Lists .newArrayList ();
209
234
}
210
235
if (!needsToUpdateConnections (newInstanceList )) {
211
- log .debug ("Nothing has changed... skipping update for {}" , name );
236
+ log .debug ("Nothing has changed... skipping update for {}" , getName () );
212
237
return KEEP_PREVIOUS ;
213
238
}
214
- log .debug ("Ready to update server list for {}" , name );
239
+ log .debug ("Ready to update server list for {}" , getName () );
215
240
final List <EquivalentAddressGroup > targets = Lists .newArrayList ();
216
241
for (final ServiceInstance instance : newInstanceList ) {
217
242
final int port = getGRPCPort (instance );
218
- log .debug ("Found gRPC server {}:{} for {}" , instance .getHost (), port , name );
243
+ log .debug ("Found gRPC server {}:{} for {}" , instance .getHost (), port , getName () );
219
244
targets .add (new EquivalentAddressGroup (
220
245
new InetSocketAddress (instance .getHost (), port ), Attributes .EMPTY ));
221
246
}
222
247
if (targets .isEmpty ()) {
223
- log .error ("None of the servers for {} specified a gRPC port" , name );
248
+ log .error ("None of the servers for {} specified a gRPC port" , getName () );
224
249
this .savedListener .onError (Status .UNAVAILABLE
225
- .withDescription ("None of the servers for " + name + " specified a gRPC port" ));
250
+ .withDescription ("None of the servers for " + getName () + " specified a gRPC port" ));
226
251
return Lists .newArrayList ();
227
252
} else {
228
253
this .savedListener .onResult (ResolutionResult .newBuilder ()
229
254
.setAddresses (targets )
230
255
.build ());
231
- log .info ("Done updating server list for {}" , name );
256
+ log .info ("Done updating server list for {}" , getName () );
232
257
return newInstanceList ;
233
258
}
234
259
}
@@ -245,15 +270,14 @@ private int getGRPCPort(final ServiceInstance instance) {
245
270
if (metadata == null ) {
246
271
return instance .getPort ();
247
272
}
248
- String portString = metadata .get (GrpcUtils . CLOUD_DISCOVERY_METADATA_PORT );
273
+ String portString = metadata .get (CLOUD_DISCOVERY_METADATA_PORT );
249
274
if (portString == null ) {
250
275
portString = metadata .get (LEGACY_CLOUD_DISCOVERY_METADATA_PORT );
251
276
if (portString == null ) {
252
277
return instance .getPort ();
253
278
} else {
254
279
log .warn ("Found legacy grpc port metadata '{}' for client '{}' use '{}' instead" ,
255
- LEGACY_CLOUD_DISCOVERY_METADATA_PORT , DiscoveryClientNameResolver .this .name ,
256
- GrpcUtils .CLOUD_DISCOVERY_METADATA_PORT );
280
+ LEGACY_CLOUD_DISCOVERY_METADATA_PORT , getName (), CLOUD_DISCOVERY_METADATA_PORT );
257
281
}
258
282
}
259
283
try {
@@ -271,10 +295,10 @@ private int getGRPCPort(final ServiceInstance instance) {
271
295
* @return True, if the given instance list contains different entries than the stored ones.
272
296
*/
273
297
private boolean needsToUpdateConnections (final List <ServiceInstance > newInstanceList ) {
274
- if (this .savedInstanceList .size () != newInstanceList .size ()) {
298
+ if (DiscoveryClientNameResolver . this .instanceList .size () != newInstanceList .size ()) {
275
299
return true ;
276
300
}
277
- for (final ServiceInstance instance : this .savedInstanceList ) {
301
+ for (final ServiceInstance instance : DiscoveryClientNameResolver . this .instanceList ) {
278
302
final int port = getGRPCPort (instance );
279
303
boolean isSame = false ;
280
304
for (final ServiceInstance newInstance : newInstanceList ) {
0 commit comments