Skip to content

Commit 5e7b8c6

Browse files
authored
xds: implement WeightedTargetLoadBalancer
1 parent 3b8e363 commit 5e7b8c6

9 files changed

+959
-100
lines changed

xds/src/main/java/io/grpc/xds/LocalityStore.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,12 @@
4848
import io.grpc.xds.EnvoyProtoData.LbEndpoint;
4949
import io.grpc.xds.EnvoyProtoData.Locality;
5050
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
51-
import io.grpc.xds.InterLocalityPicker.WeightedChildPicker;
5251
import io.grpc.xds.OrcaOobUtil.OrcaReportingConfig;
5352
import io.grpc.xds.OrcaOobUtil.OrcaReportingHelperWrapper;
53+
import io.grpc.xds.WeightedRandomPicker.WeightedChildPicker;
5454
import io.grpc.xds.XdsLogger.XdsLogLevel;
5555
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
5656
import java.util.ArrayList;
57-
import java.util.Collections;
5857
import java.util.HashMap;
5958
import java.util.HashSet;
6059
import java.util.List;
@@ -109,7 +108,6 @@ final class LocalityStoreImpl implements LocalityStore {
109108

110109
private final XdsLogger logger;
111110
private final Helper helper;
112-
private final PickerFactory pickerFactory;
113111
private final LoadBalancerProvider loadBalancerProvider;
114112
private final ThreadSafeRandom random;
115113
private final LoadStatsStore loadStatsStore;
@@ -130,7 +128,6 @@ final class LocalityStoreImpl implements LocalityStore {
130128
this(
131129
logId,
132130
helper,
133-
pickerFactoryImpl,
134131
lbRegistry,
135132
ThreadSafeRandom.ThreadSafeRandomImpl.instance,
136133
loadStatsStore,
@@ -142,14 +139,12 @@ final class LocalityStoreImpl implements LocalityStore {
142139
LocalityStoreImpl(
143140
InternalLogId logId,
144141
Helper helper,
145-
PickerFactory pickerFactory,
146142
LoadBalancerRegistry lbRegistry,
147143
ThreadSafeRandom random,
148144
LoadStatsStore loadStatsStore,
149145
OrcaPerRequestUtil orcaPerRequestUtil,
150146
OrcaOobUtil orcaOobUtil) {
151147
this.helper = checkNotNull(helper, "helper");
152-
this.pickerFactory = checkNotNull(pickerFactory, "pickerFactory");
153148
loadBalancerProvider = checkNotNull(
154149
lbRegistry.getProvider(ROUND_ROBIN),
155150
"Unable to find '%s' LoadBalancer", ROUND_ROBIN);
@@ -160,11 +155,6 @@ final class LocalityStoreImpl implements LocalityStore {
160155
logger = XdsLogger.withLogId(checkNotNull(logId, "logId"));
161156
}
162157

163-
@VisibleForTesting // Introduced for testing only.
164-
interface PickerFactory {
165-
SubchannelPicker picker(List<WeightedChildPicker> childPickers);
166-
}
167-
168158
private final class DroppablePicker extends SubchannelPicker {
169159

170160
final List<DropOverload> dropOverloads;
@@ -206,14 +196,6 @@ public String toString() {
206196
}
207197
}
208198

209-
private static final PickerFactory pickerFactoryImpl =
210-
new PickerFactory() {
211-
@Override
212-
public SubchannelPicker picker(List<WeightedChildPicker> childPickers) {
213-
return new InterLocalityPicker(childPickers);
214-
}
215-
};
216-
217199
@Override
218200
public void reset() {
219201
for (Locality locality : localityMap.keySet()) {
@@ -335,7 +317,6 @@ private static ConnectivityState aggregateState(
335317

336318
private void updatePicker(
337319
@Nullable ConnectivityState state, List<WeightedChildPicker> childPickers) {
338-
childPickers = Collections.unmodifiableList(childPickers);
339320
SubchannelPicker picker;
340321
if (childPickers.isEmpty()) {
341322
if (state == TRANSIENT_FAILURE) {
@@ -344,7 +325,7 @@ private void updatePicker(
344325
picker = XdsSubchannelPickers.BUFFER_PICKER;
345326
}
346327
} else {
347-
picker = pickerFactory.picker(childPickers);
328+
picker = new WeightedRandomPicker(childPickers);
348329
}
349330

350331
if (!dropOverloads.isEmpty()) {

xds/src/main/java/io/grpc/xds/InterLocalityPicker.java renamed to xds/src/main/java/io/grpc/xds/WeightedRandomPicker.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,24 @@
2121

2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.base.MoreObjects;
24-
import com.google.common.collect.ImmutableList;
2524
import io.grpc.LoadBalancer.PickResult;
2625
import io.grpc.LoadBalancer.PickSubchannelArgs;
2726
import io.grpc.LoadBalancer.SubchannelPicker;
27+
import java.util.Collections;
2828
import java.util.List;
29+
import java.util.Objects;
2930

30-
final class InterLocalityPicker extends SubchannelPicker {
31+
final class WeightedRandomPicker extends SubchannelPicker {
32+
33+
@VisibleForTesting
34+
final List<WeightedChildPicker> weightedChildPickers;
3135

32-
private final List<WeightedChildPicker> weightedChildPickers;
3336
private final ThreadSafeRandom random;
3437
private final int totalWeight;
3538

3639
static final class WeightedChildPicker {
37-
final int weight;
38-
final SubchannelPicker childPicker;
40+
private final int weight;
41+
private final SubchannelPicker childPicker;
3942

4043
WeightedChildPicker(int weight, SubchannelPicker childPicker) {
4144
checkArgument(weight >= 0, "weight is negative");
@@ -53,6 +56,23 @@ SubchannelPicker getPicker() {
5356
return childPicker;
5457
}
5558

59+
@Override
60+
public boolean equals(Object o) {
61+
if (this == o) {
62+
return true;
63+
}
64+
if (o == null || getClass() != o.getClass()) {
65+
return false;
66+
}
67+
WeightedChildPicker that = (WeightedChildPicker) o;
68+
return weight == that.weight && Objects.equals(childPicker, that.childPicker);
69+
}
70+
71+
@Override
72+
public int hashCode() {
73+
return Objects.hash(weight, childPicker);
74+
}
75+
5676
@Override
5777
public String toString() {
5878
return MoreObjects.toStringHelper(this)
@@ -62,16 +82,16 @@ public String toString() {
6282
}
6383
}
6484

65-
InterLocalityPicker(List<WeightedChildPicker> weightedChildPickers) {
85+
WeightedRandomPicker(List<WeightedChildPicker> weightedChildPickers) {
6686
this(weightedChildPickers, ThreadSafeRandom.ThreadSafeRandomImpl.instance);
6787
}
6888

6989
@VisibleForTesting
70-
InterLocalityPicker(List<WeightedChildPicker> weightedChildPickers, ThreadSafeRandom random) {
90+
WeightedRandomPicker(List<WeightedChildPicker> weightedChildPickers, ThreadSafeRandom random) {
7191
checkNotNull(weightedChildPickers, "weightedChildPickers in null");
7292
checkArgument(!weightedChildPickers.isEmpty(), "weightedChildPickers is empty");
7393

74-
this.weightedChildPickers = ImmutableList.copyOf(weightedChildPickers);
94+
this.weightedChildPickers = Collections.unmodifiableList(weightedChildPickers);
7595

7696
int totalWeight = 0;
7797
for (WeightedChildPicker weightedChildPicker : weightedChildPickers) {
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Copyright 2020 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.xds;
18+
19+
import static com.google.common.base.Preconditions.checkNotNull;
20+
import static io.grpc.ConnectivityState.CONNECTING;
21+
import static io.grpc.ConnectivityState.IDLE;
22+
import static io.grpc.ConnectivityState.READY;
23+
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24+
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
25+
26+
import com.google.common.collect.ImmutableMap;
27+
import io.grpc.ConnectivityState;
28+
import io.grpc.InternalLogId;
29+
import io.grpc.LoadBalancer;
30+
import io.grpc.Status;
31+
import io.grpc.util.ForwardingLoadBalancerHelper;
32+
import io.grpc.util.GracefulSwitchLoadBalancer;
33+
import io.grpc.xds.WeightedRandomPicker.WeightedChildPicker;
34+
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
35+
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
36+
import io.grpc.xds.XdsLogger.XdsLogLevel;
37+
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
38+
import java.util.ArrayList;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
import javax.annotation.Nullable;
43+
44+
/** Load balancer for weighted_target policy. */
45+
final class WeightedTargetLoadBalancer extends LoadBalancer {
46+
47+
private final XdsLogger logger;
48+
private final Map<String, GracefulSwitchLoadBalancer> childBalancers = new HashMap<>();
49+
private final Map<String, ChildHelper> childHelpers = new HashMap<>();
50+
private final Helper helper;
51+
52+
private Map<String, WeightedPolicySelection> targets = ImmutableMap.of();
53+
54+
WeightedTargetLoadBalancer(Helper helper) {
55+
this.helper = helper;
56+
logger = XdsLogger.withLogId(
57+
InternalLogId.allocate("weighted-target-lb", helper.getAuthority()));
58+
logger.log(XdsLogLevel.INFO, "Created");
59+
}
60+
61+
@Override
62+
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
63+
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
64+
Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig();
65+
checkNotNull(lbConfig, "missing weighted_target lb config");
66+
67+
WeightedTargetConfig weightedTargetConfig = (WeightedTargetConfig) lbConfig;
68+
Map<String, WeightedPolicySelection> newTargets = weightedTargetConfig.targets;
69+
70+
for (String targetName : newTargets.keySet()) {
71+
WeightedPolicySelection weightedChildLbConfig = newTargets.get(targetName);
72+
if (!targets.containsKey(targetName)) {
73+
ChildHelper childHelper = new ChildHelper();
74+
GracefulSwitchLoadBalancer childBalancer = new GracefulSwitchLoadBalancer(childHelper);
75+
childBalancer.switchTo(weightedChildLbConfig.policySelection.getProvider());
76+
childHelpers.put(targetName, childHelper);
77+
childBalancers.put(targetName, childBalancer);
78+
} else if (!weightedChildLbConfig.policySelection.getProvider().equals(
79+
targets.get(targetName).policySelection.getProvider())) {
80+
childBalancers.get(targetName)
81+
.switchTo(weightedChildLbConfig.policySelection.getProvider());
82+
}
83+
}
84+
85+
targets = newTargets;
86+
87+
for (String targetName : targets.keySet()) {
88+
childBalancers.get(targetName).handleResolvedAddresses(
89+
resolvedAddresses.toBuilder()
90+
.setLoadBalancingPolicyConfig(targets.get(targetName).policySelection.getConfig())
91+
.build());
92+
}
93+
94+
// Cleanup removed targets.
95+
// TODO(zdapeng): cache removed target for 15 minutes.
96+
for (String targetName : childBalancers.keySet()) {
97+
if (!targets.containsKey(targetName)) {
98+
childBalancers.get(targetName).shutdown();
99+
}
100+
}
101+
childBalancers.keySet().retainAll(targets.keySet());
102+
childHelpers.keySet().retainAll(targets.keySet());
103+
}
104+
105+
@Override
106+
public void handleNameResolutionError(Status error) {
107+
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
108+
if (childBalancers.isEmpty()) {
109+
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
110+
}
111+
for (LoadBalancer childBalancer : childBalancers.values()) {
112+
childBalancer.handleNameResolutionError(error);
113+
}
114+
}
115+
116+
@Override
117+
public boolean canHandleEmptyAddressListFromNameResolution() {
118+
return true;
119+
}
120+
121+
@Override
122+
public void shutdown() {
123+
logger.log(XdsLogLevel.INFO, "Shutdown");
124+
for (LoadBalancer childBalancer : childBalancers.values()) {
125+
childBalancer.shutdown();
126+
}
127+
}
128+
129+
private void updateOverallBalancingState() {
130+
List<WeightedChildPicker> childPickers = new ArrayList<>();
131+
132+
ConnectivityState overallState = null;
133+
for (String name : targets.keySet()) {
134+
ChildHelper childHelper = childHelpers.get(name);
135+
ConnectivityState childState = childHelper.currentState;
136+
overallState = aggregateState(overallState, childState);
137+
if (READY == childState) {
138+
int weight = targets.get(name).weight;
139+
childPickers.add(new WeightedChildPicker(weight, childHelper.currentPicker));
140+
}
141+
}
142+
143+
SubchannelPicker picker;
144+
if (childPickers.isEmpty()) {
145+
if (overallState == TRANSIENT_FAILURE) {
146+
picker = new ErrorPicker(Status.UNAVAILABLE); // TODO: more details in status
147+
} else {
148+
picker = XdsSubchannelPickers.BUFFER_PICKER;
149+
}
150+
} else {
151+
picker = new WeightedRandomPicker(childPickers);
152+
}
153+
154+
if (overallState != null) {
155+
helper.updateBalancingState(overallState, picker);
156+
}
157+
}
158+
159+
@Nullable
160+
private ConnectivityState aggregateState(
161+
@Nullable ConnectivityState overallState, ConnectivityState childState) {
162+
if (overallState == null) {
163+
return childState;
164+
}
165+
if (overallState == READY || childState == READY) {
166+
return READY;
167+
}
168+
if (overallState == CONNECTING || childState == CONNECTING) {
169+
return CONNECTING;
170+
}
171+
if (overallState == IDLE || childState == IDLE) {
172+
return IDLE;
173+
}
174+
return overallState;
175+
}
176+
177+
private final class ChildHelper extends ForwardingLoadBalancerHelper {
178+
ConnectivityState currentState = CONNECTING;
179+
SubchannelPicker currentPicker = BUFFER_PICKER;
180+
181+
@Override
182+
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
183+
currentState = newState;
184+
currentPicker = newPicker;
185+
updateOverallBalancingState();
186+
}
187+
188+
@Override
189+
protected Helper delegate() {
190+
return helper;
191+
}
192+
}
193+
}

0 commit comments

Comments
 (0)