Skip to content

Commit c72c8ca

Browse files
add allowDefaultEmptyEdsUpdate in server startup configs (#287)
Signed-off-by: Yifei Sun <[email protected]> Co-authored-by: Yifei Sun <[email protected]>
1 parent c6e3bf3 commit c72c8ca

File tree

17 files changed

+1516
-1270
lines changed

17 files changed

+1516
-1270
lines changed

cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,18 @@ public interface ConfigWatcher {
2222
* @param responseConsumer the response handler, used to process outgoing response messages
2323
* @param hasClusterChanged Indicates if EDS should be sent immediately, even if version has not been changed.
2424
* Supported in ADS mode.
25+
*
26+
* @param allowDefaultEmptyEdsUpdate indicates if default empty EDS response should be sent when some clusters
27+
* in request are missing in snapshot. Supported in ADS mode.
28+
*
2529
*/
2630
Watch createWatch(
2731
boolean ads,
2832
XdsRequest request,
2933
Set<String> knownResourceNames,
3034
Consumer<Response> responseConsumer,
31-
boolean hasClusterChanged);
35+
boolean hasClusterChanged,
36+
boolean allowDefaultEmptyEdsUpdate);
3237

3338
/**
3439
* Returns a new configuration resource {@link Watch} for the given discovery request.

cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@
99
import com.google.common.collect.Sets;
1010
import com.google.protobuf.Message;
1111
import io.envoyproxy.controlplane.cache.Resources.ResourceType;
12-
12+
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
1313
import java.util.Collection;
1414
import java.util.Collections;
1515
import java.util.HashMap;
1616
import java.util.List;
1717
import java.util.Map;
1818
import java.util.Objects;
1919
import java.util.Set;
20+
import java.util.UUID;
2021
import java.util.concurrent.atomic.AtomicLong;
2122
import java.util.concurrent.locks.Lock;
2223
import java.util.concurrent.locks.ReadWriteLock;
@@ -26,16 +27,18 @@
2627
import java.util.stream.Collectors;
2728
import java.util.stream.Stream;
2829
import javax.annotation.concurrent.GuardedBy;
29-
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

3333
/**
3434
* {@code SimpleCache} provides a default implementation of {@link SnapshotCache}. It maintains a single versioned
3535
* {@link Snapshot} per node group. For the protocol to work correctly in ADS mode, EDS/RDS requests are responded to
36-
* only when all resources in the snapshot xDS response are named as part of the request. It is expected that the CDS
37-
* response names all EDS clusters, and the LDS response names all RDS routes in a snapshot, to ensure that Envoy makes
38-
* the request for all EDS clusters or RDS routes eventually.
36+
* only when all resources in the snapshot xDS response are named as part of the request by default. It is expected
37+
* that the CDS response names all EDS clusters, and the LDS response names all RDS routes in a snapshot, to ensure
38+
* that Envoy makes the request for all EDS clusters or RDS routes eventually.
39+
*<p/>
40+
* when allowIncompleteEdsUpdate is true, we will send EDS response even if some clusters names are missing in the
41+
* snapshot in ADS.
3942
*
4043
* <p>The snapshot can be partial, e.g. only include RDS or EDS resources.
4144
*/
@@ -93,7 +96,7 @@ public Watch createWatch(
9396
XdsRequest request,
9497
Set<String> knownResourceNames,
9598
Consumer<Response> responseConsumer) {
96-
return createWatch(ads, request, knownResourceNames, responseConsumer, false);
99+
return createWatch(ads, request, knownResourceNames, responseConsumer, false, false);
97100
}
98101

99102
/**
@@ -105,7 +108,8 @@ public Watch createWatch(
105108
XdsRequest request,
106109
Set<String> knownResourceNames,
107110
Consumer<Response> responseConsumer,
108-
boolean hasClusterChanged) {
111+
boolean hasClusterChanged,
112+
boolean allowDefaultEmptyEdsUpdate) {
109113
ResourceType requestResourceType = request.getResourceType();
110114
Preconditions.checkNotNull(requestResourceType, "unsupported type URL %s",
111115
request.getTypeUrl());
@@ -124,7 +128,7 @@ public Watch createWatch(
124128
String version = snapshot == null ? "" : snapshot.version(requestResourceType,
125129
request.getResourceNamesList());
126130

127-
Watch watch = new Watch(ads, request, responseConsumer);
131+
Watch watch = new Watch(ads, allowDefaultEmptyEdsUpdate, request, responseConsumer);
128132

129133
if (snapshot != null) {
130134
Set<String> requestedResources = ImmutableSet.copyOf(request.getResourceNamesList());
@@ -440,13 +444,14 @@ protected void respondWithSpecificOrder(T group,
440444
}
441445

442446
private Response createResponse(XdsRequest request, Map<String, VersionedResource<?>> resources,
443-
String version) {
447+
String version, boolean allowDefaultResource) {
444448
Collection<? extends Message> filtered = request.getResourceNamesList().isEmpty()
445449
? resources.values().stream()
446450
.map(VersionedResource::resource)
447451
.collect(Collectors.toList())
448452
: request.getResourceNamesList().stream()
449-
.map(resources::get)
453+
.map(name -> resources.getOrDefault(name,
454+
allowDefaultResource ? defaultResource(name, request.getResourceType()) : null))
450455
.filter(Objects::nonNull)
451456
.map(VersionedResource::resource)
452457
.collect(Collectors.toList());
@@ -458,21 +463,35 @@ private boolean respond(Watch watch, U snapshot, T group) {
458463
Map<String, VersionedResource<?>> snapshotResources =
459464
snapshot.versionedResources(watch.request().getResourceType());
460465

466+
boolean allowDefaultResource = false;
461467
if (!watch.request().getResourceNamesList().isEmpty() && watch.ads()) {
462468
Collection<String> missingNames = watch.request().getResourceNamesList().stream()
463469
.filter(name -> !snapshotResources.containsKey(name))
464470
.collect(Collectors.toList());
465471

466472
if (!missingNames.isEmpty()) {
467-
LOGGER.info(
468-
"not responding in ADS mode for {} from node {} at version {} for request [{}] since [{}] not in snapshot",
469-
watch.request().getTypeUrl(),
470-
group,
471-
snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList()),
472-
String.join(", ", watch.request().getResourceNamesList()),
473-
String.join(", ", missingNames));
473+
if (watch.allowDefaultEmptyEdsUpdate() && watch.request().getResourceType().equals(ResourceType.ENDPOINT)) {
474+
allowDefaultResource = true;
475+
LOGGER.info(
476+
"responding with empty ClusterLoadAssignments in ADS mode for {} from node {} at version {} "
477+
+ "for request [{}] and [{}] not in snapshot",
478+
watch.request().getTypeUrl(),
479+
group,
480+
snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList()),
481+
String.join(", ", watch.request().getResourceNamesList()),
482+
String.join(", ", missingNames));
483+
} else {
484+
LOGGER.info(
485+
"not responding in ADS mode for {} from node {} at version {} for request [{}] since [{}] not in"
486+
+ " snapshot",
487+
watch.request().getTypeUrl(),
488+
group,
489+
snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList()),
490+
String.join(", ", watch.request().getResourceNamesList()),
491+
String.join(", ", missingNames));
474492

475-
return false;
493+
return false;
494+
}
476495
}
477496
}
478497

@@ -488,7 +507,8 @@ private boolean respond(Watch watch, U snapshot, T group) {
488507
Response response = createResponse(
489508
watch.request(),
490509
snapshotResources,
491-
version);
510+
version,
511+
allowDefaultResource);
492512

493513
try {
494514
watch.respond(response);
@@ -573,6 +593,14 @@ private ResponseState respondDelta(DeltaWatch watch,
573593
return ResponseState.CANCELLED;
574594
}
575595

596+
private VersionedResource<?> defaultResource(String resourceName, ResourceType resourceType) {
597+
if (resourceType.equals(ResourceType.ENDPOINT)) {
598+
return VersionedResource.create(ClusterLoadAssignment.newBuilder().setClusterName(resourceName).build(),
599+
UUID.randomUUID().toString());
600+
}
601+
throw new IllegalArgumentException(String.format("no default resource for resourceType: [%s]", resourceType));
602+
}
603+
576604
private enum ResponseState {
577605
RESPONDED,
578606
UNRESPONDED,

cache/src/main/java/io/envoyproxy/controlplane/cache/TestResources.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.annotations.VisibleForTesting;
44
import com.google.protobuf.Any;
5+
import com.google.protobuf.Duration;
56
import com.google.protobuf.util.Durations;
67
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
78
import io.envoyproxy.envoy.config.core.v3.Address;
@@ -50,6 +51,7 @@ public class TestResources {
5051
public static Cluster createCluster(String clusterName) {
5152
ConfigSource edsSource =
5253
ConfigSource.newBuilder()
54+
.setInitialFetchTimeout(Duration.newBuilder().setSeconds(0))
5355
.setAds(AggregatedConfigSource.getDefaultInstance())
5456
.setResourceApiVersion(ApiVersion.V3)
5557
.build();

cache/src/main/java/io/envoyproxy/controlplane/cache/Watch.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,22 @@
88
*/
99
public class Watch extends AbstractWatch<XdsRequest, Response> {
1010
private final boolean ads;
11+
private final boolean allowDefaultEmptyEdsUpdate;
1112

1213
/**
1314
* Construct a watch.
1415
*
15-
* @param ads is this watch for an ADS request?
16-
* @param request the original request for the watch
17-
* @param responseConsumer handler for outgoing response messages
16+
* @param ads is this watch for an ADS request?
17+
* @param allowDefaultEmptyEdsUpdate whether allow responding incomplete EDS response
18+
* when some clusters are missing in the snapshot
19+
* @param request the original request for the watch
20+
* @param responseConsumer handler for outgoing response messages
1821
*/
19-
public Watch(boolean ads, XdsRequest request, Consumer<Response> responseConsumer) {
22+
public Watch(boolean ads, boolean allowDefaultEmptyEdsUpdate,
23+
XdsRequest request, Consumer<Response> responseConsumer) {
2024
super(request, responseConsumer);
2125
this.ads = ads;
26+
this.allowDefaultEmptyEdsUpdate = allowDefaultEmptyEdsUpdate;
2227
}
2328

2429
/**
@@ -28,4 +33,7 @@ public boolean ads() {
2833
return ads;
2934
}
3035

36+
public boolean allowDefaultEmptyEdsUpdate() {
37+
return allowDefaultEmptyEdsUpdate;
38+
}
3139
}

cache/src/test/java/io/envoyproxy/controlplane/cache/CacheStatusInfoTest.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,21 @@ public void lastWatchRequestTimeReturnsExpectedValueIfSet() {
4444
@Test
4545
public void numWatchesReturnsExpectedSize() {
4646
final boolean ads = ThreadLocalRandom.current().nextBoolean();
47+
final boolean allowDefaultEmptyEdsUpdate = ThreadLocalRandom.current().nextBoolean();
4748
final long watchId1 = ThreadLocalRandom.current().nextLong(10000, 50000);
4849
final long watchId2 = ThreadLocalRandom.current().nextLong(50000, 100000);
4950

5051
CacheStatusInfo<Node> info = new CacheStatusInfo<>(Node.getDefaultInstance());
5152

5253
assertThat(info.numWatches()).isZero();
5354

54-
info.setWatch(watchId1, new Watch(ads,
55+
info.setWatch(watchId1, new Watch(ads, allowDefaultEmptyEdsUpdate,
5556
XdsRequest.create(DiscoveryRequest.getDefaultInstance()), r -> { }));
5657

5758
assertThat(info.numWatches()).isEqualTo(1);
5859
assertThat(info.watchIds()).containsExactlyInAnyOrder(watchId1);
5960

60-
info.setWatch(watchId2, new Watch(ads,
61+
info.setWatch(watchId2, new Watch(ads, allowDefaultEmptyEdsUpdate,
6162
XdsRequest.create(DiscoveryRequest.getDefaultInstance()), r -> { }));
6263

6364
assertThat(info.numWatches()).isEqualTo(2);
@@ -72,14 +73,15 @@ public void numWatchesReturnsExpectedSize() {
7273
@Test
7374
public void watchesRemoveIfRemovesExpectedWatches() {
7475
final boolean ads = ThreadLocalRandom.current().nextBoolean();
76+
final boolean allowDefaultEmptyEdsUpdate = ThreadLocalRandom.current().nextBoolean();
7577
final long watchId1 = ThreadLocalRandom.current().nextLong(10000, 50000);
7678
final long watchId2 = ThreadLocalRandom.current().nextLong(50000, 100000);
7779

7880
CacheStatusInfo<Node> info = new CacheStatusInfo<>(Node.getDefaultInstance());
7981

80-
info.setWatch(watchId1, new Watch(ads,
82+
info.setWatch(watchId1, new Watch(ads, allowDefaultEmptyEdsUpdate,
8183
XdsRequest.create(DiscoveryRequest.getDefaultInstance()), r -> { }));
82-
info.setWatch(watchId2, new Watch(ads,
84+
info.setWatch(watchId2, new Watch(ads, allowDefaultEmptyEdsUpdate,
8385
XdsRequest.create(DiscoveryRequest.getDefaultInstance()), r -> { }));
8486

8587
assertThat(info.numWatches()).isEqualTo(2);
@@ -94,15 +96,16 @@ public void watchesRemoveIfRemovesExpectedWatches() {
9496
@Test
9597
public void testConcurrentSetWatchAndRemove() {
9698
final boolean ads = ThreadLocalRandom.current().nextBoolean();
99+
final boolean allowDefaultEmptyEdsUpdate = ThreadLocalRandom.current().nextBoolean();
97100
final int watchCount = 50;
98101

99102
CacheStatusInfo<Node> info = new CacheStatusInfo<>(Node.getDefaultInstance());
100103

101104
Collection<Long> watchIds = LongStream.range(0, watchCount).boxed().collect(Collectors.toList());
102105

103106
watchIds.parallelStream().forEach(watchId -> {
104-
Watch watch = new Watch(ads, XdsRequest.create(DiscoveryRequest.getDefaultInstance()),
105-
r -> { });
107+
Watch watch = new Watch(ads, allowDefaultEmptyEdsUpdate,
108+
XdsRequest.create(DiscoveryRequest.getDefaultInstance()), r -> { });
106109

107110
info.setWatch(watchId, watch);
108111
});

cache/src/test/java/io/envoyproxy/controlplane/cache/WatchTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,21 @@ public class WatchTest {
1919
@Test
2020
public void adsReturnsGivenValue() {
2121
final boolean ads = ThreadLocalRandom.current().nextBoolean();
22+
final boolean allowDefaultEmptyEdsUpdate = ThreadLocalRandom.current().nextBoolean();
2223

23-
Watch watch = new Watch(ads, XdsRequest.create(DiscoveryRequest.getDefaultInstance()),
24-
r -> { });
24+
Watch watch = new Watch(ads, allowDefaultEmptyEdsUpdate,
25+
XdsRequest.create(DiscoveryRequest.getDefaultInstance()), r -> { });
2526

2627
assertThat(watch.ads()).isEqualTo(ads);
2728
}
2829

2930
@Test
3031
public void isCancelledTrueAfterCancel() {
3132
final boolean ads = ThreadLocalRandom.current().nextBoolean();
33+
final boolean allowDefaultEmptyEdsUpdate = ThreadLocalRandom.current().nextBoolean();
3234

33-
Watch watch = new Watch(ads, XdsRequest.create(DiscoveryRequest.getDefaultInstance()), r -> { });
35+
Watch watch = new Watch(ads, allowDefaultEmptyEdsUpdate,
36+
XdsRequest.create(DiscoveryRequest.getDefaultInstance()), r -> { });
3437

3538
assertThat(watch.isCancelled()).isFalse();
3639

@@ -42,10 +45,12 @@ public void isCancelledTrueAfterCancel() {
4245
@Test
4346
public void cancelWithStopCallsStop() {
4447
final boolean ads = ThreadLocalRandom.current().nextBoolean();
48+
final boolean allowDefaultEmptyEdsUpdate = ThreadLocalRandom.current().nextBoolean();
4549

4650
AtomicInteger stopCount = new AtomicInteger();
4751

48-
Watch watch = new Watch(ads, XdsRequest.create(DiscoveryRequest.getDefaultInstance()), r -> { });
52+
Watch watch = new Watch(ads, allowDefaultEmptyEdsUpdate,
53+
XdsRequest.create(DiscoveryRequest.getDefaultInstance()), r -> { });
4954

5055
watch.setStop(stopCount::getAndIncrement);
5156

@@ -62,6 +67,7 @@ public void cancelWithStopCallsStop() {
6267
@Test
6368
public void responseHandlerExecutedForResponsesUntilCancelled() {
6469
final boolean ads = ThreadLocalRandom.current().nextBoolean();
70+
final boolean allowDefaultEmptyEdsUpdate = ThreadLocalRandom.current().nextBoolean();
6571

6672
Response response1 = Response.create(
6773
XdsRequest.create(DiscoveryRequest.getDefaultInstance()),
@@ -80,7 +86,8 @@ public void responseHandlerExecutedForResponsesUntilCancelled() {
8086

8187
List<Response> responses = new LinkedList<>();
8288

83-
Watch watch = new Watch(ads, XdsRequest.create(DiscoveryRequest.getDefaultInstance()), responses::add);
89+
Watch watch = new Watch(ads, allowDefaultEmptyEdsUpdate,
90+
XdsRequest.create(DiscoveryRequest.getDefaultInstance()), responses::add);
8491

8592
try {
8693
watch.respond(response1);

cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,36 @@ public void invalidNamesListShouldReturnWatcherWithNoResponseInAdsMode() {
108108
assertThatWatchIsOpenWithNoResponses(new WatchAndTracker(watch, responseTracker));
109109
}
110110

111+
@Test
112+
public void invalidNamesListShouldReturnWatcherWithDefaultEmptyResponseInAdsModeAndAllowDefaultEmptyEdsUpdate() {
113+
SimpleCache<String> cache = new SimpleCache<>(new SingleNodeGroup());
114+
115+
cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1);
116+
117+
ResponseTracker responseTracker = new ResponseTracker();
118+
119+
Watch watch = cache.createWatch(
120+
true,
121+
XdsRequest.create(DiscoveryRequest.newBuilder()
122+
.setNode(Node.getDefaultInstance())
123+
.setTypeUrl(Resources.V3.ENDPOINT_TYPE_URL)
124+
.addResourceNames("none")
125+
.build()),
126+
Collections.emptySet(),
127+
responseTracker,
128+
false,
129+
true);
130+
131+
assertThat(watch.isCancelled()).isFalse();
132+
Assertions.assertThat(responseTracker.responses).isNotEmpty();
133+
Assertions.assertThat(responseTracker.responses.size()).isEqualTo(1);
134+
Message[] messages = responseTracker.responses.getFirst().resources().toArray(new Message[0]);
135+
136+
assertThat(messages).containsExactlyElementsOf(
137+
Collections.singleton(ClusterLoadAssignment.newBuilder()
138+
.setClusterName("none").build()));
139+
}
140+
111141
@Test
112142
public void invalidNamesListShouldReturnWatcherWithResponseInXdsMode() {
113143
SimpleCache<String> cache = new SimpleCache<>(new SingleNodeGroup());
@@ -175,7 +205,8 @@ public void shouldSendEdsWhenClusterChangedButEdsVersionDidnt() {
175205
.build()),
176206
Sets.newHashSet(""),
177207
responseTracker,
178-
true);
208+
true,
209+
false);
179210

180211
assertThat(watch.request().getTypeUrl()).isEqualTo(Resources.V3.ENDPOINT_TYPE_URL);
181212
assertThat(watch.request().getResourceNamesList()).containsExactlyElementsOf(
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.envoyproxy.controlplane.server;
2+
3+
public class DefaultStartupConfigs implements StartupConfigs {
4+
@Override
5+
public boolean allowDefaultEmptyEdsUpdate() {
6+
return false;
7+
}
8+
}

0 commit comments

Comments
 (0)