Skip to content

Commit dffb376

Browse files
committed
rls: Backoff timer fix and RLS channel connectivity state behavior
Fix test. Unit test. Save changes. In-progress changes for control plane connectivity state monitoring. Test for making backoff timer expiry not make a RLS rpc directly but update the parent LB policy with picker after deleting backoff cache entry. Save in-progress changes. Save changes.
1 parent 599a0a1 commit dffb376

File tree

4 files changed

+138
-41
lines changed

4 files changed

+138
-41
lines changed

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ final class CachingRlsLbClient {
133133
private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
134134
private final ChannelLogger logger;
135135
private final ChildPolicyWrapper fallbackChildPolicyWrapper;
136+
private ConnectivityState lastRlsServerConnectivityState;
137+
private boolean wasReady;
138+
private boolean wasInTransientFailure;
136139

137140
static {
138141
MetricInstrumentRegistry metricInstrumentRegistry
@@ -216,6 +219,9 @@ private CachingRlsLbClient(Builder builder) {
216219
rlsChannelBuilder.disableServiceConfigLookUp();
217220
}
218221
rlsChannel = rlsChannelBuilder.build();
222+
lastRlsServerConnectivityState = rlsChannel.getState(false);
223+
rlsChannel.notifyWhenStateChanged(
224+
lastRlsServerConnectivityState, () -> rlsServerConnectionStateChanged());
219225
rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
220226
childLbResolvedAddressFactory =
221227
checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
@@ -225,8 +231,7 @@ private CachingRlsLbClient(Builder builder) {
225231
refCountedChildPolicyWrapperFactory =
226232
new RefCountedChildPolicyWrapperFactory(
227233
lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
228-
childLbHelperProvider,
229-
new BackoffRefreshListener());
234+
childLbHelperProvider);
230235
// TODO(creamsoup) wait until lb is ready
231236
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
232237
if (defaultTarget != null && !defaultTarget.isEmpty()) {
@@ -257,6 +262,26 @@ public void accept(BatchRecorder recorder) {
257262
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
258263
}
259264

265+
private void rlsServerConnectionStateChanged() {
266+
ConnectivityState currentState = rlsChannel.getState(false);
267+
if (!wasInTransientFailure && currentState == ConnectivityState.READY) {
268+
wasReady = true;
269+
} else if (wasReady && currentState == ConnectivityState.TRANSIENT_FAILURE) {
270+
wasInTransientFailure = true;
271+
} else if (wasInTransientFailure && currentState == ConnectivityState.READY) {
272+
wasInTransientFailure = false;
273+
synchronized (lock) {
274+
for (CacheEntry value : linkedHashLruCache.values()) {
275+
if (value instanceof BackoffCacheEntry) {
276+
refreshBackoffEntry((BackoffCacheEntry) value);
277+
}
278+
}
279+
}
280+
}
281+
rlsChannel.notifyWhenStateChanged(currentState, () -> rlsServerConnectionStateChanged());
282+
lastRlsServerConnectivityState = currentState;
283+
}
284+
260285
void init() {
261286
synchronized (lock) {
262287
refCountedChildPolicyWrapperFactory.init();
@@ -457,7 +482,8 @@ private void refreshBackoffEntry(BackoffCacheEntry entry) {
457482
logger.log(ChannelLogLevel.DEBUG,
458483
"[RLS Entry {0}] Calling RLS for transition to pending", entry.request);
459484
linkedHashLruCache.invalidate(entry.request);
460-
asyncRlsCall(entry.request, entry.backoffPolicy);
485+
// Cache updated. updateBalancingState() to reattempt picks
486+
helper.triggerPendingRpcProcessing();
461487
}
462488
}
463489

rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -210,20 +210,17 @@ static final class RefCountedChildPolicyWrapperFactory {
210210
new HashMap<>();
211211

212212
private final ChildLoadBalancerHelperProvider childLbHelperProvider;
213-
private final ChildLbStatusListener childLbStatusListener;
214213
private final ChildLoadBalancingPolicy childPolicy;
215214
private ResolvedAddressFactory childLbResolvedAddressFactory;
216215

217216
public RefCountedChildPolicyWrapperFactory(
218217
ChildLoadBalancingPolicy childPolicy,
219218
ResolvedAddressFactory childLbResolvedAddressFactory,
220-
ChildLoadBalancerHelperProvider childLbHelperProvider,
221-
ChildLbStatusListener childLbStatusListener) {
219+
ChildLoadBalancerHelperProvider childLbHelperProvider) {
222220
this.childPolicy = checkNotNull(childPolicy, "childPolicy");
223221
this.childLbResolvedAddressFactory =
224222
checkNotNull(childLbResolvedAddressFactory, "childLbResolvedAddressFactory");
225223
this.childLbHelperProvider = checkNotNull(childLbHelperProvider, "childLbHelperProvider");
226-
this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener");
227224
}
228225

229226
void init() {
@@ -248,8 +245,7 @@ ChildPolicyWrapper createOrGet(String target) {
248245
RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);
249246
if (pooledChildPolicyWrapper == null) {
250247
ChildPolicyWrapper childPolicyWrapper = new ChildPolicyWrapper(
251-
target, childPolicy, childLbResolvedAddressFactory, childLbHelperProvider,
252-
childLbStatusListener);
248+
target, childPolicy, childLbResolvedAddressFactory, childLbHelperProvider);
253249
pooledChildPolicyWrapper = RefCountedChildPolicyWrapper.of(childPolicyWrapper);
254250
childPolicyMap.put(target, pooledChildPolicyWrapper);
255251
return pooledChildPolicyWrapper.getObject();
@@ -299,11 +295,9 @@ public ChildPolicyWrapper(
299295
String target,
300296
ChildLoadBalancingPolicy childPolicy,
301297
final ResolvedAddressFactory childLbResolvedAddressFactory,
302-
ChildLoadBalancerHelperProvider childLbHelperProvider,
303-
ChildLbStatusListener childLbStatusListener) {
298+
ChildLoadBalancerHelperProvider childLbHelperProvider) {
304299
this.target = target;
305-
this.helper =
306-
new ChildPolicyReportingHelper(childLbHelperProvider, childLbStatusListener);
300+
this.helper = new ChildPolicyReportingHelper(childLbHelperProvider);
307301
LoadBalancerProvider lbProvider = childPolicy.getEffectiveLbProvider();
308302
final ConfigOrError lbConfig =
309303
lbProvider
@@ -386,14 +380,11 @@ public String toString() {
386380
final class ChildPolicyReportingHelper extends ForwardingLoadBalancerHelper {
387381

388382
private final ChildLoadBalancerHelper delegate;
389-
private final ChildLbStatusListener listener;
390383

391384
ChildPolicyReportingHelper(
392-
ChildLoadBalancerHelperProvider childHelperProvider,
393-
ChildLbStatusListener listener) {
385+
ChildLoadBalancerHelperProvider childHelperProvider) {
394386
checkNotNull(childHelperProvider, "childHelperProvider");
395387
this.delegate = childHelperProvider.forTarget(getTarget());
396-
this.listener = checkNotNull(listener, "listener");
397388
}
398389

399390
@Override
@@ -406,7 +397,6 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne
406397
picker = newPicker;
407398
state = newState;
408399
super.updateBalancingState(newState, newPicker);
409-
listener.onStatusChanged(newState);
410400
}
411401
}
412402
}

rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java

Lines changed: 103 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,18 @@
5959
import io.grpc.MetricRecorder.BatchRecorder;
6060
import io.grpc.MetricRecorder.Registration;
6161
import io.grpc.NameResolver.ConfigOrError;
62+
import io.grpc.Server;
6263
import io.grpc.Status;
6364
import io.grpc.Status.Code;
6465
import io.grpc.SynchronizationContext;
6566
import io.grpc.inprocess.InProcessChannelBuilder;
6667
import io.grpc.inprocess.InProcessServerBuilder;
6768
import io.grpc.internal.BackoffPolicy;
6869
import io.grpc.internal.FakeClock;
70+
import io.grpc.internal.GrpcUtil;
71+
import io.grpc.internal.ObjectPool;
6972
import io.grpc.internal.PickSubchannelArgsImpl;
73+
import io.grpc.internal.SharedResourcePool;
7074
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
7175
import io.grpc.rls.CachingRlsLbClient.CacheEntry;
7276
import io.grpc.rls.CachingRlsLbClient.CachedRouteLookupResponse;
@@ -96,10 +100,13 @@
96100
import java.util.Map;
97101
import java.util.Set;
98102
import java.util.concurrent.ExecutionException;
103+
import java.util.concurrent.Executor;
104+
import java.util.concurrent.ExecutorService;
99105
import java.util.concurrent.ScheduledExecutorService;
100106
import java.util.concurrent.ScheduledFuture;
101107
import java.util.concurrent.TimeUnit;
102108
import java.util.concurrent.TimeoutException;
109+
import java.util.concurrent.atomic.AtomicBoolean;
103110
import javax.annotation.Nonnull;
104111
import org.junit.After;
105112
import org.junit.Before;
@@ -160,8 +167,9 @@ public void uncaughtException(Thread t, Throwable e) {
160167
fakeClock.getScheduledExecutorService());
161168
private final ChildLoadBalancingPolicy childLbPolicy =
162169
new ChildLoadBalancingPolicy("target", Collections.<String, Object>emptyMap(), lbProvider);
170+
private final FakeHelper fakeHelper = new FakeHelper();
163171
private final Helper helper =
164-
mock(Helper.class, delegatesTo(new FakeHelper()));
172+
mock(Helper.class, delegatesTo(fakeHelper));
165173
private final FakeThrottler fakeThrottler = new FakeThrottler();
166174
private final LbPolicyConfiguration lbPolicyConfiguration =
167175
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, null, childLbPolicy);
@@ -325,29 +333,94 @@ public void get_throttledAndRecover() throws Exception {
325333

326334
assertThat(resp.hasError()).isTrue();
327335

336+
// let it pass throttler
337+
fakeThrottler.nextResult = false;
328338
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
329-
// initially backed off entry is backed off again
339+
// Backoff entry evicted from cache.
330340
verify(evictionListener)
331341
.onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.EXPLICIT));
342+
// Assert that Rls LB policy picker was updated.
343+
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
344+
}
332345

333-
resp = getInSyncContext(routeLookupRequest);
334-
335-
assertThat(resp.hasError()).isTrue();
336-
337-
// let it pass throttler
338-
fakeThrottler.nextResult = false;
339-
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
346+
@Test
347+
public void controlPlaneTransientToReady_backOffEntriesRemovedAndPickerUpdated()
348+
throws Exception {
349+
setUpRlsLbClient();
350+
final ConnectivityState[] rlsChannelState = new ConnectivityState[1];
351+
Runnable channelStateListener = new Runnable() {
352+
@Override
353+
public void run() {
354+
rlsChannelState[0] = fakeHelper.oobChannel.getState(false);
355+
fakeHelper.oobChannel.notifyWhenStateChanged(rlsChannelState[0], this);
356+
synchronized (this) {
357+
notify();
358+
}
359+
}
360+
};
361+
fakeHelper.oobChannel.notifyWhenStateChanged(fakeHelper.oobChannel.getState(false),
362+
channelStateListener);
363+
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of(
364+
"server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar"));
365+
rlsServerImpl.setLookupTable(
366+
ImmutableMap.of(
367+
routeLookupRequest,
368+
RouteLookupResponse.create(ImmutableList.of("target"), "header")));
340369

370+
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
371+
assertThat(resp.isPending()).isTrue();
372+
// server response
373+
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
341374
resp = getInSyncContext(routeLookupRequest);
375+
assertThat(resp.hasData()).isTrue();
342376

377+
fakeHelper.server.shutdown();
378+
// Channel goes to IDLE state from the shutdown listener handling.
379+
try {
380+
if (!fakeHelper.server.awaitTermination(10, TimeUnit.SECONDS)) {
381+
fakeHelper.server.shutdownNow(); // Forceful shutdown if graceful timeout expires
382+
}
383+
} catch (InterruptedException e) {
384+
fakeHelper.server.shutdownNow();
385+
}
386+
// Use a different key to cause a cache miss and trigger a RPC.
387+
RouteLookupRequest routeLookupRequest2 = RouteLookupRequest.create(ImmutableMap.of(
388+
"server", "bigtable.googleapis.com", "service-key", "foo2", "method-key", "bar"));
389+
// Rls channel will go to TRANSIENT_FAILURE (back-off) because the picker notices the
390+
// subchannel state IDLE and the server transport listener is null.
391+
resp = getInSyncContext(routeLookupRequest2);
343392
assertThat(resp.isPending()).isTrue();
393+
assertThat(rlsChannelState[0]).isEqualTo(ConnectivityState.TRANSIENT_FAILURE);
394+
// Throttle the next rpc call.
395+
fakeThrottler.nextResult = true;
396+
fakeBackoffProvider.nextPolicy = createBackoffPolicy(10, TimeUnit.MILLISECONDS);
344397

345-
// server responses
346-
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
398+
// Cause another cache miss by using a new request key. This will create a back-off Rls
399+
// cache entry.
400+
RouteLookupRequest routeLookupRequest3 = RouteLookupRequest.create(ImmutableMap.of(
401+
"server", "bigtable.googleapis.com", "service-key", "foo3", "method-key", "bar"));
402+
resp = getInSyncContext(routeLookupRequest3);
347403

348-
resp = getInSyncContext(routeLookupRequest);
404+
assertThat(resp.hasError()).isTrue();
349405

350-
assertThat(resp.hasData()).isTrue();
406+
fakeHelper.createServerAndRegister("service1");
407+
// Wait for Rls subchannel back-off expiry and its moving to READY
408+
synchronized (channelStateListener) {
409+
channelStateListener.wait(5000);
410+
}
411+
assertThat(rlsChannelState[0]).isEqualTo(ConnectivityState.READY);
412+
final ObjectPool<? extends Executor> defaultExecutorPool =
413+
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
414+
AtomicBoolean isSuccess = new AtomicBoolean(false);
415+
((ExecutorService) defaultExecutorPool.getObject()).submit(() -> {
416+
// Backoff entry evicted from cache.
417+
verify(evictionListener)
418+
.onEviction(eq(routeLookupRequest3), any(CacheEntry.class), eq(EvictionType.EXPLICIT));
419+
// Assert that Rls LB policy picker was updated.
420+
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
421+
isSuccess.set(true);
422+
}).get();
423+
assertThat(isSuccess.get()).isTrue();
351424
}
352425

353426
@Test
@@ -894,16 +967,24 @@ public void run() {
894967

895968
private final class FakeHelper extends Helper {
896969

970+
SubchannelPicker lastPicker;
971+
Server server;
972+
ManagedChannel oobChannel;
973+
974+
void createServerAndRegister(String target) throws IOException {
975+
server = InProcessServerBuilder.forName(target)
976+
.addService(rlsServerImpl)
977+
.directExecutor()
978+
.build()
979+
.start();
980+
grpcCleanupRule.register(server);
981+
}
982+
897983
@Override
898984
public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
899985
String target, ChannelCredentials creds) {
900986
try {
901-
grpcCleanupRule.register(
902-
InProcessServerBuilder.forName(target)
903-
.addService(rlsServerImpl)
904-
.directExecutor()
905-
.build()
906-
.start());
987+
createServerAndRegister(target);
907988
} catch (IOException e) {
908989
throw new RuntimeException("cannot create server: " + target, e);
909990
}
@@ -919,7 +1000,8 @@ protected ManagedChannelBuilder<?> delegate() {
9191000

9201001
@Override
9211002
public ManagedChannel build() {
922-
return grpcCleanupRule.register(super.build());
1003+
oobChannel = super.build();
1004+
return grpcCleanupRule.register(oobChannel);
9231005
}
9241006

9251007
@Override
@@ -948,7 +1030,7 @@ public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String author
9481030
@Override
9491031
public void updateBalancingState(
9501032
@Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) {
951-
// no-op
1033+
lastPicker = newPicker;
9521034
}
9531035

9541036
@Override

rls/src/test/java/io/grpc/rls/LbPolicyConfigurationTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ public ResolvedAddresses create(Object childLbConfig) {
7878
ImmutableMap.<String, Object>of("foo", "bar"),
7979
lbProvider),
8080
resolvedAddressFactory,
81-
new ChildLoadBalancerHelperProvider(helper, subchannelStateManager, picker),
82-
childLbStatusListener);
81+
new ChildLoadBalancerHelperProvider(helper, subchannelStateManager, picker));
8382

8483
@Before
8584
public void setUp() {

0 commit comments

Comments
 (0)