@@ -137,38 +137,27 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
137137 final ImmutableList <EquivalentAddressGroup > newImmutableAddressGroups =
138138 ImmutableList .<EquivalentAddressGroup >builder ().addAll (cleanServers ).build ();
139139
140- if (rawConnectivityState == READY ) {
141- // If the previous ready subchannel exists in new address list,
140+ if (rawConnectivityState == READY || rawConnectivityState == CONNECTING ) {
141+ // If the previous ready (or connecting) subchannel exists in new address list,
142142 // keep this connection and don't create new subchannels
143143 SocketAddress previousAddress = addressIndex .getCurrentAddress ();
144144 addressIndex .updateGroups (newImmutableAddressGroups );
145145 if (addressIndex .seekTo (previousAddress )) {
146146 SubchannelData subchannelData = subchannels .get (previousAddress );
147147 subchannelData .getSubchannel ().updateAddresses (addressIndex .getCurrentEagAsList ());
148+ shutdownRemovedAddresses (newImmutableAddressGroups );
148149 return Status .OK ;
149150 }
150151 // Previous ready subchannel not in the new list of addresses
151152 } else {
152153 addressIndex .updateGroups (newImmutableAddressGroups );
153154 }
154155
155- // remove old subchannels that were not in new address list
156- Set <SocketAddress > oldAddrs = new HashSet <>(subchannels .keySet ());
157-
158- // Flatten the new EAGs addresses
159- Set <SocketAddress > newAddrs = new HashSet <>();
160- for (EquivalentAddressGroup endpoint : newImmutableAddressGroups ) {
161- newAddrs .addAll (endpoint .getAddresses ());
162- }
163-
164- // Shut them down and remove them
165- for (SocketAddress oldAddr : oldAddrs ) {
166- if (!newAddrs .contains (oldAddr )) {
167- subchannels .remove (oldAddr ).getSubchannel ().shutdown ();
168- }
169- }
156+ // No old addresses means first time through, so we will do an explicit move to CONNECTING
157+ // which is what we implicitly started with
158+ boolean noOldAddrs = shutdownRemovedAddresses (newImmutableAddressGroups );
170159
171- if (oldAddrs . size () == 0 ) {
160+ if (noOldAddrs ) {
172161 // Make tests happy; they don't properly assume starting in CONNECTING
173162 rawConnectivityState = CONNECTING ;
174163 updateBalancingState (CONNECTING , new Picker (PickResult .withNoResult ()));
@@ -188,6 +177,31 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
188177 return Status .OK ;
189178 }
190179
180+ /**
181+ * Compute the difference between the flattened new addresses and the old addresses that had been
182+ * made into subchannels and then shutdown the matching subchannels.
183+ * @return true if there were no old addresses
184+ */
185+ private boolean shutdownRemovedAddresses (
186+ ImmutableList <EquivalentAddressGroup > newImmutableAddressGroups ) {
187+
188+ Set <SocketAddress > oldAddrs = new HashSet <>(subchannels .keySet ());
189+
190+ // Flatten the new EAGs addresses
191+ Set <SocketAddress > newAddrs = new HashSet <>();
192+ for (EquivalentAddressGroup endpoint : newImmutableAddressGroups ) {
193+ newAddrs .addAll (endpoint .getAddresses ());
194+ }
195+
196+ // Shut them down and remove them
197+ for (SocketAddress oldAddr : oldAddrs ) {
198+ if (!newAddrs .contains (oldAddr )) {
199+ subchannels .remove (oldAddr ).getSubchannel ().shutdown ();
200+ }
201+ }
202+ return oldAddrs .isEmpty ();
203+ }
204+
191205 private static List <EquivalentAddressGroup > deDupAddresses (List <EquivalentAddressGroup > groups ) {
192206 Set <SocketAddress > seenAddresses = new HashSet <>();
193207 List <EquivalentAddressGroup > newGroups = new ArrayList <>();
@@ -290,7 +304,14 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
290304 cancelScheduleTask ();
291305 requestConnection (); // is recursive so might hit the end of the addresses
292306 } else {
293- scheduleBackoff ();
307+ if (subchannels .size () >= addressIndex .size ()) {
308+ scheduleBackoff ();
309+ } else {
310+ // We must have done a seek to the middle of the list lets start over from the
311+ // beginning
312+ addressIndex .reset ();
313+ requestConnection ();
314+ }
294315 }
295316 }
296317
0 commit comments