Skip to content

Commit 2cadee6

Browse files
committed
In-progress changes for control plane connectivity state monitoring.
1 parent 254e673 commit 2cadee6

File tree

2 files changed

+41
-11
lines changed

2 files changed

+41
-11
lines changed

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,8 @@ public void accept(BatchRecorder recorder) {
261261
}
262262

263263
private void rlsServerConnectionStateChanged() {
264-
lastRlsServerConnectivityState = rlsChannel.getState(false);
265-
if (wasInTransientFailure && lastRlsServerConnectivityState == ConnectivityState.READY) {
264+
ConnectivityState currentState = rlsChannel.getState(false);
265+
if (wasInTransientFailure && currentState == ConnectivityState.READY) {
266266
wasInTransientFailure = false;
267267
synchronized (lock) {
268268
for (CacheEntry value : linkedHashLruCache.values()) {
@@ -271,10 +271,11 @@ private void rlsServerConnectionStateChanged() {
271271
}
272272
}
273273
}
274-
} else if (!wasInTransientFailure && lastRlsServerConnectivityState == ConnectivityState.TRANSIENT_FAILURE) {
274+
} else if (lastRlsServerConnectivityState == ConnectivityState.READY && currentState == ConnectivityState.TRANSIENT_FAILURE) {
275275
wasInTransientFailure = true;
276276
}
277-
rlsChannel.notifyWhenStateChanged(lastRlsServerConnectivityState, () -> rlsServerConnectionStateChanged());
277+
rlsChannel.notifyWhenStateChanged(currentState, () -> rlsServerConnectionStateChanged());
278+
lastRlsServerConnectivityState = currentState;
278279
}
279280

280281
void init() {

rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import io.grpc.MetricRecorder.BatchRecorder;
6060
import io.grpc.MetricRecorder.Registration;
6161
import io.grpc.NameResolver.ConfigOrError;
62+
import io.grpc.Server;
6263
import io.grpc.Status;
6364
import io.grpc.Status.Code;
6465
import io.grpc.SynchronizationContext;
@@ -170,6 +171,7 @@ public void uncaughtException(Thread t, Throwable e) {
170171
private CachingRlsLbClient rlsLbClient;
171172
private Map<String, ?> rlsChannelServiceConfig;
172173
private String rlsChannelOverriddenAuthority;
174+
private Server server;
173175

174176
private void setUpRlsLbClient() {
175177
fakeThrottler.resetCounts();
@@ -336,6 +338,33 @@ public void get_throttledAndRecover() throws Exception {
336338
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
337339
}
338340

341+
@Test
342+
public void controlPlaneTransientToReady_backOffEntriesRemovedAndPickerUpdated() throws Exception {
343+
setUpRlsLbClient();
344+
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of(
345+
"server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar"));
346+
rlsServerImpl.setLookupTable(
347+
ImmutableMap.of(
348+
routeLookupRequest,
349+
RouteLookupResponse.create(ImmutableList.of("target"), "header")));
350+
351+
fakeThrottler.nextResult = true;
352+
fakeBackoffProvider.nextPolicy = createBackoffPolicy(10, TimeUnit.MILLISECONDS);
353+
354+
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
355+
356+
assertThat(resp.hasError()).isTrue();
357+
358+
// let it pass throttler
359+
fakeThrottler.nextResult = false;
360+
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
361+
// Backoff entry evicted from cache.
362+
verify(evictionListener)
363+
.onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.EXPLICIT));
364+
// Assert that Rls LB policy picker was updated.
365+
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
366+
}
367+
339368
@Test
340369
public void get_updatesLbState() throws Exception {
341370
setUpRlsLbClient();
@@ -884,13 +913,13 @@ private final class FakeHelper extends Helper {
884913
@Override
885914
public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
886915
String target, ChannelCredentials creds) {
887-
try {
888-
grpcCleanupRule.register(
889-
InProcessServerBuilder.forName(target)
890-
.addService(rlsServerImpl)
891-
.directExecutor()
892-
.build()
893-
.start());
916+
try {
917+
server = InProcessServerBuilder.forName(target)
918+
.addService(rlsServerImpl)
919+
.directExecutor()
920+
.build()
921+
.start();
922+
grpcCleanupRule.register(server);
894923
} catch (IOException e) {
895924
throw new RuntimeException("cannot create server: " + target, e);
896925
}

0 commit comments

Comments
 (0)