Skip to content

Commit 172f650

Browse files
authored
xds: Implement proactive connection in RingHashLoadBalancer
Implement proactive connection logic in RingHashLoadBalancer as outlined in gRFC A61. This address the missing logic where the balancer should initialize the first IDLE child when a child balancer reports TRANSIENT_FAILURE and no other children are connecting. This behavior, which was previously present before #10610, ensures that a backup subchannel starts connecting immediately outside of the picker flow, reducing failover latency. Fixes #12024
1 parent 4bbf8ee commit 172f650

File tree

2 files changed

+67
-13
lines changed

2 files changed

+67
-13
lines changed

xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,32 @@ protected void updateOverallBalancingState() {
214214
overallState = TRANSIENT_FAILURE;
215215
}
216216

217+
// gRFC A61: if the aggregated connectivity state is TRANSIENT_FAILURE or CONNECTING and
218+
// there are no endpoints in CONNECTING state, the ring_hash policy will choose one of
219+
// the endpoints in IDLE state (if any) to trigger a connection attempt on
220+
if (numReady == 0 && numTF > 0 && numConnecting == 0 && numIdle > 0) {
221+
triggerIdleChildConnection();
222+
}
223+
217224
RingHashPicker picker =
218225
new RingHashPicker(syncContext, ring, getChildLbStates(), requestHashHeaderKey, random);
219226
getHelper().updateBalancingState(overallState, picker);
220227
this.currentConnectivityState = overallState;
221228
}
222229

230+
231+
/**
232+
* Triggers a connection attempt for the first IDLE child load balancer.
233+
*/
234+
private void triggerIdleChildConnection() {
235+
for (ChildLbState child : getChildLbStates()) {
236+
if (child.getCurrentState() == ConnectivityState.IDLE) {
237+
child.getLb().requestConnection();
238+
return;
239+
}
240+
}
241+
}
242+
223243
@Override
224244
protected ChildLbState createChildLbState(Object key) {
225245
return new ChildLbState(key, lazyLbFactory);

xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() {
255255
inOrder.verify(helper).refreshNameResolution();
256256
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any());
257257
}
258-
verifyConnection(0);
258+
verifyConnection(1);
259259
}
260260

