Skip to content

Commit c679726

Browse files
committed
Add custom LB interop test support
1 parent 13a6e6d commit c679726

File tree

3 files changed

+151
-7
lines changed

3 files changed

+151
-7
lines changed

packages/grpc-js-xds/interop/xds-interop-client.ts

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,98 @@ import { XdsUpdateClientConfigureServiceHandlers } from './generated/grpc/testin
3030
import { Empty__Output } from './generated/grpc/testing/Empty';
3131
import { LoadBalancerAccumulatedStatsResponse } from './generated/grpc/testing/LoadBalancerAccumulatedStatsResponse';
3232

33+
import TypedLoadBalancingConfig = grpc.experimental.TypedLoadBalancingConfig;
34+
import LoadBalancer = grpc.experimental.LoadBalancer;
35+
import ChannelControlHelper = grpc.experimental.ChannelControlHelper;
36+
import ChildLoadBalancerHandler = grpc.experimental.ChildLoadBalancerHandler;
37+
import SubchannelAddress = grpc.experimental.SubchannelAddress;
38+
import Picker = grpc.experimental.Picker;
39+
import PickArgs = grpc.experimental.PickArgs;
40+
import PickResult = grpc.experimental.PickResult;
41+
import PickResultType = grpc.experimental.PickResultType;
42+
import createChildChannelControlHelper = grpc.experimental.createChildChannelControlHelper;
43+
import parseLoadBalancingConfig = grpc.experimental.parseLoadBalancingConfig;
44+
3345
grpc_xds.register();
3446

