2222import static io .grpc .xds .client .XdsLogger .XdsLogLevel .DEBUG ;
2323
2424import com .google .common .annotations .VisibleForTesting ;
25+ import com .google .common .collect .ImmutableList ;
2526import com .google .common .collect .Sets ;
2627import io .grpc .InternalLogId ;
2728import io .grpc .NameResolver ;
4243import java .util .Collections ;
4344import java .util .HashMap ;
4445import java .util .HashSet ;
46+ import java .util .LinkedHashSet ;
4547import java .util .List ;
4648import java .util .Map ;
4749import java .util .Objects ;
4850import java .util .Set ;
4951import java .util .concurrent .ScheduledExecutorService ;
50- import java .util .stream .Collectors ;
5152import javax .annotation .Nullable ;
5253
5354/**
@@ -101,7 +102,7 @@ public Closeable subscribeToCluster(String clusterName) {
101102 subscription .closed = true ;
102103 return ; // shutdown() called
103104 }
104- addClusterWatcher (clusterName , subscription , 1 );
105+ addClusterWatcher (clusterName , subscription );
105106 });
106107
107108 return subscription ;
@@ -164,7 +165,7 @@ private static void throwIfParentContextsNotEmpty(XdsWatcherBase<?> watcher) {
164165 CdsWatcher cdsWatcher = (CdsWatcher ) watcher ;
165166 if (!cdsWatcher .parentContexts .isEmpty ()) {
166167 String msg = String .format ("CdsWatcher %s has parent contexts %s" ,
167- cdsWatcher .resourceName (), cdsWatcher .parentContexts . keySet () );
168+ cdsWatcher .resourceName (), cdsWatcher .parentContexts );
168169 throw new IllegalStateException (msg );
169170 }
170171 } else if (watcher instanceof EdsWatcher ) {
@@ -309,24 +310,14 @@ StatusOr<XdsConfig> buildUpdate() {
309310 }
310311 builder .setVirtualHost (activeVirtualHost );
311312
312- Map <String , XdsWatcherBase <XdsEndpointResource .EdsUpdate >> edsWatchers =
313- getWatchers (ENDPOINT_RESOURCE );
314- Map <String , XdsWatcherBase <XdsClusterResource .CdsUpdate >> cdsWatchers =
315- getWatchers (CLUSTER_RESOURCE );
316-
317- // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters
318- List <String > topLevelClusters =
319- cdsWatchers .values ().stream ()
320- .filter (XdsDependencyManager ::isTopLevelCluster )
321- .map (XdsWatcherBase <?>::resourceName )
322- .distinct ()
323- .collect (Collectors .toList ());
324-
325- // Flatten multi-level aggregates into lists of leaf clusters
326- Set <String > leafNames =
327- addTopLevelClustersToBuilder (builder , edsWatchers , cdsWatchers , topLevelClusters );
328-
329- addLeavesToBuilder (builder , edsWatchers , leafNames );
313+ Map <String , StatusOr <XdsConfig .XdsClusterConfig >> clusters = new HashMap <>();
314+ LinkedHashSet <String > ancestors = new LinkedHashSet <>();
315+ for (String cluster : getWatchers (CLUSTER_RESOURCE ).keySet ()) {
316+ addConfigForCluster (clusters , cluster , ancestors );
317+ }
318+ for (Map .Entry <String , StatusOr <XdsConfig .XdsClusterConfig >> me : clusters .entrySet ()) {
319+ builder .addCluster (me .getKey (), me .getValue ());
320+ }
330321
331322 return StatusOr .fromValue (builder .build ());
332323 }
@@ -344,111 +335,81 @@ private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
344335 return tTypeWatchers .watchers ;
345336 }
346337
347- private void addLeavesToBuilder (
348- XdsConfig .XdsConfigBuilder builder ,
349- Map <String , XdsWatcherBase <XdsEndpointResource .EdsUpdate >> edsWatchers ,
350- Set <String > leafNames ) {
351- for (String clusterName : leafNames ) {
352- CdsWatcher cdsWatcher = getCluster (clusterName );
353- StatusOr <XdsClusterResource .CdsUpdate > cdsUpdateOr = cdsWatcher .getData ();
338+ private void addConfigForCluster (
339+ Map <String , StatusOr <XdsConfig .XdsClusterConfig >> clusters ,
340+ String clusterName ,
341+ @ SuppressWarnings ("NonApiType" ) // Need order-preserving set for errors
342+ LinkedHashSet <String > ancestors ) {
343+ if (clusters .containsKey (clusterName )) {
344+ return ;
345+ }
346+ if (ancestors .contains (clusterName )) {
347+ clusters .put (clusterName , StatusOr .fromStatus (
348+ Status .INTERNAL .withDescription (
349+ "Aggregate cluster cycle detected: " + ancestors )));
350+ return ;
351+ }
352+ if (ancestors .size () > MAX_CLUSTER_RECURSION_DEPTH ) {
353+ clusters .put (clusterName , StatusOr .fromStatus (
354+ Status .INTERNAL .withDescription ("Recursion limit reached: " + ancestors )));
355+ return ;
356+ }
354357
355- if (!cdsUpdateOr .hasValue ()) {
356- builder .addCluster (clusterName , StatusOr .fromStatus (cdsUpdateOr .getStatus ()));
357- continue ;
358- }
358+ CdsWatcher cdsWatcher = (CdsWatcher ) getWatchers (CLUSTER_RESOURCE ).get (clusterName );
359+ StatusOr <XdsClusterResource .CdsUpdate > cdsWatcherDataOr = cdsWatcher .getData ();
360+ if (!cdsWatcherDataOr .hasValue ()) {
361+ clusters .put (clusterName , StatusOr .fromStatus (cdsWatcherDataOr .getStatus ()));
362+ return ;
363+ }
359364
360- XdsClusterResource .CdsUpdate cdsUpdate = cdsUpdateOr .getValue ();
361- if (cdsUpdate .clusterType () == ClusterType .EDS ) {
365+ XdsClusterResource .CdsUpdate cdsUpdate = cdsWatcherDataOr .getValue ();
366+ XdsConfig .XdsClusterConfig .ClusterChild child ;
367+ switch (cdsUpdate .clusterType ()) {
368+ case AGGREGATE :
369+ // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
370+ // preserves the priority across all aggregate clusters
371+ LinkedHashSet <String > leafNames = new LinkedHashSet <String >();
372+ ancestors .add (clusterName );
373+ for (String childCluster : cdsUpdate .prioritizedClusterNames ()) {
374+ addConfigForCluster (clusters , childCluster , ancestors );
375+ StatusOr <XdsConfig .XdsClusterConfig > config = clusters .get (childCluster );
376+ if (!config .hasValue ()) {
377+ clusters .put (clusterName , StatusOr .fromStatus (Status .INTERNAL .withDescription (
378+ "Unable to get leaves for " + clusterName + ": "
379+ + config .getStatus ().getDescription ())));
380+ return ;
381+ }
382+ XdsConfig .XdsClusterConfig .ClusterChild children = config .getValue ().getChildren ();
383+ if (children instanceof AggregateConfig ) {
384+ leafNames .addAll (((AggregateConfig ) children ).getLeafNames ());
385+ } else {
386+ leafNames .add (childCluster );
387+ }
388+ }
389+ ancestors .remove (clusterName );
390+
391+ child = new AggregateConfig (ImmutableList .copyOf (leafNames ));
392+ break ;
393+ case EDS :
362394 XdsWatcherBase <XdsEndpointResource .EdsUpdate > edsWatcher =
363- edsWatchers .get (cdsWatcher .getEdsServiceName ());
364- EndpointConfig child ;
395+ getWatchers (ENDPOINT_RESOURCE ).get (cdsWatcher .getEdsServiceName ());
365396 if (edsWatcher != null ) {
366397 child = new EndpointConfig (edsWatcher .getData ());
367398 } else {
368399 child = new EndpointConfig (StatusOr .fromStatus (Status .INTERNAL .withDescription (
369400 "EDS resource not found for cluster " + clusterName )));
370401 }
371- builder . addCluster ( clusterName , StatusOr . fromValue (
372- new XdsConfig . XdsClusterConfig ( clusterName , cdsUpdate , child )));
373- } else if ( cdsUpdate . clusterType () == ClusterType . LOGICAL_DNS ) {
374- builder . addCluster ( clusterName , StatusOr .fromStatus (
402+ break ;
403+ case LOGICAL_DNS :
404+ // TODO get the resolved endpoint configuration
405+ child = new EndpointConfig ( StatusOr .fromStatus (
375406 Status .INTERNAL .withDescription ("Logical DNS in dependency manager unsupported" )));
376- }
377- }
378- }
379-
380- // Adds the top-level clusters to the builder and returns the leaf cluster names
381- private Set <String > addTopLevelClustersToBuilder (
382- XdsConfig .XdsConfigBuilder builder ,
383- Map <String , XdsWatcherBase <XdsEndpointResource .EdsUpdate >> edsWatchers ,
384- Map <String , XdsWatcherBase <XdsClusterResource .CdsUpdate >> cdsWatchers ,
385- List <String > topLevelClusters ) {
386-
387- Set <String > leafClusterNames = new HashSet <>();
388- for (String clusterName : topLevelClusters ) {
389- CdsWatcher cdsWatcher = (CdsWatcher ) cdsWatchers .get (clusterName );
390- StatusOr <XdsClusterResource .CdsUpdate > cdsWatcherDataOr = cdsWatcher .getData ();
391- if (!cdsWatcher .hasDataValue ()) {
392- builder .addCluster (clusterName , StatusOr .fromStatus (cdsWatcherDataOr .getStatus ()));
393- continue ;
394- }
395-
396- XdsClusterResource .CdsUpdate cdsUpdate = cdsWatcherDataOr .getValue ();
397- XdsConfig .XdsClusterConfig .ClusterChild child ;
398- switch (cdsUpdate .clusterType ()) {
399- case AGGREGATE :
400- Set <String > leafNames = new HashSet <>();
401- addLeafNames (leafNames , cdsUpdate );
402- child = new AggregateConfig (leafNames );
403- leafClusterNames .addAll (leafNames );
404- break ;
405- case EDS :
406- XdsWatcherBase <XdsEndpointResource .EdsUpdate > edsWatcher =
407- edsWatchers .get (cdsWatcher .getEdsServiceName ());
408- if (edsWatcher != null ) {
409- child = new EndpointConfig (edsWatcher .getData ());
410- } else {
411- child = new EndpointConfig (StatusOr .fromStatus (Status .INTERNAL .withDescription (
412- "EDS resource not found for cluster " + clusterName )));
413- }
414- break ;
415- case LOGICAL_DNS :
416- // TODO get the resolved endpoint configuration
417- child = new EndpointConfig (StatusOr .fromStatus (
418- Status .INTERNAL .withDescription ("Logical DNS in dependency manager unsupported" )));
419- break ;
420- default :
421- throw new IllegalStateException ("Unexpected value: " + cdsUpdate .clusterType ());
422- }
423- builder .addCluster (clusterName , StatusOr .fromValue (
424- new XdsConfig .XdsClusterConfig (clusterName , cdsUpdate , child )));
425- }
426-
427- return leafClusterNames ;
428- }
429-
430- private void addLeafNames (Set <String > leafNames , XdsClusterResource .CdsUpdate cdsUpdate ) {
431- for (String cluster : cdsUpdate .prioritizedClusterNames ()) {
432- if (leafNames .contains (cluster )) {
433- continue ;
434- }
435- StatusOr <XdsClusterResource .CdsUpdate > data = getCluster (cluster ).getData ();
436- if (data == null || !data .hasValue () || data .getValue () == null ) {
437- leafNames .add (cluster );
438- continue ;
439- }
440- if (data .getValue ().clusterType () == ClusterType .AGGREGATE ) {
441- addLeafNames (leafNames , data .getValue ());
442- } else {
443- leafNames .add (cluster );
444- }
407+ break ;
408+ default :
409+ throw new IllegalStateException ("Unexpected value: " + cdsUpdate .clusterType ());
445410 }
446- }
447-
448- private static boolean isTopLevelCluster (
449- XdsWatcherBase <XdsClusterResource .CdsUpdate > cdsWatcher ) {
450- return ((CdsWatcher )cdsWatcher ).parentContexts .values ().stream ()
451- .anyMatch (depth -> depth == 1 );
411+ clusters .put (clusterName , StatusOr .fromValue (
412+ new XdsConfig .XdsClusterConfig (clusterName , cdsUpdate , child )));
452413 }
453414
454415 @ Override
@@ -467,14 +428,14 @@ private void addEdsWatcher(String edsServiceName, CdsWatcher parentContext) {
467428 addWatcher (new EdsWatcher (edsServiceName , parentContext ));
468429 }
469430
470- private void addClusterWatcher (String clusterName , Object parentContext , int depth ) {
431+ private void addClusterWatcher (String clusterName , Object parentContext ) {
471432 CdsWatcher watcher = (CdsWatcher ) getWatchers (CLUSTER_RESOURCE ).get (clusterName );
472433 if (watcher != null ) {
473- watcher .parentContexts .put (parentContext , depth );
434+ watcher .parentContexts .add (parentContext );
474435 return ;
475436 }
476437
477- addWatcher (new CdsWatcher (clusterName , parentContext , depth ));
438+ addWatcher (new CdsWatcher (clusterName , parentContext ));
478439 }
479440
480441 private void updateRoutes (List <VirtualHost > virtualHosts , Object newParentContext ,
@@ -494,9 +455,9 @@ private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContex
494455
495456 deletedClusters .forEach (watcher ->
496457 cancelClusterWatcherTree (getCluster (watcher ), newParentContext ));
497- addedClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext , 1 ));
458+ addedClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext ));
498459 } else {
499- newClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext , 1 ));
460+ newClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext ));
500461 }
501462 }
502463
@@ -805,11 +766,11 @@ public StatusOr<RdsUpdate> getRdsUpdate() {
805766 }
806767
807768 private class CdsWatcher extends XdsWatcherBase <XdsClusterResource .CdsUpdate > {
808- Map <Object , Integer > parentContexts = new HashMap <>();
769+ Set <Object > parentContexts = new HashSet <>();
809770
810- CdsWatcher (String resourceName , Object parentContext , int depth ) {
771+ CdsWatcher (String resourceName , Object parentContext ) {
811772 super (CLUSTER_RESOURCE , checkNotNull (resourceName , "resourceName" ));
812- this .parentContexts .put (checkNotNull (parentContext , "parentContext" ), depth );
773+ this .parentContexts .add (checkNotNull (parentContext , "parentContext" ));
813774 }
814775
815776 @ Override
@@ -829,14 +790,6 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
829790 break ;
830791 case AGGREGATE :
831792 Object parentContext = this ;
832- int depth = parentContexts .values ().stream ().max (Integer ::compare ).orElse (0 ) + 1 ;
833- if (depth > MAX_CLUSTER_RECURSION_DEPTH ) {
834- logger .log (XdsLogger .XdsLogLevel .WARNING ,
835- "Cluster recursion depth limit exceeded for cluster {0}" , resourceName ());
836- Status error = Status .UNAVAILABLE .withDescription (
837- "aggregate cluster graph exceeds max depth at " + resourceName () + nodeInfo ());
838- setDataAsStatus (error );
839- }
840793 if (hasDataValue ()) {
841794 Set <String > oldNames = getData ().getValue ().clusterType () == ClusterType .AGGREGATE
842795 ? new HashSet <>(getData ().getValue ().prioritizedClusterNames ())
@@ -847,21 +800,18 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
847800 deletedClusters .forEach ((cluster )
848801 -> cancelClusterWatcherTree (getCluster (cluster ), parentContext ));
849802
850- if (depth <= MAX_CLUSTER_RECURSION_DEPTH ) {
851- setData (update );
852- Set <String > addedClusters = Sets .difference (newNames , oldNames );
853- addedClusters .forEach ((cluster ) -> addClusterWatcher (cluster , parentContext , depth ));
854- }
855-
856- } else if (depth <= MAX_CLUSTER_RECURSION_DEPTH ) {
803+ setData (update );
804+ Set <String > addedClusters = Sets .difference (newNames , oldNames );
805+ addedClusters .forEach ((cluster ) -> addClusterWatcher (cluster , parentContext ));
806+ } else {
857807 setData (update );
858808 update .prioritizedClusterNames ()
859- .forEach (name -> addClusterWatcher (name , parentContext , depth ));
809+ .forEach (name -> addClusterWatcher (name , parentContext ));
860810 }
861811 break ;
862812 default :
863813 Status error = Status .UNAVAILABLE .withDescription (
864- "aggregate cluster graph exceeds max depth at " + resourceName () + nodeInfo ());
814+ "unknown cluster type in " + resourceName () + " " + update . clusterType ());
865815 setDataAsStatus (error );
866816 }
867817 maybePublishConfig ();
0 commit comments