261261
private void verifyConnection(int times) {
@@ -537,7 +537,7 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha
537537
// Bring one subchannel to TRANSIENT_FAILURE.
538538
deliverSubchannelUnreachable(getSubChannel(servers.get(0)));
539539
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
540-
verifyConnection(0);
540+
verifyConnection(1);
541541

542542
// Pick subchannel with random hash does trigger connection by walking the ring
543543
// and choosing the first (at most one) IDLE subchannel along the way.
@@ -583,7 +583,7 @@ public void skipFailingHosts_pickNextNonFailingHost() {
583583
getSubChannel(servers.get(0)),
584584
ConnectivityStateInfo.forTransientFailure(
585585
Status.UNAVAILABLE.withDescription("unreachable")));
586-
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
586+
verify(helper, atLeastOnce()).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
587587

588588
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
589589
assertThat(result.getStatus().isOk()).isTrue();
@@ -649,7 +649,7 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() {
649649
ConnectivityStateInfo.forTransientFailure(
650650
Status.PERMISSION_DENIED.withDescription("permission denied")));
651651
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
652-
verifyConnection(0);
652+
verifyConnection(2);
653653
PickResult result = pickerCaptor.getValue().pickSubchannel(args); // activate last subchannel
654654
assertThat(result.getStatus().isOk()).isTrue();
655655
int expectedCount = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 0 : 1;
@@ -721,7 +721,7 @@ public void allSubchannelsInTransientFailure() {
721721
}
722722
verify(helper, atLeastOnce())
723723
.updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
724-
verifyConnection(0);
724+
verifyConnection(2);
725725

726726
// Picking subchannel triggers connection. RPC hash hits server0.
727727
PickSubchannelArgs args = getDefaultPickSubchannelArgsForServer(0);
@@ -740,12 +740,13 @@ public void firstSubchannelIdle() {
740740
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
741741
initializeLbSubchannels(config, servers);
742742

743-
// Go to TF does nothing, though PF will try to reconnect after backoff
743+
// As per gRFC A61, entering TF triggers a proactive connection attempt
744+
// on an IDLE subchannel because no other subchannel is currently CONNECTING.
744745
deliverSubchannelState(getSubchannel(servers, 1),
745746
ConnectivityStateInfo.forTransientFailure(
746747
Status.UNAVAILABLE.withDescription("unreachable")));
747748
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
748-
verifyConnection(0);
749+
verifyConnection(1);
749750

750751
// Picking subchannel triggers connection. RPC hash hits server0.
751752
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
@@ -796,7 +797,7 @@ public void firstSubchannelFailure() {
796797
ConnectivityStateInfo.forTransientFailure(
797798
Status.UNAVAILABLE.withDescription("unreachable")));
798799
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
799-
verifyConnection(0);
800+
verifyConnection(1);
800801

801802
// Per GRFC A61 Picking subchannel should no longer request connections that were failing
802803
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
@@ -824,7 +825,7 @@ public void secondSubchannelConnecting() {
824825

825826
Subchannel firstSubchannel = getSubchannel(servers, 0);
826827
deliverSubchannelUnreachable(firstSubchannel);
827-
verifyConnection(0);
828+
verifyConnection(1);
828829

829830
deliverSubchannelState(getSubchannel(servers, 2), CSI_CONNECTING);
830831
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
@@ -833,7 +834,7 @@ public void secondSubchannelConnecting() {
833834
// Picking subchannel when idle triggers connection.
834835
deliverSubchannelState(getSubchannel(servers, 2),
835836
ConnectivityStateInfo.forNonError(IDLE));
836-
verifyConnection(0);
837+
verifyConnection(1);
837838
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
838839
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
839840
assertThat(result.getStatus().isOk()).isTrue();
@@ -857,7 +858,7 @@ public void secondSubchannelFailure() {
857858
deliverSubchannelUnreachable(firstSubchannel);
858859
deliverSubchannelUnreachable(getSubchannel(servers, 2));
859860
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
860-
verifyConnection(0);
861+
verifyConnection(2);
861862

862863
// Picking subchannel triggers connection.
863864
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
@@ -887,7 +888,7 @@ public void thirdSubchannelConnecting() {
887888
deliverSubchannelState(getSubchannel(servers, 1), CSI_CONNECTING);
888889
verify(helper, atLeastOnce())
889890
.updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
890-
verifyConnection(0);
891+
verifyConnection(2);
891892

892893
// Picking subchannel should not trigger connection per gRFC A61.
893894
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
@@ -909,7 +910,7 @@ public void stickyTransientFailure() {
909910
deliverSubchannelUnreachable(firstSubchannel);
910911

911912
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
912-
verifyConnection(0);
913+
verifyConnection(1);
913914

914915
reset(helper);
915916
deliverSubchannelState(firstSubchannel, ConnectivityStateInfo.forNonError(IDLE));
@@ -1127,6 +1128,39 @@ public void config_equalsTester() {
11271128
.testEquals();
11281129
}
11291130

1131+
@Test
1132+
public void tfWithoutConnectingChild_triggersIdleChildConnection() {
1133+
RingHashConfig config = new RingHashConfig(10, 100, "");
1134+
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1);
1135+
1136+
initializeLbSubchannels(config, servers);
1137+
1138+
Subchannel tfSubchannel = getSubchannel(servers, 0);
1139+
Subchannel idleSubchannel = getSubchannel(servers, 1);
1140+
1141+
deliverSubchannelUnreachable(tfSubchannel);
1142+
1143+
Subchannel requested = connectionRequestedQueue.poll();
1144+
assertThat(requested).isSameInstanceAs(idleSubchannel);
1145+
assertThat(connectionRequestedQueue.poll()).isNull();
1146+
}
1147+
1148+
@Test
1149+
public void tfWithReadyChild_doesNotTriggerIdleChildConnection() {
1150+
RingHashConfig config = new RingHashConfig(10, 100, "");
1151+
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
1152+
1153+
initializeLbSubchannels(config, servers);
1154+
1155+
Subchannel tfSubchannel = getSubchannel(servers, 0);
1156+
Subchannel readySubchannel = getSubchannel(servers, 1);
1157+
1158+
deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
1159+
deliverSubchannelUnreachable(tfSubchannel);
1160+
1161+
assertThat(connectionRequestedQueue.poll()).isNull();
1162+
}
1163+
11301164
private List<Subchannel> initializeLbSubchannels(RingHashConfig config,
11311165
List<EquivalentAddressGroup> servers, InitializationFlags... initFlags) {
11321166

0 commit comments

Comments
 (0)