Skip to content

Commit 7e982e4

Browse files
Xds: Aggregate cluster fixes (A75) (#12186)
Instead of representing an aggregate cluster as a single cluster whose priorities come from different underlying clusters, represent an aggregate cluster as an instance of a priority LB policy where each child is a cds LB policy for the underlying cluster.
1 parent c3ef1ab commit 7e982e4

File tree

6 files changed

+326
-403
lines changed

6 files changed

+326
-403
lines changed

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

Lines changed: 54 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
2020
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21+
import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME;
2122
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
23+
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
2224

23-
import com.google.common.annotations.VisibleForTesting;
24-
import com.google.common.collect.ImmutableList;
2525
import com.google.errorprone.annotations.CheckReturnValue;
2626
import io.grpc.InternalLogId;
2727
import io.grpc.LoadBalancer;
@@ -33,6 +33,7 @@
3333
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
3434
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
3535
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
36+
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
3637
import io.grpc.xds.XdsClusterResource.CdsUpdate;
3738
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
3839
import io.grpc.xds.XdsConfig.Subscription;
@@ -41,10 +42,11 @@
4142
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
4243
import io.grpc.xds.client.XdsLogger;
4344
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
44-
import java.util.ArrayList;
4545
import java.util.Arrays;
4646
import java.util.Collections;
47+
import java.util.HashMap;
4748
import java.util.List;
49+
import java.util.Map;
4850

4951
/**
5052
* Load balancer for cds_experimental LB policy. One instance per top-level cluster.
@@ -55,19 +57,15 @@ final class CdsLoadBalancer2 extends LoadBalancer {
5557
private final XdsLogger logger;
5658
private final Helper helper;
5759
private final LoadBalancerRegistry lbRegistry;
60+
private GracefulSwitchLoadBalancer delegate;
5861
// Following fields are effectively final.
5962
private String clusterName;
6063
private Subscription clusterSubscription;
61-
private LoadBalancer childLb;
6264

63-
CdsLoadBalancer2(Helper helper) {
64-
this(helper, LoadBalancerRegistry.getDefaultRegistry());
65-
}
66-
67-
@VisibleForTesting
6865
CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
6966
this.helper = checkNotNull(helper, "helper");
7067
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
68+
this.delegate = new GracefulSwitchLoadBalancer(helper);
7169
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
7270
logger.log(XdsLogLevel.INFO, "Created");
7371
}
@@ -91,7 +89,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
9189
if (clusterSubscription == null) {
9290
// Should be impossible, because XdsDependencyManager wouldn't have generated this
9391
return fail(Status.INTERNAL.withDescription(
94-
errorPrefix() + "Unable to find non-dynamic root cluster"));
92+
errorPrefix() + "Unable to find non-dynamic cluster"));
9593
}
9694
// The dynamic cluster must not have loaded yet
9795
return Status.OK;
@@ -100,42 +98,25 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
10098
return fail(clusterConfigOr.getStatus());
10199
}
102100
XdsClusterConfig clusterConfig = clusterConfigOr.getValue();
103-
List<String> leafNames;
104-
if (clusterConfig.getChildren() instanceof AggregateConfig) {
105-
leafNames = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
106-
} else if (clusterConfig.getChildren() instanceof EndpointConfig) {
107-
leafNames = ImmutableList.of(clusterName);
108-
} else {
109-
return fail(Status.INTERNAL.withDescription(
110-
errorPrefix() + "Unexpected cluster children type: "
111-
+ clusterConfig.getChildren().getClass()));
112-
}
113-
if (leafNames.isEmpty()) {
114-
// Should be impossible, because XdsClusterResource validated this
115-
return fail(Status.UNAVAILABLE.withDescription(
116-
errorPrefix() + "Zero leaf clusters for root cluster " + clusterName));
117-
}
118101

119-
Status noneFoundError = Status.INTERNAL
120-
.withDescription(errorPrefix() + "No leaves and no error; this is a bug");
121-
List<DiscoveryMechanism> instances = new ArrayList<>();
122-
for (String leafName : leafNames) {
123-
StatusOr<XdsClusterConfig> leafConfigOr = xdsConfig.getClusters().get(leafName);
124-
if (!leafConfigOr.hasValue()) {
125-
noneFoundError = leafConfigOr.getStatus();
126-
continue;
127-
}
128-
if (!(leafConfigOr.getValue().getChildren() instanceof EndpointConfig)) {
129-
noneFoundError = Status.INTERNAL.withDescription(
130-
errorPrefix() + "Unexpected child " + leafName + " cluster children type: "
131-
+ leafConfigOr.getValue().getChildren().getClass());
132-
continue;
102+
NameResolver.ConfigOrError configOrError;
103+
Object gracefulConfig;
104+
if (clusterConfig.getChildren() instanceof EndpointConfig) {
105+
// The LB policy config is provided in service_config.proto/JSON format.
106+
configOrError =
107+
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
108+
Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()),
109+
lbRegistry);
110+
if (configOrError.getError() != null) {
111+
// Should be impossible, because XdsClusterResource validated this
112+
return fail(Status.INTERNAL.withDescription(
113+
errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
133114
}
134-
CdsUpdate result = leafConfigOr.getValue().getClusterResource();
115+
CdsUpdate result = clusterConfig.getClusterResource();
135116
DiscoveryMechanism instance;
136117
if (result.clusterType() == ClusterType.EDS) {
137118
instance = DiscoveryMechanism.forEds(
138-
leafName,
119+
clusterName,
139120
result.edsServiceName(),
140121
result.lrsServerInfo(),
141122
result.maxConcurrentRequests(),
@@ -144,45 +125,49 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
144125
result.outlierDetection());
145126
} else {
146127
instance = DiscoveryMechanism.forLogicalDns(
147-
leafName,
128+
clusterName,
148129
result.dnsHostName(),
149130
result.lrsServerInfo(),
150131
result.maxConcurrentRequests(),
151132
result.upstreamTlsContext(),
152133
result.filterMetadata());
153134
}
154-
instances.add(instance);
155-
}
156-
if (instances.isEmpty()) {
157-
return fail(noneFoundError);
158-
}
159-
160-
// The LB policy config is provided in service_config.proto/JSON format.
161-
NameResolver.ConfigOrError configOrError =
162-
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
163-
Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()), lbRegistry);
164-
if (configOrError.getError() != null) {
165-
// Should be impossible, because XdsClusterResource validated this
135+
gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
136+
lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME),
137+
new ClusterResolverConfig(
138+
instance,
139+
configOrError.getConfig(),
140+
clusterConfig.getClusterResource().isHttp11ProxyAvailable()));
141+
} else if (clusterConfig.getChildren() instanceof AggregateConfig) {
142+
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
143+
List<String> leafClusters = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
144+
for (String childCluster: leafClusters) {
145+
priorityChildConfigs.put(childCluster,
146+
new PriorityChildConfig(
147+
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
148+
lbRegistry.getProvider(CDS_POLICY_NAME),
149+
new CdsConfig(childCluster)),
150+
false));
151+
}
152+
gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
153+
lbRegistry.getProvider(PRIORITY_POLICY_NAME),
154+
new PriorityLoadBalancerProvider.PriorityLbConfig(
155+
Collections.unmodifiableMap(priorityChildConfigs), leafClusters));
156+
} else {
166157
return fail(Status.INTERNAL.withDescription(
167-
errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
158+
errorPrefix() + "Unexpected cluster children type: "
159+
+ clusterConfig.getChildren().getClass()));
168160
}
169161

170-
ClusterResolverConfig config = new ClusterResolverConfig(
171-
Collections.unmodifiableList(instances),
172-
configOrError.getConfig(),
173-
clusterConfig.getClusterResource().isHttp11ProxyAvailable());
174-
if (childLb == null) {
175-
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
176-
}
177-
return childLb.acceptResolvedAddresses(
178-
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
162+
return delegate.acceptResolvedAddresses(
163+
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build());
179164
}
180165

181166
@Override
182167
public void handleNameResolutionError(Status error) {
183168
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
184-
if (childLb != null) {
185-
childLb.handleNameResolutionError(error);
169+
if (delegate != null) {
170+
delegate.handleNameResolutionError(error);
186171
} else {
187172
helper.updateBalancingState(
188173
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
@@ -192,10 +177,8 @@ public void handleNameResolutionError(Status error) {
192177
@Override
193178
public void shutdown() {
194179
logger.log(XdsLogLevel.INFO, "Shutdown");
195-
if (childLb != null) {
196-
childLb.shutdown();
197-
childLb = null;
198-
}
180+
delegate.shutdown();
181+
delegate = new GracefulSwitchLoadBalancer(helper);
199182
if (clusterSubscription != null) {
200183
clusterSubscription.close();
201184
clusterSubscription = null;
@@ -204,10 +187,7 @@ public void shutdown() {
204187

205188
@CheckReturnValue // don't forget to return up the stack after the fail call
206189
private Status fail(Status error) {
207-
if (childLb != null) {
208-
childLb.shutdown();
209-
childLb = null;
210-
}
190+
delegate.shutdown();
211191
helper.updateBalancingState(
212192
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
213193
return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter

xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.grpc.LoadBalancer;
2424
import io.grpc.LoadBalancer.Helper;
2525
import io.grpc.LoadBalancerProvider;
26+
import io.grpc.LoadBalancerRegistry;
2627
import io.grpc.NameResolver.ConfigOrError;
2728
import io.grpc.Status;
2829
import io.grpc.internal.JsonUtil;
@@ -51,9 +52,24 @@ public String getPolicyName() {
5152
return XdsLbPolicies.CDS_POLICY_NAME;
5253
}
5354

55+
private final LoadBalancerRegistry loadBalancerRegistry;
56+
57+
public CdsLoadBalancerProvider() {
58+
this.loadBalancerRegistry = null;
59+
}
60+
61+
public CdsLoadBalancerProvider(LoadBalancerRegistry loadBalancerRegistry) {
62+
this.loadBalancerRegistry = loadBalancerRegistry;
63+
}
64+
5465
@Override
5566
public LoadBalancer newLoadBalancer(Helper helper) {
56-
return new CdsLoadBalancer2(helper);
67+
LoadBalancerRegistry loadBalancerRegistry = this.loadBalancerRegistry;
68+
if (loadBalancerRegistry == null) {
69+
loadBalancerRegistry = LoadBalancerRegistry.getDefaultRegistry();
70+
}
71+
72+
return new CdsLoadBalancer2(helper, loadBalancerRegistry);
5773
}
5874

5975
@Override

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

Lines changed: 31 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ public LoadBalancer newLoadBalancer(Helper helper) {
168168
*/
169169
private final class ClusterResolverLbState extends LoadBalancer {
170170
private final Helper helper;
171-
private final List<String> clusters = new ArrayList<>();
172-
private final Map<String, ClusterState> clusterStates = new HashMap<>();
171+
private ClusterState clusterState;
172+
private String cluster;
173173
private Object endpointLbConfig;
174174
private ResolvedAddresses resolvedAddresses;
175175
private LoadBalancer childLb;
@@ -185,21 +185,18 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
185185
ClusterResolverConfig config =
186186
(ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
187187
endpointLbConfig = config.lbConfig;
188-
for (DiscoveryMechanism instance : config.discoveryMechanisms) {
189-
clusters.add(instance.cluster);
190-
ClusterState state;
191-
if (instance.type == DiscoveryMechanism.Type.EDS) {
192-
state = new EdsClusterState(instance.cluster, instance.edsServiceName,
193-
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
194-
instance.filterMetadata, instance.outlierDetection);
195-
} else { // logical DNS
196-
state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
197-
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
198-
instance.filterMetadata);
199-
}
200-
clusterStates.put(instance.cluster, state);
201-
state.start();
202-
}
188+
DiscoveryMechanism instance = config.discoveryMechanism;
189+
cluster = instance.cluster;
190+
if (instance.type == DiscoveryMechanism.Type.EDS) {
191+
clusterState = new EdsClusterState(instance.cluster, instance.edsServiceName,
192+
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
193+
instance.filterMetadata, instance.outlierDetection);
194+
} else { // logical DNS
195+
clusterState = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
196+
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
197+
instance.filterMetadata);
198+
}
199+
clusterState.start();
203200
return Status.OK;
204201
}
205202

