Skip to content

Commit 36fe276

Browse files
authored
xds: add "resource_timer_is_transient_failure" server feature (#12249)
1 parent ba0a732 commit 36fe276

File tree

10 files changed

+178
-41
lines changed

10 files changed

+178
-41
lines changed

xds/src/main/java/io/grpc/xds/CsdsService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ static ClientResourceStatus metadataStatusToClientStatus(ResourceMetadataStatus
249249
return ClientResourceStatus.ACKED;
250250
case NACKED:
251251
return ClientResourceStatus.NACKED;
252+
case TIMEOUT:
253+
return ClientResourceStatus.TIMEOUT;
252254
default:
253255
throw new AssertionError("Unexpected ResourceMetadataStatus: " + status);
254256
}

xds/src/main/java/io/grpc/xds/client/Bootstrapper.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,20 @@ public abstract static class ServerInfo {
6363

6464
public abstract boolean isTrustedXdsServer();
6565

66+
public abstract boolean resourceTimerIsTransientError();
67+
6668
@VisibleForTesting
6769
public static ServerInfo create(String target, @Nullable Object implSpecificConfig) {
68-
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig, false, false);
70+
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
71+
false, false, false);
6972
}
7073

7174
@VisibleForTesting
7275
public static ServerInfo create(
7376
String target, Object implSpecificConfig, boolean ignoreResourceDeletion,
74-
boolean isTrustedXdsServer) {
77+
boolean isTrustedXdsServer, boolean resourceTimerIsTransientError) {
7578
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
76-
ignoreResourceDeletion, isTrustedXdsServer);
79+
ignoreResourceDeletion, isTrustedXdsServer, resourceTimerIsTransientError);
7780
}
7881
}
7982

xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public abstract class BootstrapperImpl extends Bootstrapper {
4343

4444
public static final String GRPC_EXPERIMENTAL_XDS_FALLBACK =
4545
"GRPC_EXPERIMENTAL_XDS_FALLBACK";
46+
public static final String GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING =
47+
"GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING";
4648

4749
// Client features.
4850
@VisibleForTesting
@@ -54,10 +56,16 @@ public abstract class BootstrapperImpl extends Bootstrapper {
5456
// Server features.
5557
private static final String SERVER_FEATURE_IGNORE_RESOURCE_DELETION = "ignore_resource_deletion";
5658
private static final String SERVER_FEATURE_TRUSTED_XDS_SERVER = "trusted_xds_server";
59+
private static final String
60+
SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR = "resource_timer_is_transient_error";
5761

5862
@VisibleForTesting
5963
static boolean enableXdsFallback = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, true);
6064

65+
@VisibleForTesting
66+
public static boolean xdsDataErrorHandlingEnabled
67+
= GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING, false);
68+
6169
protected final XdsLogger logger;
6270

6371
protected FileReader reader = LocalFileReader.INSTANCE;
@@ -247,18 +255,22 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
247255

248256
Object implSpecificConfig = getImplSpecificConfig(serverConfig, serverUri);
249257

258+
boolean resourceTimerIsTransientError = false;
250259
boolean ignoreResourceDeletion = false;
251260
// "For forward compatibility reasons, the client will ignore any entry in the list that it
252261
// does not understand, regardless of type."
253262
List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
254263
if (serverFeatures != null) {
255264
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
256265
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
266+
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
267+
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
257268
}
258269
servers.add(
259270
ServerInfo.create(serverUri, implSpecificConfig, ignoreResourceDeletion,
260271
serverFeatures != null
261-
&& serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER)));
272+
&& serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER),
273+
resourceTimerIsTransientError));
262274
}
263275
return servers.build();
264276
}

xds/src/main/java/io/grpc/xds/client/XdsClient.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ public static ResourceMetadata newResourceMetadataDoesNotExist() {
199199
return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false, null, null);
200200
}
201201

