Skip to content

Commit c2c5df5

Browse files
authored
Send responses in specific order (#128)
Signed-off-by: Lukasz Dziedziak <[email protected]>
1 parent fbf5591 commit c2c5df5

File tree

2 files changed

+68
-35
lines changed

2 files changed

+68
-35
lines changed

cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -212,26 +212,32 @@ public synchronized void setSnapshot(T group, Snapshot snapshot) {
212212
return;
213213
}
214214

215-
status.watchesRemoveIf((id, watch) -> {
216-
String version = snapshot.version(watch.request().getTypeUrl(), watch.request().getResourceNamesList());
217-
218-
if (!watch.request().getVersionInfo().equals(version)) {
219-
if (LOGGER.isDebugEnabled()) {
220-
LOGGER.debug("responding to open watch {}[{}] with new version {}",
221-
id,
222-
String.join(", ", watch.request().getResourceNamesList()),
223-
version);
215+
// Responses should be in specific order and TYPE_URLS has a list of resources in the right order.
216+
for (String typeUrl : Resources.TYPE_URLS) {
217+
status.watchesRemoveIf((id, watch) -> {
218+
if (!watch.request().getTypeUrl().equals(typeUrl)) {
219+
return false;
224220
}
221+
String version = snapshot.version(watch.request().getTypeUrl(), watch.request().getResourceNamesList());
222+
223+
if (!watch.request().getVersionInfo().equals(version)) {
224+
if (LOGGER.isDebugEnabled()) {
225+
LOGGER.debug("responding to open watch {}[{}] with new version {}",
226+
id,
227+
String.join(", ", watch.request().getResourceNamesList()),
228+
version);
229+
}
225230

226-
respond(watch, snapshot, group);
231+
respond(watch, snapshot, group);
227232

228-
// Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response.
229-
return true;
230-
}
233+
// Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response.
234+
return true;
235+
}
231236

232-
// Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond.
233-
return false;
234-
});
237+
// Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond.
238+
return false;
239+
});
240+
}
235241
}
236242

237243
/**

cache/src/test/java/io/envoyproxy/controlplane/cache/SimpleCacheTest.java

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.envoyproxy.envoy.api.v2.auth.Secret;
1414
import io.envoyproxy.envoy.api.v2.core.Node;
1515
import java.util.Collections;
16+
import java.util.HashMap;
1617
import java.util.LinkedList;
1718
import java.util.Map;
1819
import java.util.UUID;
@@ -165,25 +166,35 @@ public void successfullyWatchAllResourceTypesWithSetBeforeWatchWithRequestVersio
165166

166167
cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1);
167168

168-
Map<String, WatchAndTracker> watches = Resources.TYPE_URLS.stream()
169-
.collect(Collectors.toMap(
170-
typeUrl -> typeUrl,
171-
typeUrl -> {
172-
ResponseTracker responseTracker = new ResponseTracker();
173-
174-
Watch watch = cache.createWatch(
175-
ADS,
176-
DiscoveryRequest.newBuilder()
177-
.setNode(Node.getDefaultInstance())
178-
.setTypeUrl(typeUrl)
179-
.setVersionInfo(SNAPSHOT1.version(typeUrl))
180-
.addAllResourceNames(SNAPSHOT1.resources(typeUrl).keySet())
181-
.build(),
182-
SNAPSHOT2.resources(typeUrl).keySet(),
183-
responseTracker);
184-
185-
return new WatchAndTracker(watch, responseTracker);
186-
}));
169+
ResponseOrderTracker responseOrderTracker = new ResponseOrderTracker();
170+
171+
HashMap<String, WatchAndTracker> watches = new HashMap<>();
172+
173+
for (int i = 0; i < 2; ++i) {
174+
watches.putAll(Resources.TYPE_URLS.stream()
175+
.collect(Collectors.toMap(
176+
typeUrl -> typeUrl,
177+
typeUrl -> {
178+
ResponseTracker responseTracker = new ResponseTracker();
179+
180+
Watch watch = cache.createWatch(
181+
ADS,
182+
DiscoveryRequest.newBuilder()
183+
.setNode(Node.getDefaultInstance())
184+
.setTypeUrl(typeUrl)
185+
.setVersionInfo(SNAPSHOT1.version(typeUrl))
186+
.addAllResourceNames(SNAPSHOT1.resources(typeUrl).keySet())
187+
.build(),
188+
SNAPSHOT2.resources(typeUrl).keySet(),
189+
r -> {
190+
responseTracker.accept(r);
191+
responseOrderTracker.accept(r);
192+
});
193+
194+
return new WatchAndTracker(watch, responseTracker);
195+
}))
196+
);
197+
}
187198

188199
// The request version matches the current snapshot version, so the watches shouldn't receive any responses.
189200
for (String typeUrl : Resources.TYPE_URLS) {
@@ -195,6 +206,12 @@ public void successfullyWatchAllResourceTypesWithSetBeforeWatchWithRequestVersio
195206
for (String typeUrl : Resources.TYPE_URLS) {
196207
assertThatWatchReceivesSnapshot(watches.get(typeUrl), SNAPSHOT2);
197208
}
209+
210+
// Verify that CDS and LDS always get triggered before EDS and RDS respectively.
211+
assertThat(responseOrderTracker.responseTypes).containsExactly(Resources.CLUSTER_TYPE_URL,
212+
Resources.CLUSTER_TYPE_URL, Resources.ENDPOINT_TYPE_URL, Resources.ENDPOINT_TYPE_URL,
213+
Resources.LISTENER_TYPE_URL, Resources.LISTENER_TYPE_URL, Resources.ROUTE_TYPE_URL,
214+
Resources.ROUTE_TYPE_URL, Resources.SECRET_TYPE_URL, Resources.SECRET_TYPE_URL);
198215
}
199216

200217
@Test
@@ -457,6 +474,16 @@ private static class ResponseTracker implements Consumer<Response> {
457474
public void accept(Response response) {
458475
responses.add(response);
459476
}
477+
478+
}
479+
480+
private static class ResponseOrderTracker implements Consumer<Response> {
481+
482+
private final LinkedList<String> responseTypes = new LinkedList<>();
483+
484+
@Override public void accept(Response response) {
485+
responseTypes.add(response.request().getTypeUrl());
486+
}
460487
}
461488

462489
private static class SingleNodeGroup implements NodeGroup<String> {

0 commit comments

Comments
 (0)