18
18
19
19
import static com .google .common .base .Preconditions .checkArgument ;
20
20
import static com .google .common .base .Preconditions .checkNotNull ;
21
+ import static com .google .common .base .Preconditions .checkState ;
21
22
import static io .grpc .xds .client .XdsClient .ResourceUpdate ;
22
23
23
24
import com .google .common .annotations .VisibleForTesting ;
34
35
import io .grpc .xds .client .XdsClient ;
35
36
import io .grpc .xds .client .XdsClient .ResourceWatcher ;
36
37
import io .grpc .xds .client .XdsResourceType ;
37
- import java .io .Closeable ;
38
- import java .io .IOException ;
39
38
import java .util .Collections ;
40
39
import java .util .HashMap ;
41
40
import java .util .HashSet ;
56
55
final class XdsDependencyManager implements XdsConfig .XdsClusterSubscriptionRegistry {
57
56
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource .getInstance ();
58
57
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource .getInstance ();
59
- private static final int MAX_CLUSTER_RECURSION_DEPTH = 16 ; // Matches C++
58
+ private static final int MAX_CLUSTER_RECURSION_DEPTH = 16 ; // Specified by gRFC A37
60
59
private final String listenerName ;
61
60
private final XdsClient xdsClient ;
62
- private final XdsConfigWatcher xdsConfigWatcher ;
63
61
private final SynchronizationContext syncContext ;
64
62
private final String dataPlaneAuthority ;
63
+ private XdsConfigWatcher xdsConfigWatcher ;
65
64
66
65
private StatusOr <XdsConfig > lastUpdate = null ;
67
66
private final Map <XdsResourceType <?>, TypeWatchers <?>> resourceWatchers = new HashMap <>();
68
67
private final Set <ClusterSubscription > subscriptions = new HashSet <>();
69
68
70
- XdsDependencyManager (XdsClient xdsClient , XdsConfigWatcher xdsConfigWatcher ,
69
+ XdsDependencyManager (XdsClient xdsClient ,
71
70
SynchronizationContext syncContext , String dataPlaneAuthority ,
72
71
String listenerName , NameResolver .Args nameResolverArgs ,
73
72
ScheduledExecutorService scheduler ) {
74
73
this .listenerName = checkNotNull (listenerName , "listenerName" );
75
74
this .xdsClient = checkNotNull (xdsClient , "xdsClient" );
76
- this .xdsConfigWatcher = checkNotNull (xdsConfigWatcher , "xdsConfigWatcher" );
77
75
this .syncContext = checkNotNull (syncContext , "syncContext" );
78
76
this .dataPlaneAuthority = checkNotNull (dataPlaneAuthority , "dataPlaneAuthority" );
79
77
checkNotNull (nameResolverArgs , "nameResolverArgs" );
80
78
checkNotNull (scheduler , "scheduler" );
81
-
82
- // start the ball rolling
83
- syncContext .execute (() -> addWatcher (new LdsWatcher (listenerName )));
84
79
}
85
80
86
81
public static String toContextStr (String typeName , String resourceName ) {
87
82
return typeName + " resource " + resourceName ;
88
83
}
89
84
85
+ public void start (XdsConfigWatcher xdsConfigWatcher ) {
86
+ checkState (this .xdsConfigWatcher == null , "dep manager may not be restarted" );
87
+ this .xdsConfigWatcher = checkNotNull (xdsConfigWatcher , "xdsConfigWatcher" );
88
+ // start the ball rolling
89
+ syncContext .execute (() -> addWatcher (new LdsWatcher (listenerName )));
90
+ }
91
+
90
92
@ Override
91
- public Closeable subscribeToCluster (String clusterName ) {
93
+ public XdsConfig .Subscription subscribeToCluster (String clusterName ) {
94
+ checkState (this .xdsConfigWatcher != null , "dep manager must first be started" );
92
95
checkNotNull (clusterName , "clusterName" );
93
96
ClusterSubscription subscription = new ClusterSubscription (clusterName );
94
97
@@ -291,10 +294,17 @@ private static void addConfigForCluster(
291
294
addConfigForCluster (clusters , childCluster , ancestors , tracer );
292
295
StatusOr <XdsConfig .XdsClusterConfig > config = clusters .get (childCluster );
293
296
if (!config .hasValue ()) {
294
- clusters .put (clusterName , StatusOr .fromStatus (Status .INTERNAL .withDescription (
295
- "Unable to get leaves for " + clusterName + ": "
296
- + config .getStatus ().getDescription ())));
297
- return ;
297
+ // gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
298
+ // exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
299
+ // watchers reports a transient ADS stream error, the policy should report that it is in
300
+ // TRANSIENT_FAILURE if it has never passed a config to its child.
301
+ //
302
+ // But there's currently disagreement about whether that is actually what we want, and
303
+ // that was not originally implemented in gRPC Java. So we're keeping Java's old
304
+ // behavior for now and only failing the "leaves" (which is a bit arbitrary for a
305
+ // cycle).
306
+ leafNames .add (childCluster );
307
+ continue ;
298
308
}
299
309
XdsConfig .XdsClusterConfig .ClusterChild children = config .getValue ().getChildren ();
300
310
if (children instanceof AggregateConfig ) {
@@ -325,6 +335,11 @@ private static void addConfigForCluster(
325
335
default :
326
336
throw new IllegalStateException ("Unexpected value: " + cdsUpdate .clusterType ());
327
337
}
338
+ if (clusters .containsKey (clusterName )) {
339
+ // If a cycle is detected, we'll have detected it while recursing, so now there will be a key
340
+ // present. We don't want to overwrite it with a non-error value.
341
+ return ;
342
+ }
328
343
clusters .put (clusterName , StatusOr .fromValue (
329
344
new XdsConfig .XdsClusterConfig (clusterName , cdsUpdate , child )));
330
345
}
@@ -406,7 +421,7 @@ public interface XdsConfigWatcher {
406
421
void onUpdate (StatusOr <XdsConfig > config );
407
422
}
408
423
409
- private final class ClusterSubscription implements Closeable {
424
+ private final class ClusterSubscription implements XdsConfig . Subscription {
410
425
private final String clusterName ;
411
426
boolean closed ; // Accessed from syncContext
412
427
@@ -419,7 +434,7 @@ String getClusterName() {
419
434
}
420
435
421
436
@ Override
422
- public void close () throws IOException {
437
+ public void close () {
423
438
releaseSubscription (this );
424
439
}
425
440
}
0 commit comments