Skip to content

Commit 42e1829

Browse files
xds: Do RLS fallback policy eagar start (#12211)
The resource subscription to the fallback target was done only at the time of falling back, which can cause rpcs to fail. This change makes the fallback target to be subscribed and cached earlier, similar to C++ and go gRPC implementations.
1 parent c4256ad commit 42e1829

File tree

3 files changed

+49
-27
lines changed

3 files changed

+49
-27
lines changed

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ final class CachingRlsLbClient {
132132
@GuardedBy("lock")
133133
private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
134134
private final ChannelLogger logger;
135+
private final ChildPolicyWrapper fallbackChildPolicyWrapper;
135136

136137
static {
137138
MetricInstrumentRegistry metricInstrumentRegistry
@@ -226,6 +227,13 @@ private CachingRlsLbClient(Builder builder) {
226227
lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
227228
childLbHelperProvider,
228229
new BackoffRefreshListener());
230+
// TODO(creamsoup) wait until lb is ready
231+
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
232+
if (defaultTarget != null && !defaultTarget.isEmpty()) {
233+
fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
234+
} else {
235+
fallbackChildPolicyWrapper = null;
236+
}
229237

230238
gaugeRegistration = helper.getMetricRecorder()
231239
.registerBatchCallback(new BatchCallback() {
@@ -1022,12 +1030,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
10221030
}
10231031
}
10241032

1025-
private ChildPolicyWrapper fallbackChildPolicyWrapper;
1026-
10271033
/** Uses Subchannel connected to default target. */
10281034
private PickResult useFallback(PickSubchannelArgs args) {
1029-
// TODO(creamsoup) wait until lb is ready
1030-
startFallbackChildPolicy();
10311035
SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
10321036
if (picker == null) {
10331037
return PickResult.withNoResult();
@@ -1052,17 +1056,6 @@ private String determineMetricsPickResult(PickResult pickResult) {
10521056
}
10531057
}
10541058

1055-
private void startFallbackChildPolicy() {
1056-
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1057-
synchronized (lock) {
1058-
if (fallbackChildPolicyWrapper != null) {
1059-
return;
1060-
}
1061-
logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
1062-
fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1063-
}
1064-
}
1065-
10661059
// GuardedBy CachingRlsLbClient.lock
10671060
void close() {
10681061
synchronized (lock) { // Lock is already held, but ErrorProne can't tell

rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,9 @@ public void setUpMockMetricRecorder() {
191191

192192
@After
193193
public void tearDown() throws Exception {
194-
rlsLbClient.close();
194+
if (rlsLbClient != null) {
195+
rlsLbClient.close();
196+
}
195197
assertWithMessage(
196198
"On client shut down, RlsLoadBalancer must shut down with all its child loadbalancers.")
197199
.that(lbProvider.loadBalancers).isEmpty();
@@ -372,12 +374,14 @@ public void get_updatesLbState() throws Exception {
372374
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
373375
ArgumentCaptor<ConnectivityState> stateCaptor =
374376
ArgumentCaptor.forClass(ConnectivityState.class);
375-
inOrder.verify(helper, times(2))
377+
inOrder.verify(helper, times(3))
376378
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
377379

378380
assertThat(new HashSet<>(pickerCaptor.getAllValues())).hasSize(1);
381+
// TRANSIENT_FAILURE is because the test setup pretends fallback is not available.
379382
assertThat(stateCaptor.getAllValues())
380-
.containsExactly(ConnectivityState.CONNECTING, ConnectivityState.READY);
383+
.containsExactly(ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING,
384+
ConnectivityState.READY);
381385
Metadata headers = new Metadata();
382386
PickResult pickResult = getPickResultForCreate(pickerCaptor, headers);
383387
assertThat(pickResult.getStatus().isOk()).isTrue();
@@ -439,7 +443,7 @@ public void timeout_not_changing_picked_subchannel() throws Exception {
439443
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
440444
ArgumentCaptor<ConnectivityState> stateCaptor =
441445
ArgumentCaptor.forClass(ConnectivityState.class);
442-
verify(helper, times(4)).updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
446+
verify(helper, times(5)).updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
443447

444448
Metadata headers = new Metadata();
445449
PickResult pickResult = getPickResultForCreate(pickerCaptor, headers);
@@ -509,7 +513,7 @@ public void get_withAdaptiveThrottler() throws Exception {
509513
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
510514
ArgumentCaptor<ConnectivityState> stateCaptor =
511515
ArgumentCaptor.forClass(ConnectivityState.class);
512-
inOrder.verify(helper, times(2))
516+
inOrder.verify(helper, times(3))
513517
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
514518

515519
Metadata headers = new Metadata();
@@ -699,6 +703,7 @@ public void metricGauges() throws ExecutionException, InterruptedException, Time
699703

700704
// Shutdown
701705
rlsLbClient.close();
706+
rlsLbClient = null;
702707
verify(mockGaugeRegistration).close();
703708
}
704709

rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,13 @@ public void tearDown() {
201201

202202
@Test
203203
public void lb_serverStatusCodeConversion() throws Exception {
204-
deliverResolvedAddresses();
204+
helper.getSynchronizationContext().execute(() -> {
205+
try {
206+
deliverResolvedAddresses();
207+
} catch (Exception e) {
208+
throw new RuntimeException(e);
209+
}
210+
});
205211
InOrder inOrder = inOrder(helper);
206212
inOrder.verify(helper)
207213
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@@ -236,7 +242,13 @@ public void lb_serverStatusCodeConversion() throws Exception {
236242

237243
@Test
238244
public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
239-
deliverResolvedAddresses();
245+
helper.getSynchronizationContext().execute(() -> {
246+
try {
247+
deliverResolvedAddresses();
248+
} catch (Exception e) {
249+
throw new RuntimeException(e);
250+
}
251+
});
240252
InOrder inOrder = inOrder(helper);
241253
inOrder.verify(helper)
242254
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@@ -257,7 +269,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
257269
inOrder.verifyNoMoreInteractions();
258270

259271
assertThat(res.getStatus().isOk()).isTrue();
260-
assertThat(subchannels).hasSize(1);
272+
assertThat(subchannels).hasSize(2); // includes fallback sub-channel
261273
FakeSubchannel searchSubchannel = subchannels.getLast();
262274
assertThat(subchannelIsReady(searchSubchannel)).isFalse();
263275

@@ -277,7 +289,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
277289
// other rls picker itself is ready due to first channel.
278290
assertThat(res.getStatus().isOk()).isTrue();
279291
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
280-
assertThat(subchannels).hasSize(2);
292+
assertThat(subchannels).hasSize(3); // includes fallback sub-channel
281293
FakeSubchannel rescueSubchannel = subchannels.getLast();
282294

283295
// search subchannel is down, rescue subchannel is connecting
@@ -393,7 +405,13 @@ public void lb_working_withoutDefaultTarget_noRlsResponse() throws Exception {
393405
public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
394406
fakeThrottler.nextResult = true;
395407

396-
deliverResolvedAddresses();
408+
helper.getSynchronizationContext().execute(() -> {
409+
try {
410+
deliverResolvedAddresses();
411+
} catch (Exception e) {
412+
throw new RuntimeException(e);
413+
}
414+
});
397415
InOrder inOrder = inOrder(helper);
398416
inOrder.verify(helper)
399417
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@@ -535,7 +553,13 @@ public void lb_working_withoutDefaultTarget() throws Exception {
535553

536554
@Test
537555
public void lb_nameResolutionFailed() throws Exception {
538-
deliverResolvedAddresses();
556+
helper.getSynchronizationContext().execute(() -> {
557+
try {
558+
deliverResolvedAddresses();
559+
} catch (Exception e) {
560+
throw new RuntimeException(e);
561+
}
562+
});
539563
InOrder inOrder = inOrder(helper);
540564
inOrder.verify(helper)
541565
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@@ -545,7 +569,7 @@ public void lb_nameResolutionFailed() throws Exception {
545569
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
546570

547571
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
548-
assertThat(subchannels).hasSize(1);
572+
assertThat(subchannels).hasSize(2); // includes fallback sub-channel
549573

550574
FakeSubchannel searchSubchannel = subchannels.getLast();
551575
searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));

0 commit comments

Comments
 (0)