Skip to content

Commit c56c848

Browse files
sschepenssnowp
authored andcommitted
server: handle call cancellation gracefully (#80)
Immediately cancel watches when responseObserver is cancelled. Catch cancellation exceptions when responding watches. Correctly handle cancellations on error. Fixes #79 Signed-off-by: Sebastian Schepens <[email protected]>
1 parent aa7cc51 commit c56c848

File tree

2 files changed

+193
-95
lines changed

2 files changed

+193
-95
lines changed

server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java

Lines changed: 132 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import io.envoyproxy.controlplane.cache.Response;
1818
import io.envoyproxy.controlplane.cache.Watch;
1919
import io.grpc.Status;
20+
import io.grpc.StatusRuntimeException;
21+
import io.grpc.stub.ServerCallStreamObserver;
2022
import io.grpc.stub.StreamObserver;
2123
import java.util.Collections;
2224
import java.util.List;
@@ -152,123 +154,158 @@ private StreamObserver<DiscoveryRequest> createRequestHandler(
152154

153155
callbacks.forEach(cb -> cb.onStreamOpen(streamId, defaultTypeUrl));
154156

155-
return new StreamObserver<DiscoveryRequest>() {
157+
final DiscoveryRequestStreamObserver requestStreamObserver =
158+
new DiscoveryRequestStreamObserver(defaultTypeUrl, responseObserver, streamId, ads);
156159

157-
private final Map<String, Watch> watches = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
158-
private final Map<String, DiscoveryResponse> latestResponse =
159-
new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
160-
private final Map<String, Set<String>> ackedResources = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
160+
if (responseObserver instanceof ServerCallStreamObserver) {
161+
((ServerCallStreamObserver) responseObserver).setOnCancelHandler(requestStreamObserver::onCancelled);
162+
}
161163

162-
private AtomicLong streamNonce = new AtomicLong();
164+
return requestStreamObserver;
165+
}
163166

164-
@Override
165-
public void onNext(DiscoveryRequest request) {
166-
String nonce = request.getResponseNonce();
167-
String requestTypeUrl = request.getTypeUrl();
168-
169-
if (defaultTypeUrl.equals(ANY_TYPE_URL)) {
170-
if (requestTypeUrl.isEmpty()) {
171-
responseObserver.onError(
172-
Status.UNKNOWN
173-
.withDescription(String.format("[%d] type URL is required for ADS", streamId))
174-
.asRuntimeException());
175-
176-
return;
177-
}
178-
} else if (requestTypeUrl.isEmpty()) {
179-
requestTypeUrl = defaultTypeUrl;
167+
private class DiscoveryRequestStreamObserver implements StreamObserver<DiscoveryRequest> {
168+
169+
private final Map<String, Watch> watches;
170+
private final Map<String, DiscoveryResponse> latestResponse;
171+
private final Map<String, Set<String>> ackedResources;
172+
private final String defaultTypeUrl;
173+
private final StreamObserver<DiscoveryResponse> responseObserver;
174+
private final long streamId;
175+
private final boolean ads;
176+
177+
private AtomicLong streamNonce;
178+
179+
public DiscoveryRequestStreamObserver(String defaultTypeUrl, StreamObserver<DiscoveryResponse> responseObserver,
180+
long streamId, boolean ads) {
181+
this.defaultTypeUrl = defaultTypeUrl;
182+
this.responseObserver = responseObserver;
183+
this.streamId = streamId;
184+
this.ads = ads;
185+
watches = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
186+
latestResponse = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
187+
ackedResources = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
188+
streamNonce = new AtomicLong();
189+
}
190+
191+
@Override
192+
public void onNext(DiscoveryRequest request) {
193+
String nonce = request.getResponseNonce();
194+
String requestTypeUrl = request.getTypeUrl();
195+
196+
if (defaultTypeUrl.equals(ANY_TYPE_URL)) {
197+
if (requestTypeUrl.isEmpty()) {
198+
responseObserver.onError(
199+
Status.UNKNOWN
200+
.withDescription(String.format("[%d] type URL is required for ADS", streamId))
201+
.asRuntimeException());
202+
203+
return;
180204
}
205+
} else if (requestTypeUrl.isEmpty()) {
206+
requestTypeUrl = defaultTypeUrl;
207+
}
181208

182-
LOGGER.info("[{}] request {}[{}] with nonce {} from version {}",
183-
streamId,
184-
requestTypeUrl,
185-
String.join(", ", request.getResourceNamesList()),
186-
nonce,
187-
request.getVersionInfo());
188-
189-
callbacks.forEach(cb -> cb.onStreamRequest(streamId, request));
190-
191-
for (String typeUrl : Resources.TYPE_URLS) {
192-
DiscoveryResponse response = latestResponse.get(typeUrl);
193-
String resourceNonce = response == null ? null : response.getNonce();
194-
195-
if (requestTypeUrl.equals(typeUrl) && (isNullOrEmpty(resourceNonce)
196-
|| resourceNonce.equals(nonce))) {
197-
if (!request.hasErrorDetail() && response != null) {
198-
Set<String> ackedResourcesForType = response.getResourcesList()
199-
.stream()
200-
.map(Resources::getResourceName)
201-
.collect(Collectors.toSet());
202-
ackedResources.put(typeUrl, ackedResourcesForType);
203-
}
209+
LOGGER.info("[{}] request {}[{}] with nonce {} from version {}",
210+
streamId,
211+
requestTypeUrl,
212+
String.join(", ", request.getResourceNamesList()),
213+
nonce,
214+
request.getVersionInfo());
215+
216+
callbacks.forEach(cb -> cb.onStreamRequest(streamId, request));
217+
218+
for (String typeUrl : Resources.TYPE_URLS) {
219+
DiscoveryResponse response = latestResponse.get(typeUrl);
220+
String resourceNonce = response == null ? null : response.getNonce();
221+
222+
if (requestTypeUrl.equals(typeUrl) && (isNullOrEmpty(resourceNonce)
223+
|| resourceNonce.equals(nonce))) {
224+
if (!request.hasErrorDetail() && response != null) {
225+
Set<String> ackedResourcesForType = response.getResourcesList()
226+
.stream()
227+
.map(Resources::getResourceName)
228+
.collect(Collectors.toSet());
229+
ackedResources.put(typeUrl, ackedResourcesForType);
230+
}
204231

205-
watches.compute(typeUrl, (t, oldWatch) -> {
206-
if (oldWatch != null) {
207-
oldWatch.cancel();
208-
}
232+
watches.compute(typeUrl, (t, oldWatch) -> {
233+
if (oldWatch != null) {
234+
oldWatch.cancel();
235+
}
209236

210-
return configWatcher.createWatch(
211-
ads,
212-
request,
213-
ackedResources.getOrDefault(typeUrl, Collections.emptySet()),
214-
r -> send(r, typeUrl));
215-
});
237+
return configWatcher.createWatch(
238+
ads,
239+
request,
240+
ackedResources.getOrDefault(typeUrl, Collections.emptySet()),
241+
r -> send(r, typeUrl));
242+
});
216243

217-
return;
218-
}
244+
return;
219245
}
220246
}
247+
}
221248

222-
@Override
223-
public void onError(Throwable t) {
224-
if (!Status.fromThrowable(t).getCode().equals(Status.CANCELLED.getCode())) {
225-
LOGGER.error("[{}] stream closed with error", streamId, t);
226-
}
227-
228-
try {
229-
callbacks.forEach(cb -> cb.onStreamCloseWithError(streamId, defaultTypeUrl, t));
230-
responseObserver.onError(Status.fromThrowable(t).asException());
231-
} finally {
232-
cancel();
233-
}
249+
@Override
250+
public void onError(Throwable t) {
251+
if (!Status.fromThrowable(t).getCode().equals(Status.CANCELLED.getCode())) {
252+
LOGGER.error("[{}] stream closed with error", streamId, t);
234253
}
235254

236-
@Override
237-
public void onCompleted() {
238-
LOGGER.info("[{}] stream closed", streamId);
239-
240-
try {
241-
callbacks.forEach(cb -> cb.onStreamClose(streamId, defaultTypeUrl));
242-
responseObserver.onCompleted();
243-
} finally {
244-
cancel();
245-
}
255+
try {
256+
callbacks.forEach(cb -> cb.onStreamCloseWithError(streamId, defaultTypeUrl, t));
257+
responseObserver.onError(Status.fromThrowable(t).asException());
258+
} finally {
259+
cancel();
246260
}
261+
}
262+
263+
@Override
264+
public void onCompleted() {
265+
LOGGER.info("[{}] stream closed", streamId);
247266

248-
private void cancel() {
249-
watches.values().forEach(Watch::cancel);
267+
try {
268+
callbacks.forEach(cb -> cb.onStreamClose(streamId, defaultTypeUrl));
269+
responseObserver.onCompleted();
270+
} finally {
271+
cancel();
250272
}
273+
}
251274

252-
private void send(Response response, String typeUrl) {
253-
String nonce = Long.toString(streamNonce.getAndIncrement());
275+
void onCancelled() {
276+
LOGGER.info("[{}] stream cancelled", streamId);
277+
cancel();
278+
}
254279

255-
DiscoveryResponse discoveryResponse = DiscoveryResponse.newBuilder()
256-
.setVersionInfo(response.version())
257-
.addAllResources(response.resources().stream().map(Any::pack).collect(Collectors.toList()))
258-
.setTypeUrl(typeUrl)
259-
.setNonce(nonce)
260-
.build();
280+
private void cancel() {
281+
watches.values().forEach(Watch::cancel);
282+
}
261283

262-
LOGGER.info("[{}] response {} with nonce {} version {}", streamId, typeUrl, nonce, response.version());
284+
private void send(Response response, String typeUrl) {
285+
String nonce = Long.toString(streamNonce.getAndIncrement());
263286

264-
callbacks.forEach(cb -> cb.onStreamResponse(streamId, response.request(), discoveryResponse));
287+
DiscoveryResponse discoveryResponse = DiscoveryResponse.newBuilder()
288+
.setVersionInfo(response.version())
289+
.addAllResources(response.resources().stream().map(Any::pack).collect(Collectors.toList()))
290+
.setTypeUrl(typeUrl)
291+
.setNonce(nonce)
292+
.build();
265293

266-
// Store the latest response *before* we send the response. This ensures that by the time the request
267-
// is processed the map is guaranteed to be updated. Doing it afterwards leads to a race conditions
268-
// which may see the incoming request arrive before the map is updated, failing the nonce check erroneously.
269-
latestResponse.put(typeUrl, discoveryResponse);
294+
LOGGER.info("[{}] response {} with nonce {} version {}", streamId, typeUrl, nonce, response.version());
295+
296+
callbacks.forEach(cb -> cb.onStreamResponse(streamId, response.request(), discoveryResponse));
297+
298+
// Store the latest response *before* we send the response. This ensures that by the time the request
299+
// is processed the map is guaranteed to be updated. Doing it afterwards leads to a race conditions
300+
// which may see the incoming request arrive before the map is updated, failing the nonce check erroneously.
301+
latestResponse.put(typeUrl, discoveryResponse);
302+
try {
270303
responseObserver.onNext(discoveryResponse);
304+
} catch (StatusRuntimeException e) {
305+
if (!Status.CANCELLED.getCode().equals(e.getStatus().getCode())) {
306+
throw e;
307+
}
271308
}
272-
};
309+
}
273310
}
274311
}

server/src/test/java/io/envoyproxy/controlplane/server/DiscoveryServerTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,67 @@ public void callbackOnError_doesNotLogError_whenCancelled() {
804804
}
805805
}
806806

807+
@Test
808+
public void testCallbacksOnCancelled() throws InterruptedException, ClassNotFoundException {
809+
final CountDownLatch streamCloseWithErrorLatch = new CountDownLatch(1);
810+
final CountDownLatch watchCreated = new CountDownLatch(1);
811+
AtomicReference<Watch> watch = new AtomicReference<>();
812+
813+
MockDiscoveryServerCallbacks callbacks = new MockDiscoveryServerCallbacks() {
814+
@Override
815+
public void onStreamCloseWithError(long streamId, String typeUrl, Throwable error) {
816+
// watch should already be closed by the time we report a stream close error
817+
assertThat(watch.get().isCancelled()).isTrue();
818+
super.onStreamCloseWithError(streamId, typeUrl, error);
819+
streamCloseWithErrorLatch.countDown();
820+
}
821+
};
822+
823+
MockConfigWatcher configWatcher = new MockConfigWatcher(false, createResponses()) {
824+
@Override
825+
public Watch createWatch(boolean ads, DiscoveryRequest request, Set<String> knownResources,
826+
Consumer<Response> responseConsumer) {
827+
watchCreated.countDown();
828+
watch.set(super.createWatch(ads, request, knownResources, responseConsumer));
829+
return watch.get();
830+
}
831+
};
832+
DiscoveryServer server = new DiscoveryServer(callbacks, configWatcher);
833+
834+
grpcServer.getServiceRegistry().addService(server.getClusterDiscoveryServiceImpl());
835+
836+
ClusterDiscoveryServiceStub stub = ClusterDiscoveryServiceGrpc.newStub(grpcServer.getChannel());
837+
838+
MockDiscoveryResponseObserver responseObserver = new MockDiscoveryResponseObserver();
839+
840+
StreamObserver<DiscoveryRequest> requestObserver = stub.streamClusters(responseObserver);
841+
842+
requestObserver.onNext(DiscoveryRequest.newBuilder()
843+
.setNode(NODE)
844+
.setResponseNonce("1")
845+
.setTypeUrl(Resources.CLUSTER_TYPE_URL)
846+
.setVersionInfo(VERSION)
847+
.build());
848+
849+
if (!watchCreated.await(1, TimeUnit.SECONDS)) {
850+
fail("failed to execute watchCreated callback before timeout");
851+
}
852+
853+
requestObserver.onError(Status.CANCELLED.asException());
854+
855+
if (!streamCloseWithErrorLatch.await(1, TimeUnit.SECONDS)) {
856+
fail("failed to execute onStreamCloseWithError callback before timeout");
857+
}
858+
859+
callbacks.assertThatNoErrors();
860+
861+
assertThat(callbacks.streamCloseCount).hasValue(0);
862+
assertThat(callbacks.streamCloseWithErrorCount).hasValue(1);
863+
assertThat(callbacks.streamOpenCount).hasValue(1);
864+
assertThat(callbacks.streamRequestCount).hasValue(1);
865+
assertThat(callbacks.streamResponseCount).hasValue(1);
866+
}
867+
807868
private static Table<String, String, Collection<? extends Message>> createResponses() {
808869
return ImmutableTable.<String, String, Collection<? extends Message>>builder()
809870
.put(Resources.CLUSTER_TYPE_URL, VERSION, ImmutableList.of(CLUSTER))

0 commit comments

Comments
 (0)