2323import static io .grpc .ConnectivityState .READY ;
2424import static io .grpc .ConnectivityState .SHUTDOWN ;
2525import static io .grpc .ConnectivityState .TRANSIENT_FAILURE ;
26- import static io .grpc .util .MultiChildLoadBalancer .IS_PETIOLE_POLICY ;
2726import static io .grpc .xds .RingHashLoadBalancerTest .InitializationFlags .DO_NOT_RESET_HELPER ;
2827import static io .grpc .xds .RingHashLoadBalancerTest .InitializationFlags .DO_NOT_VERIFY ;
2928import static io .grpc .xds .RingHashLoadBalancerTest .InitializationFlags .RESET_SUBCHANNEL_MOCKS ;
4847import io .grpc .ConnectivityState ;
4948import io .grpc .ConnectivityStateInfo ;
5049import io .grpc .EquivalentAddressGroup ;
50+ import io .grpc .LoadBalancer ;
5151import io .grpc .LoadBalancer .CreateSubchannelArgs ;
5252import io .grpc .LoadBalancer .Helper ;
5353import io .grpc .LoadBalancer .PickDetailsConsumer ;
6262import io .grpc .SynchronizationContext ;
6363import io .grpc .internal .FakeClock ;
6464import io .grpc .internal .PickFirstLoadBalancerProvider ;
65+ import io .grpc .internal .PickFirstLoadBalancerProviderAccessor ;
6566import io .grpc .internal .PickSubchannelArgsImpl ;
6667import io .grpc .testing .TestMethodDescriptors ;
6768import io .grpc .util .AbstractTestHelper ;
69+ import io .grpc .util .ForwardingLoadBalancerHelper ;
6870import io .grpc .util .MultiChildLoadBalancer .ChildLbState ;
6971import io .grpc .xds .RingHashLoadBalancer .RingHashConfig ;
7072import java .lang .Thread .UncaughtExceptionHandler ;
7476import java .util .Collections ;
7577import java .util .Deque ;
7678import java .util .HashMap ;
79+ import java .util .HashSet ;
7780import java .util .List ;
7881import java .util .Map ;
82+ import java .util .Random ;
83+ import java .util .Set ;
7984import org .junit .After ;
8085import org .junit .Before ;
8186import org .junit .Rule ;
@@ -115,6 +120,7 @@ public void uncaughtException(Thread t, Throwable e) {
115120 @ Captor
116121 private ArgumentCaptor <SubchannelPicker > pickerCaptor ;
117122 private RingHashLoadBalancer loadBalancer ;
123+ private boolean defaultNewPickFirst = PickFirstLoadBalancerProvider .isEnabledNewPickFirst ();
118124
119125 @ Before
120126 public void setUp () {
@@ -126,6 +132,7 @@ public void setUp() {
126132
127133 @ After
128134 public void tearDown () {
135+ PickFirstLoadBalancerProviderAccessor .setEnableNewPickFirst (defaultNewPickFirst );
129136 loadBalancer .shutdown ();
130137 for (Subchannel subchannel : subchannels .values ()) {
131138 verify (subchannel ).shutdown ();
@@ -906,6 +913,70 @@ public void duplicateAddresses() {
906913 assertThat (description ).contains ("Address: FakeSocketAddress-server2, count: 3" );
907914 }
908915
916+ @ Test
917+ public void subchannelHealthObserved () throws Exception {
918+ // Only the new PF policy observes the new separate listener for health
919+ PickFirstLoadBalancerProviderAccessor .setEnableNewPickFirst (true );
920+ // PickFirst does most of this work. If the test fails, check IS_PETIOLE_POLICY
921+ Map <Subchannel , LoadBalancer .SubchannelStateListener > healthListeners = new HashMap <>();
922+ loadBalancer = new RingHashLoadBalancer (new ForwardingLoadBalancerHelper () {
923+ @ Override
924+ public Subchannel createSubchannel (CreateSubchannelArgs args ) {
925+ Subchannel subchannel = super .createSubchannel (args .toBuilder ()
926+ .setAttributes (args .getAttributes ().toBuilder ()
927+ .set (LoadBalancer .HAS_HEALTH_PRODUCER_LISTENER_KEY , true )
928+ .build ())
929+ .build ());
930+ healthListeners .put (
931+ subchannel , args .getOption (LoadBalancer .HEALTH_CONSUMER_LISTENER_ARG_KEY ));
932+ return subchannel ;
933+ }
934+
935+ @ Override
936+ protected Helper delegate () {
937+ return helper ;
938+ }
939+ });
940+
941+ InOrder inOrder = Mockito .inOrder (helper );
942+ List <EquivalentAddressGroup > servers = createWeightedServerAddrs (1 , 1 );
943+ initializeLbSubchannels (new RingHashConfig (10 , 100 ), servers );
944+ Subchannel subchannel0 = subchannels .get (Collections .singletonList (servers .get (0 )));
945+ Subchannel subchannel1 = subchannels .get (Collections .singletonList (servers .get (1 )));
946+
947+ // Subchannels go READY, but the LB waits for health
948+ for (Subchannel subchannel : subchannels .values ()) {
949+ deliverSubchannelState (subchannel , ConnectivityStateInfo .forNonError (READY ));
950+ }
951+ inOrder .verify (helper , times (0 )).updateBalancingState (eq (READY ), any (SubchannelPicker .class ));
952+
953+ // Health results lets subchannels go READY
954+ healthListeners .get (subchannel0 ).onSubchannelState (ConnectivityStateInfo .forNonError (READY ));
955+ healthListeners .get (subchannel1 ).onSubchannelState (ConnectivityStateInfo .forNonError (READY ));
956+ inOrder .verify (helper , times (2 )).updateBalancingState (eq (READY ), pickerCaptor .capture ());
957+ SubchannelPicker picker = pickerCaptor .getValue ();
958+ Random random = new Random (1 );
959+ Set <Subchannel > picks = new HashSet <>();
960+ for (int i = 0 ; i < 10 ; i ++) {
961+ picks .add (
962+ picker .pickSubchannel (getDefaultPickSubchannelArgs (random .nextLong ())).getSubchannel ());
963+ }
964+ assertThat (picks ).containsExactly (subchannel0 , subchannel1 );
965+
966+ // Unhealthy subchannel skipped
967+ healthListeners .get (subchannel0 ).onSubchannelState (
968+ ConnectivityStateInfo .forTransientFailure (Status .UNAVAILABLE .withDescription ("oh no" )));
969+ inOrder .verify (helper ).updateBalancingState (eq (READY ), pickerCaptor .capture ());
970+ picker = pickerCaptor .getValue ();
971+ random .setSeed (1 );
972+ picks .clear ();
973+ for (int i = 0 ; i < 10 ; i ++) {
974+ picks .add (
975+ picker .pickSubchannel (getDefaultPickSubchannelArgs (random .nextLong ())).getSubchannel ());
976+ }
977+ assertThat (picks ).containsExactly (subchannel1 );
978+ }
979+
909980 private List <Subchannel > initializeLbSubchannels (RingHashConfig config ,
910981 List <EquivalentAddressGroup > servers , InitializationFlags ... initFlags ) {
911982
@@ -950,8 +1021,6 @@ private List<Subchannel> initializeLbSubchannels(RingHashConfig config,
9501021 for (ChildLbState childLbState : loadBalancer .getChildLbStates ()) {
9511022 childLbState .getCurrentPicker ()
9521023 .pickSubchannel (getDefaultPickSubchannelArgs (hashFunc .hashVoid ()));
953- assertThat (childLbState .getResolvedAddresses ().getAttributes ().get (IS_PETIOLE_POLICY ))
954- .isTrue ();
9551024 }
9561025
9571026 if (doVerifies ) {
0 commit comments