Skip to content

Commit 2e9c3e1

Browse files
authored
xds: Update error handling for ADS stream close and failure scenarios (grpc#11596)
When an ADS stream in closed with a non-OK status after receiving a response, new status will be updated to OK status. This makes the fail behavior consistent with gRFC A57.
1 parent e59ae5f commit 2e9c3e1

File tree

3 files changed

+81
-31
lines changed

3 files changed

+81
-31
lines changed

xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.grpc.xds.client;
1818

19-
import static com.google.common.base.Preconditions.checkArgument;
2019
import static com.google.common.base.Preconditions.checkNotNull;
2120
import static com.google.common.base.Preconditions.checkState;
2221

@@ -60,7 +59,6 @@
6059
*/
6160
final class ControlPlaneClient {
6261

63-
public static final String CLOSED_BY_SERVER = "Closed by server";
6462
private final SynchronizationContext syncContext;
6563
private final InternalLogId logId;
6664
private final XdsLogger logger;
@@ -358,11 +356,7 @@ public void run() {
358356
@Override
359357
public void onStatusReceived(final Status status) {
360358
syncContext.execute(() -> {
361-
if (status.isOk()) {
362-
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
363-
} else {
364-
handleRpcStreamClosed(status);
365-
}
359+
handleRpcStreamClosed(status);
366360
});
367361
}
368362

@@ -381,7 +375,7 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
381375
processingTracker.onComplete();
382376
}
383377

384-
private void handleRpcStreamClosed(Status error) {
378+
private void handleRpcStreamClosed(Status status) {
385379
if (closed) {
386380
return;
387381
}
@@ -399,15 +393,34 @@ private void handleRpcStreamClosed(Status error) {
399393
rpcRetryTimer = syncContext.schedule(
400394
new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
401395

402-
checkArgument(!error.isOk(), "unexpected OK status");
403-
String errorMsg = error.getDescription() != null
404-
&& error.getDescription().equals(CLOSED_BY_SERVER)
405-
? "ADS stream closed with status {0}: {1}. Cause: {2}"
406-
: "ADS stream failed with status {0}: {1}. Cause: {2}";
407-
logger.log(
408-
XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
396+
Status newStatus = status;
397+
if (responseReceived) {
398+
// A closed ADS stream after a successful response is not considered an error. Servers may
399+
// close streams for various reasons during normal operation, such as load balancing or
400+
// underlying connection hitting its max connection age limit (see gRFC A9).
401+
if (!status.isOk()) {
402+
newStatus = Status.OK;
403+
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
404+
+ "response was received, so this will not be treated as an error. Cause: {2}",
405+
status.getCode(), status.getDescription(), status.getCause());
406+
} else {
407+
logger.log(XdsLogLevel.DEBUG,
408+
"ADS stream closed by server after a response was received");
409+
}
410+
} else {
411+
// If the ADS stream is closed without ever having received a response from the server, then
412+
// the XdsClient should consider that a connectivity error (see gRFC A57).
413+
if (status.isOk()) {
414+
newStatus = Status.UNAVAILABLE.withDescription(
415+
"ADS stream closed with OK before receiving a response");
416+
}
417+
logger.log(
418+
XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
419+
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
420+
}
421+
409422
closed = true;
410-
xdsResponseHandler.handleStreamClosed(error);
423+
xdsResponseHandler.handleStreamClosed(newStatus);
411424
cleanUp();
412425

413426
logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,13 @@ public void handleResourceResponse(
142142
public void handleStreamClosed(Status error) {
143143
syncContext.throwIfNotInThisSynchronizationContext();
144144
cleanUpResourceTimers();
145-
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
146-
resourceSubscribers.values()) {
147-
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
148-
if (!subscriber.hasResult()) {
149-
subscriber.onError(error, null);
145+
if (!error.isOk()) {
146+
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
147+
resourceSubscribers.values()) {
148+
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
149+
if (!subscriber.hasResult()) {
150+
subscriber.onError(error, null);
151+
}
150152
}
151153
}
152154
}

xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3331,6 +3331,43 @@ public void useIndependentRpcContext() {
33313331
}
33323332
}
33333333

3334+
@Test
3335+
public void streamClosedWithNoResponse() {
3336+
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher);
3337+
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE,
3338+
rdsResourceWatcher);
3339+
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
3340+
// Management server closes the RPC stream before sending any response.
3341+
call.sendCompleted();
3342+
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
3343+
.onError(errorCaptor.capture());
3344+
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
3345+
"ADS stream closed with OK before receiving a response");
3346+
verify(rdsResourceWatcher).onError(errorCaptor.capture());
3347+
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
3348+
"ADS stream closed with OK before receiving a response");
3349+
}
3350+
3351+
@Test
3352+
public void streamClosedAfterSendingResponses() {
3353+
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher);
3354+
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE,
3355+
rdsResourceWatcher);
3356+
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
3357+
ScheduledTask ldsResourceTimeout =
3358+
Iterables.getOnlyElement(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
3359+
ScheduledTask rdsResourceTimeout =
3360+
Iterables.getOnlyElement(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
3361+
call.sendResponse(LDS, testListenerRds, VERSION_1, "0000");
3362+
assertThat(ldsResourceTimeout.isCancelled()).isTrue();
3363+
call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
3364+
assertThat(rdsResourceTimeout.isCancelled()).isTrue();
3365+
// Management server closes the RPC stream after sending responses.
3366+
call.sendCompleted();
3367+
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
3368+
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
3369+
}
3370+
33343371
@Test
33353372
public void streamClosedAndRetryWithBackoff() {
33363373
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
@@ -3408,10 +3445,10 @@ public void streamClosedAndRetryWithBackoff() {
34083445
call.sendError(Status.DEADLINE_EXCEEDED.asException());
34093446
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
34103447
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
3411-
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
3412-
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
3413-
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
3414-
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
3448+
verify(cdsResourceWatcher, times(2)).onError(errorCaptor.capture());
3449+
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
3450+
verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture());
3451+
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
34153452

34163453
// Reset backoff sequence and retry after backoff.
34173454
inOrder.verify(backoffPolicyProvider).get();
@@ -3430,9 +3467,9 @@ public void streamClosedAndRetryWithBackoff() {
34303467
call.sendError(Status.UNAVAILABLE.asException());
34313468
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
34323469
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
3433-
verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture());
3470+
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
34343471
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
3435-
verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture());
3472+
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
34363473
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
34373474

34383475
// Retry after backoff.
@@ -3516,10 +3553,8 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe
35163553
assertThat(edsResourceTimeout.isCancelled()).isTrue();
35173554
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
35183555
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
3519-
verify(cdsResourceWatcher).onError(errorCaptor.capture());
3520-
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
3521-
verify(edsResourceWatcher).onError(errorCaptor.capture());
3522-
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
3556+
verify(cdsResourceWatcher, never()).onError(errorCaptor.capture());
3557+
verify(edsResourceWatcher, never()).onError(errorCaptor.capture());
35233558

35243559
fakeClock.forwardNanos(10L);
35253560
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0);

0 commit comments

Comments
 (0)