Skip to content

Commit eb6f133

Browse files
committed
grpc-js-xds: Implement custom LB policies
1 parent 089a8a8 commit eb6f133

11 files changed

+421
-69
lines changed

packages/grpc-js-xds/src/environment.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ export const EXPERIMENTAL_FAULT_INJECTION = (process.env.GRPC_XDS_EXPERIMENTAL_F
1919
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
2020
export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true';
2121
export const EXPERIMENTAL_FEDERATION = (process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION ?? 'false') === 'true';
22+
export const EXPERIMENTAL_CUSTOM_LB_CONFIG = (process.env.GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG ?? 'false') === 'true';

packages/grpc-js-xds/src/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ import * as resolver_xds from './resolver-xds';
1919
import * as load_balancer_cds from './load-balancer-cds';
2020
import * as xds_cluster_resolver from './load-balancer-xds-cluster-resolver';
2121
import * as xds_cluster_impl from './load-balancer-xds-cluster-impl';
22-
import * as load_balancer_lrs from './load-balancer-lrs';
2322
import * as load_balancer_priority from './load-balancer-priority';
2423
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
2524
import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager';
25+
import * as xds_wrr_locality from './load-balancer-xds-wrr-locality';
2626
import * as router_filter from './http-filter/router-filter';
2727
import * as fault_injection_filter from './http-filter/fault-injection-filter';
2828
import * as csds from './csds';
@@ -35,11 +35,11 @@ export function register() {
3535
load_balancer_cds.setup();
3636
xds_cluster_resolver.setup();
3737
xds_cluster_impl.setup();
38-
load_balancer_lrs.setup();
3938
load_balancer_priority.setup();
4039
load_balancer_weighted_target.setup();
4140
load_balancer_xds_cluster_manager.setup();
41+
xds_wrr_locality.setup();
4242
router_filter.setup();
4343
fault_injection_filter.setup();
4444
csds.setup();
45-
}
45+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2023 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
import { LoadBalancingConfig, experimental, logVerbosity } from "@grpc/grpc-js";
19+
import { LoadBalancingPolicy__Output } from "./generated/envoy/config/cluster/v3/LoadBalancingPolicy";
20+
import { TypedExtensionConfig__Output } from "./generated/envoy/config/core/v3/TypedExtensionConfig";
21+
22+
const TRACER_NAME = 'lb_policy_registry';
23+
function trace(text: string) {
24+
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
25+
}
26+
27+
const MAX_RECURSION_DEPTH = 16;
28+
29+
interface ProtoLbPolicyConverter {
30+
(protoPolicy: TypedExtensionConfig__Output, selectChildPolicy: (childPolicy: LoadBalancingPolicy__Output) => LoadBalancingConfig): LoadBalancingConfig
31+
}
32+
33+
interface RegisteredLbPolicy {
34+
convertToLoadBalancingPolicy: ProtoLbPolicyConverter;
35+
}
36+
37+
const registry: {[typeUrl: string]: RegisteredLbPolicy} = {}
38+
39+
export function registerLbPolicy(typeUrl: string, converter: ProtoLbPolicyConverter) {
40+
registry[typeUrl] = {convertToLoadBalancingPolicy: converter};
41+
}
42+
43+
export function convertToLoadBalancingConfig(protoPolicy: LoadBalancingPolicy__Output, recursionDepth = 0): LoadBalancingConfig {
44+
if (recursionDepth > MAX_RECURSION_DEPTH) {
45+
throw new Error(`convertToLoadBalancingConfig: Max recursion depth ${MAX_RECURSION_DEPTH} reached`);
46+
}
47+
for (const policyCandidate of protoPolicy.policies) {
48+
const extensionConfig = policyCandidate.typed_extension_config;
49+
if (!extensionConfig?.typed_config) {
50+
continue;
51+
}
52+
const typeUrl = extensionConfig.typed_config.type_url;
53+
if (typeUrl in registry) {
54+
try {
55+
return registry[typeUrl].convertToLoadBalancingPolicy(extensionConfig, childPolicy => convertToLoadBalancingConfig(childPolicy, recursionDepth + 1));
56+
} catch (e) {
57+
throw new Error(`Error parsing ${typeUrl} LoadBalancingPolicy: ${(e as Error).message}`);
58+
}
59+
}
60+
}
61+
throw new Error('No registered LB policy found in list');
62+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2023 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
import { LoadBalancingConfig } from "@grpc/grpc-js";
19+
import { LoadBalancingPolicy__Output } from "../generated/envoy/config/cluster/v3/LoadBalancingPolicy";
20+
import { TypedExtensionConfig__Output } from "../generated/envoy/config/core/v3/TypedExtensionConfig";
21+
import { registerLbPolicy } from "../lb-policy-registry";
22+
23+
const ROUND_ROBIN_TYPE_URL = 'envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin';
24+
25+
function convertToLoadBalancingPolicy(protoPolicy: TypedExtensionConfig__Output, selectChildPolicy: (childPolicy: LoadBalancingPolicy__Output) => LoadBalancingConfig): LoadBalancingConfig {
26+
if (protoPolicy.typed_config?.type_url !== ROUND_ROBIN_TYPE_URL) {
27+
throw new Error(`Round robin LB policy parsing error: unexpected type URL ${protoPolicy.typed_config?.type_url}`);
28+
}
29+
return {
30+
round_robin: {}
31+
};
32+
}
33+
34+
export function setup() {
35+
registerLbPolicy(ROUND_ROBIN_TYPE_URL, convertToLoadBalancingPolicy);
36+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2023 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
import { LoadBalancingConfig } from "@grpc/grpc-js";
19+
import { LoadBalancingPolicy__Output } from "../generated/envoy/config/cluster/v3/LoadBalancingPolicy";
20+
import { TypedExtensionConfig__Output } from "../generated/envoy/config/core/v3/TypedExtensionConfig";
21+
import { registerLbPolicy } from "../lb-policy-registry";
22+
import { loadProtosWithOptionsSync } from "@grpc/proto-loader/build/src/util";
23+
import { Any__Output } from "../generated/google/protobuf/Any";
24+
import { Struct__Output } from "../generated/google/protobuf/Struct";
25+
import { Value__Output } from "../generated/google/protobuf/Value";
26+
import { TypedStruct__Output } from "../generated/xds/type/v3/TypedStruct";
27+
28+
const XDS_TYPED_STRUCT_TYPE_URL = 'xds.type.v3.TypedStruct';
29+
const UDPA_TYPED_STRUCT_TYPE_URL = 'udpa.type.v1.TypedStruct';
30+
31+
const resourceRoot = loadProtosWithOptionsSync([
32+
'xds/type/v3/typed_struct.proto',
33+
'udpa/type/v1/typed_struct.proto'], {
34+
keepCase: true,
35+
includeDirs: [
36+
// Paths are relative to src/build
37+
__dirname + '/../../deps/xds/',
38+
],
39+
}
40+
);
41+
42+
const toObjectOptions = {
43+
longs: String,
44+
enums: String,
45+
defaults: true,
46+
oneofs: true
47+
}
48+
49+
/* xds.type.v3.TypedStruct and udpa.type.v1.TypedStruct have identical interfaces */
50+
function decodeTypedStruct(message: Any__Output): TypedStruct__Output {
51+
const name = message.type_url.substring(message.type_url.lastIndexOf('/') + 1);
52+
const type = resourceRoot.lookup(name);
53+
if (type) {
54+
const decodedMessage = (type as any).decode(message.value);
55+
return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as TypedStruct__Output;
56+
} else {
57+
throw new Error(`TypedStruct parsing error: unexpected type URL ${message.type_url}`);
58+
}
59+
}
60+
61+
type FlatValue = boolean | null | number | string | FlatValue[] | FlatStruct;
62+
interface FlatStruct {
63+
[key: string]: FlatValue;
64+
}
65+
66+
function flattenValue(value: Value__Output): FlatValue {
67+
switch (value.kind) {
68+
case 'boolValue':
69+
return value.boolValue!;
70+
case 'listValue':
71+
return value.listValue!.values.map(flattenValue);
72+
case 'nullValue':
73+
return null;
74+
case 'numberValue':
75+
return value.numberValue!;
76+
case 'stringValue':
77+
return value.stringValue!;
78+
case 'structValue':
79+
return flattenStruct(value.structValue!);
80+
default:
81+
throw new Error(`Struct parsing error: unexpected value kind ${value.kind}`);
82+
}
83+
}
84+
85+
function flattenStruct(struct: Struct__Output): FlatStruct {
86+
const result: FlatStruct = {};
87+
for (const [key, value] of Object.entries(struct.fields)) {
88+
result[key] = flattenValue(value);
89+
}
90+
return result;
91+
}
92+
93+
function convertToLoadBalancingPolicy(protoPolicy: TypedExtensionConfig__Output, selectChildPolicy: (childPolicy: LoadBalancingPolicy__Output) => LoadBalancingConfig): LoadBalancingConfig {
94+
if (protoPolicy.typed_config?.type_url !== XDS_TYPED_STRUCT_TYPE_URL && protoPolicy.typed_config?.type_url !== UDPA_TYPED_STRUCT_TYPE_URL) {
95+
throw new Error(`Typed struct LB policy parsing error: unexpected type URL ${protoPolicy.typed_config?.type_url}`);
96+
}
97+
const typedStruct = decodeTypedStruct(protoPolicy.typed_config);
98+
if (!typedStruct.value) {
99+
throw new Error(`Typed struct LB parsing error: unexpected value ${typedStruct.value}`);
100+
}
101+
const policyName = typedStruct.type_url.substring(typedStruct.type_url.lastIndexOf('/') + 1);
102+
return {
103+
[policyName]: flattenStruct(typedStruct.value)
104+
};
105+
}
106+
107+
export function setup() {
108+
registerLbPolicy(XDS_TYPED_STRUCT_TYPE_URL, convertToLoadBalancingPolicy);
109+
registerLbPolicy(UDPA_TYPED_STRUCT_TYPE_URL, convertToLoadBalancingPolicy);
110+
}

packages/grpc-js-xds/src/load-balancer-cds.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,11 @@ export class CdsLoadBalancer implements LoadBalancer {
193193
this.reportError((e as Error).message);
194194
return;
195195
}
196+
const rootClusterUpdate = this.clusterTree[this.latestConfig!.getCluster()].latestUpdate!;
196197
const clusterResolverConfig: LoadBalancingConfig = {
197198
xds_cluster_resolver: {
198199
discovery_mechanisms: discoveryMechanismList,
199-
locality_picking_policy: [],
200-
endpoint_picking_policy: []
200+
xds_lb_policy: rootClusterUpdate.lbPolicyConfig
201201
}
202202
};
203203
let parsedClusterResolverConfig: TypedLoadBalancingConfig;

packages/grpc-js-xds/src/load-balancer-priority.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import QueuePicker = experimental.QueuePicker;
2727
import UnavailablePicker = experimental.UnavailablePicker;
2828
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
2929
import selectLbConfigFromList = experimental.selectLbConfigFromList;
30+
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
3031

3132
const TRACER_NAME = 'priority';
3233

@@ -41,6 +42,8 @@ const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000;
4142

4243
export type LocalitySubchannelAddress = SubchannelAddress & {
4344
localityPath: string[];
45+
locality: Locality__Output;
46+
weight: number;
4447
};
4548

4649
export function isLocalitySubchannelAddress(

packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
import { experimental, logVerbosity, status as Status, Metadata, connectivityState } from "@grpc/grpc-js";
1919
import { validateXdsServerConfig, XdsServerConfig } from "./xds-bootstrap";
20-
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from "./xds-client";
20+
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats, XdsClusterLocalityStats } from "./xds-client";
21+
import { LocalitySubchannelAddress } from "./load-balancer-priority";
2122

2223
import LoadBalancer = experimental.LoadBalancer;
2324
import registerLoadBalancerType = experimental.registerLoadBalancerType;
@@ -31,6 +32,8 @@ import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
3132
import createChildChannelControlHelper = experimental.createChildChannelControlHelper;
3233
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
3334
import selectLbConfigFromList = experimental.selectLbConfigFromList;
35+
import SubchannelInterface = experimental.SubchannelInterface;
36+
import BaseSubchannelWrapper = experimental.BaseSubchannelWrapper;
3437

3538
const TRACER_NAME = 'xds_cluster_impl';
3639

@@ -80,7 +83,7 @@ class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
8083
};
8184
}
8285

83-
constructor(private cluster: string, private dropCategories: DropCategory[], private childPolicy: TypedLoadBalancingConfig, private edsServiceName?: string, private lrsLoadReportingServer?: XdsServerConfig, maxConcurrentRequests?: number) {
86+
constructor(private cluster: string, private dropCategories: DropCategory[], private childPolicy: TypedLoadBalancingConfig, private edsServiceName: string, private lrsLoadReportingServer: XdsServerConfig, maxConcurrentRequests?: number) {
8487
this.maxConcurrentRequests = maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS;
8588
}
8689

@@ -112,8 +115,8 @@ class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
112115
if (!('cluster' in obj && typeof obj.cluster === 'string')) {
113116
throw new Error('xds_cluster_impl config must have a string field cluster');
114117
}
115-
if ('eds_service_name' in obj && !(obj.eds_service_name === undefined || typeof obj.eds_service_name === 'string')) {
116-
throw new Error('xds_cluster_impl config eds_service_name field must be a string if provided');
118+
if (!('eds_service_name' in obj && typeof obj.eds_service_name === 'string')) {
119+
throw new Error('xds_cluster_impl config must have a string field eds_service_name');
117120
}
118121
if ('max_concurrent_requests' in obj && !(obj.max_concurrent_requests === undefined || typeof obj.max_concurrent_requests === 'number')) {
119122
throw new Error('xds_cluster_impl config max_concurrent_requests must be a number if provided');
@@ -128,7 +131,7 @@ class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
128131
if (!childConfig) {
129132
throw new Error('xds_cluster_impl config child_policy parsing failed');
130133
}
131-
return new XdsClusterImplLoadBalancingConfig(obj.cluster, obj.drop_categories.map(validateDropCategory), childConfig, obj.eds_service_name, obj.lrs_load_reporting_server ? validateXdsServerConfig(obj.lrs_load_reporting_server) : undefined, obj.max_concurrent_requests);
134+
return new XdsClusterImplLoadBalancingConfig(obj.cluster, obj.drop_categories.map(validateDropCategory), childConfig, obj.eds_service_name, validateXdsServerConfig(obj.lrs_load_reporting_server), obj.max_concurrent_requests);
132135
}
133136
}
134137

@@ -156,7 +159,25 @@ class CallCounterMap {
156159

157160
const callCounterMap = new CallCounterMap();
158161

159-
class DropPicker implements Picker {
162+
class LocalitySubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface {
163+
constructor(child: SubchannelInterface, private statsObject: XdsClusterLocalityStats) {
164+
super(child);
165+
}
166+
167+
getStatsObject() {
168+
return this.statsObject;
169+
}
170+
171+
getWrappedSubchannel(): SubchannelInterface {
172+
return this.child;
173+
}
174+
}
175+
176+
/**
177+
* This picker is responsible for implementing the drop configuration, and for
178+
* recording drop stats and per-locality stats.
179+
*/
180+
class XdsClusterImplPicker implements Picker {
160181
constructor(private originalPicker: Picker, private callCounterMapKey: string, private maxConcurrentRequests: number, private dropCategories: DropCategory[], private clusterDropStats: XdsClusterDropStats | null) {}
161182

162183
private checkForMaxConcurrentRequestsDrop(): boolean {
@@ -186,16 +207,19 @@ class DropPicker implements Picker {
186207
}
187208
if (details === null) {
188209
const originalPick = this.originalPicker.pick(pickArgs);
210+
const pickSubchannel = originalPick.subchannel ? (originalPick.subchannel as LocalitySubchannelWrapper) : null;
189211
return {
190212
pickResultType: originalPick.pickResultType,
191213
status: originalPick.status,
192-
subchannel: originalPick.subchannel,
214+
subchannel: pickSubchannel?.getWrappedSubchannel() ?? null,
193215
onCallStarted: () => {
194216
originalPick.onCallStarted?.();
217+
pickSubchannel?.getStatsObject().addCallStarted();
195218
callCounterMap.startCall(this.callCounterMapKey);
196219
},
197220
onCallEnded: status => {
198221
originalPick.onCallEnded?.(status);
222+
pickSubchannel?.getStatsObject().addCallFinished(status !== Status.OK)
199223
callCounterMap.endCall(this.callCounterMapKey);
200224
}
201225
};
@@ -227,11 +251,25 @@ class XdsClusterImplBalancer implements LoadBalancer {
227251

228252
constructor(private readonly channelControlHelper: ChannelControlHelper) {
229253
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
254+
createSubchannel: (subchannelAddress, subchannelArgs) => {
255+
if (!this.xdsClient || !this.latestConfig) {
256+
throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated');
257+
}
258+
const locality = (subchannelAddress as LocalitySubchannelAddress).locality ?? '';
259+
const wrapperChild = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
260+
const statsObj = this.xdsClient.addClusterLocalityStats(
261+
this.latestConfig.getLrsLoadReportingServer(),
262+
this.latestConfig.getCluster(),
263+
this.latestConfig.getEdsServiceName(),
264+
locality
265+
);
266+
return new LocalitySubchannelWrapper(wrapperChild, statsObj);
267+
},
230268
updateState: (connectivityState, originalPicker) => {
231269
if (this.latestConfig === null) {
232270
channelControlHelper.updateState(connectivityState, originalPicker);
233271
} else {
234-
const picker = new DropPicker(originalPicker, getCallCounterMapKey(this.latestConfig.getCluster(), this.latestConfig.getEdsServiceName()), this.latestConfig.getMaxConcurrentRequests(), this.latestConfig.getDropCategories(), this.clusterDropStats);
272+
const picker = new XdsClusterImplPicker(originalPicker, getCallCounterMapKey(this.latestConfig.getCluster(), this.latestConfig.getEdsServiceName()), this.latestConfig.getMaxConcurrentRequests(), this.latestConfig.getDropCategories(), this.clusterDropStats);
235273
channelControlHelper.updateState(connectivityState, picker);
236274
}
237275
}

0 commit comments

Comments
 (0)