Skip to content

Commit 5a63438

Browse files
committed
In progress changes.
1 parent 454f1c5 commit 5a63438

File tree

1 file changed

+127
-0
lines changed

1 file changed

+127
-0
lines changed

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,133 @@ private void shutdown() {
144144
}
145145
}
146146

147+
private void handleClusterDiscovered2() {
148+
if (root.isLeaf) {
149+
DiscoveryMechanism instance;
150+
if (root.result.clusterType() == ClusterType.EDS) {
151+
instance = DiscoveryMechanism.forEds(
152+
root.name, root.result.edsServiceName(),
153+
root.result.lrsServerInfo(),
154+
root.result.maxConcurrentRequests(),
155+
root.result.upstreamTlsContext(),
156+
root.result.filterMetadata(),
157+
root.result.outlierDetection());
158+
} else { // logical DNS
159+
instance = DiscoveryMechanism.forLogicalDns(
160+
root.name, root.result.dnsHostName(),
161+
root.result.lrsServerInfo(),
162+
root.result.maxConcurrentRequests(),
163+
root.result.upstreamTlsContext(),
164+
root.result.filterMetadata());
165+
}
166+
// The LB policy config is provided in service_config.proto/JSON format.
167+
NameResolver.ConfigOrError configOrError =
168+
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
169+
Arrays.asList(root.result.lbPolicyConfig()), lbRegistry);
170+
if (configOrError.getError() != null) {
171+
throw configOrError.getError().augmentDescription("Unable to parse the LB config")
172+
.asRuntimeException();
173+
}
174+
ClusterResolverConfig config = new ClusterResolverConfig(
175+
List.of(instance),
176+
configOrError.getConfig(),
177+
root.result.isHttp11ProxyAvailable());
178+
if (childLb == null) {
179+
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
180+
}
181+
childLb.handleResolvedAddresses(
182+
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
183+
} else {
184+
int numLeafClusters = 0;
185+
// Used for loop detection to break the infinite recursion that loops would cause
186+
Map<ClusterState, List<ClusterState>> parentClusters = new HashMap<>();
187+
Status loopStatus = null;
188+
189+
// Level-order traversal.
190+
// Collect configurations for all non-aggregate (leaf) clusters.
191+
Queue<ClusterState> queue = new ArrayDeque<>();
192+
queue.add(root);
193+
while (!queue.isEmpty()) {
194+
int size = queue.size();
195+
for (int i = 0; i < size; i++) {
196+
ClusterState clusterState = queue.remove();
197+
if (!clusterState.discovered) {
198+
return; // do not proceed until all clusters discovered
199+
}
200+
if (clusterState.result == null) { // resource revoked or not exists
201+
continue;
202+
}
203+
if (clusterState.isLeaf) {
204+
if (clusterState.childClusterStates == null) {
205+
continue;
206+
}
207+
// Do loop detection and break recursion if detected
208+
List<String> namesCausingLoops = identifyLoops(clusterState, parentClusters);
209+
if (namesCausingLoops.isEmpty()) {
210+
queue.addAll(clusterState.childClusterStates.values());
211+
} else {
212+
// Do cleanup
213+
if (childLb != null) {
214+
childLb.shutdown();
215+
childLb = null;
216+
}
217+
if (loopStatus != null) {
218+
logger.log(XdsLogLevel.WARNING,
219+
"Multiple loops in CDS config. Old msg: " + loopStatus.getDescription());
220+
}
221+
loopStatus = Status.UNAVAILABLE.withDescription(String.format(
222+
"CDS error: circular aggregate clusters directly under %s for "
223+
+ "root cluster %s, named %s, xDS node ID: %s",
224+
clusterState.name, root.name, namesCausingLoops,
225+
xdsClient.getBootstrapInfo().node().getId()));
226+
}
227+
}
228+
}
229+
}
230+
231+
if (loopStatus != null) {
232+
helper.updateBalancingState(
233+
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(loopStatus)));
234+
return;
235+
}
236+
237+
if (instances.isEmpty()) { // none of non-aggregate clusters exists
238+
if (childLb != null) {
239+
childLb.shutdown();
240+
childLb = null;
241+
}
242+
Status unavailable = Status.UNAVAILABLE.withDescription(String.format(
243+
"CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster %s"
244+
+ " xDS node ID: %s", root.name, xdsClient.getBootstrapInfo().node().getId()));
245+
helper.updateBalancingState(
246+
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable)));
247+
return;
248+
}
249+
250+
// The LB policy config is provided in service_config.proto/JSON format.
251+
NameResolver.ConfigOrError configOrError =
252+
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
253+
Arrays.asList(root.result.lbPolicyConfig()), lbRegistry);
254+
if (configOrError.getError() != null) {
255+
throw configOrError.getError().augmentDescription("Unable to parse the LB config")
256+
.asRuntimeException();
257+
}
258+
259+
ClusterResolverConfig config = new ClusterResolverConfig(
260+
Collections.unmodifiableList(instances),
261+
configOrError.getConfig(),
262+
root.result.isHttp11ProxyAvailable());
263+
if (childLb == null) {
264+
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
265+
}
266+
childLb.handleResolvedAddresses(
267+
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
268+
for (String childClusterName: root.result.prioritizedClusterNames()) {
269+
270+
}
271+
}
272+
}
273+
147274
private void handleClusterDiscovered() {
148275
List<DiscoveryMechanism> instances = new ArrayList<>();
149276

0 commit comments

Comments
 (0)