@@ -215,9 +212,7 @@ public void handleNameResolutionError(Status error) {
215212

216213
@Override
217214
public void shutdown() {
218-
for (ClusterState state : clusterStates.values()) {
219-
state.shutdown();
220-
}
215+
clusterState.shutdown();
221216
if (childLb != null) {
222217
childLb.shutdown();
223218
}
@@ -229,24 +224,21 @@ private void handleEndpointResourceUpdate() {
229224
List<String> priorities = new ArrayList<>(); // totally ordered priority list
230225

231226
Status endpointNotFound = Status.OK;
232-
for (String cluster : clusters) {
233-
ClusterState state = clusterStates.get(cluster);
234-
// Propagate endpoints to the child LB policy only after all clusters have been resolved.
235-
if (!state.resolved && state.status.isOk()) {
236-
return;
237-
}
238-
if (state.result != null) {
239-
addresses.addAll(state.result.addresses);
240-
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
241-
priorities.addAll(state.result.priorities);
242-
} else {
243-
endpointNotFound = state.status;
244-
}
227+
// Propagate endpoints to the child LB policy only after all clusters have been resolved.
228+
if (!clusterState.resolved && clusterState.status.isOk()) {
229+
return;
230+
}
231+
if (clusterState.result != null) {
232+
addresses.addAll(clusterState.result.addresses);
233+
priorityChildConfigs.putAll(clusterState.result.priorityChildConfigs);
234+
priorities.addAll(clusterState.result.priorities);
235+
} else {
236+
endpointNotFound = clusterState.status;
245237
}
246238
if (addresses.isEmpty()) {
247239
if (endpointNotFound.isOk()) {
248240
endpointNotFound = Status.UNAVAILABLE.withDescription(
249-
"No usable endpoint from cluster(s): " + clusters);
241+
"No usable endpoint from cluster: " + cluster);
250242
} else {
251243
endpointNotFound =
252244
Status.UNAVAILABLE.withCause(endpointNotFound.getCause())
@@ -274,22 +266,12 @@ private void handleEndpointResourceUpdate() {
274266
}
275267

276268
private void handleEndpointResolutionError() {
277-
boolean allInError = true;
278-
Status error = null;
279-
for (String cluster : clusters) {
280-
ClusterState state = clusterStates.get(cluster);
281-
if (state.status.isOk()) {
282-
allInError = false;
283-
} else {
284-
error = state.status;
285-
}
286-
}
287-
if (allInError) {
269+
if (!clusterState.status.isOk()) {
288270
if (childLb != null) {
289-
childLb.handleNameResolutionError(error);
271+
childLb.handleNameResolutionError(clusterState.status);
290272
} else {
291273
helper.updateBalancingState(
292-
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
274+
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(clusterState.status)));
293275
}
294276
}
295277
}
@@ -306,10 +288,8 @@ private RefreshableHelper(Helper delegate) {
306288

307289
@Override
308290
public void refreshNameResolution() {
309-
for (ClusterState state : clusterStates.values()) {
310-
if (state instanceof LogicalDnsClusterState) {
311-
((LogicalDnsClusterState) state).refresh();
312-
}
291+
if (clusterState instanceof LogicalDnsClusterState) {
292+
((LogicalDnsClusterState) clusterState).refresh();
313293
}
314294
}
315295

0 commit comments

Comments
 (0)