1
1
package io .envoyproxy .controlplane .server ;
2
2
3
- import static io .envoyproxy .controlplane .server .V2TestSnapshots .createSnapshot ;
4
3
import static io .envoyproxy .envoy .api .v2 .core .ApiVersion .V2 ;
5
4
import static io .restassured .RestAssured .given ;
6
5
import static org .assertj .core .api .Assertions .assertThat ;
7
6
import static org .awaitility .Awaitility .await ;
8
7
import static org .hamcrest .Matchers .containsString ;
9
8
10
9
import com .google .protobuf .util .Durations ;
11
- import io .envoyproxy .controlplane .cache .CacheStatusInfo ;
12
10
import io .envoyproxy .controlplane .cache .NodeGroup ;
13
- import io .envoyproxy .controlplane .cache .Resources ;
14
11
import io .envoyproxy .controlplane .cache .TestResources ;
15
12
import io .envoyproxy .controlplane .cache .v2 .SimpleCache ;
16
13
import io .envoyproxy .controlplane .cache .v2 .Snapshot ;
26
23
import io .envoyproxy .envoy .api .v2 .core .Node ;
27
24
import io .grpc .netty .NettyServerBuilder ;
28
25
import io .restassured .http .ContentType ;
29
- import java .util .concurrent .ConcurrentMap ;
30
26
import java .util .concurrent .CountDownLatch ;
31
- import java .util .concurrent .ExecutorService ;
32
- import java .util .concurrent .Executors ;
33
27
import java .util .concurrent .TimeUnit ;
34
28
import org .junit .ClassRule ;
35
29
import org .junit .Test ;
@@ -43,7 +37,7 @@ public class V2DiscoveryServerAdsWarmingClusterIT {
43
37
private static final String CONFIG = "envoy/ads.v2.config.yaml" ;
44
38
private static final String GROUP = "key" ;
45
39
private static final Integer LISTENER_PORT = 10000 ;
46
- private static final CustomCache <String > cache = new CustomCache <>(new NodeGroup <String >() {
40
+ private static final SimpleCache <String > cache = new SimpleCache <>(new NodeGroup <String >() {
47
41
@ Override public String hash (Node node ) {
48
42
return GROUP ;
49
43
}
@@ -60,7 +54,6 @@ public class V2DiscoveryServerAdsWarmingClusterIT {
60
54
private static final NettyGrpcServerRule ADS = new NettyGrpcServerRule () {
61
55
@ Override
62
56
protected void configureServerBuilder (NettyServerBuilder builder ) {
63
- ExecutorService executorService = Executors .newSingleThreadExecutor ();
64
57
final DiscoveryServerCallbacks callbacks = new DiscoveryServerCallbacks () {
65
58
@ Override
66
59
public void onStreamOpen (long streamId , String typeUrl ) {
@@ -82,7 +75,6 @@ public void onV3StreamRequest(long streamId,
82
75
public void onStreamResponse (long streamId , DiscoveryRequest request , DiscoveryResponse response ) {
83
76
// Here we update a Snapshot with working cluster, but we change only CDS version, not EDS version.
84
77
// This change allows to test if EDS will be sent anyway after CDS was sent.
85
- createSnapshotWithWorkingClusterWithTheSameEdsVersion (request , executorService );
86
78
onStreamResponseLatch .countDown ();
87
79
}
88
80
};
@@ -131,30 +123,35 @@ public void validateTestRequestToEchoServerViaEnvoy() throws InterruptedExceptio
131
123
132
124
String baseUri = String .format ("http://%s:%d" , ENVOY .getContainerIpAddress (), ENVOY .getMappedPort (LISTENER_PORT ));
133
125
126
+ await ().atMost (5 , TimeUnit .SECONDS ).ignoreExceptions ().untilAsserted (
127
+ () -> given ().baseUri (baseUri ).contentType (ContentType .TEXT )
128
+ .when ().get ("/" )
129
+ .then ().statusCode (503 ));
130
+
131
+ // Here we update a Snapshot with working cluster, but we change only CDS version, not EDS version.
132
+ // This change allows to test if EDS will be sent anyway after CDS was sent.
133
+ createSnapshotWithWorkingClusterWithTheSameEdsVersion ();
134
+
134
135
await ().atMost (5 , TimeUnit .SECONDS ).ignoreExceptions ().untilAsserted (
135
136
() -> given ().baseUri (baseUri ).contentType (ContentType .TEXT )
136
137
.when ().get ("/" )
137
138
.then ().statusCode (200 )
138
139
.and ().body (containsString (UPSTREAM .response )));
139
140
}
140
141
141
- private static void createSnapshotWithWorkingClusterWithTheSameEdsVersion (DiscoveryRequest request ,
142
- ExecutorService executorService ) {
143
- if (request .getTypeUrl ().equals (Resources .V2 .CLUSTER_TYPE_URL )) {
144
- executorService .submit (() -> cache .setSnapshot (
145
- GROUP ,
146
- createSnapshot (true ,
147
- "upstream" ,
148
- UPSTREAM .ipAddress (),
149
- EchoContainer .PORT ,
150
- "listener0" ,
151
- LISTENER_PORT ,
152
- "route0" ,
153
- "2" ))
154
- );
155
- }
142
+ private static void createSnapshotWithWorkingClusterWithTheSameEdsVersion () {
143
+ cache .setSnapshot (GROUP ,
144
+ V2TestSnapshots .createSnapshot (true ,
145
+ "upstream" ,
146
+ UPSTREAM .ipAddress (),
147
+ EchoContainer .PORT ,
148
+ "listener0" ,
149
+ LISTENER_PORT ,
150
+ "route0" ,
151
+ "2" ));
156
152
}
157
153
154
+
158
155
private static Snapshot createSnapshotWithNotWorkingCluster (boolean ads ,
159
156
String clusterName ,
160
157
String endpointAddress ,
@@ -197,34 +194,21 @@ private static Snapshot createSnapshotWithNotWorkingCluster(boolean ads,
197
194
}
198
195
199
196
200
- /**
201
- * Code has been copied from io.envoyproxy.controlplane.cache.SimpleCache to show specific case when
202
- * Envoy might stuck with warming cluster. Class has changed lines from method respondWithSpecificOrder which are
203
- * responsible for responding for watches. Because to reproduce this problem we need a lot of connected Envoy's and
204
- * changes to snapshot it is easier to reproduce this way.
197
+ /*
198
+ * In the previous versions of this tests we had a copied SimpleCache with respondWithSpecificOrder removed.
199
+ * With new versions of Envoy hitting this edge-case became highly improbable.
200
+ * Now this test checks only if a CDS change will also send EDS.
201
+ * 1. Envoy connects to control-plane
202
+ * 2. Snapshot already exists in control-plane <- other instance share same group
203
+ * 3. Control-plane respond with CDS in createWatch method
204
+ * 4. There is snapshot update which change CDS and EDS versions
205
+ * 5. Envoy sends EDS request
206
+ * 6. Control-plane respond with EDS in createWatch method
207
+ * 7. Envoy resume CDS and EDS requests.
208
+ * 8. Envoy sends request CDS
209
+ * 9. Control plane respond with CDS in createWatch method
210
+ * 10. Envoy sends EDS requests
211
+ * 11. Control plane doesn't respond because version hasn't changed
212
+ * 12. Cluster of service stays in warming phase
205
213
*/
206
- static class CustomCache <T > extends SimpleCache <T > {
207
-
208
- public CustomCache (NodeGroup <T > groups ) {
209
- super (groups );
210
- }
211
-
212
- @ Override
213
- protected void respondWithSpecificOrder (T group , Snapshot snapshot ,
214
- ConcurrentMap <Resources .ResourceType , CacheStatusInfo <T >> status ) {
215
- // This code has been removed to show specific case which is hard to reproduce in integration test:
216
- // 1. Envoy connects to control-plane
217
- // 2. Snapshot already exists in control-plane <- other instance share same group
218
- // 3. Control-plane respond with CDS in createWatch method
219
- // 4. There is snapshot update which change CDS and EDS versions
220
- // 5. Envoy sends EDS request
221
- // 6. Control-plane respond with EDS in createWatch method
222
- // 7. Envoy resume CDS and EDS requests.
223
- // 8. Envoy sends request CDS
224
- // 9. Control plane respond with CDS in createWatch method
225
- // 10. Envoy sends EDS requests
226
- // 11. Control plane doesn't respond because version hasn't changed
227
- // 12. Cluster of service stays in warming phase
228
- }
229
- }
230
214
}
0 commit comments