|
29 | 29 | import io.grpc.StatusOr; |
30 | 30 | import io.grpc.SynchronizationContext; |
31 | 31 | import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; |
| 32 | +import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType; |
| 33 | +import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig; |
| 34 | +import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig; |
32 | 35 | import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; |
33 | 36 | import io.grpc.xds.client.XdsClient; |
34 | 37 | import io.grpc.xds.client.XdsClient.ResourceWatcher; |
35 | 38 | import io.grpc.xds.client.XdsLogger; |
36 | 39 | import io.grpc.xds.client.XdsResourceType; |
37 | 40 | import java.io.Closeable; |
38 | 41 | import java.io.IOException; |
| 42 | +import java.util.ArrayList; |
39 | 43 | import java.util.Collections; |
40 | 44 | import java.util.HashMap; |
41 | 45 | import java.util.HashSet; |
42 | 46 | import java.util.List; |
43 | 47 | import java.util.Map; |
44 | 48 | import java.util.Objects; |
45 | 49 | import java.util.Set; |
| 50 | +import java.util.stream.Collectors; |
46 | 51 | import javax.annotation.Nullable; |
47 | 52 |
|
48 | 53 | /** |
@@ -299,27 +304,123 @@ XdsConfig buildConfig() { |
299 | 304 | Map<String, ? extends XdsWatcherBase<?>> cdsWatchers = |
300 | 305 | resourceWatchers.get(CLUSTER_RESOURCE).watchers; |
301 | 306 |
|
302 | | - // Iterate CDS watchers |
303 | | - for (XdsWatcherBase<?> watcher : cdsWatchers.values()) { |
304 | | - CdsWatcher cdsWatcher = (CdsWatcher) watcher; |
305 | | - String clusterName = cdsWatcher.resourceName(); |
306 | | - StatusOr<XdsClusterResource.CdsUpdate> cdsUpdate = cdsWatcher.getData(); |
307 | | - if (cdsUpdate.hasValue()) { |
308 | | - XdsConfig.XdsClusterConfig clusterConfig; |
309 | | - String edsName = cdsUpdate.getValue().edsServiceName(); |
310 | | - EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(edsName); |
311 | | - |
312 | | - // Only EDS type clusters have endpoint data |
313 | | - StatusOr<XdsEndpointResource.EdsUpdate> data = |
314 | | - edsWatcher != null ? edsWatcher.getData() : null; |
315 | | - clusterConfig = new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate.getValue(), data); |
316 | | - builder.addCluster(clusterName, StatusOr.fromValue(clusterConfig)); |
| 307 | + // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters |
| 308 | + List<String> topLevelClusters = |
| 309 | + cdsWatchers.values().stream() |
| 310 | + .filter(XdsDependencyManager::isTopLevelCluster) |
| 311 | + .map(w -> w.resourceName()) |
| 312 | + .collect(Collectors.toList()); |
| 313 | + |
| 314 | + // Flatten multi-level aggregates into lists of leaf clusters |
| 315 | + Set<String> leafNames = |
| 316 | + addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters); |
| 317 | + |
| 318 | + addLeavesToBuilder(builder, edsWatchers, leafNames); |
| 319 | + |
| 320 | + return builder.build(); |
| 321 | + } |
| 322 | + |
| 323 | + private void addLeavesToBuilder(XdsConfig.XdsConfigBuilder builder, |
| 324 | + Map<String, ? extends XdsWatcherBase<?>> edsWatchers, |
| 325 | + Set<String> leafNames) { |
| 326 | + for (String clusterName : leafNames) { |
| 327 | + CdsWatcher cdsWatcher = getCluster(clusterName); |
| 328 | + StatusOr<XdsClusterResource.CdsUpdate> cdsUpdateOr = cdsWatcher.getData(); |
| 329 | + |
| 330 | + if (cdsUpdateOr.hasValue()) { |
| 331 | + XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue(); |
| 332 | + if (cdsUpdate.clusterType() == ClusterType.EDS) { |
| 333 | + EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName()); |
| 334 | + if (edsWatcher != null) { |
| 335 | + EndpointConfig child = new EndpointConfig(edsWatcher.getData()); |
| 336 | + builder.addCluster(clusterName, StatusOr.fromValue( |
| 337 | + new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); |
| 338 | + } else { |
| 339 | + builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( |
| 340 | + "EDS resource not found for cluster " + clusterName))); |
| 341 | + } |
| 342 | + } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) { |
| 343 | + // TODO get the resolved endpoint configuration |
| 344 | + builder.addCluster(clusterName, StatusOr.fromValue( |
| 345 | + new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, new EndpointConfig(null)))); |
| 346 | + } |
317 | 347 | } else { |
318 | | - builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdate.getStatus())); |
| 348 | + builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus())); |
319 | 349 | } |
320 | 350 | } |
| 351 | + } |
321 | 352 |
|
322 | | - return builder.build(); |
| 353 | + // Adds the top-level clusters to the builder and returns the leaf cluster names |
| 354 | + private Set<String> addTopLevelClustersToBuilder( |
| 355 | + XdsConfig.XdsConfigBuilder builder, Map<String, ? extends XdsWatcherBase<?>> edsWatchers, |
| 356 | + Map<String, ? extends XdsWatcherBase<?>> cdsWatchers, List<String> topLevelClusters) { |
| 357 | + |
| 358 | + Set<String> leafClusterNames = new HashSet<>(); |
| 359 | + for (String clusterName : topLevelClusters) { |
| 360 | + CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName); |
| 361 | + StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData(); |
| 362 | + if (!cdsWatcher.hasDataValue()) { |
| 363 | + builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus())); |
| 364 | + continue; |
| 365 | + } |
| 366 | + |
| 367 | + XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue(); |
| 368 | + XdsConfig.XdsClusterConfig.ClusterChild child; |
| 369 | + switch (cdsUpdate.clusterType()) { |
| 370 | + case AGGREGATE: |
| 371 | + List<String> leafNames = getLeafNames(cdsUpdate); |
| 372 | + child = new AggregateConfig(leafNames); |
| 373 | + leafClusterNames.addAll(leafNames); |
| 374 | + break; |
| 375 | + case EDS: |
| 376 | + EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName()); |
| 377 | + if (edsWatcher != null) { |
| 378 | + child = new EndpointConfig(edsWatcher.getData()); |
| 379 | + } else { |
| 380 | + builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( |
| 381 | + "EDS resource not found for cluster " + clusterName))); |
| 382 | + continue; |
| 383 | + } |
| 384 | + break; |
| 385 | + case LOGICAL_DNS: |
| 386 | + // TODO get the resolved endpoint configuration |
| 387 | + child = new EndpointConfig(null); |
| 388 | + break; |
| 389 | + default: |
| 390 | + throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType()); |
| 391 | + } |
| 392 | + builder.addCluster(clusterName, StatusOr.fromValue( |
| 393 | + new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); |
| 394 | + } |
| 395 | + |
| 396 | + return leafClusterNames; |
| 397 | + } |
| 398 | + |
| 399 | + private List<String> getLeafNames(XdsClusterResource.CdsUpdate cdsUpdate) { |
| 400 | + List<String> childNames = new ArrayList<>(); |
| 401 | + |
| 402 | + for (String cluster : cdsUpdate.prioritizedClusterNames()) { |
| 403 | + StatusOr<XdsClusterResource.CdsUpdate> data = getCluster(cluster).getData(); |
| 404 | + if (data == null || !data.hasValue() || data.getValue() == null) { |
| 405 | + childNames.add(cluster); |
| 406 | + continue; |
| 407 | + } |
| 408 | + if (data.getValue().clusterType() == ClusterType.AGGREGATE) { |
| 409 | + childNames.addAll(getLeafNames(data.getValue())); |
| 410 | + } else { |
| 411 | + childNames.add(cluster); |
| 412 | + } |
| 413 | + } |
| 414 | + |
| 415 | + return childNames; |
| 416 | + } |
| 417 | + |
| 418 | + private static boolean isTopLevelCluster(XdsWatcherBase<?> cdsWatcher) { |
| 419 | + if (! (cdsWatcher instanceof CdsWatcher)) { |
| 420 | + return false; |
| 421 | + } |
| 422 | + return ((CdsWatcher)cdsWatcher).parentContexts.values().stream() |
| 423 | + .anyMatch(depth -> depth == 1); |
323 | 424 | } |
324 | 425 |
|
325 | 426 | @Override |
|
0 commit comments