Skip to content

Commit 61382e0

Browse files
sschepensjakubdyszkiewicz
authored andcommitted
synchronize access to responseObserver
Signed-off-by: Sebastian Schepens <[email protected]>
1 parent b937b37 commit 61382e0

File tree

1 file changed

+49
-27
lines changed

1 file changed

+49
-27
lines changed

server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java

Lines changed: 49 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,15 @@
3030
import java.util.Set;
3131
import java.util.concurrent.ConcurrentHashMap;
3232
import java.util.concurrent.Executor;
33-
import java.util.concurrent.atomic.AtomicBoolean;
3433
import java.util.concurrent.atomic.AtomicLong;
3534
import java.util.stream.Collectors;
3635
import org.slf4j.Logger;
3736
import org.slf4j.LoggerFactory;
3837

3938
public class DiscoveryServer {
4039

41-
private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class);
42-
4340
static final String ANY_TYPE_URL = "";
44-
41+
private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class);
4542
private final List<DiscoveryServerCallbacks> callbacks;
4643
private final ConfigWatcher configWatcher;
4744
private final ExecutorGroup executorGroup;
@@ -58,7 +55,8 @@ public DiscoveryServer(DiscoveryServerCallbacks callbacks, ConfigWatcher configW
5855

5956
/**
6057
* Creates the server.
61-
* @param callbacks server callbacks
58+
*
59+
* @param callbacks server callbacks
6260
* @param configWatcher source of configuration updates
6361
*/
6462
public DiscoveryServer(List<DiscoveryServerCallbacks> callbacks, ConfigWatcher configWatcher) {
@@ -67,9 +65,10 @@ public DiscoveryServer(List<DiscoveryServerCallbacks> callbacks, ConfigWatcher c
6765

6866
/**
6967
* Creates the server.
70-
* @param callbacks server callbacks
71-
* @param configWatcher source of configuration updates
72-
* @param executorGroup executor group to use for responding stream requests
68+
*
69+
* @param callbacks server callbacks
70+
* @param configWatcher source of configuration updates
71+
* @param executorGroup executor group to use for responding stream requests
7372
* @param protoResourcesSerializer serializer of proto buffer messages
7473
*/
7574
public DiscoveryServer(List<DiscoveryServerCallbacks> callbacks,
@@ -162,7 +161,8 @@ public StreamObserver<DiscoveryRequest> streamRoutes(
162161
*/
163162
public SecretDiscoveryServiceGrpc.SecretDiscoveryServiceImplBase getSecretDiscoveryServiceImpl() {
164163
return new SecretDiscoveryServiceGrpc.SecretDiscoveryServiceImplBase() {
165-
@Override public StreamObserver<DiscoveryRequest> streamSecrets(
164+
@Override
165+
public StreamObserver<DiscoveryRequest> streamSecrets(
166166
StreamObserver<DiscoveryResponse> responseObserver) {
167167
return createRequestHandler(responseObserver, false, Resources.SECRET_TYPE_URL);
168168
}
@@ -201,7 +201,7 @@ private class DiscoveryRequestStreamObserver implements StreamObserver<Discovery
201201
private final long streamId;
202202
private final boolean ads;
203203
private final Executor executor;
204-
private final AtomicBoolean isClosing = new AtomicBoolean();
204+
private boolean isClosing;
205205

206206
private AtomicLong streamNonce;
207207

@@ -214,10 +214,10 @@ public DiscoveryRequestStreamObserver(String defaultTypeUrl,
214214
this.responseObserver = responseObserver;
215215
this.streamId = streamId;
216216
this.ads = ads;
217-
watches = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
218-
latestResponse = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
219-
ackedResources = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
220-
streamNonce = new AtomicLong();
217+
this.watches = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
218+
this.latestResponse = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
219+
this.ackedResources = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
220+
this.streamNonce = new AtomicLong();
221221
this.executor = executor;
222222
}
223223

@@ -228,10 +228,15 @@ public void onNext(DiscoveryRequest request) {
228228

229229
if (defaultTypeUrl.equals(ANY_TYPE_URL)) {
230230
if (requestTypeUrl.isEmpty()) {
231-
responseObserver.onError(
232-
Status.UNKNOWN
233-
.withDescription(String.format("[%d] type URL is required for ADS", streamId))
234-
.asRuntimeException());
231+
synchronized (responseObserver) {
232+
if (!isClosing) {
233+
isClosing = true;
234+
responseObserver.onError(
235+
Status.UNKNOWN
236+
.withDescription(String.format("[%d] type URL is required for ADS", streamId))
237+
.asRuntimeException());
238+
}
239+
}
235240

236241
return;
237242
}
@@ -292,7 +297,12 @@ public void onError(Throwable t) {
292297

293298
try {
294299
callbacks.forEach(cb -> cb.onStreamCloseWithError(streamId, defaultTypeUrl, t));
295-
responseObserver.onError(Status.fromThrowable(t).asException());
300+
synchronized (responseObserver) {
301+
if (!isClosing) {
302+
isClosing = true;
303+
responseObserver.onError(Status.fromThrowable(t).asException());
304+
}
305+
}
296306
} finally {
297307
cancel();
298308
}
@@ -304,7 +314,12 @@ public void onCompleted() {
304314

305315
try {
306316
callbacks.forEach(cb -> cb.onStreamClose(streamId, defaultTypeUrl));
307-
responseObserver.onCompleted();
317+
synchronized (responseObserver) {
318+
if (!isClosing) {
319+
isClosing = true;
320+
responseObserver.onCompleted();
321+
}
322+
}
308323
} finally {
309324
cancel();
310325
}
@@ -316,8 +331,11 @@ void onCancelled() {
316331
}
317332

318333
private void closeWithError(Throwable exception) {
319-
if (isClosing.compareAndSet(false, true)) {
320-
responseObserver.onError(exception);
334+
synchronized (responseObserver) {
335+
if (!isClosing) {
336+
isClosing = true;
337+
responseObserver.onError(exception);
338+
}
321339
}
322340
cancel();
323341
}
@@ -345,11 +363,15 @@ private void send(Response response, String typeUrl) {
345363
// is processed the map is guaranteed to be updated. Doing it afterwards leads to a race conditions
346364
// which may see the incoming request arrive before the map is updated, failing the nonce check erroneously.
347365
latestResponse.put(typeUrl, discoveryResponse);
348-
try {
349-
responseObserver.onNext(discoveryResponse);
350-
} catch (StatusRuntimeException e) {
351-
if (!Status.CANCELLED.getCode().equals(e.getStatus().getCode())) {
352-
throw e;
366+
synchronized (responseObserver) {
367+
if (!isClosing) {
368+
try {
369+
responseObserver.onNext(discoveryResponse);
370+
} catch (StatusRuntimeException e) {
371+
if (!Status.CANCELLED.getCode().equals(e.getStatus().getCode())) {
372+
throw e;
373+
}
374+
}
353375
}
354376
}
355377
}

0 commit comments

Comments
 (0)