202+
public static ResourceMetadata newResourceMetadataTimeout() {
203+
return new ResourceMetadata(ResourceMetadataStatus.TIMEOUT, "", 0, false, null, null);
204+
}
205+
202206
public static ResourceMetadata newResourceMetadataAcked(
203207
Any rawResource, String version, long updateTimeNanos) {
204208
checkNotNull(rawResource, "rawResource");
@@ -256,7 +260,7 @@ public UpdateFailureState getErrorState() {
256260
* config_dump.proto</a>
257261
*/
258262
public enum ResourceMetadataStatus {
259-
UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED
263+
UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED, TIMEOUT
260264
}
261265

262266
/**

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
6767
// Longest time to wait, since the subscription to some resource, for concluding its absence.
6868
@VisibleForTesting
6969
public static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
70+
public static final int EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC = 30;
7071

7172
private final SynchronizationContext syncContext = new SynchronizationContext(
7273
new Thread.UncaughtExceptionHandler() {
@@ -738,6 +739,9 @@ void restartTimer() {
738739
// When client becomes ready, it triggers a restartTimer for all relevant subscribers.
739740
return;
740741
}
742+
ServerInfo serverInfo = activeCpc.getServerInfo();
743+
int timeoutSec = serverInfo.resourceTimerIsTransientError()
744+
? EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC : INITIAL_RESOURCE_FETCH_TIMEOUT_SEC;
741745

742746
class ResourceNotFound implements Runnable {
743747
@Override
@@ -761,8 +765,7 @@ public String toString() {
761765
respTimer.cancel();
762766
}
763767
respTimer = syncContext.schedule(
764-
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
765-
timeService);
768+
new ResourceNotFound(), timeoutSec, TimeUnit.SECONDS, timeService);
766769
}
767770

768771
void stopTimer() {
@@ -854,14 +857,21 @@ void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverIn
854857
if (!absent) {
855858
data = null;
856859
absent = true;
857-
metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
860+
metadata = serverInfo.resourceTimerIsTransientError()
861+
? ResourceMetadata.newResourceMetadataTimeout()
862+
: ResourceMetadata.newResourceMetadataDoesNotExist();
858863
for (ResourceWatcher<T> watcher : watchers.keySet()) {
859864
if (processingTracker != null) {
860865
processingTracker.startTask();
861866
}
862867
watchers.get(watcher).execute(() -> {
863868
try {
864-
watcher.onResourceDoesNotExist(resource);
869+
if (serverInfo.resourceTimerIsTransientError()) {
870+
watcher.onError(Status.UNAVAILABLE.withDescription(
871+
"Timed out waiting for resource " + resource + " from xDS server"));
872+
} else {
873+
watcher.onResourceDoesNotExist(resource);
874+
}
865875
} finally {
866876
if (processingTracker != null) {
867877
processingTracker.onComplete();

xds/src/test/java/io/grpc/xds/CsdsServiceTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,8 @@ public void metadataStatusToClientStatus() {
366366
.isEqualTo(ClientResourceStatus.ACKED);
367367
assertThat(CsdsService.metadataStatusToClientStatus(ResourceMetadataStatus.NACKED))
368368
.isEqualTo(ClientResourceStatus.NACKED);
369+
assertThat(CsdsService.metadataStatusToClientStatus(ResourceMetadataStatus.TIMEOUT))
370+
.isEqualTo(ClientResourceStatus.TIMEOUT);
369371
}
370372

371373
@Test

xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3549,7 +3549,7 @@ private static Filter buildHttpConnectionManagerFilter(HttpFilter... httpFilters
35493549

35503550
private XdsResourceType.Args getXdsResourceTypeArgs(boolean isTrustedServer) {
35513551
return new XdsResourceType.Args(
3552-
ServerInfo.create("http://td", "", false, isTrustedServer), "1.0", null, null, null, null
3552+
ServerInfo.create("http://td", "", false, isTrustedServer, false), "1.0", null, null, null, null
35533553
);
35543554
}
35553555
}

xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java

Lines changed: 132 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
8686
import io.grpc.xds.client.Bootstrapper.CertificateProviderInfo;
8787
import io.grpc.xds.client.Bootstrapper.ServerInfo;
88+
import io.grpc.xds.client.BootstrapperImpl;
8889
import io.grpc.xds.client.EnvoyProtoData.Node;
8990
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
9091
import io.grpc.xds.client.Locality;
@@ -145,7 +146,7 @@
145146
public abstract class GrpcXdsClientImplTestBase {
146147

147148
private static final String SERVER_URI = "trafficdirector.googleapis.com";
148-
private static final String SERVER_URI_CUSTOME_AUTHORITY = "trafficdirector2.googleapis.com";
149+
private static final String SERVER_URI_CUSTOM_AUTHORITY = "trafficdirector2.googleapis.com";
149150
private static final String SERVER_URI_EMPTY_AUTHORITY = "trafficdirector3.googleapis.com";
150151
private static final String LDS_RESOURCE = "listener.googleapis.com";
151152
private static final String RDS_RESOURCE = "route-configuration.googleapis.com";
@@ -304,6 +305,30 @@ public long currentTimeNanos() {
304305
private final BindableService adsService = createAdsService();
305306
private final BindableService lrsService = createLrsService();
306307

308+
private XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() {
309+
@Override
310+
public XdsTransport create(ServerInfo serverInfo) {
311+
if (serverInfo.target().equals(SERVER_URI)) {
312+
return new GrpcXdsTransport(channel);
313+
}
314+
if (serverInfo.target().equals(SERVER_URI_CUSTOM_AUTHORITY)) {
315+
if (channelForCustomAuthority == null) {
316+
channelForCustomAuthority = cleanupRule.register(
317+
InProcessChannelBuilder.forName(serverName).directExecutor().build());
318+
}
319+
return new GrpcXdsTransport(channelForCustomAuthority);
320+
}
321+
if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) {
322+
if (channelForEmptyAuthority == null) {
323+
channelForEmptyAuthority = cleanupRule.register(
324+
InProcessChannelBuilder.forName(serverName).directExecutor().build());
325+
}
326+
return new GrpcXdsTransport(channelForEmptyAuthority);
327+
}
328+
throw new IllegalArgumentException("Can not create channel for " + serverInfo);
329+
}
330+
};
331+
307332
@Before
308333
public void setUp() throws IOException {
309334
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
@@ -322,32 +347,9 @@ public void setUp() throws IOException {
322347
.start());
323348
channel =
324349
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
325-
XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() {
326-
@Override
327-
public XdsTransport create(ServerInfo serverInfo) {
328-
if (serverInfo.target().equals(SERVER_URI)) {
329-
return new GrpcXdsTransport(channel);
330-
}
331-
if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) {
332-
if (channelForCustomAuthority == null) {
333-
channelForCustomAuthority = cleanupRule.register(
334-
InProcessChannelBuilder.forName(serverName).directExecutor().build());
335-
}
336-
return new GrpcXdsTransport(channelForCustomAuthority);
337-
}
338-
if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) {
339-
if (channelForEmptyAuthority == null) {
340-
channelForEmptyAuthority = cleanupRule.register(
341-
InProcessChannelBuilder.forName(serverName).directExecutor().build());
342-
}
343-
return new GrpcXdsTransport(channelForEmptyAuthority);
344-
}
345-
throw new IllegalArgumentException("Can not create channel for " + serverInfo);
346-
}
347-
};
348350

349351
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(),
350-
true);
352+
true, false);
351353
BootstrapInfo bootstrapInfo =
352354
Bootstrapper.BootstrapInfo.builder()
353355
.servers(Collections.singletonList(xdsServerInfo))
@@ -357,7 +359,7 @@ public XdsTransport create(ServerInfo serverInfo) {
357359
AuthorityInfo.create(
358360
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
359361
ImmutableList.of(Bootstrapper.ServerInfo.create(
360-
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))),
362+
SERVER_URI_CUSTOM_AUTHORITY, CHANNEL_CREDENTIALS))),
361363
"",
362364
AuthorityInfo.create(
363365
"xdstp:///envoy.config.listener.v3.Listener/%s",
@@ -3155,6 +3157,108 @@ public void flowControlAbsent() throws Exception {
31553157
verify(anotherWatcher).onError(any());
31563158
}
31573159

3160+
@Test
3161+
public void resourceTimerIsTransientError_schedulesExtendedTimeout() {
3162+
BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
3163+
ServerInfo serverInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS,
3164+
false, true, true);
3165+
BootstrapInfo bootstrapInfo =
3166+
Bootstrapper.BootstrapInfo.builder()
3167+
.servers(Collections.singletonList(serverInfo))
3168+
.node(NODE)
3169+
.authorities(ImmutableMap.of(
3170+
"",
3171+
AuthorityInfo.create(
3172+
"xdstp:///envoy.config.listener.v3.Listener/%s",
3173+
ImmutableList.of(Bootstrapper.ServerInfo.create(
3174+
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
3175+
.certProviders(ImmutableMap.of())
3176+
.build();
3177+
xdsClient = new XdsClientImpl(
3178+
xdsTransportFactory,
3179+
bootstrapInfo,
3180+
fakeClock.getScheduledExecutorService(),
3181+
backoffPolicyProvider,
3182+
fakeClock.getStopwatchSupplier(),
3183+
timeProvider,
3184+
MessagePrinter.INSTANCE,
3185+
new TlsContextManagerImpl(bootstrapInfo),
3186+
xdsClientMetricReporter);
3187+
@SuppressWarnings("unchecked")
3188+
ResourceWatcher<CdsUpdate> watcher = mock(ResourceWatcher.class);
3189+
String resourceName = "cluster.googleapis.com";
3190+
3191+
xdsClient.watchXdsResource(
3192+
XdsClusterResource.getInstance(),
3193+
resourceName,
3194+
watcher,
3195+
fakeClock.getScheduledExecutorService());
3196+
3197+
ScheduledTask task = Iterables.getOnlyElement(
3198+
fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
3199+
assertThat(task.getDelay(TimeUnit.SECONDS))
3200+
.isEqualTo(XdsClientImpl.EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC);
3201+
fakeClock.runDueTasks();
3202+
BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
3203+
}
3204+
3205+
@Test
3206+
public void resourceTimerIsTransientError_callsOnErrorUnavailable() {
3207+
BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
3208+
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(),
3209+
true, true);
3210+
BootstrapInfo bootstrapInfo =
3211+
Bootstrapper.BootstrapInfo.builder()
3212+
.servers(Collections.singletonList(xdsServerInfo))
3213+
.node(NODE)
3214+
.authorities(ImmutableMap.of(
3215+
"authority.xds.com",
3216+
AuthorityInfo.create(
3217+
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
3218+
ImmutableList.of(Bootstrapper.ServerInfo.create(
3219+
SERVER_URI_CUSTOM_AUTHORITY, CHANNEL_CREDENTIALS))),
3220+
"",
3221+
AuthorityInfo.create(
3222+
"xdstp:///envoy.config.listener.v3.Listener/%s",
3223+
ImmutableList.of(Bootstrapper.ServerInfo.create(
3224+
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
3225+
.certProviders(ImmutableMap.of("cert-instance-name",
3226+
CertificateProviderInfo.create("file-watcher", ImmutableMap.of())))
3227+
.build();
3228+
xdsClient = new XdsClientImpl(
3229+
xdsTransportFactory,
3230+
bootstrapInfo,
3231+
fakeClock.getScheduledExecutorService(),
3232+
backoffPolicyProvider,
3233+
fakeClock.getStopwatchSupplier(),
3234+
timeProvider,
3235+
MessagePrinter.INSTANCE,
3236+
new TlsContextManagerImpl(bootstrapInfo),
3237+
xdsClientMetricReporter);
3238+
String timeoutResource = CDS_RESOURCE + "_timeout";
3239+
@SuppressWarnings("unchecked")
3240+
ResourceWatcher<CdsUpdate> timeoutWatcher = mock(ResourceWatcher.class);
3241+
3242+
xdsClient.watchXdsResource(
3243+
XdsClusterResource.getInstance(),
3244+
timeoutResource,
3245+
timeoutWatcher,
3246+
fakeClock.getScheduledExecutorService());
3247+
3248+
assertThat(resourceDiscoveryCalls).hasSize(1);
3249+
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
3250+
call.verifyRequest(CDS, ImmutableList.of(timeoutResource), "", "", NODE);
3251+
fakeClock.forwardTime(XdsClientImpl.EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
3252+
fakeClock.runDueTasks();
3253+
ArgumentCaptor<Status> errorCaptor = ArgumentCaptor.forClass(Status.class);
3254+
verify(timeoutWatcher).onError(errorCaptor.capture());
3255+
Status error = errorCaptor.getValue();
3256+
assertThat(error.getCode()).isEqualTo(Status.Code.UNAVAILABLE);
3257+
assertThat(error.getDescription()).isEqualTo(
3258+
"Timed out waiting for resource " + timeoutResource + " from xDS server");
3259+
BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
3260+
}
3261+
31583262
private Answer<Void> blockUpdate(CyclicBarrier barrier) {
31593263
return new Answer<Void>() {
31603264
@Override
@@ -4220,7 +4324,7 @@ private XdsClientImpl createXdsClient(String serverUri) {
42204324
private BootstrapInfo buildBootStrap(String serverUri) {
42214325

42224326
ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS,
4223-
ignoreResourceDeletion(), true);
4327+
ignoreResourceDeletion(), true, false);
42244328

42254329
return Bootstrapper.BootstrapInfo.builder()
42264330
.servers(Collections.singletonList(xdsServerInfo))
@@ -4230,7 +4334,7 @@ private BootstrapInfo buildBootStrap(String serverUri) {
42304334
AuthorityInfo.create(
42314335
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
42324336
ImmutableList.of(Bootstrapper.ServerInfo.create(
4233-
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))),
4337+
SERVER_URI_CUSTOM_AUTHORITY, CHANNEL_CREDENTIALS))),
42344338
"",
42354339
AuthorityInfo.create(
42364340
"xdstp:///envoy.config.listener.v3.Listener/%s",

0 commit comments

Comments
 (0)