@@ -30,8 +30,98 @@ import { XdsUpdateClientConfigureServiceHandlers } from './generated/grpc/testin
30
30
import { Empty__Output } from './generated/grpc/testing/Empty' ;
31
31
import { LoadBalancerAccumulatedStatsResponse } from './generated/grpc/testing/LoadBalancerAccumulatedStatsResponse' ;
32
32
33
+ import TypedLoadBalancingConfig = grpc . experimental . TypedLoadBalancingConfig ;
34
+ import LoadBalancer = grpc . experimental . LoadBalancer ;
35
+ import ChannelControlHelper = grpc . experimental . ChannelControlHelper ;
36
+ import ChildLoadBalancerHandler = grpc . experimental . ChildLoadBalancerHandler ;
37
+ import SubchannelAddress = grpc . experimental . SubchannelAddress ;
38
+ import Picker = grpc . experimental . Picker ;
39
+ import PickArgs = grpc . experimental . PickArgs ;
40
+ import PickResult = grpc . experimental . PickResult ;
41
+ import PickResultType = grpc . experimental . PickResultType ;
42
+ import createChildChannelControlHelper = grpc . experimental . createChildChannelControlHelper ;
43
+ import parseLoadBalancingConfig = grpc . experimental . parseLoadBalancingConfig ;
44
+
33
45
grpc_xds . register ( ) ;
34
46
47
+ const LB_POLICY_NAME = 'test.RpcBehaviorLoadBalancer' ;
48
+
49
+ class RpcBehaviorLoadBalancingConfig implements TypedLoadBalancingConfig {
50
+ constructor ( private rpcBehavior : string ) { }
51
+ getLoadBalancerName ( ) : string {
52
+ return LB_POLICY_NAME ;
53
+ }
54
+ toJsonObject ( ) : object {
55
+ return {
56
+ [ LB_POLICY_NAME ] : {
57
+ 'rpcBehavior' : this . rpcBehavior
58
+ }
59
+ } ;
60
+ }
61
+ getRpcBehavior ( ) {
62
+ return this . rpcBehavior ;
63
+ }
64
+ static createFromJson ( obj : any ) : RpcBehaviorLoadBalancingConfig {
65
+ if ( ! ( 'rpcBehavior' in obj && typeof obj . rpcBehavior === 'string' ) ) {
66
+ throw new Error ( `${ LB_POLICY_NAME } parsing error: expected string field rpcBehavior` ) ;
67
+ }
68
+ return new RpcBehaviorLoadBalancingConfig ( obj . rpcBehavior ) ;
69
+ }
70
+ }
71
+
72
+ class RpcBehaviorPicker implements Picker {
73
+ constructor ( private wrappedPicker : Picker , private rpcBehavior : string ) { }
74
+ pick ( pickArgs : PickArgs ) : PickResult {
75
+ const wrappedPick = this . wrappedPicker . pick ( pickArgs ) ;
76
+ if ( wrappedPick . pickResultType === PickResultType . COMPLETE ) {
77
+ pickArgs . metadata . add ( 'rpc-behavior' , this . rpcBehavior ) ;
78
+ }
79
+ return wrappedPick ;
80
+ }
81
+ }
82
+
83
+ const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig ( { round_robin : { } } ) ;
84
+
85
+ /**
86
+ * Load balancer implementation for Custom LB policy test
87
+ */
88
+ class RpcBehaviorLoadBalancer implements LoadBalancer {
89
+ private child : ChildLoadBalancerHandler ;
90
+ private latestConfig : RpcBehaviorLoadBalancingConfig | null = null ;
91
+ constructor ( channelControlHelper : ChannelControlHelper ) {
92
+ const childChannelControlHelper = createChildChannelControlHelper ( channelControlHelper , {
93
+ updateState : ( connectivityState , picker ) => {
94
+ if ( connectivityState === grpc . connectivityState . READY && this . latestConfig ) {
95
+ picker = new RpcBehaviorPicker ( picker , this . latestConfig . getLoadBalancerName ( ) ) ;
96
+ }
97
+ channelControlHelper . updateState ( connectivityState , picker ) ;
98
+ }
99
+ } ) ;
100
+ this . child = new ChildLoadBalancerHandler ( childChannelControlHelper ) ;
101
+ }
102
+ updateAddressList ( addressList : SubchannelAddress [ ] , lbConfig : TypedLoadBalancingConfig , attributes : { [ key : string ] : unknown ; } ) : void {
103
+ if ( ! ( lbConfig instanceof RpcBehaviorLoadBalancingConfig ) ) {
104
+ return ;
105
+ }
106
+ this . latestConfig = lbConfig ;
107
+ this . child . updateAddressList ( addressList , RPC_BEHAVIOR_CHILD_CONFIG , attributes ) ;
108
+ }
109
+ exitIdle ( ) : void {
110
+ this . child . exitIdle ( ) ;
111
+ }
112
+ resetBackoff ( ) : void {
113
+ this . child . resetBackoff ( ) ;
114
+ }
115
+ destroy ( ) : void {
116
+ this . child . destroy ( ) ;
117
+ }
118
+ getTypeName ( ) : string {
119
+ return LB_POLICY_NAME ;
120
+ }
121
+ }
122
+
123
+ grpc . experimental . registerLoadBalancerType ( LB_POLICY_NAME , RpcBehaviorLoadBalancer , RpcBehaviorLoadBalancingConfig ) ;
124
+
35
125
const packageDefinition = protoLoader . loadSync ( 'grpc/testing/test.proto' , {
36
126
keepCase : true ,
37
127
defaults : true ,
@@ -91,7 +181,7 @@ class CallSubscriber {
91
181
}
92
182
if ( peerName in this . callsSucceededByPeer ) {
93
183
this . callsSucceededByPeer [ peerName ] += 1 ;
94
- } else {
184
+ } else {
95
185
this . callsSucceededByPeer [ peerName ] = 1 ;
96
186
}
97
187
this . callsSucceeded += 1 ;
@@ -426,9 +516,9 @@ function main() {
426
516
* channels do not share any subchannels. It does not have any
427
517
* inherent function. */
428
518
console . log ( `Interop client channel ${ i } starting sending ${ argv . qps } QPS to ${ argv . server } ` ) ;
429
- sendConstantQps ( new loadedProto . grpc . testing . TestService ( argv . server , grpc . credentials . createInsecure ( ) , { 'unique' : i } ) ,
430
- argv . qps ,
431
- argv . fail_on_failed_rpcs === 'true' ,
519
+ sendConstantQps ( new loadedProto . grpc . testing . TestService ( argv . server , grpc . credentials . createInsecure ( ) , { 'unique' : i } ) ,
520
+ argv . qps ,
521
+ argv . fail_on_failed_rpcs === 'true' ,
432
522
callStatsTracker ) ;
433
523
}
434
524
@@ -486,4 +576,4 @@ function main() {
486
576
487
577
if ( require . main === module ) {
488
578
main ( ) ;
489
- }
579
+ }
0 commit comments