@@ -20,21 +20,27 @@ package clusterimpl_test
20
20
21
21
import (
22
22
"context"
23
+ "encoding/json"
24
+ "errors"
23
25
"fmt"
24
26
"math"
25
27
"net"
26
28
"strconv"
27
29
"strings"
30
+ "sync/atomic"
28
31
"testing"
29
32
"time"
30
33
31
34
"github.com/google/go-cmp/cmp"
32
35
"github.com/google/uuid"
33
36
"google.golang.org/grpc"
37
+ "google.golang.org/grpc/balancer"
38
+ "google.golang.org/grpc/balancer/pickfirst"
34
39
"google.golang.org/grpc/codes"
35
40
"google.golang.org/grpc/connectivity"
36
41
"google.golang.org/grpc/credentials/insecure"
37
42
"google.golang.org/grpc/internal"
43
+ "google.golang.org/grpc/internal/balancer/stub"
38
44
"google.golang.org/grpc/internal/grpctest"
39
45
"google.golang.org/grpc/internal/stubserver"
40
46
"google.golang.org/grpc/internal/testutils"
@@ -43,6 +49,7 @@ import (
43
49
"google.golang.org/grpc/peer"
44
50
"google.golang.org/grpc/resolver"
45
51
"google.golang.org/grpc/resolver/manual"
52
+ "google.golang.org/grpc/serviceconfig"
46
53
"google.golang.org/grpc/status"
47
54
"google.golang.org/protobuf/testing/protocmp"
48
55
"google.golang.org/protobuf/types/known/durationpb"
@@ -1113,3 +1120,186 @@ func (s) TestUpdateLRSServerToNil(t *testing.T) {
1113
1120
t .Fatalf ("Expected no LRS reports after disable, got %v want %v" , err , context .DeadlineExceeded )
1114
1121
}
1115
1122
}
1123
+
1124
+ // Test verifies that child policy was updated on receipt of
1125
+ // configuration update.
1126
+ func (s ) TestChildPolicyChangeOnConfigUpdate (t * testing.T ) {
1127
+ // Create an xDS management server.
1128
+ mgmtServer := e2e .StartManagementServer (t , e2e.ManagementServerOptions {AllowResourceSubset : true })
1129
+ defer mgmtServer .Stop ()
1130
+
1131
+ // Create bootstrap configuration pointing to the above management server.
1132
+ nodeID := uuid .New ().String ()
1133
+ bc := e2e .DefaultBootstrapContents (t , nodeID , mgmtServer .Address )
1134
+ testutils .CreateBootstrapFileForTesting (t , bc )
1135
+
1136
+ // Create an xDS resolver with the above bootstrap configuration.
1137
+ if internal .NewXDSResolverWithConfigForTesting == nil {
1138
+ t .Fatalf ("internal.NewXDSResolverWithConfigForTesting is nil" )
1139
+ }
1140
+ resolverBuilder , err := internal .NewXDSResolverWithConfigForTesting .(func ([]byte ) (resolver.Builder , error ))(bc )
1141
+ if err != nil {
1142
+ t .Fatalf ("Failed to create xDS resolver for testing: %v" , err )
1143
+ }
1144
+
1145
+ // Start a server backend exposing the test service.
1146
+ server := stubserver .StartTestService (t , nil )
1147
+ defer server .Stop ()
1148
+
1149
+ const serviceName = "test-child-policy"
1150
+
1151
+ // Configure the xDS management server with default resources. Cluster
1152
+ // corresponding to this resource will be configured with "round_robin"
1153
+ // as the endpoint picking policy
1154
+ resources := e2e .DefaultClientResources (e2e.ResourceParams {
1155
+ DialTarget : serviceName ,
1156
+ NodeID : nodeID ,
1157
+ Host : "localhost" ,
1158
+ Port : testutils .ParsePort (t , server .Address ),
1159
+ })
1160
+
1161
+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
1162
+ defer cancel ()
1163
+ if err := mgmtServer .Update (ctx , resources ); err != nil {
1164
+ t .Fatalf ("Failed to update xDS resources: %v" , err )
1165
+ }
1166
+
1167
+ cc , err := grpc .NewClient (fmt .Sprintf ("xds:///%s" , serviceName ), grpc .WithResolvers (resolverBuilder ), grpc .WithTransportCredentials (insecure .NewCredentials ()))
1168
+ if err != nil {
1169
+ t .Fatalf ("Failed to create client: %v" , err )
1170
+ }
1171
+ defer cc .Close ()
1172
+
1173
+ client := testgrpc .NewTestServiceClient (cc )
1174
+ if _ , err := client .EmptyCall (ctx , & testpb.Empty {}); err != nil {
1175
+ t .Fatalf ("client.EmptyCall() failed: %v" , err )
1176
+ }
1177
+
1178
+ // Register stub pickfirst LB policy so that we can catch config changes.
1179
+ pfBuilder := balancer .Get (pickfirst .Name )
1180
+ internal .BalancerUnregister (pfBuilder .Name ())
1181
+ lbCfgCh := make (chan serviceconfig.LoadBalancingConfig , 1 )
1182
+ var updatedChildPolicy atomic.Pointer [string ]
1183
+ stub .Register (pfBuilder .Name (), stub.BalancerFuncs {
1184
+ ParseConfig : func (lbCfg json.RawMessage ) (serviceconfig.LoadBalancingConfig , error ) {
1185
+ return pfBuilder .(balancer.ConfigParser ).ParseConfig (lbCfg )
1186
+ },
1187
+ Init : func (bd * stub.BalancerData ) {
1188
+ bd .ChildBalancer = pfBuilder .Build (bd .ClientConn , bd .BuildOptions )
1189
+ },
1190
+ UpdateClientConnState : func (bd * stub.BalancerData , ccs balancer.ClientConnState ) error {
1191
+ name := pfBuilder .Name ()
1192
+ updatedChildPolicy .Store (& name )
1193
+ select {
1194
+ case lbCfgCh <- ccs .BalancerConfig :
1195
+ case <- ctx .Done ():
1196
+ t .Error ("Timed out while waiting for BalancerConfig, context deadline exceeded" )
1197
+ }
1198
+ return bd .ChildBalancer .UpdateClientConnState (ccs )
1199
+ },
1200
+ Close : func (bd * stub.BalancerData ) {
1201
+ bd .ChildBalancer .Close ()
1202
+ },
1203
+ })
1204
+ defer balancer .Register (pfBuilder )
1205
+
1206
+ // Now update the cluster to use "pick_first" as the endpoint picking policy.
1207
+ resources .Clusters [0 ].LoadBalancingPolicy = & v3clusterpb.LoadBalancingPolicy {
1208
+ Policies : []* v3clusterpb.LoadBalancingPolicy_Policy {{
1209
+ TypedExtensionConfig : & v3corepb.TypedExtensionConfig {
1210
+ TypedConfig : testutils .MarshalAny (t , & v3pickfirstpb.PickFirst {}),
1211
+ },
1212
+ }},
1213
+ }
1214
+ if err := mgmtServer .Update (ctx , resources ); err != nil {
1215
+ t .Fatal (err )
1216
+ }
1217
+
1218
+ select {
1219
+ case <- ctx .Done ():
1220
+ t .Fatalf ("Timeout waiting for pickfirst child policy config" )
1221
+ case <- lbCfgCh :
1222
+ }
1223
+
1224
+ if p := updatedChildPolicy .Load (); p == nil || * p != pfBuilder .Name () {
1225
+ var got string
1226
+ if p != nil {
1227
+ got = * p
1228
+ }
1229
+ t .Fatalf ("Unexpected child policy after config update, got %q, want %q" , got , pfBuilder .Name ())
1230
+ }
1231
+
1232
+ // New RPC should still be routed successfully
1233
+ if _ , err := client .EmptyCall (ctx , & testpb.Empty {}); err != nil {
1234
+ t .Errorf ("EmptyCall() failed after policy update: %v" , err )
1235
+ }
1236
+ }
1237
+
1238
+ // Test verifies that config update fails if child policy config
1239
+ // failed to parse.
1240
+ func (s ) TestFailedToParseChildPolicyConfig (t * testing.T ) {
1241
+ // Create an xDS management server.
1242
+ mgmtServer := e2e .StartManagementServer (t , e2e.ManagementServerOptions {AllowResourceSubset : true })
1243
+ defer mgmtServer .Stop ()
1244
+
1245
+ // Create bootstrap configuration pointing to the above management server.
1246
+ nodeID := uuid .New ().String ()
1247
+ bc := e2e .DefaultBootstrapContents (t , nodeID , mgmtServer .Address )
1248
+ testutils .CreateBootstrapFileForTesting (t , bc )
1249
+
1250
+ // Create an xDS resolver with the above bootstrap configuration.
1251
+ if internal .NewXDSResolverWithConfigForTesting == nil {
1252
+ t .Fatalf ("internal.NewXDSResolverWithConfigForTesting is nil" )
1253
+ }
1254
+ resolverBuilder , err := internal .NewXDSResolverWithConfigForTesting .(func ([]byte ) (resolver.Builder , error ))(bc )
1255
+ if err != nil {
1256
+ t .Fatalf ("Failed to create xDS resolver for testing: %v" , err )
1257
+ }
1258
+
1259
+ // Start a server backend exposing the test service.
1260
+ server := stubserver .StartTestService (t , nil )
1261
+ defer server .Stop ()
1262
+
1263
+ const serviceName = "test-child-policy"
1264
+ resources := e2e .DefaultClientResources (e2e.ResourceParams {
1265
+ DialTarget : serviceName ,
1266
+ NodeID : nodeID ,
1267
+ Host : "localhost" ,
1268
+ Port : testutils .ParsePort (t , server .Address ),
1269
+ })
1270
+ resources .Clusters [0 ].LoadBalancingPolicy = & v3clusterpb.LoadBalancingPolicy {
1271
+ Policies : []* v3clusterpb.LoadBalancingPolicy_Policy {{
1272
+ TypedExtensionConfig : & v3corepb.TypedExtensionConfig {
1273
+ TypedConfig : testutils .MarshalAny (t , & v3pickfirstpb.PickFirst {}),
1274
+ },
1275
+ }},
1276
+ }
1277
+
1278
+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
1279
+ defer cancel ()
1280
+ if err := mgmtServer .Update (ctx , resources ); err != nil {
1281
+ t .Fatalf ("Failed to update xDS resources: %v" , err )
1282
+ }
1283
+
1284
+ // Register stub pickfirst LB policy so that we can catch parsing errors.
1285
+ pfBuilder := balancer .Get (pickfirst .Name )
1286
+ internal .BalancerUnregister (pfBuilder .Name ())
1287
+ const parseConfigError = "failed to parse config"
1288
+ stub .Register (pfBuilder .Name (), stub.BalancerFuncs {
1289
+ ParseConfig : func (_ json.RawMessage ) (serviceconfig.LoadBalancingConfig , error ) {
1290
+ return nil , errors .New (parseConfigError )
1291
+ },
1292
+ })
1293
+ defer balancer .Register (pfBuilder )
1294
+
1295
+ cc , err := grpc .NewClient (fmt .Sprintf ("xds:///%s" , serviceName ), grpc .WithResolvers (resolverBuilder ), grpc .WithTransportCredentials (insecure .NewCredentials ()))
1296
+ if err != nil {
1297
+ t .Fatalf ("Failed to create client: %v" , err )
1298
+ }
1299
+ defer cc .Close ()
1300
+
1301
+ client := testgrpc .NewTestServiceClient (cc )
1302
+ if _ , err := client .EmptyCall (ctx , & testpb.Empty {}); err == nil || ! strings .Contains (err .Error (), parseConfigError ) {
1303
+ t .Fatal ("EmptyCall RPC succeeded when expected to fail" )
1304
+ }
1305
+ }
0 commit comments