Skip to content

Commit 87aa6de

Browse files
authored
core:Have acceptResolvedAddresses() do a seek when in CONNECTING state and cleanup removed subchannels when a seek was successful (grpc#11849)
* Have acceptResolvedAddresses() do a seek when in CONNECTING state and cleanup removed subchannels when a seek was successful. Move cleanup of removed subchannels into a method so it can be called from 2 places in acceptResolvedAddresses. Since the seek could mean we never looked at the first address, if we go off the end of the index and haven't looked at the all of the addresses then instead of scheduleBackoff() we reset the index and request a connection.
1 parent 67351c0 commit 87aa6de

File tree

2 files changed

+48
-25
lines changed

2 files changed

+48
-25
lines changed

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -137,38 +137,27 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
137137
final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
138138
ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();
139139

140-
if (rawConnectivityState == READY) {
141-
// If the previous ready subchannel exists in new address list,
140+
if (rawConnectivityState == READY || rawConnectivityState == CONNECTING) {
141+
// If the previous ready (or connecting) subchannel exists in new address list,
142142
// keep this connection and don't create new subchannels
143143
SocketAddress previousAddress = addressIndex.getCurrentAddress();
144144
addressIndex.updateGroups(newImmutableAddressGroups);
145145
if (addressIndex.seekTo(previousAddress)) {
146146
SubchannelData subchannelData = subchannels.get(previousAddress);
147147
subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
148+
shutdownRemovedAddresses(newImmutableAddressGroups);
148149
return Status.OK;
149150
}
150151
// Previous ready subchannel not in the new list of addresses
151152
} else {
152153
addressIndex.updateGroups(newImmutableAddressGroups);
153154
}
154155

155-
// remove old subchannels that were not in new address list
156-
Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
157-
158-
// Flatten the new EAGs addresses
159-
Set<SocketAddress> newAddrs = new HashSet<>();
160-
for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
161-
newAddrs.addAll(endpoint.getAddresses());
162-
}
163-
164-
// Shut them down and remove them
165-
for (SocketAddress oldAddr : oldAddrs) {
166-
if (!newAddrs.contains(oldAddr)) {
167-
subchannels.remove(oldAddr).getSubchannel().shutdown();
168-
}
169-
}
156+
// No old addresses means first time through, so we will do an explicit move to CONNECTING
157+
// which is what we implicitly started with
158+
boolean noOldAddrs = shutdownRemovedAddresses(newImmutableAddressGroups);
170159

171-
if (oldAddrs.size() == 0) {
160+
if (noOldAddrs) {
172161
// Make tests happy; they don't properly assume starting in CONNECTING
173162
rawConnectivityState = CONNECTING;
174163
updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
@@ -188,6 +177,31 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
188177
return Status.OK;
189178
}
190179

180+
/**
181+
* Compute the difference between the flattened new addresses and the old addresses that had been
182+
* made into subchannels and then shutdown the matching subchannels.
183+
* @return true if there were no old addresses
184+
*/
185+
private boolean shutdownRemovedAddresses(
186+
ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups) {
187+
188+
Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
189+
190+
// Flatten the new EAGs addresses
191+
Set<SocketAddress> newAddrs = new HashSet<>();
192+
for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
193+
newAddrs.addAll(endpoint.getAddresses());
194+
}
195+
196+
// Shut them down and remove them
197+
for (SocketAddress oldAddr : oldAddrs) {
198+
if (!newAddrs.contains(oldAddr)) {
199+
subchannels.remove(oldAddr).getSubchannel().shutdown();
200+
}
201+
}
202+
return oldAddrs.isEmpty();
203+
}
204+
191205
private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
192206
Set<SocketAddress> seenAddresses = new HashSet<>();
193207
List<EquivalentAddressGroup> newGroups = new ArrayList<>();
@@ -290,7 +304,14 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
290304
cancelScheduleTask();
291305
requestConnection(); // is recursive so might hit the end of the addresses
292306
} else {
293-
scheduleBackoff();
307+
if (subchannels.size() >= addressIndex.size()) {
308+
scheduleBackoff();
309+
} else {
310+
// We must have done a seek to the middle of the list lets start over from the
311+
// beginning
312+
addressIndex.reset();
313+
requestConnection();
314+
}
294315
}
295316
}
296317

core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2133,18 +2133,20 @@ public void lastAddressFailingNotTransientFailure() {
21332133
loadBalancer.acceptResolvedAddresses(
21342134
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());
21352135

2136-
// Verify that no new subchannels were created or started
2136+
// Subchannel 2 should be reused since it was trying to connect and is present.
21372137
inOrder.verify(mockSubchannel1).shutdown();
2138-
inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture());
2139-
SubchannelStateListener stateListener3 = stateListenerCaptor.getValue();
2140-
inOrder.verify(mockSubchannel3).requestConnection();
2138+
inOrder.verify(mockSubchannel3, never()).start(stateListenerCaptor.capture());
21412139
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());
21422140

2143-
// Second address connection attempt is unsuccessful, but should not go into transient failure
2141+
// Second address connection attempt is unsuccessful, so since at end, but don't have all
2142+
// subchannels, schedule a backoff for the first address
21442143
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));
2144+
fakeClock.forwardTime(1, TimeUnit.SECONDS);
2145+
inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture());
2146+
SubchannelStateListener stateListener3 = stateListenerCaptor.getValue();
21452147
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());
21462148

2147-
// Third address connection attempt is unsuccessful, now we enter transient failure
2149+
// Third address connection attempt is unsuccessful, now we enter TF, do name resolution
21482150
stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));
21492151
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
21502152

0 commit comments

Comments
 (0)