Skip to content

Commit 1692741

Browse files
authored
server: allow throwing checked exception from stream callbacks (#257)
This PR introduces throwing a checked exception from stream callbacks, and catching them to trigger clean up tasks such as calling onStreamClose(WithError) This follows a similar pattern to the go-control-plane where certain callbacks can return errors https://github.com/envoyproxy/go-control-plane/blob/bf9fc1db9d0fefb8df1291fc4638b0f2d7f39a7e/pkg/server/v3/server.go#L70-L81 And a defer statement calls OnStreamClosed https://github.com/envoyproxy/go-control-plane/blob/bf9fc1db9d0fefb8df1291fc4638b0f2d7f39a7e/pkg/server/sotw/v3/server.go#L93-L98 Signed-off-by: Shulin Jia <[email protected]> Signed-off-by: Shulin Jia <[email protected]>
1 parent a50d51f commit 1692741

10 files changed

+221
-38
lines changed

server/src/main/java/io/envoyproxy/controlplane/server/AdsDiscoveryRequestStreamObserver.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.envoyproxy.controlplane.cache.Resources;
66
import io.envoyproxy.controlplane.cache.Watch;
77
import io.grpc.Status;
8+
import io.grpc.StatusException;
89
import io.grpc.stub.StreamObserver;
910
import java.util.Collections;
1011
import java.util.Set;
@@ -37,11 +38,8 @@ public class AdsDiscoveryRequestStreamObserver<T, U> extends DiscoveryRequestStr
3738
@Override
3839
public void onNext(T request) {
3940
if (discoveryServer.wrapXdsRequest(request).getTypeUrl().isEmpty()) {
40-
closeWithError(
41-
Status.UNKNOWN
42-
.withDescription(String.format("[%d] type URL is required for ADS", streamId))
43-
.asRuntimeException());
44-
41+
onError(new StatusException(Status.UNKNOWN.withDescription(
42+
String.format("[%d] type URL is required for ADS", streamId))));
4543
return;
4644
}
4745

server/src/main/java/io/envoyproxy/controlplane/server/DeltaDiscoveryRequestStreamObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void onNext(V rawRequest) {
6767
try {
6868
discoveryServer.runStreamDeltaRequestCallbacks(streamId, rawRequest);
6969
} catch (RequestException e) {
70-
closeWithError(e);
70+
onError(e);
7171
return;
7272
}
7373

server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void onNext(T rawRequest) {
7070
try {
7171
discoveryServer.runStreamRequestCallbacks(streamId, rawRequest);
7272
} catch (RequestException e) {
73-
closeWithError(e);
73+
onError(e);
7474
return;
7575
}
7676

server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
import io.envoyproxy.controlplane.cache.ConfigWatcher;
66
import io.envoyproxy.controlplane.cache.DeltaXdsRequest;
77
import io.envoyproxy.controlplane.cache.XdsRequest;
8+
import io.envoyproxy.controlplane.server.exception.RequestException;
89
import io.envoyproxy.controlplane.server.serializer.ProtoResourcesSerializer;
10+
import io.grpc.StatusRuntimeException;
911
import io.grpc.stub.ServerCallStreamObserver;
1012
import io.grpc.stub.StreamObserver;
1113
import java.util.Collection;
@@ -58,9 +60,9 @@ public abstract X makeDeltaResponse(String typeUrl, String version, String nonce
5860

5961
protected abstract Y makeResource(String name, String version, Any resource);
6062

61-
protected abstract void runStreamRequestCallbacks(long streamId, T request);
63+
protected abstract void runStreamRequestCallbacks(long streamId, T request) throws RequestException;
6264

63-
protected abstract void runStreamDeltaRequestCallbacks(long streamId, V request);
65+
protected abstract void runStreamDeltaRequestCallbacks(long streamId, V request) throws RequestException;
6466

6567
protected abstract void runStreamResponseCallbacks(long streamId, XdsRequest request, U response);
6668

@@ -76,7 +78,14 @@ StreamObserver<T> createRequestHandler(
7678

7779
LOGGER.debug("[{}] open stream from {}", streamId, defaultTypeUrl);
7880

79-
callbacks.forEach(cb -> cb.onStreamOpen(streamId, defaultTypeUrl));
81+
for (DiscoveryServerCallbacks cb : callbacks) {
82+
try {
83+
cb.onStreamOpen(streamId, defaultTypeUrl);
84+
} catch (RequestException e) {
85+
callbacks.forEach(cb2 -> cb2.onStreamCloseWithError(streamId, defaultTypeUrl, e));
86+
throw new StatusRuntimeException(e.getStatus());
87+
}
88+
}
8089

8190
final DiscoveryRequestStreamObserver<T, U> requestStreamObserver;
8291
if (ads) {
@@ -111,7 +120,14 @@ StreamObserver<V> createDeltaRequestHandler(
111120

112121
LOGGER.debug("[{}] open stream from {}", streamId, defaultTypeUrl);
113122

114-
callbacks.forEach(cb -> cb.onStreamOpen(streamId, defaultTypeUrl));
123+
for (DiscoveryServerCallbacks cb : callbacks) {
124+
try {
125+
cb.onStreamOpen(streamId, defaultTypeUrl);
126+
} catch (RequestException e) {
127+
callbacks.forEach(cb2 -> cb2.onStreamCloseWithError(streamId, defaultTypeUrl, e));
128+
throw new StatusRuntimeException(e.getStatus());
129+
}
130+
}
115131

116132
final DeltaDiscoveryRequestStreamObserver<V, X, Y> requestStreamObserver;
117133
if (ads) {

server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServerCallbacks.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ default void onStreamCloseWithError(long streamId, String typeUrl, Throwable err
4141
*
4242
* @param streamId an ID for this stream that is only unique to this discovery server instance
4343
* @param typeUrl the resource type of the stream, or {@link DiscoveryServer#ANY_TYPE_URL} for ADS
44+
* @throws RequestException can throw {@link RequestException} with custom status. That status
45+
* will be returned to the client and the stream will be closed with error.
4446
*/
45-
default void onStreamOpen(long streamId, String typeUrl) {
47+
default void onStreamOpen(long streamId, String typeUrl) throws RequestException {
4648

4749
}
4850

@@ -53,10 +55,10 @@ default void onStreamOpen(long streamId, String typeUrl) {
5355
* @param streamId an ID for this stream that is only unique to this discovery server instance
5456
* @param request the discovery request sent by the envoy instance
5557
*
56-
* @throws RequestException optionally can throw {@link RequestException} with custom status. That status
58+
* @throws RequestException can throw {@link RequestException} with custom status. That status
5759
* will be returned to the client and the stream will be closed with error.
5860
*/
59-
void onV3StreamRequest(long streamId, DiscoveryRequest request);
61+
void onV3StreamRequest(long streamId, DiscoveryRequest request) throws RequestException;
6062

6163
/**
6264
* {@code onV3StreamRequest} is called for each {@link io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest}
@@ -65,11 +67,11 @@ default void onStreamOpen(long streamId, String typeUrl) {
6567
* @param streamId an ID for this stream that is only unique to this discovery server instance
6668
* @param request the discovery request sent by the envoy instance
6769
*
68-
* @throws RequestException optionally can throw {@link RequestException} with custom status. That status
70+
* @throws RequestException can throw {@link RequestException} with custom status. That status
6971
* will be returned to the client and the stream will be closed with error.
7072
*/
7173
void onV3StreamDeltaRequest(long streamId,
72-
DeltaDiscoveryRequest request);
74+
DeltaDiscoveryRequest request) throws RequestException;
7375

7476
/**
7577
* {@code onV3StreamResponse} is called just before each

server/src/main/java/io/envoyproxy/controlplane/server/V3DiscoveryServer.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.envoyproxy.controlplane.cache.DeltaXdsRequest;
1313
import io.envoyproxy.controlplane.cache.Resources;
1414
import io.envoyproxy.controlplane.cache.XdsRequest;
15+
import io.envoyproxy.controlplane.server.exception.RequestException;
1516
import io.envoyproxy.controlplane.server.serializer.DefaultProtoResourcesSerializer;
1617
import io.envoyproxy.controlplane.server.serializer.ProtoResourcesSerializer;
1718
import io.envoyproxy.envoy.service.cluster.v3.ClusterDiscoveryServiceGrpc.ClusterDiscoveryServiceImplBase;
@@ -187,15 +188,17 @@ protected DeltaXdsRequest wrapDeltaXdsRequest(DeltaDiscoveryRequest request) {
187188
}
188189

189190
@Override
190-
protected void runStreamRequestCallbacks(long streamId, DiscoveryRequest discoveryRequest) {
191-
callbacks.forEach(
192-
cb -> cb.onV3StreamRequest(streamId, discoveryRequest));
191+
protected void runStreamRequestCallbacks(long streamId, DiscoveryRequest discoveryRequest) throws RequestException {
192+
for (DiscoveryServerCallbacks cb : callbacks) {
193+
cb.onV3StreamRequest(streamId, discoveryRequest);
194+
}
193195
}
194196

195197
@Override
196-
protected void runStreamDeltaRequestCallbacks(long streamId, DeltaDiscoveryRequest request) {
197-
callbacks.forEach(
198-
cb -> cb.onV3StreamDeltaRequest(streamId, request));
198+
protected void runStreamDeltaRequestCallbacks(long streamId, DeltaDiscoveryRequest request) throws RequestException {
199+
for (DiscoveryServerCallbacks cb : callbacks) {
200+
cb.onV3StreamDeltaRequest(streamId, request);
201+
}
199202
}
200203

201204
@Override

server/src/main/java/io/envoyproxy/controlplane/server/exception/RequestException.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
import io.grpc.Metadata;
44
import io.grpc.Status;
5-
import io.grpc.StatusRuntimeException;
6-
5+
import io.grpc.StatusException;
76
import javax.annotation.Nullable;
87

9-
public class RequestException extends StatusRuntimeException {
8+
public class RequestException extends StatusException {
109
public RequestException(Status status) {
1110
this(status, null);
1211
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package io.envoyproxy.controlplane.server;
2+
3+
import static io.envoyproxy.controlplane.server.V3TestSnapshots.createSnapshot;
4+
import static io.restassured.RestAssured.given;
5+
import static org.assertj.core.api.Assertions.assertThat;
6+
import static org.awaitility.Awaitility.await;
7+
import static org.hamcrest.Matchers.containsString;
8+
9+
import io.envoyproxy.controlplane.cache.v3.SimpleCache;
10+
import io.envoyproxy.controlplane.server.exception.RequestException;
11+
import io.grpc.Status;
12+
import io.grpc.netty.NettyServerBuilder;
13+
import io.restassured.http.ContentType;
14+
import java.util.concurrent.CountDownLatch;
15+
import java.util.concurrent.TimeUnit;
16+
import org.junit.ClassRule;
17+
import org.junit.Test;
18+
import org.junit.rules.RuleChain;
19+
import org.testcontainers.containers.Network;
20+
21+
public class V3DiscoveryServerAdsStreamOpenExceptionIT {
22+
23+
private static final String CONFIG = "envoy/ads.v3.config.yaml";
24+
private static final String GROUP = "key";
25+
private static final Integer LISTENER_PORT = 10000;
26+
27+
private static final CountDownLatch onStreamOpenLatch = new CountDownLatch(1);
28+
private static final CountDownLatch onStreamCloseWithErrorLatch = new CountDownLatch(1);
29+
private static final CountDownLatch onStreamRequestLatch = new CountDownLatch(1);
30+
private static final CountDownLatch onStreamResponseLatch = new CountDownLatch(1);
31+
32+
private static final NettyGrpcServerRule ADS =
33+
new NettyGrpcServerRule() {
34+
@Override
35+
protected void configureServerBuilder(NettyServerBuilder builder) {
36+
final SimpleCache<String> cache = new SimpleCache<>(node -> GROUP);
37+
final DiscoveryServerCallbacks callbacks = new V3OnlyDiscoveryServerCallbacks(
38+
onStreamOpenLatch, onStreamRequestLatch, onStreamResponseLatch) {
39+
@Override public void onStreamOpen(long streamId, String typeUrl) throws RequestException {
40+
if (streamId == 0) {
41+
throw new RequestException(Status.INVALID_ARGUMENT);
42+
} else {
43+
onStreamOpenLatch.countDown();
44+
}
45+
}
46+
47+
@Override public void onStreamCloseWithError(long streamId, String typeUrl, Throwable error) {
48+
onStreamCloseWithErrorLatch.countDown();
49+
}
50+
};
51+
52+
cache.setSnapshot(
53+
GROUP,
54+
createSnapshot(
55+
true,
56+
false,
57+
"upstream",
58+
UPSTREAM.ipAddress(),
59+
EchoContainer.PORT,
60+
"listener0",
61+
LISTENER_PORT,
62+
"route0",
63+
"1"));
64+
65+
V3DiscoveryServer server = new V3DiscoveryServer(callbacks, cache);
66+
67+
builder.addService(server.getAggregatedDiscoveryServiceImpl());
68+
}
69+
};
70+
71+
private static final Network NETWORK = Network.newNetwork();
72+
73+
private static final EnvoyContainer ENVOY = new EnvoyContainer(CONFIG, () -> ADS.getServer().getPort())
74+
.withExposedPorts(LISTENER_PORT)
75+
.withNetwork(NETWORK);
76+
77+
private static final EchoContainer UPSTREAM =
78+
new EchoContainer().withNetwork(NETWORK).withNetworkAliases("upstream");
79+
80+
@ClassRule
81+
public static final RuleChain RULES = RuleChain.outerRule(UPSTREAM).around(ADS).around(ENVOY);
82+
83+
@Test
84+
public void validateTestRequestToEchoServerViaEnvoy() throws InterruptedException {
85+
assertThat(onStreamCloseWithErrorLatch.await(15, TimeUnit.SECONDS))
86+
.isTrue()
87+
.overridingErrorMessage("failed to close ADS stream");
88+
89+
assertThat(onStreamOpenLatch.await(15, TimeUnit.SECONDS))
90+
.isTrue()
91+
.overridingErrorMessage("failed to open ADS stream");
92+
93+
assertThat(onStreamRequestLatch.await(15, TimeUnit.SECONDS))
94+
.isTrue()
95+
.overridingErrorMessage("failed to receive ADS request");
96+
97+
assertThat(onStreamResponseLatch.await(15, TimeUnit.SECONDS))
98+
.isTrue()
99+
.overridingErrorMessage("failed to send ADS response");
100+
101+
String baseUri =
102+
String.format(
103+
"http://%s:%d", ENVOY.getContainerIpAddress(), ENVOY.getMappedPort(LISTENER_PORT));
104+
105+
await()
106+
.atMost(5, TimeUnit.SECONDS)
107+
.ignoreExceptions()
108+
.untilAsserted(
109+
() ->
110+
given()
111+
.baseUri(baseUri)
112+
.contentType(ContentType.TEXT)
113+
.when()
114+
.get("/")
115+
.then()
116+
.statusCode(200)
117+
.and()
118+
.body(containsString(UPSTREAM.response)));
119+
}
120+
}

0 commit comments

Comments
 (0)