@@ -24,6 +24,98 @@ import { XdsServer } from "./xds-server";
24
24
import * as assert from 'assert' ;
25
25
import { WrrLocality } from "../src/generated/envoy/extensions/load_balancing_policies/wrr_locality/v3/WrrLocality" ;
26
26
import { TypedStruct } from "../src/generated/xds/type/v3/TypedStruct" ;
27
+ import { connectivityState , experimental , logVerbosity } from "@grpc/grpc-js" ;
28
+
29
+ import TypedLoadBalancingConfig = experimental . TypedLoadBalancingConfig ;
30
+ import LoadBalancer = experimental . LoadBalancer ;
31
+ import ChannelControlHelper = experimental . ChannelControlHelper ;
32
+ import ChildLoadBalancerHandler = experimental . ChildLoadBalancerHandler ;
33
+ import SubchannelAddress = experimental . SubchannelAddress ;
34
+ import Picker = experimental . Picker ;
35
+ import PickArgs = experimental . PickArgs ;
36
+ import PickResult = experimental . PickResult ;
37
+ import PickResultType = experimental . PickResultType ;
38
+ import createChildChannelControlHelper = experimental . createChildChannelControlHelper ;
39
+ import parseLoadBalancingConfig = experimental . parseLoadBalancingConfig ;
40
+ import registerLoadBalancerType = experimental . registerLoadBalancerType ;
41
+
42
+ const LB_POLICY_NAME = 'test.RpcBehaviorLoadBalancer' ;
43
+
44
+ class RpcBehaviorLoadBalancingConfig implements TypedLoadBalancingConfig {
45
+ constructor ( private rpcBehavior : string ) { }
46
+ getLoadBalancerName ( ) : string {
47
+ return LB_POLICY_NAME ;
48
+ }
49
+ toJsonObject ( ) : object {
50
+ return {
51
+ [ LB_POLICY_NAME ] : {
52
+ 'rpcBehavior' : this . rpcBehavior
53
+ }
54
+ } ;
55
+ }
56
+ getRpcBehavior ( ) {
57
+ return this . rpcBehavior ;
58
+ }
59
+ static createFromJson ( obj : any ) : RpcBehaviorLoadBalancingConfig {
60
+ if ( ! ( 'rpcBehavior' in obj && typeof obj . rpcBehavior === 'string' ) ) {
61
+ throw new Error ( `${ LB_POLICY_NAME } parsing error: expected string field rpcBehavior` ) ;
62
+ }
63
+ return new RpcBehaviorLoadBalancingConfig ( obj . rpcBehavior ) ;
64
+ }
65
+ }
66
+
67
+ class RpcBehaviorPicker implements Picker {
68
+ constructor ( private wrappedPicker : Picker , private rpcBehavior : string ) { }
69
+ pick ( pickArgs : PickArgs ) : PickResult {
70
+ const wrappedPick = this . wrappedPicker . pick ( pickArgs ) ;
71
+ if ( wrappedPick . pickResultType === PickResultType . COMPLETE ) {
72
+ pickArgs . metadata . add ( 'rpc-behavior' , this . rpcBehavior ) ;
73
+ }
74
+ return wrappedPick ;
75
+ }
76
+ }
77
+
78
+ const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig ( { round_robin : { } } ) ;
79
+
80
+ /**
81
+ * Load balancer implementation for Custom LB policy test
82
+ */
83
+ class RpcBehaviorLoadBalancer implements LoadBalancer {
84
+ private child : ChildLoadBalancerHandler ;
85
+ private latestConfig : RpcBehaviorLoadBalancingConfig | null = null ;
86
+ constructor ( channelControlHelper : ChannelControlHelper ) {
87
+ const childChannelControlHelper = createChildChannelControlHelper ( channelControlHelper , {
88
+ updateState : ( state , picker ) => {
89
+ if ( state === connectivityState . READY && this . latestConfig ) {
90
+ picker = new RpcBehaviorPicker ( picker , this . latestConfig . getRpcBehavior ( ) ) ;
91
+ }
92
+ channelControlHelper . updateState ( state , picker ) ;
93
+ }
94
+ } ) ;
95
+ this . child = new ChildLoadBalancerHandler ( childChannelControlHelper ) ;
96
+ }
97
+ updateAddressList ( addressList : SubchannelAddress [ ] , lbConfig : TypedLoadBalancingConfig , attributes : { [ key : string ] : unknown ; } ) : void {
98
+ if ( ! ( lbConfig instanceof RpcBehaviorLoadBalancingConfig ) ) {
99
+ return ;
100
+ }
101
+ this . latestConfig = lbConfig ;
102
+ this . child . updateAddressList ( addressList , RPC_BEHAVIOR_CHILD_CONFIG , attributes ) ;
103
+ }
104
+ exitIdle ( ) : void {
105
+ this . child . exitIdle ( ) ;
106
+ }
107
+ resetBackoff ( ) : void {
108
+ this . child . resetBackoff ( ) ;
109
+ }
110
+ destroy ( ) : void {
111
+ this . child . destroy ( ) ;
112
+ }
113
+ getTypeName ( ) : string {
114
+ return LB_POLICY_NAME ;
115
+ }
116
+ }
117
+
118
+ registerLoadBalancerType ( LB_POLICY_NAME , RpcBehaviorLoadBalancer , RpcBehaviorLoadBalancingConfig ) ;
27
119
28
120
describe ( 'Custom LB policies' , ( ) => {
29
121
let xdsServer : XdsServer ;
@@ -163,4 +255,47 @@ describe('Custom LB policies', () => {
163
255
client . sendOneCall ( done ) ;
164
256
} , reason => done ( reason ) ) ;
165
257
} ) ;
258
+ it ( 'Should handle a custom LB policy' , done => {
259
+ const childPolicy : TypedStruct & AnyExtension = {
260
+ '@type' : 'type.googleapis.com/xds.type.v3.TypedStruct' ,
261
+ type_url : 'test.RpcBehaviorLoadBalancer' ,
262
+ value : {
263
+ fields : {
264
+ rpcBehavior : { stringValue : 'error-code-15' }
265
+ }
266
+ }
267
+ } ;
268
+ const lbPolicy : WrrLocality & AnyExtension = {
269
+ '@type' : 'type.googleapis.com/envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality' ,
270
+ endpoint_picking_policy : {
271
+ policies : [
272
+ {
273
+ typed_extension_config : {
274
+ name : 'child' ,
275
+ typed_config : childPolicy
276
+ }
277
+ }
278
+ ]
279
+ }
280
+ } ;
281
+ const cluster = new FakeEdsCluster ( 'cluster1' , 'endpoint1' , [ { backends : [ new Backend ( ) ] , locality :{ region : 'region1' } } ] , lbPolicy ) ;
282
+ const routeGroup = new FakeRouteGroup ( 'listener1' , 'route1' , [ { cluster : cluster } ] ) ;
283
+ routeGroup . startAllBackends ( ) . then ( ( ) => {
284
+ xdsServer . setEdsResource ( cluster . getEndpointConfig ( ) ) ;
285
+ xdsServer . setCdsResource ( cluster . getClusterConfig ( ) ) ;
286
+ xdsServer . setRdsResource ( routeGroup . getRouteConfiguration ( ) ) ;
287
+ xdsServer . setLdsResource ( routeGroup . getListener ( ) ) ;
288
+ xdsServer . addResponseListener ( ( typeUrl , responseState ) => {
289
+ if ( responseState . state === 'NACKED' ) {
290
+ client . stopCalls ( ) ;
291
+ assert . fail ( `Client NACKED ${ typeUrl } resource with message ${ responseState . errorMessage } ` ) ;
292
+ }
293
+ } )
294
+ client = XdsTestClient . createFromServer ( 'listener1' , xdsServer ) ;
295
+ client . sendOneCall ( error => {
296
+ assert . strictEqual ( error ?. code , 15 ) ;
297
+ done ( ) ;
298
+ } ) ;
299
+ } , reason => done ( reason ) ) ;
300
+ } )
166
301
} ) ;
0 commit comments