Skip to content

Commit 6a64951

Browse files
authored
xds: implement XdsRoutingLoadBalancer
1 parent 8d4240f commit 6a64951

File tree

4 files changed

+568
-9
lines changed

4 files changed

+568
-9
lines changed

core/src/test/java/io/grpc/internal/TestUtils.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424
import io.grpc.CallOptions;
2525
import io.grpc.ChannelLogger;
2626
import io.grpc.InternalLogId;
27+
import io.grpc.LoadBalancer.PickResult;
28+
import io.grpc.LoadBalancer.PickSubchannelArgs;
29+
import io.grpc.LoadBalancer.Subchannel;
30+
import io.grpc.LoadBalancer.SubchannelPicker;
31+
import io.grpc.LoadBalancerProvider;
2732
import io.grpc.Metadata;
2833
import io.grpc.MethodDescriptor;
2934
import java.net.SocketAddress;
@@ -36,7 +41,41 @@
3641
/**
3742
* Common utility methods for tests.
3843
*/
39-
final class TestUtils {
44+
public final class TestUtils {
45+
46+
/** Base class for a standard LoadBalancerProvider implementation. */
47+
public abstract static class StandardLoadBalancerProvider extends LoadBalancerProvider {
48+
private final String policyName;
49+
50+
public StandardLoadBalancerProvider(String policyName) {
51+
this.policyName = policyName;
52+
}
53+
54+
@Override
55+
public boolean isAvailable() {
56+
return true;
57+
}
58+
59+
@Override
60+
public int getPriority() {
61+
return 5;
62+
}
63+
64+
@Override
65+
public final String getPolicyName() {
66+
return policyName;
67+
}
68+
}
69+
70+
/** Creates a {@link SubchannelPicker} that returns the given {@link Subchannel} on every pick. */
71+
public static SubchannelPicker pickerOf(final Subchannel subchannel) {
72+
return new SubchannelPicker() {
73+
@Override
74+
public PickResult pickSubchannel(PickSubchannelArgs args) {
75+
return PickResult.withSubchannel(subchannel);
76+
}
77+
};
78+
}
4079

4180
static class MockClientTransportInfo {
4281
/**

xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java

Lines changed: 188 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,205 @@
1616

1717
package io.grpc.xds;
1818

19+
import static com.google.common.base.Preconditions.checkNotNull;
20+
import static io.grpc.ConnectivityState.CONNECTING;
21+
import static io.grpc.ConnectivityState.IDLE;
22+
import static io.grpc.ConnectivityState.READY;
23+
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24+
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
25+
26+
import com.google.common.collect.ImmutableList;
27+
import com.google.common.collect.ImmutableMap;
28+
import io.grpc.ConnectivityState;
29+
import io.grpc.InternalLogId;
1930
import io.grpc.LoadBalancer;
31+
import io.grpc.MethodDescriptor;
2032
import io.grpc.Status;
33+
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
34+
import io.grpc.util.ForwardingLoadBalancerHelper;
35+
import io.grpc.util.GracefulSwitchLoadBalancer;
36+
import io.grpc.xds.XdsLogger.XdsLogLevel;
37+
import io.grpc.xds.XdsRoutingLoadBalancerProvider.MethodName;
38+
import io.grpc.xds.XdsRoutingLoadBalancerProvider.Route;
39+
import io.grpc.xds.XdsRoutingLoadBalancerProvider.XdsRoutingConfig;
40+
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
41+
import java.util.HashMap;
42+
import java.util.LinkedHashMap;
43+
import java.util.List;
44+
import java.util.Map;
45+
import javax.annotation.Nullable;
2146

22-
// TODO(zdapeng): Implementation.
2347
/** Load balancer for xds_routing policy. */
2448
final class XdsRoutingLoadBalancer extends LoadBalancer {
2549

50+
private final XdsLogger logger;
51+
private final Helper helper;
52+
private final Map<String, GracefulSwitchLoadBalancer> routeBalancers = new HashMap<>();
53+
private final Map<String, RouteHelper> routeHelpers = new HashMap<>();
54+
55+
private Map<String, PolicySelection> actions = ImmutableMap.of();
56+
private List<Route> routes = ImmutableList.of();
57+
58+
XdsRoutingLoadBalancer(Helper helper) {
59+
this.helper = checkNotNull(helper, "helper");
60+
logger = XdsLogger.withLogId(
61+
InternalLogId.allocate("xds-routing-lb", helper.getAuthority()));
62+
logger.log(XdsLogLevel.INFO, "Created");
63+
}
64+
65+
@Override
66+
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
67+
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
68+
XdsRoutingConfig xdsRoutingConfig =
69+
(XdsRoutingConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
70+
checkNotNull(xdsRoutingConfig, "Missing xds_routing lb config");
71+
72+
Map<String, PolicySelection> newActions = xdsRoutingConfig.actions;
73+
for (String actionName : newActions.keySet()) {
74+
PolicySelection action = newActions.get(actionName);
75+
if (!actions.containsKey(actionName)) {
76+
RouteHelper routeHelper = new RouteHelper();
77+
GracefulSwitchLoadBalancer routeBalancer = new GracefulSwitchLoadBalancer(routeHelper);
78+
routeBalancer.switchTo(action.getProvider());
79+
routeHelpers.put(actionName, routeHelper);
80+
routeBalancers.put(actionName, routeBalancer);
81+
} else if (!action.getProvider().equals(actions.get(actionName).getProvider())) {
82+
routeBalancers.get(actionName).switchTo(action.getProvider());
83+
}
84+
}
85+
86+
this.routes = xdsRoutingConfig.routes;
87+
this.actions = newActions;
88+
89+
for (String actionName : actions.keySet()) {
90+
routeBalancers.get(actionName).handleResolvedAddresses(
91+
resolvedAddresses.toBuilder()
92+
.setLoadBalancingPolicyConfig(actions.get(actionName).getConfig())
93+
.build());
94+
}
95+
96+
// Cleanup removed actions.
97+
// TODO(zdapeng): cache removed actions for 15 minutes.
98+
for (String actionName : routeBalancers.keySet()) {
99+
if (!actions.containsKey(actionName)) {
100+
routeBalancers.get(actionName).shutdown();
101+
}
102+
}
103+
routeBalancers.keySet().retainAll(actions.keySet());
104+
routeHelpers.keySet().retainAll(actions.keySet());
105+
}
106+
26107
@Override
27108
public void handleNameResolutionError(Status error) {
109+
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
110+
if (routeBalancers.isEmpty()) {
111+
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
112+
}
113+
for (LoadBalancer routeBalancer : routeBalancers.values()) {
114+
routeBalancer.handleNameResolutionError(error);
115+
}
28116
}
29117

30118
@Override
31119
public void shutdown() {
120+
logger.log(XdsLogLevel.INFO, "Shutdown");
121+
for (LoadBalancer routeBalancer : routeBalancers.values()) {
122+
routeBalancer.shutdown();
123+
}
124+
}
125+
126+
@Override
127+
public boolean canHandleEmptyAddressListFromNameResolution() {
128+
return true;
129+
}
130+
131+
private void updateOverallBalancingState() {
132+
ConnectivityState overallState = null;
133+
// Use LinkedHashMap to preserve the order of routes.
134+
Map<MethodName, SubchannelPicker> routePickers = new LinkedHashMap<>();
135+
for (Route route : routes) {
136+
RouteHelper routeHelper = routeHelpers.get(route.actionName);
137+
routePickers.put(route.methodName, routeHelper.currentPicker);
138+
ConnectivityState routeState = routeHelper.currentState;
139+
overallState = aggregateState(overallState, routeState);
140+
}
141+
if (overallState != null) {
142+
SubchannelPicker picker = new PathMatchingSubchannelPicker(routePickers);
143+
helper.updateBalancingState(overallState, picker);
144+
}
145+
}
146+
147+
@Nullable
148+
private static ConnectivityState aggregateState(
149+
@Nullable ConnectivityState overallState, ConnectivityState childState) {
150+
if (overallState == null) {
151+
return childState;
152+
}
153+
if (overallState == READY || childState == READY) {
154+
return READY;
155+
}
156+
if (overallState == CONNECTING || childState == CONNECTING) {
157+
return CONNECTING;
158+
}
159+
if (overallState == IDLE || childState == IDLE) {
160+
return IDLE;
161+
}
162+
return overallState;
163+
}
164+
165+
/**
166+
* The lb helper for a single route balancer.
167+
*/
168+
private final class RouteHelper extends ForwardingLoadBalancerHelper {
169+
ConnectivityState currentState = CONNECTING;
170+
SubchannelPicker currentPicker = BUFFER_PICKER;
171+
172+
@Override
173+
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
174+
currentState = newState;
175+
currentPicker = newPicker;
176+
updateOverallBalancingState();
177+
}
178+
179+
@Override
180+
protected Helper delegate() {
181+
return helper;
182+
}
183+
}
184+
185+
private static final class PathMatchingSubchannelPicker extends SubchannelPicker {
186+
187+
final Map<MethodName, SubchannelPicker> routePickers;
188+
189+
/**
190+
* Constructs a picker that will match the path of PickSubchannelArgs with the given map.
191+
* The order of the map entries matters. First match will be picked even if second match is an
192+
* exact (service + method) path match.
193+
*/
194+
PathMatchingSubchannelPicker(Map<MethodName, SubchannelPicker> routePickers) {
195+
this.routePickers = routePickers;
196+
}
197+
198+
@Override
199+
public PickResult pickSubchannel(PickSubchannelArgs args) {
200+
for (MethodName methodName : routePickers.keySet()) {
201+
if (match(args.getMethodDescriptor(), methodName)) {
202+
return routePickers.get(methodName).pickSubchannel(args);
203+
}
204+
}
205+
// At least the default route should match, otherwise there is a bug.
206+
throw new IllegalStateException("PathMatchingSubchannelPicker: error in matching path");
207+
}
208+
209+
boolean match(MethodDescriptor<?, ?> methodDescriptor, MethodName methodName) {
210+
if (methodName.service.isEmpty() && methodName.method.isEmpty()) {
211+
return true;
212+
}
213+
if (methodName.method.isEmpty()) {
214+
return methodName.service.equals(methodDescriptor.getServiceName());
215+
}
216+
return (methodName.service + '/' + methodName.method)
217+
.equals(methodDescriptor.getFullMethodName());
218+
}
32219
}
33220
}

xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancerProvider.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,6 @@ public XdsRoutingLoadBalancerProvider() {
6464
this.lbRegistry = lbRegistry;
6565
}
6666

67-
private LoadBalancerRegistry loadBalancerRegistry() {
68-
return lbRegistry == null ? LoadBalancerRegistry.getDefaultRegistry() : lbRegistry;
69-
}
70-
7167
@Override
7268
public boolean isAvailable() {
7369
return true;
@@ -85,8 +81,7 @@ public String getPolicyName() {
8581

8682
@Override
8783
public LoadBalancer newLoadBalancer(Helper helper) {
88-
// TODO(zdapeng): pass helper and loadBalancerRegistry() to constructor args.
89-
return new XdsRoutingLoadBalancer();
84+
return new XdsRoutingLoadBalancer(helper);
9085
}
9186

9287
@Override
@@ -112,8 +107,10 @@ public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawConfig) {
112107
+ rawConfig));
113108
}
114109

110+
LoadBalancerRegistry lbRegistry =
111+
this.lbRegistry == null ? LoadBalancerRegistry.getDefaultRegistry() : this.lbRegistry;
115112
ConfigOrError selectedConfigOrError =
116-
ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, loadBalancerRegistry());
113+
ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, lbRegistry);
117114
if (selectedConfigOrError.getError() != null) {
118115
return selectedConfigOrError;
119116
}

0 commit comments

Comments
 (0)