1818package org .apache .servicecomb .service .center .client ;
1919
2020import java .io .IOException ;
21+ import java .net .InetSocketAddress ;
22+ import java .net .Socket ;
2123import java .util .ArrayList ;
2224import java .util .Collections ;
2325import java .util .List ;
@@ -153,13 +155,12 @@ public void onPullInstanceEvent(PullInstanceEvent event) {
153155 startTask (new PullInstanceOnceTask ());
154156 }
155157
156- private List < SubscriptionKey > pullInstance (SubscriptionKey k , SubscriptionValue v , boolean sendChangedEvent ) {
158+ private void pullInstance (SubscriptionKey k , SubscriptionValue v , boolean sendChangedEvent ) {
157159 if (myselfServiceId == null ) {
158160 // registration not ready
159- return Collections . emptyList () ;
161+ return ;
160162 }
161163
162- List <SubscriptionKey > failedKeys = new ArrayList <>();
163164 try {
164165 FindMicroserviceInstancesResponse instancesResponse = serviceCenterClient
165166 .findMicroserviceInstance (myselfServiceId , k .appId , k .serviceName , ALL_VERSION , v .revision );
@@ -186,17 +187,9 @@ private List<SubscriptionKey> pullInstance(SubscriptionKey k, SubscriptionValue
186187 }
187188 }
188189 } catch (Exception e ) {
189- if (!(e .getCause () instanceof IOException )) {
190- // for IOException, do not remove cache, or when service center
191- // not available, invocation between microservices will fail.
192- failedKeys .add (k );
193- LOGGER .error ("find service {}#{} instance failed and remove local cache." , k .appId , k .serviceName , e );
194- } else {
195- LOGGER .warn ("find service {}#{} instance failed, remaining local instances cache, cause message: {}" ,
196- k .appId , k .serviceName , e .getMessage ());
197- }
190+ LOGGER .warn ("find service {}#{} instance failed, remaining local instances cache [{}], cause message: {}" ,
191+ k .appId , k .serviceName , instanceToString (v .instancesCache ), e .getMessage ());
198192 }
199- return failedKeys ;
200193 }
201194
202195 private void setMicroserviceInfo (List <MicroserviceInstance > instances ) {
@@ -236,10 +229,13 @@ public void execute() {
236229
237230 private synchronized void pullAllInstance () {
238231 List <SubscriptionKey > failedInstances = new ArrayList <>();
239- instancesCache .forEach ((k , v ) -> failedInstances .addAll (pullInstance (k , v , true )));
240- if (failedInstances .isEmpty ()) {
241- return ;
242- }
232+ instancesCache .forEach ((k , v ) -> {
233+ pullInstance (k , v , true );
234+ v .instancesCache .removeIf (instance -> isInstanceUnavailable (instance .getServiceName (), instance .getEndpoints ()));
235+ if (v .instancesCache .isEmpty ()) {
236+ failedInstances .add (k );
237+ }
238+ });
243239 failedInstances .forEach (instancesCache ::remove );
244240 failedInstances .clear ();
245241 }
@@ -261,4 +257,32 @@ private static String instanceToString(List<MicroserviceInstance> instances) {
261257 sb .append ("#" );
262258 return sb .toString ();
263259 }
260+
261+ protected boolean isInstanceUnavailable (String serviceName , List <String > endpoints ) {
262+ for (String endpoint : endpoints ) {
263+ String [] hostPort = getHostPort (endpoint );
264+ if (hostPort == null ) {
265+ continue ;
266+ }
267+ for (int k = 0 ; k < 3 ; k ++) {
268+ try (Socket s = new Socket ()) {
269+ s .connect (new InetSocketAddress (hostPort [0 ], Integer .parseInt (hostPort [1 ])), 3000 );
270+ return false ;
271+ } catch (IOException e ) {
272+ LOGGER .warn ("telnet endpoint [{}] failed, It will be try again." , endpoint );
273+ }
274+ }
275+ LOGGER .warn ("telnet three times failed, remove service [{}] endpoint [{}]." , serviceName , endpoint );
276+ return true ;
277+ }
278+ return true ;
279+ }
280+
281+ private String [] getHostPort (String endpoint ) {
282+ String hostPort = endpoint .substring ("rest://" .length ());
283+ if (hostPort .split (":" ).length == 2 ) {
284+ return hostPort .split (":" );
285+ }
286+ return null ;
287+ }
264288}
0 commit comments