diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 21ee914ff8f..513f4d643ea 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -214,12 +214,32 @@ protected void updateOverallBalancingState() { overallState = TRANSIENT_FAILURE; } + // gRFC A61: if the aggregated connectivity state is TRANSIENT_FAILURE or CONNECTING and + // there are no endpoints in CONNECTING state, the ring_hash policy will choose one of + // the endpoints in IDLE state (if any) to trigger a connection attempt on + if (numReady == 0 && numTF > 0 && numConnecting == 0 && numIdle > 0) { + triggerIdleChildConnection(); + } + RingHashPicker picker = new RingHashPicker(syncContext, ring, getChildLbStates(), requestHashHeaderKey, random); getHelper().updateBalancingState(overallState, picker); this.currentConnectivityState = overallState; } + + /** + * Triggers a connection attempt for the first IDLE child load balancer. + */ + private void triggerIdleChildConnection() { + for (ChildLbState child : getChildLbStates()) { + if (child.getCurrentState() == ConnectivityState.IDLE) { + child.getLb().requestConnection(); + return; + } + } + } + @Override protected ChildLbState createChildLbState(Object key) { return new ChildLbState(key, lazyLbFactory); diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index d65cf96c00d..b515ed81158 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -255,7 +255,7 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() { inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any()); } - verifyConnection(0); + verifyConnection(1); } private void verifyConnection(int times) { @@ -537,7 +537,7 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha // Bring one subchannel to TRANSIENT_FAILURE. deliverSubchannelUnreachable(getSubChannel(servers.get(0))); verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - verifyConnection(0); + verifyConnection(1); // Pick subchannel with random hash does trigger connection by walking the ring // and choosing the first (at most one) IDLE subchannel along the way. @@ -583,7 +583,7 @@ public void skipFailingHosts_pickNextNonFailingHost() { getSubChannel(servers.get(0)), ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("unreachable"))); - verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(helper, atLeastOnce()).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); @@ -649,7 +649,7 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() { ConnectivityStateInfo.forTransientFailure( Status.PERMISSION_DENIED.withDescription("permission denied"))); verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - verifyConnection(0); + verifyConnection(2); PickResult result = pickerCaptor.getValue().pickSubchannel(args); // activate last subchannel assertThat(result.getStatus().isOk()).isTrue(); int expectedCount = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 0 : 1; @@ -721,7 +721,7 @@ public void allSubchannelsInTransientFailure() { } verify(helper, atLeastOnce()) .updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - verifyConnection(0); + verifyConnection(2); // Picking subchannel triggers connection. RPC hash hits server0. PickSubchannelArgs args = getDefaultPickSubchannelArgsForServer(0); @@ -740,12 +740,13 @@ public void firstSubchannelIdle() { List servers = createWeightedServerAddrs(1, 1, 1); initializeLbSubchannels(config, servers); - // Go to TF does nothing, though PF will try to reconnect after backoff + // As per gRFC A61, entering TF triggers a proactive connection attempt + // on an IDLE subchannel because no other subchannel is currently CONNECTING. deliverSubchannelState(getSubchannel(servers, 1), ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("unreachable"))); verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - verifyConnection(0); + verifyConnection(1); // Picking subchannel triggers connection. RPC hash hits server0. PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); @@ -796,7 +797,7 @@ public void firstSubchannelFailure() { ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("unreachable"))); verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - verifyConnection(0); + verifyConnection(1); // Per GRFC A61 Picking subchannel should no longer request connections that were failing PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); @@ -824,7 +825,7 @@ public void secondSubchannelConnecting() { Subchannel firstSubchannel = getSubchannel(servers, 0); deliverSubchannelUnreachable(firstSubchannel); - verifyConnection(0); + verifyConnection(1); deliverSubchannelState(getSubchannel(servers, 2), CSI_CONNECTING); verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); @@ -833,7 +834,7 @@ public void secondSubchannelConnecting() { // Picking subchannel when idle triggers connection. deliverSubchannelState(getSubchannel(servers, 2), ConnectivityStateInfo.forNonError(IDLE)); - verifyConnection(0); + verifyConnection(1); PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); @@ -857,7 +858,7 @@ public void secondSubchannelFailure() { deliverSubchannelUnreachable(firstSubchannel); deliverSubchannelUnreachable(getSubchannel(servers, 2)); verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - verifyConnection(0); + verifyConnection(2); // Picking subchannel triggers connection. PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); @@ -887,7 +888,7 @@ public void thirdSubchannelConnecting() { deliverSubchannelState(getSubchannel(servers, 1), CSI_CONNECTING); verify(helper, atLeastOnce()) .updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - verifyConnection(0); + verifyConnection(2); // Picking subchannel should not trigger connection per gRFC A61. PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); @@ -909,7 +910,7 @@ public void stickyTransientFailure() { deliverSubchannelUnreachable(firstSubchannel); verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - verifyConnection(0); + verifyConnection(1); reset(helper); deliverSubchannelState(firstSubchannel, ConnectivityStateInfo.forNonError(IDLE)); @@ -1127,6 +1128,39 @@ public void config_equalsTester() { .testEquals(); } + @Test + public void tfWithoutConnectingChild_triggersIdleChildConnection() { + RingHashConfig config = new RingHashConfig(10, 100, ""); + List servers = createWeightedServerAddrs(1, 1); + + initializeLbSubchannels(config, servers); + + Subchannel tfSubchannel = getSubchannel(servers, 0); + Subchannel idleSubchannel = getSubchannel(servers, 1); + + deliverSubchannelUnreachable(tfSubchannel); + + Subchannel requested = connectionRequestedQueue.poll(); + assertThat(requested).isSameInstanceAs(idleSubchannel); + assertThat(connectionRequestedQueue.poll()).isNull(); + } + + @Test + public void tfWithReadyChild_doesNotTriggerIdleChildConnection() { + RingHashConfig config = new RingHashConfig(10, 100, ""); + List servers = createWeightedServerAddrs(1, 1, 1); + + initializeLbSubchannels(config, servers); + + Subchannel tfSubchannel = getSubchannel(servers, 0); + Subchannel readySubchannel = getSubchannel(servers, 1); + + deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelUnreachable(tfSubchannel); + + assertThat(connectionRequestedQueue.poll()).isNull(); + } + private List initializeLbSubchannels(RingHashConfig config, List servers, InitializationFlags... initFlags) {