Skip to content

Commit 0ff3106

Browse files
committed
Review comments.
1 parent 519dcce commit 0ff3106

File tree

2 files changed

+75
-58
lines changed

2 files changed

+75
-58
lines changed

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
5454
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
5555
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
56-
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
5756
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
5857
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
5958
import io.grpc.rls.LruCache.EvictionListener;
@@ -133,9 +132,6 @@ final class CachingRlsLbClient {
133132
private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
134133
private final ChannelLogger logger;
135134
private final ChildPolicyWrapper fallbackChildPolicyWrapper;
136-
private ConnectivityState lastRlsServerConnectivityState;
137-
private boolean wasReady;
138-
private boolean wasInTransientFailure;
139135

140136
static {
141137
MetricInstrumentRegistry metricInstrumentRegistry
@@ -219,9 +215,28 @@ private CachingRlsLbClient(Builder builder) {
219215
rlsChannelBuilder.disableServiceConfigLookUp();
220216
}
221217
rlsChannel = rlsChannelBuilder.build();
222-
lastRlsServerConnectivityState = rlsChannel.getState(false);
218+
Runnable rlsServerConnectivityStateChangeHandler = new Runnable() {
219+
private boolean wasInTransientFailure;
220+
@Override
221+
public void run() {
222+
ConnectivityState currentState = rlsChannel.getState(false);
223+
if (currentState == ConnectivityState.TRANSIENT_FAILURE) {
224+
wasInTransientFailure = true;
225+
} else if (wasInTransientFailure && currentState == ConnectivityState.READY) {
226+
wasInTransientFailure = false;
227+
synchronized (lock) {
228+
for (CacheEntry value : linkedHashLruCache.values()) {
229+
if (value instanceof BackoffCacheEntry) {
230+
refreshBackoffEntry((BackoffCacheEntry) value);
231+
}
232+
}
233+
}
234+
}
235+
rlsChannel.notifyWhenStateChanged(currentState, this);
236+
}
237+
};
223238
rlsChannel.notifyWhenStateChanged(
224-
lastRlsServerConnectivityState, () -> rlsServerConnectionStateChanged());
239+
ConnectivityState.IDLE, rlsServerConnectivityStateChangeHandler);
225240
rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
226241
childLbResolvedAddressFactory =
227242
checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
@@ -262,26 +277,6 @@ public void accept(BatchRecorder recorder) {
262277
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
263278
}
264279

265-
private void rlsServerConnectionStateChanged() {
266-
ConnectivityState currentState = rlsChannel.getState(false);
267-
if (!wasInTransientFailure && currentState == ConnectivityState.READY) {
268-
wasReady = true;
269-
} else if (wasReady && currentState == ConnectivityState.TRANSIENT_FAILURE) {
270-
wasInTransientFailure = true;
271-
} else if (wasInTransientFailure && currentState == ConnectivityState.READY) {
272-
wasInTransientFailure = false;
273-
synchronized (lock) {
274-
for (CacheEntry value : linkedHashLruCache.values()) {
275-
if (value instanceof BackoffCacheEntry) {
276-
refreshBackoffEntry((BackoffCacheEntry) value);
277-
}
278-
}
279-
}
280-
}
281-
rlsChannel.notifyWhenStateChanged(currentState, () -> rlsServerConnectionStateChanged());
282-
lastRlsServerConnectivityState = currentState;
283-
}
284-
285280
void init() {
286281
synchronized (lock) {
287282
refCountedChildPolicyWrapperFactory.init();
@@ -367,7 +362,9 @@ final CachedRouteLookupResponse get(final RouteLookupRequest request) {
367362
synchronized (lock) {
368363
final CacheEntry cacheEntry;
369364
cacheEntry = linkedHashLruCache.read(request);
370-
if (cacheEntry == null) {
365+
if (cacheEntry == null
366+
|| cacheEntry instanceof BackoffCacheEntry
367+
&& (((BackoffCacheEntry) cacheEntry).isBackoffTimeEnded)) {
371368
PendingCacheEntry pendingEntry = pendingCallCache.get(request);
372369
if (pendingEntry != null) {
373370
return CachedRouteLookupResponse.pendingResponse(pendingEntry);
@@ -464,7 +461,7 @@ private BackoffCacheEntry createBackOffEntry(
464461
ChannelLogLevel.DEBUG,
465462
"[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}",
466463
request, status, delayNanos);
467-
BackoffCacheEntry entry = new BackoffCacheEntry(request, status);
464+
BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
468465
// Lock is held, so the task can't execute before the assignment
469466
entry.scheduledFuture = scheduledExecutorService.schedule(
470467
() -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
@@ -481,7 +478,7 @@ private void refreshBackoffEntry(BackoffCacheEntry entry) {
481478
}
482479
logger.log(ChannelLogLevel.DEBUG,
483480
"[RLS Entry {0}] Calling RLS for transition to pending", entry.request);
484-
linkedHashLruCache.invalidate(entry.request);
481+
entry.isBackoffTimeEnded = true;
485482
// Cache updated. updateBalancingState() to reattempt picks
486483
helper.triggerPendingRpcProcessing();
487484
}
@@ -787,11 +784,14 @@ public String toString() {
787784
private static final class BackoffCacheEntry extends CacheEntry {
788785

789786
private final Status status;
787+
private final BackoffPolicy backoffPolicy;
790788
private Future<?> scheduledFuture;
789+
private boolean isBackoffTimeEnded;
791790

792-
BackoffCacheEntry(RouteLookupRequest request, Status status) {
791+
BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
793792
super(request);
794793
this.status = checkNotNull(status, "status");
794+
this.backoffPolicy = backoffPolicy;
795795
}
796796

797797
Status getStatus() {

rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -336,11 +336,47 @@ public void get_throttledAndRecover() throws Exception {
336336
// let it pass throttler
337337
fakeThrottler.nextResult = false;
338338
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
339-
// Backoff entry evicted from cache.
340-
verify(evictionListener)
341-
.onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.EXPLICIT));
342339
// Assert that Rls LB policy picker was updated.
343340
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
341+
// Backoff entry present but marked as not active anymore in cache, so next rpc should not be
342+
// backed off.
343+
resp = getInSyncContext(routeLookupRequest);
344+
assertThat(resp.isPending()).isTrue();
345+
}
346+
347+
@Test
348+
public void get_throttled_backoffBehavior() throws Exception {
349+
setUpRlsLbClient();
350+
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of(
351+
"server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar"));
352+
rlsServerImpl.setLookupTable(
353+
ImmutableMap.of(
354+
routeLookupRequest,
355+
RouteLookupResponse.create(ImmutableList.of("target"), "header")));
356+
357+
fakeThrottler.nextResult = true;
358+
fakeBackoffProvider.nextPolicy = createBackoffPolicy(10, TimeUnit.MILLISECONDS);
359+
360+
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
361+
assertThat(resp.hasError()).isTrue();
362+
363+
// let it be throttled again
364+
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
365+
resp = getInSyncContext(routeLookupRequest);
366+
assertThat(resp.hasError()).isTrue();
367+
368+
// Assert that the backoff policy is still in effect for the cache entry.
369+
fakeClock.forwardTime(9, TimeUnit.MILLISECONDS);
370+
resp = getInSyncContext(routeLookupRequest);
371+
assertThat(resp.hasError()).isTrue();
372+
373+
fakeClock.forwardTime(1, TimeUnit.MILLISECONDS);
374+
// Assert that Rls LB policy picker was updated.
375+
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
376+
// Backoff entry marked as not active anymore in cache, so next rpc should not be backed off.
377+
fakeThrottler.nextResult = false;
378+
resp = getInSyncContext(routeLookupRequest);
379+
assertThat(resp.isPending()).isTrue();
344380
}
345381

346382
@Test
@@ -360,19 +396,6 @@ public void run() {
360396
};
361397
fakeHelper.oobChannel.notifyWhenStateChanged(fakeHelper.oobChannel.getState(false),
362398
channelStateListener);
363-
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of(
364-
"server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar"));
365-
rlsServerImpl.setLookupTable(
366-
ImmutableMap.of(
367-
routeLookupRequest,
368-
RouteLookupResponse.create(ImmutableList.of("target"), "header")));
369-
370-
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
371-
assertThat(resp.isPending()).isTrue();
372-
// server response
373-
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
374-
resp = getInSyncContext(routeLookupRequest);
375-
assertThat(resp.hasData()).isTrue();
376399

377400
fakeHelper.server.shutdown();
378401
// Channel goes to IDLE state from the shutdown listener handling.
@@ -383,22 +406,19 @@ public void run() {
383406
} catch (InterruptedException e) {
384407
fakeHelper.server.shutdownNow();
385408
}
386-
// Use a different key to cause a cache miss and trigger a RPC.
387-
RouteLookupRequest routeLookupRequest2 = RouteLookupRequest.create(ImmutableMap.of(
388-
"server", "bigtable.googleapis.com", "service-key", "foo2", "method-key", "bar"));
389-
// Rls channel will go to TRANSIENT_FAILURE (back-off) because the picker notices the
390-
// subchannel state IDLE and the server transport listener is null.
391-
resp = getInSyncContext(routeLookupRequest2);
409+
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of(
410+
"server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar"));
411+
// Rls channel will go to TRANSIENT_FAILURE (connection back-off).
412+
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
392413
assertThat(resp.isPending()).isTrue();
393414
assertThat(rlsChannelState[0]).isEqualTo(ConnectivityState.TRANSIENT_FAILURE);
394415
// Throttle the next rpc call.
395416
fakeThrottler.nextResult = true;
396417
fakeBackoffProvider.nextPolicy = createBackoffPolicy(10, TimeUnit.MILLISECONDS);
397418

398-
// Cause another cache miss by using a new request key. This will create a back-off Rls
399-
// cache entry.
419+
// Cause a cache miss by using a new request key. This will create a back-off Rls cache entry.
400420
RouteLookupRequest routeLookupRequest3 = RouteLookupRequest.create(ImmutableMap.of(
401-
"server", "bigtable.googleapis.com", "service-key", "foo3", "method-key", "bar"));
421+
"server", "bigtable.googleapis.com", "service-key", "foo2", "method-key", "bar"));
402422
resp = getInSyncContext(routeLookupRequest3);
403423

404424
assertThat(resp.hasError()).isTrue();
@@ -413,9 +433,6 @@ public void run() {
413433
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
414434
AtomicBoolean isSuccess = new AtomicBoolean(false);
415435
((ExecutorService) defaultExecutorPool.getObject()).submit(() -> {
416-
// Backoff entry evicted from cache.
417-
verify(evictionListener)
418-
.onEviction(eq(routeLookupRequest3), any(CacheEntry.class), eq(EvictionType.EXPLICIT));
419436
// Assert that Rls LB policy picker was updated.
420437
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
421438
isSuccess.set(true);

0 commit comments

Comments
 (0)