1919import static com .google .common .base .Preconditions .checkNotNull ;
2020import static io .grpc .ConnectivityState .TRANSIENT_FAILURE ;
2121import static io .grpc .xds .XdsLbPolicies .CLUSTER_RESOLVER_POLICY_NAME ;
22+ import static io .grpc .xds .XdsLbPolicies .PRIORITY_POLICY_NAME ;
23+ import static java .util .Objects .requireNonNull ;
2224
2325import com .google .common .annotations .VisibleForTesting ;
2426import com .google .common .collect .ImmutableList ;
2527import com .google .errorprone .annotations .CheckReturnValue ;
2628import io .grpc .InternalLogId ;
2729import io .grpc .LoadBalancer ;
30+ import io .grpc .LoadBalancerProvider ;
2831import io .grpc .LoadBalancerRegistry ;
2932import io .grpc .NameResolver ;
3033import io .grpc .Status ;
3134import io .grpc .StatusOr ;
35+ import io .grpc .xds .PriorityLoadBalancerProvider ;
36+ import io .grpc .xds .PriorityLoadBalancerProvider .PriorityLbConfig .PriorityChildConfig ;
3237import io .grpc .util .GracefulSwitchLoadBalancer ;
3338import io .grpc .xds .CdsLoadBalancerProvider .CdsConfig ;
3439import io .grpc .xds .ClusterResolverLoadBalancerProvider .ClusterResolverConfig ;
4449import java .util .ArrayList ;
4550import java .util .Arrays ;
4651import java .util .Collections ;
52+ import java .util .HashMap ;
4753import java .util .List ;
54+ import java .util .Map ;
55+ import java .util .Objects ;
4856
4957/**
5058 * Load balancer for cds_experimental LB policy. One instance per top-level cluster.
@@ -91,7 +99,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
9199 if (clusterSubscription == null ) {
92100 // Should be impossible, because XdsDependencyManager wouldn't have generated this
93101 return fail (Status .INTERNAL .withDescription (
94- errorPrefix () + "Unable to find non-dynamic root cluster" ));
102+ errorPrefix () + "Unable to find non-dynamic cluster" ));
95103 }
96104 // The dynamic cluster must not have loaded yet
97105 return Status .OK ;
@@ -100,64 +108,9 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
100108 return fail (clusterConfigOr .getStatus ());
101109 }
102110 XdsClusterConfig clusterConfig = clusterConfigOr .getValue ();
103- List <String > leafNames ;
104- if (clusterConfig .getChildren () instanceof AggregateConfig ) {
105- leafNames = ((AggregateConfig ) clusterConfig .getChildren ()).getLeafNames ();
106- } else if (clusterConfig .getChildren () instanceof EndpointConfig ) {
107- leafNames = ImmutableList .of (clusterName );
108- } else {
109- return fail (Status .INTERNAL .withDescription (
110- errorPrefix () + "Unexpected cluster children type: "
111- + clusterConfig .getChildren ().getClass ()));
112- }
113- if (leafNames .isEmpty ()) {
114- // Should be impossible, because XdsClusterResource validated this
115- return fail (Status .UNAVAILABLE .withDescription (
116- errorPrefix () + "Zero leaf clusters for root cluster " + clusterName ));
117- }
118-
119- Status noneFoundError = Status .INTERNAL
120- .withDescription (errorPrefix () + "No leaves and no error; this is a bug" );
121- List <DiscoveryMechanism > instances = new ArrayList <>();
122- for (String leafName : leafNames ) {
123- StatusOr <XdsClusterConfig > leafConfigOr = xdsConfig .getClusters ().get (leafName );
124- if (!leafConfigOr .hasValue ()) {
125- noneFoundError = leafConfigOr .getStatus ();
126- continue ;
127- }
128- if (!(leafConfigOr .getValue ().getChildren () instanceof EndpointConfig )) {
129- noneFoundError = Status .INTERNAL .withDescription (
130- errorPrefix () + "Unexpected child " + leafName + " cluster children type: "
131- + leafConfigOr .getValue ().getChildren ().getClass ());
132- continue ;
133- }
134- CdsUpdate result = leafConfigOr .getValue ().getClusterResource ();
135- DiscoveryMechanism instance ;
136- if (result .clusterType () == ClusterType .EDS ) {
137- instance = DiscoveryMechanism .forEds (
138- leafName ,
139- result .edsServiceName (),
140- result .lrsServerInfo (),
141- result .maxConcurrentRequests (),
142- result .upstreamTlsContext (),
143- result .filterMetadata (),
144- result .outlierDetection ());
145- } else {
146- instance = DiscoveryMechanism .forLogicalDns (
147- leafName ,
148- result .dnsHostName (),
149- result .lrsServerInfo (),
150- result .maxConcurrentRequests (),
151- result .upstreamTlsContext (),
152- result .filterMetadata ());
153- }
154- instances .add (instance );
155- }
156- if (instances .isEmpty ()) {
157- return fail (noneFoundError );
158- }
159111
160112 NameResolver .ConfigOrError configOrError ;
113+ Object childConfig ;
161114 if (clusterConfig .getChildren () instanceof EndpointConfig ) {
162115 // The LB policy config is provided in service_config.proto/JSON format.
163116 configOrError =
@@ -168,19 +121,64 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
168121 return fail (Status .INTERNAL .withDescription (
169122 errorPrefix () + "Unable to parse the LB config: " + configOrError .getError ()));
170123 }
124+ CdsUpdate result = clusterConfig .getClusterResource ();
125+ DiscoveryMechanism instance ;
126+ if (result .clusterType () == ClusterType .EDS ) {
127+ instance = DiscoveryMechanism .forEds (
128+ clusterName ,
129+ result .edsServiceName (),
130+ result .lrsServerInfo (),
131+ result .maxConcurrentRequests (),
132+ result .upstreamTlsContext (),
133+ result .filterMetadata (),
134+ result .outlierDetection ());
135+ } else {
136+ instance = DiscoveryMechanism .forLogicalDns (
137+ clusterName ,
138+ result .dnsHostName (),
139+ result .lrsServerInfo (),
140+ result .maxConcurrentRequests (),
141+ result .upstreamTlsContext (),
142+ result .filterMetadata ());
143+ }
144+ childConfig = new ClusterResolverConfig (
145+ instance ,
146+ configOrError .getConfig (),
147+ clusterConfig .getClusterResource ().isHttp11ProxyAvailable ());
148+ if (childLb == null ) {
149+ childLb = lbRegistry .getProvider (CLUSTER_RESOLVER_POLICY_NAME ).newLoadBalancer (helper );
150+ }
151+ } else if (clusterConfig .getChildren () instanceof AggregateConfig ) {
152+ LoadBalancerProvider priorityLbProvider = lbRegistry .getProvider (PRIORITY_POLICY_NAME );
153+ if (childLb == null ) {
154+ childLb = priorityLbProvider .newLoadBalancer (helper );
155+ }
156+ Map <String , PriorityChildConfig > priorityChildConfigs = new HashMap <>();
157+ for (String childCluster : requireNonNull (clusterConfig .getClusterResource ().prioritizedClusterNames ())) {
158+ priorityChildConfigs .put (childCluster , new PriorityChildConfig (
159+ GracefulSwitchLoadBalancer .createLoadBalancingPolicyConfig (
160+ priorityLbProvider , getCdsPolicyConfig (childCluster )), false ));
161+ }
162+ childConfig = new PriorityLoadBalancerProvider .PriorityLbConfig (
163+ Collections .unmodifiableMap (priorityChildConfigs ),
164+ Collections .unmodifiableList (
165+ requireNonNull (clusterConfig .getClusterResource ().prioritizedClusterNames ())));
171166 } else {
172-
167+ return fail (Status .INTERNAL .withDescription (
168+ errorPrefix () + "Unexpected cluster children type: "
169+ + clusterConfig .getChildren ().getClass ()));
173170 }
174171
175- ClusterResolverConfig config = new ClusterResolverConfig (
176- Collections .unmodifiableList (instances ),
177- configOrError .getConfig (),
178- clusterConfig .getClusterResource ().isHttp11ProxyAvailable ());
179- if (childLb == null ) {
180- childLb = lbRegistry .getProvider (CLUSTER_RESOLVER_POLICY_NAME ).newLoadBalancer (helper );
181- }
182172 return childLb .acceptResolvedAddresses (
183- resolvedAddresses .toBuilder ().setLoadBalancingPolicyConfig (config ).build ());
173+ resolvedAddresses .toBuilder ().setLoadBalancingPolicyConfig (childConfig ).build ());
174+ }
175+
176+ private String getCdsPolicyConfig (String cluster ) {
177+ return String .format ("{\n " +
178+ " \" cds_experimental\" : {\n " +
179+ " \" cluster\" : \" %s\" \n " +
180+ " }\n " +
181+ " }\n " , cluster );
184182 }
185183
186184 @ Override
0 commit comments