Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,32 @@ protected void updateOverallBalancingState() {
overallState = TRANSIENT_FAILURE;
}

// gRFC A61: if the aggregated connectivity state is TRANSIENT_FAILURE or CONNECTING and
// there are no endpoints in CONNECTING state, the ring_hash policy will choose one of
// the endpoints in IDLE state (if any) to trigger a connection attempt on
if (numReady == 0 && numTF > 0 && numConnecting == 0 && numIdle > 0) {
triggerIdleChildConnection();
}

RingHashPicker picker =
new RingHashPicker(syncContext, ring, getChildLbStates(), requestHashHeaderKey, random);
getHelper().updateBalancingState(overallState, picker);
this.currentConnectivityState = overallState;
}


/**
* Triggers a connection attempt for the first IDLE child load balancer.
*/
private void triggerIdleChildConnection() {
for (ChildLbState child : getChildLbStates()) {
if (child.getCurrentState() == ConnectivityState.IDLE) {
child.getLb().requestConnection();
return;
}
}
}

@Override
protected ChildLbState createChildLbState(Object key) {
return new ChildLbState(key, lazyLbFactory);
Expand Down
60 changes: 47 additions & 13 deletions xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() {
inOrder.verify(helper).refreshNameResolution();
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any());
}
verifyConnection(0);
verifyConnection(1);
}

private void verifyConnection(int times) {
Expand Down Expand Up @@ -537,7 +537,7 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha
// Bring one subchannel to TRANSIENT_FAILURE.
deliverSubchannelUnreachable(getSubChannel(servers.get(0)));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(1);

// Pick subchannel with random hash does trigger connection by walking the ring
// and choosing the first (at most one) IDLE subchannel along the way.
Expand Down Expand Up @@ -583,7 +583,7 @@ public void skipFailingHosts_pickNextNonFailingHost() {
getSubChannel(servers.get(0)),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(helper, atLeastOnce()).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());

PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
Expand Down Expand Up @@ -649,7 +649,7 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() {
ConnectivityStateInfo.forTransientFailure(
Status.PERMISSION_DENIED.withDescription("permission denied")));
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(2);
PickResult result = pickerCaptor.getValue().pickSubchannel(args); // activate last subchannel
assertThat(result.getStatus().isOk()).isTrue();
int expectedCount = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 0 : 1;
Expand Down Expand Up @@ -721,7 +721,7 @@ public void allSubchannelsInTransientFailure() {
}
verify(helper, atLeastOnce())
.updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(2);

// Picking subchannel triggers connection. RPC hash hits server0.
PickSubchannelArgs args = getDefaultPickSubchannelArgsForServer(0);
Expand All @@ -740,12 +740,13 @@ public void firstSubchannelIdle() {
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
initializeLbSubchannels(config, servers);

// Go to TF does nothing, though PF will try to reconnect after backoff
// As per gRFC A61, entering TF triggers a proactive connection attempt
// on an IDLE subchannel because no other subchannel is currently CONNECTING.
deliverSubchannelState(getSubchannel(servers, 1),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(1);

// Picking subchannel triggers connection. RPC hash hits server0.
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
Expand Down Expand Up @@ -796,7 +797,7 @@ public void firstSubchannelFailure() {
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(1);

// Per GRFC A61 Picking subchannel should no longer request connections that were failing
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
Expand Down Expand Up @@ -824,7 +825,7 @@ public void secondSubchannelConnecting() {

Subchannel firstSubchannel = getSubchannel(servers, 0);
deliverSubchannelUnreachable(firstSubchannel);
verifyConnection(0);
verifyConnection(1);

deliverSubchannelState(getSubchannel(servers, 2), CSI_CONNECTING);
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
Expand All @@ -833,7 +834,7 @@ public void secondSubchannelConnecting() {
// Picking subchannel when idle triggers connection.
deliverSubchannelState(getSubchannel(servers, 2),
ConnectivityStateInfo.forNonError(IDLE));
verifyConnection(0);
verifyConnection(1);
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
Expand All @@ -857,7 +858,7 @@ public void secondSubchannelFailure() {
deliverSubchannelUnreachable(firstSubchannel);
deliverSubchannelUnreachable(getSubchannel(servers, 2));
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(2);

// Picking subchannel triggers connection.
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
Expand Down Expand Up @@ -887,7 +888,7 @@ public void thirdSubchannelConnecting() {
deliverSubchannelState(getSubchannel(servers, 1), CSI_CONNECTING);
verify(helper, atLeastOnce())
.updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(2);

// Picking subchannel should not trigger connection per gRFC A61.
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
Expand All @@ -909,7 +910,7 @@ public void stickyTransientFailure() {
deliverSubchannelUnreachable(firstSubchannel);

verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(1);

reset(helper);
deliverSubchannelState(firstSubchannel, ConnectivityStateInfo.forNonError(IDLE));
Expand Down Expand Up @@ -1127,6 +1128,39 @@ public void config_equalsTester() {
.testEquals();
}

@Test
public void tfWithoutConnectingChild_triggersIdleChildConnection() {
RingHashConfig config = new RingHashConfig(10, 100, "");
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1);

initializeLbSubchannels(config, servers);

Subchannel tfSubchannel = getSubchannel(servers, 0);
Subchannel idleSubchannel = getSubchannel(servers, 1);

deliverSubchannelUnreachable(tfSubchannel);

Subchannel requested = connectionRequestedQueue.poll();
assertThat(requested).isSameInstanceAs(idleSubchannel);
assertThat(connectionRequestedQueue.poll()).isNull();
}

@Test
public void tfWithReadyChild_doesNotTriggerIdleChildConnection() {
RingHashConfig config = new RingHashConfig(10, 100, "");
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);

initializeLbSubchannels(config, servers);

Subchannel tfSubchannel = getSubchannel(servers, 0);
Subchannel readySubchannel = getSubchannel(servers, 1);

deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
deliverSubchannelUnreachable(tfSubchannel);

assertThat(connectionRequestedQueue.poll()).isNull();
}

private List<Subchannel> initializeLbSubchannels(RingHashConfig config,
List<EquivalentAddressGroup> servers, InitializationFlags... initFlags) {

Expand Down