Skip to content

Commit b3bc321

Browse files
committed
Expiry time for backoff cache entry.
1 parent 26eb88d commit b3bc321

File tree

2 files changed

+80
-26
lines changed

2 files changed

+80
-26
lines changed

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
5454
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
5555
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
56-
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
5756
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
5857
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
5958
import io.grpc.rls.LruCache.EvictionListener;
@@ -364,7 +363,8 @@ final CachedRouteLookupResponse get(final RouteLookupRequest request) {
364363
final CacheEntry cacheEntry;
365364
cacheEntry = linkedHashLruCache.read(request);
366365
if (cacheEntry == null
367-
|| cacheEntry instanceof BackoffCacheEntry && cacheEntry.isExpired(ticker.read())) {
366+
|| (cacheEntry instanceof BackoffCacheEntry
367+
&& !((BackoffCacheEntry) cacheEntry).isInBackoffPeriod())) {
368368
PendingCacheEntry pendingEntry = pendingCallCache.get(request);
369369
if (pendingEntry != null) {
370370
return CachedRouteLookupResponse.pendingResponse(pendingEntry);
@@ -804,6 +804,10 @@ int getSizeBytes() {
804804
return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
805805
}
806806

807+
boolean isInBackoffPeriod() {
808+
return !scheduledFuture.isDone();
809+
}
810+
807811
@Override
808812
boolean isExpired(long nowNanos) {
809813
return nowNanos > expiryTimeNanos;

rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java

Lines changed: 74 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -317,8 +317,9 @@ public void rls_withCustomRlsChannelServiceConfig() throws Exception {
317317
}
318318

319319
@Test
320-
public void get_throttledAndRecover() throws Exception {
320+
public void backoffTimerEnd_updatesPicker() throws Exception {
321321
setUpRlsLbClient();
322+
InOrder inOrder = inOrder(helper);
322323
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of(
323324
"server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar"));
324325
rlsServerImpl.setLookupTable(
@@ -330,22 +331,28 @@ public void get_throttledAndRecover() throws Exception {
330331
fakeBackoffProvider.nextPolicy = createBackoffPolicy(10, TimeUnit.MILLISECONDS);
331332

332333
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
333-
334334
assertThat(resp.hasError()).isTrue();
335335

336-
// let it pass throttler
337-
fakeThrottler.nextResult = false;
338336
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
339-
// Assert that Rls LB policy picker was updated.
340-
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
341-
// Backoff entry present but marked as not active anymore in cache, so next rpc should not be
342-
// backed off.
343-
resp = getInSyncContext(routeLookupRequest);
344-
assertThat(resp.isPending()).isTrue();
337+
// Assert that Rls LB policy picker was updated which picks the fallback target
338+
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
339+
ArgumentCaptor<ConnectivityState> stateCaptor =
340+
ArgumentCaptor.forClass(ConnectivityState.class);
341+
342+
inOrder.verify(helper, times(3))
343+
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
344+
assertThat(new HashSet<>(pickerCaptor.getAllValues())).hasSize(1);
345+
assertThat(stateCaptor.getAllValues())
346+
.containsExactly(ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING,
347+
ConnectivityState.CONNECTING);
348+
Metadata headers = new Metadata();
349+
PickResult pickResult = getPickResultForCreate(pickerCaptor, headers);
350+
assertThat(pickResult.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
351+
assertThat(pickResult.getStatus().getDescription()).isEqualTo("fallback not available");
345352
}
346353

347354
@Test
348-
public void get_throttled_backoffBehavior() throws Exception {
355+
public void get_throttledTwice_usesSameBackoffpolicy() throws Exception {
349356
setUpRlsLbClient();
350357
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of(
351358
"server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar"));
@@ -361,30 +368,60 @@ public void get_throttled_backoffBehavior() throws Exception {
361368
assertThat(resp.hasError()).isTrue();
362369

363370
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
371+
364372
// Assert that the same backoff policy is still in effect for the cache entry.
365373
// The below provider should not get used, so the back off time will still be set to 10ms.
366374
fakeBackoffProvider.nextPolicy = createBackoffPolicy(20, TimeUnit.MILLISECONDS);
367375
// let it be throttled again
368376
resp = getInSyncContext(routeLookupRequest);
369377
assertThat(resp.hasError()).isTrue();
370378

371-
fakeClock.forwardTime(9, TimeUnit.MILLISECONDS);
372-
resp = getInSyncContext(routeLookupRequest);
373-
assertThat(resp.hasError()).isTrue();
379+
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
374380

375-
fakeClock.forwardTime(1, TimeUnit.MILLISECONDS);
376-
// Assert that Rls LB policy picker was updated.
377-
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
378-
// Backoff entry marked as not active anymore in cache, so next rpc should not be backed off.
381+
// Backoff entry's backoff timer has gone off, so next rpc should not be backed off.
379382
fakeThrottler.nextResult = false;
380383
resp = getInSyncContext(routeLookupRequest);
384+
385+
assertThat(resp.isPending()).isTrue();
386+
}
387+
388+
@Test
389+
public void get_errorResponseTwice_usesSameBackoffPolicy() throws Exception {
390+
setUpRlsLbClient();
391+
RouteLookupRequest invalidRouteLookupRequest =
392+
RouteLookupRequest.create(ImmutableMap.<String, String>of());
393+
CachedRouteLookupResponse resp = getInSyncContext(invalidRouteLookupRequest);
394+
assertThat(resp.isPending()).isTrue();
395+
fakeBackoffProvider.nextPolicy = createBackoffPolicy(10, TimeUnit.MILLISECONDS);
396+
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
397+
398+
resp = getInSyncContext(invalidRouteLookupRequest);
399+
assertThat(resp.hasError()).isTrue();
400+
401+
// Backoff time expiry
402+
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
403+
resp = getInSyncContext(invalidRouteLookupRequest);
404+
assertThat(resp.isPending()).isTrue();
405+
// Assert that the same backoff policy is still in effect for the cache entry.
406+
// The below provider should not get used, so the back off time will still be set to 10ms.
407+
fakeBackoffProvider.nextPolicy = createBackoffPolicy(20, TimeUnit.MILLISECONDS);
408+
// Gets error again and backed off again
409+
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
410+
411+
resp = getInSyncContext(invalidRouteLookupRequest);
412+
assertThat(resp.hasError()).isTrue();
413+
414+
// Backoff time expiry
415+
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
416+
resp = getInSyncContext(invalidRouteLookupRequest);
381417
assertThat(resp.isPending()).isTrue();
382418
}
383419

384420
@Test
385421
public void controlPlaneTransientToReady_backOffEntriesRemovedAndPickerUpdated()
386422
throws Exception {
387423
setUpRlsLbClient();
424+
InOrder inOrder = inOrder(helper);
388425
final ConnectivityState[] rlsChannelState = new ConnectivityState[1];
389426
Runnable channelStateListener = new Runnable() {
390427
@Override
@@ -426,17 +463,32 @@ public void run() {
426463
assertThat(resp.hasError()).isTrue();
427464

428465
fakeHelper.createServerAndRegister("service1");
429-
// Wait for Rls subchannel back-off expiry and its moving to READY
466+
// Wait for Rls control plane channel back-off expiry and its moving to READY
430467
synchronized (channelStateListener) {
431-
channelStateListener.wait(5000);
468+
channelStateListener.wait(2000);
432469
}
433470
assertThat(rlsChannelState[0]).isEqualTo(ConnectivityState.READY);
434471
final ObjectPool<? extends Executor> defaultExecutorPool =
435472
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
436473
AtomicBoolean isSuccess = new AtomicBoolean(false);
437474
((ExecutorService) defaultExecutorPool.getObject()).submit(() -> {
438-
// Assert that Rls LB policy picker was updated.
439-
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
475+
// Assert that Rls LB policy picker was updated which picks the fallback target
476+
ArgumentCaptor<SubchannelPicker> pickerCaptor =
477+
ArgumentCaptor.forClass(SubchannelPicker.class);
478+
ArgumentCaptor<ConnectivityState> stateCaptor =
479+
ArgumentCaptor.forClass(ConnectivityState.class);
480+
481+
inOrder.verify(helper, times(5))
482+
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
483+
assertThat(new HashSet<>(pickerCaptor.getAllValues())).hasSize(1);
484+
assertThat(stateCaptor.getAllValues())
485+
.containsExactly(ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING,
486+
ConnectivityState.CONNECTING, ConnectivityState.CONNECTING,
487+
ConnectivityState.CONNECTING);
488+
Metadata headers = new Metadata();
489+
PickResult pickResult = getPickResultForCreate(pickerCaptor, headers);
490+
assertThat(pickResult.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
491+
assertThat(pickResult.getStatus().getDescription()).isEqualTo("fallback not available");
440492
isSuccess.set(true);
441493
}).get();
442494
assertThat(isSuccess.get()).isTrue();
@@ -986,7 +1038,6 @@ public void run() {
9861038

9871039
private final class FakeHelper extends Helper {
9881040

989-
SubchannelPicker lastPicker;
9901041
Server server;
9911042
ManagedChannel oobChannel;
9921043

@@ -1049,7 +1100,6 @@ public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String author
10491100
@Override
10501101
public void updateBalancingState(
10511102
@Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) {
1052-
lastPicker = newPicker;
10531103
}
10541104

10551105
@Override

0 commit comments

Comments
 (0)