Skip to content

Commit d4c528a

Browse files
author
mgajda
committed
Add delta tests back
1 parent 5abf951 commit d4c528a

File tree

6 files changed

+630
-0
lines changed

6 files changed

+630
-0
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.envoyproxy.controlplane.server;
2+
3+
import io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest;
4+
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
5+
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
6+
import java.util.concurrent.ConcurrentHashMap;
7+
import java.util.concurrent.CountDownLatch;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
public class V3DeltaDiscoveryServerCallbacks implements DiscoveryServerCallbacks {
12+
13+
private static final Logger LOGGER = LoggerFactory
14+
.getLogger(V3DeltaDiscoveryServerCallbacks.class);
15+
16+
private final CountDownLatch onStreamOpenLatch;
17+
private final CountDownLatch onStreamRequestLatch;
18+
private StringBuffer nonce;
19+
private StringBuffer errorDetail;
20+
private ConcurrentHashMap<String, StringBuffer> resourceToNonceMap;
21+
22+
23+
/**
24+
* Returns an implementation of DiscoveryServerCallbacks that throws if it sees a v2 request, and counts down on
25+
* provided latches in response to certain events.
26+
*
27+
* @param onStreamOpenLatch latch to call countDown() on when a v3 stream is opened.
28+
* @param onStreamRequestLatch latch to call countDown() on when a v3 request is seen.
29+
*/
30+
public V3DeltaDiscoveryServerCallbacks(CountDownLatch onStreamOpenLatch,
31+
CountDownLatch onStreamRequestLatch,
32+
StringBuffer nonce,
33+
StringBuffer errorDetail,
34+
ConcurrentHashMap<String, StringBuffer> resourceToNonceMap
35+
) {
36+
this.onStreamOpenLatch = onStreamOpenLatch;
37+
this.onStreamRequestLatch = onStreamRequestLatch;
38+
this.nonce = nonce;
39+
this.errorDetail = errorDetail;
40+
this.resourceToNonceMap = resourceToNonceMap;
41+
}
42+
43+
@Override
44+
public void onStreamOpen(long streamId, String typeUrl) {
45+
LOGGER.info("onStreamOpen called");
46+
onStreamOpenLatch.countDown();
47+
}
48+
49+
@Override
50+
public void onV2StreamRequest(long streamId,
51+
io.envoyproxy.envoy.api.v2.DiscoveryRequest request) {
52+
throw new IllegalStateException("Unexpected v2 request in v3 test");
53+
}
54+
55+
@Override
56+
public void onV3StreamRequest(long streamId, DiscoveryRequest request) {
57+
LOGGER.error("request={}", request);
58+
throw new IllegalStateException("Unexpected stream request");
59+
60+
}
61+
62+
@Override
63+
public void onV2StreamDeltaRequest(long streamId, DeltaDiscoveryRequest request) {
64+
throw new IllegalStateException("Unexpected v2 request in v3 test");
65+
}
66+
67+
@Override
68+
public void onV3StreamDeltaRequest(long streamId,
69+
io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest request) {
70+
LOGGER.info("Got a v3StreamDeltaRequest");
71+
errorDetail.append(request.getErrorDetail().getMessage());
72+
StringBuffer resourceNonce = resourceToNonceMap
73+
.getOrDefault(request.getTypeUrl(), new StringBuffer());
74+
resourceNonce.append(request.getResponseNonce());
75+
resourceToNonceMap.put(request.getTypeUrl(), resourceNonce);
76+
nonce.append(request.getResponseNonce());
77+
onStreamRequestLatch.countDown();
78+
}
79+
80+
@Override
81+
public void onStreamResponse(long streamId,
82+
io.envoyproxy.envoy.api.v2.DiscoveryRequest request,
83+
io.envoyproxy.envoy.api.v2.DiscoveryResponse response) {
84+
throw new IllegalStateException("Unexpected v2 response in v3 test");
85+
}
86+
87+
@Override
88+
public void onV3StreamResponse(long streamId, DiscoveryRequest request,
89+
DiscoveryResponse response) {
90+
LOGGER.info("Got a v3StreamResponse");
91+
}
92+
}
93+
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
package io.envoyproxy.controlplane.server;
2+
3+
import static io.restassured.RestAssured.given;
4+
import static org.assertj.core.api.Assertions.assertThat;
5+
import static org.awaitility.Awaitility.await;
6+
import static org.hamcrest.Matchers.containsString;
7+
8+
import io.envoyproxy.controlplane.cache.NodeGroup;
9+
import io.envoyproxy.controlplane.cache.Resources.V3;
10+
import io.envoyproxy.controlplane.cache.v3.SimpleCache;
11+
import io.envoyproxy.controlplane.cache.v3.Snapshot;
12+
import io.envoyproxy.envoy.api.v2.core.Node;
13+
import io.grpc.netty.NettyServerBuilder;
14+
import io.restassured.http.ContentType;
15+
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.CountDownLatch;
17+
import java.util.concurrent.TimeUnit;
18+
import org.junit.AfterClass;
19+
import org.junit.ClassRule;
20+
import org.junit.Test;
21+
import org.junit.rules.RuleChain;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
import org.testcontainers.containers.Network;
25+
26+
public class V3DiscoveryServerAdsDeltaResourcesIT {
27+
28+
private static final Logger LOGGER = LoggerFactory.getLogger(V3DiscoveryServerAdsDeltaResourcesIT.class);
29+
30+
private static final String CONFIG = "envoy/ads.v3.delta.config.yaml";
31+
private static final String GROUP = "key";
32+
private static final Integer LISTENER_PORT = 10000;
33+
34+
private static final CountDownLatch onStreamOpenLatch = new CountDownLatch(1);
35+
private static final CountDownLatch onStreamRequestLatch = new CountDownLatch(1);
36+
37+
private static ConcurrentHashMap<String, StringBuffer> resourceToNonceMap = new ConcurrentHashMap();
38+
private static StringBuffer nonce = new StringBuffer();
39+
private static StringBuffer errorDetails = new StringBuffer();
40+
41+
private static final SimpleCache<String> cache = new SimpleCache<>(new NodeGroup<String>() {
42+
@Override
43+
public String hash(Node node) {
44+
throw new IllegalStateException("Unexpected v2 request in v3 test");
45+
}
46+
47+
@Override
48+
public String hash(io.envoyproxy.envoy.config.core.v3.Node node) {
49+
return GROUP;
50+
}
51+
});
52+
53+
private static final NettyGrpcServerRule ADS = new NettyGrpcServerRule() {
54+
@Override
55+
protected void configureServerBuilder(NettyServerBuilder builder) {
56+
57+
final DiscoveryServerCallbacks callbacks =
58+
new V3DeltaDiscoveryServerCallbacks(onStreamOpenLatch, onStreamRequestLatch, nonce,
59+
errorDetails, resourceToNonceMap);
60+
61+
Snapshot snapshot = V3TestSnapshots.createSnapshot(true,
62+
true,
63+
"upstream",
64+
UPSTREAM.ipAddress(),
65+
EchoContainer.PORT,
66+
"listener0",
67+
LISTENER_PORT,
68+
"route0",
69+
"1");
70+
LOGGER.info("snapshot={}", snapshot);
71+
cache.setSnapshot(
72+
GROUP,
73+
snapshot
74+
);
75+
76+
V3DiscoveryServer server = new V3DiscoveryServer(callbacks, cache);
77+
78+
builder.addService(server.getAggregatedDiscoveryServiceImpl());
79+
}
80+
};
81+
82+
private static final Network NETWORK = Network.newNetwork();
83+
84+
private static final EnvoyContainer ENVOY = new EnvoyContainer(CONFIG, () -> ADS.getServer().getPort())
85+
.withExposedPorts(LISTENER_PORT)
86+
.withNetwork(NETWORK);
87+
88+
private static final EchoContainer UPSTREAM = new EchoContainer()
89+
.withNetwork(NETWORK)
90+
.withNetworkAliases("upstream");
91+
92+
@ClassRule
93+
public static final RuleChain RULES = RuleChain.outerRule(UPSTREAM)
94+
.around(ADS)
95+
.around(ENVOY);
96+
97+
@Test
98+
public void validateTestRequestToEchoServerViaEnvoy() throws InterruptedException {
99+
assertThat(onStreamOpenLatch.await(15, TimeUnit.SECONDS)).isTrue()
100+
.overridingErrorMessage("failed to open ADS stream");
101+
102+
assertThat(onStreamRequestLatch.await(15, TimeUnit.SECONDS)).isTrue()
103+
.overridingErrorMessage("failed to receive ADS request");
104+
105+
// there is no onStreamResponseLatch because V3DiscoveryServer doesn't call the callbacks
106+
// when responding to a delta request
107+
108+
String baseUri = String
109+
.format("http://%s:%d", ENVOY.getContainerIpAddress(), ENVOY.getMappedPort(LISTENER_PORT));
110+
111+
await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
112+
() -> given().baseUri(baseUri).contentType(ContentType.TEXT)
113+
.when().get("/")
114+
.then().statusCode(200)
115+
.and().body(containsString(UPSTREAM.response)));
116+
117+
// basically the nonces will count up from 0 to 3 as envoy receives more resources
118+
// and check that no messages have been sent to errorDetails
119+
// here just check that the nonceMap contains each of the resources we expect
120+
// as it's not guaranteed what order they'll be received in
121+
assertThat(nonce.toString()).isEqualTo("0123");
122+
assertThat(resourceToNonceMap.containsKey(V3.CLUSTER_TYPE_URL)).isTrue();
123+
assertThat(resourceToNonceMap.containsKey(V3.LISTENER_TYPE_URL)).isTrue();
124+
assertThat(resourceToNonceMap.containsKey(V3.ROUTE_TYPE_URL)).isTrue();
125+
assertThat(errorDetails.toString()).isEqualTo("");
126+
127+
// now write a new snapshot, with the only change being an update
128+
// to the listener name, wait for a few seconds for envoy to pick it up, and
129+
// check that the nonce envoy most recently ACK'd is "4"
130+
Snapshot snapshot = V3TestSnapshots.createSnapshot(true,
131+
true,
132+
"upstream",
133+
UPSTREAM.ipAddress(),
134+
EchoContainer.PORT,
135+
"listener1",
136+
LISTENER_PORT,
137+
"route0",
138+
"2");
139+
LOGGER.info("snapshot={}", snapshot);
140+
cache.setSnapshot(
141+
GROUP,
142+
snapshot
143+
);
144+
145+
await().atMost(3, TimeUnit.SECONDS).untilAsserted(
146+
() -> {
147+
assertThat(nonce.toString()).isEqualTo("01234");
148+
assertThat(errorDetails.toString()).isEqualTo("");
149+
assertThat(resourceToNonceMap.containsKey(V3.LISTENER_TYPE_URL)).isTrue();
150+
// we know that the most recent update was to the listener, so check
151+
// that it received the most recent nonce
152+
assertThat(resourceToNonceMap.get(V3.LISTENER_TYPE_URL).toString()).contains("4");
153+
}
154+
);
155+
}
156+
157+
@Test
158+
public void validateNewSnapshotVersionButSameUnderlyingResourcesDoesNotTriggerUpdate()
159+
throws InterruptedException {
160+
assertThat(onStreamOpenLatch.await(15, TimeUnit.SECONDS)).isTrue()
161+
.overridingErrorMessage("failed to open ADS stream");
162+
163+
assertThat(onStreamRequestLatch.await(15, TimeUnit.SECONDS)).isTrue()
164+
.overridingErrorMessage("failed to receive ADS request");
165+
166+
// there is no onStreamResponseLatch because V3DiscoveryServer doesn't call the callbacks
167+
// when responding to a delta request
168+
169+
String baseUri = String
170+
.format("http://%s:%d", ENVOY.getContainerIpAddress(), ENVOY.getMappedPort(LISTENER_PORT));
171+
172+
await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
173+
() -> given().baseUri(baseUri).contentType(ContentType.TEXT)
174+
.when().get("/")
175+
.then().statusCode(200)
176+
.and().body(containsString(UPSTREAM.response)));
177+
178+
// basically the nonces will count up from 0 to 3 as envoy receives more resources
179+
// and check that no messages have been sent to errorDetails
180+
assertThat(nonce.toString()).isEqualTo("0123");
181+
assertThat(resourceToNonceMap.containsKey(V3.CLUSTER_TYPE_URL)).isTrue();
182+
assertThat(resourceToNonceMap.containsKey(V3.LISTENER_TYPE_URL)).isTrue();
183+
assertThat(resourceToNonceMap.containsKey(V3.ROUTE_TYPE_URL)).isTrue();
184+
assertThat(errorDetails.toString()).isEqualTo("");
185+
186+
// now write a new snapshot, with the only change being an update
187+
// to the version, wait for a few seconds for envoy to pick it up, and
188+
// check that the nonce doesn't change
189+
Snapshot snapshot = V3TestSnapshots.createSnapshot(true,
190+
true,
191+
"upstream",
192+
UPSTREAM.ipAddress(),
193+
EchoContainer.PORT,
194+
"listener0",
195+
LISTENER_PORT,
196+
"route0",
197+
"2");
198+
LOGGER.info("snapshot={}", snapshot);
199+
cache.setSnapshot(
200+
GROUP,
201+
snapshot
202+
);
203+
204+
await().atMost(3, TimeUnit.SECONDS).untilAsserted(
205+
() -> {
206+
assertThat(nonce.toString()).isEqualTo("0123");
207+
assertThat(errorDetails.toString()).isEqualTo("");
208+
}
209+
);
210+
}
211+
212+
@AfterClass
213+
public static void after() throws Exception {
214+
ENVOY.close();
215+
UPSTREAM.close();
216+
NETWORK.close();
217+
}
218+
}

0 commit comments

Comments
 (0)