|
26 | 26 | import java.util.Map; |
27 | 27 | import java.util.Objects; |
28 | 28 | import java.util.concurrent.ConcurrentHashMap; |
| 29 | +import java.util.concurrent.Executors; |
| 30 | +import java.util.concurrent.ScheduledExecutorService; |
| 31 | +import java.util.concurrent.TimeUnit; |
29 | 32 |
|
30 | 33 | import org.apache.servicecomb.http.client.task.AbstractTask; |
31 | 34 | import org.apache.servicecomb.http.client.task.Task; |
@@ -126,6 +129,7 @@ public void startDiscovery() { |
126 | 129 | if (!started) { |
127 | 130 | started = true; |
128 | 131 | startTask(new PullInstanceTask()); |
| 132 | + startCheckInstancesHealth(); |
129 | 133 | } |
130 | 134 | } |
131 | 135 |
|
@@ -228,16 +232,33 @@ public void execute() { |
228 | 232 | } |
229 | 233 |
|
230 | 234 | private synchronized void pullAllInstance() { |
231 | | - List<SubscriptionKey> failedInstances = new ArrayList<>(); |
232 | 235 | instancesCache.forEach((k, v) -> { |
233 | 236 | pullInstance(k, v, true); |
234 | | - v.instancesCache.removeIf(instance -> isInstanceUnavailable(instance.getServiceName(), instance.getEndpoints())); |
235 | | - if (v.instancesCache.isEmpty()) { |
236 | | - failedInstances.add(k); |
237 | | - } |
238 | 237 | }); |
239 | | - failedInstances.forEach(instancesCache::remove); |
240 | | - failedInstances.clear(); |
| 238 | + } |
| 239 | + |
| 240 | + private void startCheckInstancesHealth() { |
| 241 | + ScheduledExecutorService executor = |
| 242 | + Executors.newScheduledThreadPool(1, (t) -> new Thread(t, "instance-health-check")); |
| 243 | + executor.scheduleWithFixedDelay(new CheckInstancesHealthTask(), 0, pollInterval, TimeUnit.MILLISECONDS); |
| 244 | + } |
| 245 | + |
| 246 | + class CheckInstancesHealthTask implements Runnable { |
| 247 | + @Override |
| 248 | + public void run() { |
| 249 | + if (instancesCache.isEmpty()) { |
| 250 | + return; |
| 251 | + } |
| 252 | + List<SubscriptionKey> failedInstances = new ArrayList<>(); |
| 253 | + instancesCache.forEach((k, v) -> { |
| 254 | + v.instancesCache.removeIf(item -> isInstanceUnavailable(item.getServiceName(), item.getEndpoints())); |
| 255 | + if (v.instancesCache.isEmpty()) { |
| 256 | + failedInstances.add(k); |
| 257 | + } |
| 258 | + }); |
| 259 | + failedInstances.forEach(instancesCache::remove); |
| 260 | + failedInstances.clear(); |
| 261 | + } |
241 | 262 | } |
242 | 263 |
|
243 | 264 | private static String instanceToString(List<MicroserviceInstance> instances) { |
|
0 commit comments