Skip to content

Commit db1d744

Browse files
MarcinFalkowskijoeyb
authored andcommitted
server: add posibility to validate request and return custom error response (#105)
Signed-off-by: Marcin Falkowski <[email protected]>
1 parent 5877a99 commit db1d744

File tree

4 files changed

+125
-1
lines changed

4 files changed

+125
-1
lines changed

server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.envoyproxy.controlplane.cache.Resources;
99
import io.envoyproxy.controlplane.cache.Response;
1010
import io.envoyproxy.controlplane.cache.Watch;
11+
import io.envoyproxy.controlplane.server.exception.RequestException;
1112
import io.envoyproxy.controlplane.server.serializer.DefaultProtoResourcesSerializer;
1213
import io.envoyproxy.controlplane.server.serializer.ProtoResourcesSerializer;
1314
import io.envoyproxy.envoy.api.v2.ClusterDiscoveryServiceGrpc.ClusterDiscoveryServiceImplBase;
@@ -29,6 +30,7 @@
2930
import java.util.Set;
3031
import java.util.concurrent.ConcurrentHashMap;
3132
import java.util.concurrent.Executor;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3234
import java.util.concurrent.atomic.AtomicLong;
3335
import java.util.stream.Collectors;
3436
import org.slf4j.Logger;
@@ -199,6 +201,7 @@ private class DiscoveryRequestStreamObserver implements StreamObserver<Discovery
199201
private final long streamId;
200202
private final boolean ads;
201203
private final Executor executor;
204+
private final AtomicBoolean isClosing = new AtomicBoolean();
202205

203206
private AtomicLong streamNonce;
204207

@@ -243,7 +246,12 @@ public void onNext(DiscoveryRequest request) {
243246
nonce,
244247
request.getVersionInfo());
245248

246-
callbacks.forEach(cb -> cb.onStreamRequest(streamId, request));
249+
try {
250+
callbacks.forEach(cb -> cb.onStreamRequest(streamId, request));
251+
} catch (RequestException e) {
252+
closeWithError(e);
253+
return;
254+
}
247255

248256
for (String typeUrl : Resources.TYPE_URLS) {
249257
DiscoveryResponse response = latestResponse.get(typeUrl);
@@ -307,6 +315,13 @@ void onCancelled() {
307315
cancel();
308316
}
309317

318+
private void closeWithError(Throwable exception) {
319+
if (isClosing.compareAndSet(false, true)) {
320+
responseObserver.onError(exception);
321+
}
322+
cancel();
323+
}
324+
310325
private void cancel() {
311326
watches.values().forEach(Watch::cancel);
312327
}

server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServerCallbacks.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.envoyproxy.controlplane.server;
22

3+
import io.envoyproxy.controlplane.server.exception.RequestException;
34
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
45
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
56

@@ -48,6 +49,9 @@ default void onStreamOpen(long streamId, String typeUrl) {
4849
*
4950
* @param streamId an ID for this stream that is only unique to this discovery server instance
5051
* @param request the discovery request sent by the envoy instance
52+
*
53+
* @throws RequestException optionally can throw {@link RequestException} with custom status. That status
54+
* will be returned to the client and the stream will be closed with error.
5155
*/
5256
default void onStreamRequest(long streamId, DiscoveryRequest request) {
5357

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.envoyproxy.controlplane.server.exception;
2+
3+
import io.grpc.Metadata;
4+
import io.grpc.Status;
5+
import io.grpc.StatusRuntimeException;
6+
7+
import javax.annotation.Nullable;
8+
9+
public class RequestException extends StatusRuntimeException {
10+
public RequestException(Status status) {
11+
this(status, null);
12+
}
13+
14+
public RequestException(Status status, @Nullable Metadata trailers) {
15+
super(status, trailers);
16+
}
17+
}

server/src/test/java/io/envoyproxy/controlplane/server/DiscoveryServerTest.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.envoyproxy.controlplane.cache.TestResources;
1717
import io.envoyproxy.controlplane.cache.Watch;
1818
import io.envoyproxy.controlplane.cache.WatchCancelledException;
19+
import io.envoyproxy.controlplane.server.exception.RequestException;
1920
import io.envoyproxy.envoy.api.v2.Cluster;
2021
import io.envoyproxy.envoy.api.v2.ClusterDiscoveryServiceGrpc;
2122
import io.envoyproxy.envoy.api.v2.ClusterDiscoveryServiceGrpc.ClusterDiscoveryServiceStub;
@@ -56,6 +57,7 @@
5657
import java.util.concurrent.atomic.AtomicReference;
5758
import java.util.function.Consumer;
5859
import java.util.stream.Collectors;
60+
5961
import org.assertj.core.api.Condition;
6062
import org.junit.Rule;
6163
import org.junit.Test;
@@ -864,6 +866,90 @@ public Watch createWatch(boolean ads, DiscoveryRequest request, Set<String> know
864866
assertThat(callbacks.streamResponseCount).hasValue(1);
865867
}
866868

869+
@Test
870+
public void testCallbacksRequestException() throws InterruptedException {
871+
MockDiscoveryServerCallbacks callbacks = new MockDiscoveryServerCallbacks() {
872+
@Override
873+
public void onStreamRequest(long streamId, DiscoveryRequest request) {
874+
super.onStreamRequest(streamId, request);
875+
throw new RequestException(Status.INVALID_ARGUMENT.withDescription("request not valid"));
876+
}
877+
};
878+
879+
MockConfigWatcher configWatcher = new MockConfigWatcher(false, createResponses());
880+
DiscoveryServer server = new DiscoveryServer(callbacks, configWatcher);
881+
882+
grpcServer.getServiceRegistry().addService(server.getAggregatedDiscoveryServiceImpl());
883+
AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(grpcServer.getChannel());
884+
885+
MockDiscoveryResponseObserver responseObserver = new MockDiscoveryResponseObserver();
886+
StreamObserver<DiscoveryRequest> requestObserver = stub.streamAggregatedResources(responseObserver);
887+
888+
requestObserver.onNext(DiscoveryRequest.newBuilder()
889+
.setNode(NODE)
890+
.setTypeUrl(Resources.LISTENER_TYPE_URL)
891+
.build());
892+
893+
if (!responseObserver.errorLatch.await(1, TimeUnit.SECONDS) || responseObserver.completed.get()) {
894+
fail(format("failed to error before timeout, completed = %b", responseObserver.completed.get()));
895+
}
896+
897+
callbacks.assertThatNoErrors();
898+
899+
assertThat(responseObserver.errorException).isInstanceOfSatisfying(StatusRuntimeException.class, ex -> {
900+
assertThat(ex.getStatus().getCode()).isEqualTo(Status.Code.INVALID_ARGUMENT);
901+
assertThat(ex.getStatus().getDescription()).isEqualTo("request not valid");
902+
});
903+
904+
assertThat(callbacks.streamCloseCount).hasValue(0);
905+
assertThat(callbacks.streamCloseWithErrorCount).hasValue(0);
906+
assertThat(callbacks.streamOpenCount).hasValue(1);
907+
assertThat(callbacks.streamRequestCount).hasValue(1);
908+
assertThat(callbacks.streamResponseCount).hasValue(0);
909+
}
910+
911+
@Test
912+
public void testCallbacksOtherStatusException() throws InterruptedException {
913+
MockDiscoveryServerCallbacks callbacks = new MockDiscoveryServerCallbacks() {
914+
@Override
915+
public void onStreamRequest(long streamId, DiscoveryRequest request) {
916+
super.onStreamRequest(streamId, request);
917+
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("request not valid"));
918+
}
919+
};
920+
921+
MockConfigWatcher configWatcher = new MockConfigWatcher(false, createResponses());
922+
DiscoveryServer server = new DiscoveryServer(callbacks, configWatcher);
923+
924+
grpcServer.getServiceRegistry().addService(server.getAggregatedDiscoveryServiceImpl());
925+
AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(grpcServer.getChannel());
926+
927+
MockDiscoveryResponseObserver responseObserver = new MockDiscoveryResponseObserver();
928+
StreamObserver<DiscoveryRequest> requestObserver = stub.streamAggregatedResources(responseObserver);
929+
930+
requestObserver.onNext(DiscoveryRequest.newBuilder()
931+
.setNode(NODE)
932+
.setTypeUrl(Resources.LISTENER_TYPE_URL)
933+
.build());
934+
935+
if (!responseObserver.errorLatch.await(1, TimeUnit.SECONDS) || responseObserver.completed.get()) {
936+
fail(format("failed to error before timeout, completed = %b", responseObserver.completed.get()));
937+
}
938+
939+
callbacks.assertThatNoErrors();
940+
941+
assertThat(responseObserver.errorException).isInstanceOfSatisfying(StatusRuntimeException.class, ex -> {
942+
assertThat(ex.getStatus().getCode()).isEqualTo(Status.Code.UNKNOWN);
943+
assertThat(ex.getStatus().getDescription()).isNull();
944+
});
945+
946+
assertThat(callbacks.streamCloseCount).hasValue(0);
947+
assertThat(callbacks.streamCloseWithErrorCount).hasValue(0);
948+
assertThat(callbacks.streamOpenCount).hasValue(1);
949+
assertThat(callbacks.streamRequestCount).hasValue(1);
950+
assertThat(callbacks.streamResponseCount).hasValue(0);
951+
}
952+
867953
private static Table<String, String, Collection<? extends Message>> createResponses() {
868954
return ImmutableTable.<String, String, Collection<? extends Message>>builder()
869955
.put(Resources.CLUSTER_TYPE_URL, VERSION, ImmutableList.of(CLUSTER))
@@ -1005,6 +1091,7 @@ private static class MockDiscoveryResponseObserver implements StreamObserver<Dis
10051091
private final AtomicInteger nonce = new AtomicInteger();
10061092
private final Collection<DiscoveryResponse> responses = new LinkedList<>();
10071093

1094+
private Throwable errorException;
10081095
private boolean sendError = false;
10091096

10101097
void assertThatNoErrors() {
@@ -1054,6 +1141,7 @@ public void onNext(DiscoveryResponse value) {
10541141
@Override
10551142
public void onError(Throwable t) {
10561143
error.set(true);
1144+
errorException = t;
10571145
errorLatch.countDown();
10581146
}
10591147

0 commit comments

Comments
 (0)