@@ -138,8 +138,6 @@ private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) {
138138 }
139139 }
140140
141-
142-
143141 private <T extends ResourceUpdate > void cancelWatcher (XdsWatcherBase <T > watcher ) {
144142 syncContext .throwIfNotInThisSynchronizationContext ();
145143
@@ -428,6 +426,129 @@ public String toString() {
428426 return logId .toString ();
429427 }
430428
429+ // Returns true if the watcher was added, false if it already exists
430+ private boolean addEdsWatcher (String edsServiceName , CdsWatcher parentContext ) {
431+ TypeWatchers <?> typeWatchers = resourceWatchers .get (XdsEndpointResource .getInstance ());
432+ if (typeWatchers == null || !typeWatchers .watchers .containsKey (edsServiceName )) {
433+ addWatcher (new EdsWatcher (edsServiceName , parentContext ));
434+ return true ;
435+ }
436+
437+ EdsWatcher watcher = (EdsWatcher ) typeWatchers .watchers .get (edsServiceName );
438+ watcher .addParentContext (parentContext ); // Is a set, so don't need to check for existence
439+ return false ;
440+ }
441+
442+ private void addClusterWatcher (String clusterName , Object parentContext , int depth ) {
443+ TypeWatchers <?> clusterWatchers = resourceWatchers .get (CLUSTER_RESOURCE );
444+ if (clusterWatchers != null ) {
445+ CdsWatcher watcher = (CdsWatcher ) clusterWatchers .watchers .get (clusterName );
446+ if (watcher != null ) {
447+ watcher .parentContexts .put (parentContext , depth );
448+ return ;
449+ }
450+ }
451+
452+ addWatcher (new CdsWatcher (clusterName , parentContext , depth ));
453+ }
454+
455+ private void updateRoutes (List <VirtualHost > virtualHosts , Object newParentContext ,
456+ VirtualHost oldVirtualHost , boolean sameParentContext ) {
457+ VirtualHost virtualHost =
458+ RoutingUtils .findVirtualHostForHostName (virtualHosts , dataPlaneAuthority );
459+ if (virtualHost == null ) {
460+ String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority ;
461+ logger .log (XdsLogger .XdsLogLevel .WARNING , error );
462+ cleanUpRoutes ();
463+ xdsConfigWatcher .onError (
464+ "xDS node ID:" + dataPlaneAuthority , Status .UNAVAILABLE .withDescription (error ));
465+ return ;
466+ }
467+
468+ Set <String > newClusters = getClusterNamesFromVirtualHost (virtualHost );
469+ Set <String > oldClusters = getClusterNamesFromVirtualHost (oldVirtualHost );
470+
471+ if (sameParentContext ) {
472+ // Calculate diffs.
473+ Set <String > addedClusters = Sets .difference (newClusters , oldClusters );
474+ Set <String > deletedClusters = Sets .difference (oldClusters , newClusters );
475+
476+ deletedClusters .forEach (watcher ->
477+ cancelClusterWatcherTree (getCluster (watcher ), newParentContext ));
478+ addedClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext , 1 ));
479+ } else {
480+ newClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext , 1 ));
481+ }
482+ }
483+
484+ private static Set <String > getClusterNamesFromVirtualHost (VirtualHost virtualHost ) {
485+ if (virtualHost == null ) {
486+ return Collections .emptySet ();
487+ }
488+
489+ // Get all cluster names to which requests can be routed through the virtual host.
490+ Set <String > clusters = new HashSet <>();
491+ for (VirtualHost .Route route : virtualHost .routes ()) {
492+ VirtualHost .Route .RouteAction action = route .routeAction ();
493+ if (action == null ) {
494+ continue ;
495+ }
496+ if (action .cluster () != null ) {
497+ clusters .add (action .cluster ());
498+ } else if (action .weightedClusters () != null ) {
499+ for (ClusterWeight weighedCluster : action .weightedClusters ()) {
500+ clusters .add (weighedCluster .name ());
501+ }
502+ }
503+ }
504+
505+ return clusters ;
506+ }
507+
508+ @ Nullable
509+ private VirtualHost getActiveVirtualHost () {
510+ TypeWatchers <?> rdsWatchers = resourceWatchers .get (XdsRouteConfigureResource .getInstance ());
511+ if (rdsWatchers == null ) {
512+ return null ;
513+ }
514+
515+ RdsWatcher activeRdsWatcher =
516+ (RdsWatcher ) rdsWatchers .watchers .values ().stream ().findFirst ().orElse (null );
517+ if (activeRdsWatcher == null || activeRdsWatcher .missingResult ()
518+ || !activeRdsWatcher .getData ().hasValue ()) {
519+ return null ;
520+ }
521+
522+ return RoutingUtils .findVirtualHostForHostName (
523+ activeRdsWatcher .getData ().getValue ().virtualHosts , dataPlaneAuthority );
524+ }
525+
526+ // Must be in SyncContext
527+ private void cleanUpRoutes () {
528+ // Remove RdsWatcher & CDS Watchers
529+ TypeWatchers <?> rdsResourceWatcher =
530+ resourceWatchers .get (XdsRouteConfigureResource .getInstance ());
531+ if (rdsResourceWatcher == null || rdsResourceWatcher .watchers .isEmpty ()) {
532+ return ;
533+ }
534+
535+ XdsWatcherBase <?> watcher = rdsResourceWatcher .watchers .values ().stream ().findFirst ().get ();
536+ cancelWatcher (watcher );
537+
538+ // Remove CdsWatchers pointed to by the RdsWatcher
539+ RdsWatcher rdsWatcher = (RdsWatcher ) watcher ;
540+ for (String cName : rdsWatcher .getCdsNames ()) {
541+ CdsWatcher cdsWatcher = getCluster (cName );
542+ if (cdsWatcher != null ) {
543+ cancelClusterWatcherTree (cdsWatcher , rdsWatcher );
544+ }
545+ }
546+ }
547+
548+ private CdsWatcher getCluster (String clusterName ) {
549+ return (CdsWatcher ) resourceWatchers .get (CLUSTER_RESOURCE ).watchers .get (clusterName );
550+ }
551+
431552 private static class TypeWatchers <T extends ResourceUpdate > {
432553 // Key is resource name
433554 final Map <String , XdsWatcherBase <T >> watchers = new HashMap <>();
@@ -720,32 +841,6 @@ public void onResourceDoesNotExist(String resourceName) {
720841 }
721842 }
722843
723- // Returns true if the watcher was added, false if it already exists
724- private boolean addEdsWatcher (String edsServiceName , CdsWatcher parentContext ) {
725- TypeWatchers <?> typeWatchers = resourceWatchers .get (XdsEndpointResource .getInstance ());
726- if (typeWatchers == null || !typeWatchers .watchers .containsKey (edsServiceName )) {
727- addWatcher (new EdsWatcher (edsServiceName , parentContext ));
728- return true ;
729- }
730-
731- EdsWatcher watcher = (EdsWatcher ) typeWatchers .watchers .get (edsServiceName );
732- watcher .addParentContext (parentContext ); // Is a set, so don't need to check for existence
733- return false ;
734- }
735-
736- private void addClusterWatcher (String clusterName , Object parentContext , int depth ) {
737- TypeWatchers <?> clusterWatchers = resourceWatchers .get (CLUSTER_RESOURCE );
738- if (clusterWatchers != null ) {
739- CdsWatcher watcher = (CdsWatcher ) clusterWatchers .watchers .get (clusterName );
740- if (watcher != null ) {
741- watcher .parentContexts .put (parentContext , depth );
742- return ;
743- }
744- }
745-
746- addWatcher (new CdsWatcher (clusterName , parentContext , depth ));
747- }
748-
749844 private class EdsWatcher extends XdsWatcherBase <XdsEndpointResource .EdsUpdate > {
750845 private final Set <CdsWatcher > parentContexts = new HashSet <>();
751846
@@ -770,101 +865,4 @@ void addParentContext(CdsWatcher parentContext) {
770865 parentContexts .add (checkNotNull (parentContext , "parentContext" ));
771866 }
772867 }
773-
774- private void updateRoutes (List <VirtualHost > virtualHosts , Object newParentContext ,
775- VirtualHost oldVirtualHost , boolean sameParentContext ) {
776- VirtualHost virtualHost =
777- RoutingUtils .findVirtualHostForHostName (virtualHosts , dataPlaneAuthority );
778- if (virtualHost == null ) {
779- String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority ;
780- logger .log (XdsLogger .XdsLogLevel .WARNING , error );
781- cleanUpRoutes ();
782- xdsConfigWatcher .onError (
783- "xDS node ID:" + dataPlaneAuthority , Status .UNAVAILABLE .withDescription (error ));
784- return ;
785- }
786-
787- Set <String > newClusters = getClusterNamesFromVirtualHost (virtualHost );
788- Set <String > oldClusters = getClusterNamesFromVirtualHost (oldVirtualHost );
789-
790- if (sameParentContext ) {
791- // Calculate diffs.
792- Set <String > addedClusters = Sets .difference (newClusters , oldClusters );
793- Set <String > deletedClusters = Sets .difference (oldClusters , newClusters );
794-
795- deletedClusters .forEach (watcher ->
796- cancelClusterWatcherTree (getCluster (watcher ), newParentContext ));
797- addedClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext , 1 ));
798- } else {
799- newClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext , 1 ));
800- }
801- }
802-
803- private static Set <String > getClusterNamesFromVirtualHost (VirtualHost virtualHost ) {
804- if (virtualHost == null ) {
805- return Collections .emptySet ();
806- }
807-
808- // Get all cluster names to which requests can be routed through the virtual host.
809- Set <String > clusters = new HashSet <>();
810- for (VirtualHost .Route route : virtualHost .routes ()) {
811- VirtualHost .Route .RouteAction action = route .routeAction ();
812- if (action == null ) {
813- continue ;
814- }
815- if (action .cluster () != null ) {
816- clusters .add (action .cluster ());
817- } else if (action .weightedClusters () != null ) {
818- for (ClusterWeight weighedCluster : action .weightedClusters ()) {
819- clusters .add (weighedCluster .name ());
820- }
821- }
822- }
823-
824- return clusters ;
825- }
826-
827- @ Nullable
828- private VirtualHost getActiveVirtualHost () {
829- TypeWatchers <?> rdsWatchers = resourceWatchers .get (XdsRouteConfigureResource .getInstance ());
830- if (rdsWatchers == null ) {
831- return null ;
832- }
833-
834- RdsWatcher activeRdsWatcher =
835- (RdsWatcher ) rdsWatchers .watchers .values ().stream ().findFirst ().orElse (null );
836- if (activeRdsWatcher == null || activeRdsWatcher .missingResult ()
837- || !activeRdsWatcher .getData ().hasValue ()) {
838- return null ;
839- }
840-
841- return RoutingUtils .findVirtualHostForHostName (
842- activeRdsWatcher .getData ().getValue ().virtualHosts , dataPlaneAuthority );
843- }
844-
845- // Must be in SyncContext
846- private void cleanUpRoutes () {
847- // Remove RdsWatcher & CDS Watchers
848- TypeWatchers <?> rdsResourceWatcher =
849- resourceWatchers .get (XdsRouteConfigureResource .getInstance ());
850- if (rdsResourceWatcher == null || rdsResourceWatcher .watchers .isEmpty ()) {
851- return ;
852- }
853-
854- XdsWatcherBase <?> watcher = rdsResourceWatcher .watchers .values ().stream ().findFirst ().get ();
855- cancelWatcher (watcher );
856-
857- // Remove CdsWatchers pointed to by the RdsWatcher
858- RdsWatcher rdsWatcher = (RdsWatcher ) watcher ;
859- for (String cName : rdsWatcher .getCdsNames ()) {
860- CdsWatcher cdsWatcher = getCluster (cName );
861- if (cdsWatcher != null ) {
862- cancelClusterWatcherTree (cdsWatcher , rdsWatcher );
863- }
864- }
865- }
866-
867- private CdsWatcher getCluster (String clusterName ) {
868- return (CdsWatcher ) resourceWatchers .get (CLUSTER_RESOURCE ).watchers .get (clusterName );
869- }
870868}
0 commit comments