47+
const LB_POLICY_NAME = 'RpcBehaviorLoadBalancer';
48+
49+
class RpcBehaviorLoadBalancingConfig implements TypedLoadBalancingConfig {
50+
constructor(private rpcBehavior: string) {}
51+
getLoadBalancerName(): string {
52+
return LB_POLICY_NAME;
53+
}
54+
toJsonObject(): object {
55+
return {
56+
[LB_POLICY_NAME]: {
57+
'rpcBehavior': this.rpcBehavior
58+
}
59+
};
60+
}
61+
getRpcBehavior() {
62+
return this.rpcBehavior;
63+
}
64+
static createFromJson(obj: any): RpcBehaviorLoadBalancingConfig {
65+
if (!('rpcBehavior' in obj && typeof obj.rpcBehavior === 'string')) {
66+
throw new Error(`${LB_POLICY_NAME} parsing error: expected string field rpcBehavior`);
67+
}
68+
return new RpcBehaviorLoadBalancingConfig(obj.rpcBehavior);
69+
}
70+
}
71+
72+
class RpcBehaviorPicker implements Picker {
73+
constructor(private wrappedPicker: Picker, private rpcBehavior: string) {}
74+
pick(pickArgs: PickArgs): PickResult {
75+
const wrappedPick = this.wrappedPicker.pick(pickArgs);
76+
if (wrappedPick.pickResultType === PickResultType.COMPLETE) {
77+
pickArgs.metadata.add('rpc-behavior', this.rpcBehavior);
78+
}
79+
return wrappedPick;
80+
}
81+
}
82+
83+
const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig({round_robin: {}});
84+
85+
/**
86+
* Load balancer implementation for Custom LB policy test
87+
*/
88+
class RpcBehaviorLoadBalancer implements LoadBalancer {
89+
private child: ChildLoadBalancerHandler;
90+
private latestConfig: RpcBehaviorLoadBalancingConfig | null = null;
91+
constructor(channelControlHelper: ChannelControlHelper) {
92+
const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, {
93+
updateState: (connectivityState, picker) => {
94+
if (connectivityState === grpc.connectivityState.READY && this.latestConfig) {
95+
picker = new RpcBehaviorPicker(picker, this.latestConfig.getLoadBalancerName());
96+
}
97+
channelControlHelper.updateState(connectivityState, picker);
98+
}
99+
});
100+
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
101+
}
102+
updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
103+
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
104+
return;
105+
}
106+
this.latestConfig = lbConfig;
107+
this.child.updateAddressList(addressList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
108+
}
109+
exitIdle(): void {
110+
this.child.exitIdle();
111+
}
112+
resetBackoff(): void {
113+
this.child.resetBackoff();
114+
}
115+
destroy(): void {
116+
this.child.destroy();
117+
}
118+
getTypeName(): string {
119+
return LB_POLICY_NAME;
120+
}
121+
}
122+
123+
grpc.experimental.registerLoadBalancerType(LB_POLICY_NAME, RpcBehaviorLoadBalancer, RpcBehaviorLoadBalancingConfig);
124+
35125
const packageDefinition = protoLoader.loadSync('grpc/testing/test.proto', {
36126
keepCase: true,
37127
defaults: true,
@@ -91,7 +181,7 @@ class CallSubscriber {
91181
}
92182
if (peerName in this.callsSucceededByPeer) {
93183
this.callsSucceededByPeer[peerName] += 1;
94-
} else {
184+
} else {
95185
this.callsSucceededByPeer[peerName] = 1;
96186
}
97187
this.callsSucceeded += 1;
@@ -426,9 +516,9 @@ function main() {
426516
* channels do not share any subchannels. It does not have any
427517
* inherent function. */
428518
console.log(`Interop client channel ${i} starting sending ${argv.qps} QPS to ${argv.server}`);
429-
sendConstantQps(new loadedProto.grpc.testing.TestService(argv.server, grpc.credentials.createInsecure(), {'unique': i}),
430-
argv.qps,
431-
argv.fail_on_failed_rpcs === 'true',
519+
sendConstantQps(new loadedProto.grpc.testing.TestService(argv.server, grpc.credentials.createInsecure(), {'unique': i}),
520+
argv.qps,
521+
argv.fail_on_failed_rpcs === 'true',
432522
callStatsTracker);
433523
}
434524

@@ -486,4 +576,4 @@ function main() {
486576

487577
if (require.main === module) {
488578
main();
489-
}
579+
}

packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
import { LoadBalancingConfig, experimental, logVerbosity } from "@grpc/grpc-js";
19+
import { loadProtosWithOptionsSync } from "@grpc/proto-loader/build/src/util";
1920
import { WeightedTargetRaw } from "./load-balancer-weighted-target";
2021
import { isLocalitySubchannelAddress } from "./load-balancer-priority";
2122
import { localityToName } from "./load-balancer-xds-cluster-resolver";
@@ -26,6 +27,11 @@ import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
2627
import SubchannelAddress = experimental.SubchannelAddress;
2728
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
2829
import registerLoadBalancerType = experimental.registerLoadBalancerType;
30+
import { Any__Output } from "./generated/google/protobuf/Any";
31+
import { WrrLocality__Output } from "./generated/envoy/extensions/load_balancing_policies/wrr_locality/v3/WrrLocality";
32+
import { TypedExtensionConfig__Output } from "./generated/envoy/config/core/v3/TypedExtensionConfig";
33+
import { LoadBalancingPolicy__Output } from "./generated/envoy/config/cluster/v3/LoadBalancingPolicy";
34+
import { registerLbPolicy } from "./lb-policy-registry";
2935

3036
const TRACER_NAME = 'xds_wrr_locality';
3137

@@ -107,6 +113,54 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer {
107113
}
108114
}
109115

116+
const WRR_LOCALITY_TYPE_URL = 'envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality';
117+
118+
const resourceRoot = loadProtosWithOptionsSync([
119+
'xds/type/v3/typed_struct.proto',
120+
'udpa/type/v1/typed_struct.proto'], {
121+
keepCase: true,
122+
includeDirs: [
123+
// Paths are relative to src/build
124+
__dirname + '/../../deps/xds/',
125+
__dirname + '/../../deps/protoc-gen-validate'
126+
],
127+
}
128+
);
129+
130+
const toObjectOptions = {
131+
longs: String,
132+
enums: String,
133+
defaults: true,
134+
oneofs: true
135+
}
136+
137+
function decodeWrrLocality(message: Any__Output): WrrLocality__Output {
138+
const name = message.type_url.substring(message.type_url.lastIndexOf('/') + 1);
139+
const type = resourceRoot.lookup(name);
140+
if (type) {
141+
const decodedMessage = (type as any).decode(message.value);
142+
return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as WrrLocality__Output;
143+
} else {
144+
throw new Error(`TypedStruct parsing error: unexpected type URL ${message.type_url}`);
145+
}
146+
}
147+
148+
function convertToLoadBalancingPolicy(protoPolicy: TypedExtensionConfig__Output, selectChildPolicy: (childPolicy: LoadBalancingPolicy__Output) => LoadBalancingConfig): LoadBalancingConfig {
149+
if (protoPolicy.typed_config?.type_url !== WRR_LOCALITY_TYPE_URL) {
150+
throw new Error(`WRR Locality LB policy parsing error: unexpected type URL ${protoPolicy.typed_config?.type_url}`);
151+
}
152+
const wrrLocalityMessage = decodeWrrLocality(protoPolicy.typed_config);
153+
if (!wrrLocalityMessage.endpoint_picking_policy) {
154+
throw new Error('WRR Locality LB parsing error: no endpoint_picking_policy specified');
155+
}
156+
return {
157+
[TYPE_NAME]: {
158+
child_policy: selectChildPolicy(wrrLocalityMessage.endpoint_picking_policy)
159+
}
160+
};
161+
}
162+
110163
export function setup() {
111164
registerLoadBalancerType(TYPE_NAME, XdsWrrLocalityLoadBalancer, XdsWrrLocalityLoadBalancingConfig);
165+
registerLbPolicy(WRR_LOCALITY_TYPE_URL, convertToLoadBalancingPolicy);
112166
}

packages/grpc-js/src/load-balancing-call.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,9 @@ export class LoadBalancingCall implements Call {
114114
throw new Error('doPick called before start');
115115
}
116116
this.trace('Pick called');
117+
const finalMetadata = this.metadata.clone();
117118
const pickResult = this.channel.doPick(
118-
this.metadata,
119+
finalMetadata,
119120
this.callConfig.pickInformation
120121
);
121122
const subchannelString = pickResult.subchannel
@@ -140,7 +141,6 @@ export class LoadBalancingCall implements Call {
140141
.generateMetadata({ service_url: this.serviceUrl })
141142
.then(
142143
credsMetadata => {
143-
const finalMetadata = this.metadata!.clone();
144144
finalMetadata.merge(credsMetadata);
145145
if (finalMetadata.get('authorization').length > 1) {
146146
this.outputStatus(

0 commit comments

Comments
 (0)