Skip to content

Commit 9ac0ec8

Browse files
xds/cdsbalancer: increase buffer size of requested resource channel in test (#8467)
RELEASE NOTES: N/A Fixes: #8462 The main issue was that the requests were getting dropped since we use a [non-blocking send](https://github.com/grpc/grpc-go/blob/a5e7cd6d4c2c31b1e6649789c2ddc9a82ad6b5fa/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go#L222C5-L227C6) for resources in test along with buffer size of just [one](https://github.com/grpc/grpc-go/blob/a5e7cd6d4c2c31b1e6649789c2ddc9a82ad6b5fa/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go#L210) which was resulting in resource request updates being dropped if the receiver is not executing at the exact moment. Fix: Changed the `setupManagementServer` to take `listener` and `OnStreamReq` function as a parameter and in the `TestWatcher` added a blocking send whenever a cluster resource is requested.
1 parent 0ebea3e commit 9ac0ec8

File tree

2 files changed

+96
-67
lines changed

2 files changed

+96
-67
lines changed

xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ import (
3535
"google.golang.org/grpc/status"
3636
"google.golang.org/grpc/xds/internal"
3737
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
38+
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
3839

3940
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
4041
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
42+
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
4143
testgrpc "google.golang.org/grpc/interop/grpc_testing"
4244
testpb "google.golang.org/grpc/interop/grpc_testing"
4345
)
@@ -131,7 +133,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
131133
for _, test := range tests {
132134
t.Run(test.name, func(t *testing.T) {
133135
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
134-
mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
136+
mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
135137

136138
// Push the first cluster resource through the management server and
137139
// verify the configuration pushed to the child policy.
@@ -174,7 +176,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
174176
// contains the expected discovery mechanisms.
175177
func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) {
176178
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
177-
mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
179+
mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
178180

179181
// Configure the management server with the aggregate cluster resource
180182
// pointing to two child clusters, one EDS and one LogicalDNS. Include the
@@ -281,7 +283,7 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) {
281283
// policy contains a single discovery mechanism.
282284
func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) {
283285
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
284-
mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
286+
mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
285287

286288
// Configure the management server with the aggregate cluster resource
287289
// pointing to two child clusters.
@@ -356,7 +358,7 @@ func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) {
356358
// discovery mechanisms.
357359
func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) {
358360
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
359-
mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
361+
mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
360362

361363
// Start off with the requested cluster being a leaf EDS cluster.
362364
resources := e2e.UpdateOptions{
@@ -450,7 +452,7 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T
450452
// longer exceed maximum depth, but be at the maximum allowed depth, and
451453
// verifies that an RPC can be made successfully.
452454
func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) {
453-
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
455+
mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
454456

455457
resources := e2e.UpdateOptions{
456458
NodeID: nodeID,
@@ -538,7 +540,7 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) {
538540
// pushed only after all child clusters are resolved.
539541
func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) {
540542
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
541-
mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
543+
mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
542544

543545
// Configure the management server with an aggregate cluster resource having
544546
// a diamond dependency pattern, (A->[B,C]; B->D; C->D). Includes resources
@@ -605,7 +607,7 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) {
605607
// pushed only after all child clusters are resolved.
606608
func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) {
607609
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
608-
mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
610+
mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
609611

610612
// Configure the management server with an aggregate cluster resource that
611613
// has duplicates in the graph, (A->[B, C]; B->[C, D]). Include resources
@@ -683,7 +685,7 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) {
683685
// child policy and that an RPC can be successfully made.
684686
func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
685687
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
686-
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
688+
mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
687689

688690
const (
689691
clusterNameA = clusterName // cluster name in cds LB policy config
@@ -768,7 +770,7 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
768770
// that the aggregate cluster graph has no leaf clusters.
769771
func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) {
770772
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
771-
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
773+
mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
772774

773775
const (
774776
clusterNameA = clusterName // cluster name in cds LB policy config
@@ -816,7 +818,7 @@ func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) {
816818
// child policy and RPCs should get routed to that leaf cluster.
817819
func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
818820
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
819-
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
821+
mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
820822

821823
// Start a test service backend.
822824
server := stubserver.StartTestService(t, nil)
@@ -872,10 +874,21 @@ func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
872874
// removed from the tree no longer has a watcher and the new cluster added has a
873875
// new watcher.
874876
func (s) TestWatchers(t *testing.T) {
875-
mgmtServer, nodeID, _, _, _, cdsResourceRequestedCh, _ := setupWithManagementServer(t)
876-
877877
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
878878
defer cancel()
879+
cdsResourceRequestedCh := make(chan []string, 1)
880+
onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
881+
if req.GetTypeUrl() == version.V3ClusterURL {
882+
if len(req.GetResourceNames()) > 0 {
883+
select {
884+
case cdsResourceRequestedCh <- req.GetResourceNames():
885+
case <-ctx.Done():
886+
}
887+
}
888+
}
889+
return nil
890+
}
891+
mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, onStreamReq)
879892

880893
const (
881894
clusterA = clusterName

xds/internal/balancer/cdsbalancer/cdsbalancer_test.go

Lines changed: 71 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer {
183183
}
184184

185185
// Performs the following setup required for tests:
186-
// - Spins up an xDS management server
186+
// - Spins up an xDS management server and and the provided onStreamRequest
187+
// function is set to be called for every incoming request on the ADS stream.
187188
// - Creates an xDS client talking to this management server
188189
// - Creates a manual resolver that configures the cds LB policy as the
189190
// top-level policy, and pushes an initial configuration to it
@@ -195,39 +196,11 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer {
195196
// - the grpc channel to the test backend service
196197
// - the manual resolver configured on the channel
197198
// - the xDS client used the grpc channel
198-
// - a channel on which requested cluster resource names are sent
199-
// - a channel used to signal that previously requested cluster resources are
200-
// no longer requested
201-
func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
202-
return setupWithManagementServerAndListener(t, nil)
203-
}
204-
205-
// Same as setupWithManagementServer, but also allows the caller to specify
206-
// a listener to be used by the management server.
207-
func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
199+
func setupWithManagementServer(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) {
208200
t.Helper()
209-
210-
cdsResourceRequestedCh := make(chan []string, 1)
211-
cdsResourceCanceledCh := make(chan struct{}, 1)
212201
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
213-
Listener: lis,
214-
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
215-
if req.GetTypeUrl() == version.V3ClusterURL {
216-
switch len(req.GetResourceNames()) {
217-
case 0:
218-
select {
219-
case cdsResourceCanceledCh <- struct{}{}:
220-
default:
221-
}
222-
default:
223-
select {
224-
case cdsResourceRequestedCh <- req.GetResourceNames():
225-
default:
226-
}
227-
}
228-
}
229-
return nil
230-
},
202+
Listener: lis,
203+
OnStreamRequest: onStreamRequest,
231204
// Required for aggregate clusters as all resources cannot be requested
232205
// at once.
233206
AllowResourceSubset: true,
@@ -268,7 +241,7 @@ func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.
268241
cc.Connect()
269242
t.Cleanup(func() { cc.Close() })
270243

271-
return mgmtServer, nodeID, cc, r, xdsC, cdsResourceRequestedCh, cdsResourceCanceledCh
244+
return mgmtServer, nodeID, cc, r, xdsC
272245
}
273246

274247
// Helper function to compare the load balancing configuration received on the
@@ -321,11 +294,23 @@ func verifyRPCError(gotErr error, wantCode codes.Code, wantErr, wantNodeID strin
321294
// configuration changes, it stops requesting the old cluster resource and
322295
// starts requesting the new one.
323296
func (s) TestConfigurationUpdate_Success(t *testing.T) {
324-
_, _, _, r, xdsClient, cdsResourceRequestedCh, _ := setupWithManagementServer(t)
325-
326-
// Verify that the specified cluster resource is requested.
327297
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
328298
defer cancel()
299+
cdsResourceRequestedCh := make(chan []string, 1)
300+
onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
301+
if req.GetTypeUrl() == version.V3ClusterURL {
302+
if len(req.GetResourceNames()) > 0 {
303+
select {
304+
case cdsResourceRequestedCh <- req.GetResourceNames():
305+
case <-ctx.Done():
306+
}
307+
}
308+
}
309+
return nil
310+
}
311+
_, _, _, r, xdsClient := setupWithManagementServer(t, nil, onStreamReq)
312+
313+
// Verify that the specified cluster resource is requested.
329314
wantNames := []string{clusterName}
330315
if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
331316
t.Fatal(err)
@@ -616,7 +601,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) {
616601
for _, test := range tests {
617602
t.Run(test.name, func(t *testing.T) {
618603
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
619-
mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
604+
mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
620605

621606
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
622607
defer cancel()
@@ -640,7 +625,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) {
640625
// balancing configuration pushed to the child is as expected.
641626
func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
642627
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
643-
mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
628+
mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil)
644629

645630
clusterResource := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
646631
ClusterName: clusterName,
@@ -689,15 +674,21 @@ func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
689674
// continue using the previous good update.
690675
func (s) TestClusterUpdate_Failure(t *testing.T) {
691676
_, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
692-
mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
693-
694-
// Verify that the specified cluster resource is requested.
695677
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
696678
defer cancel()
697-
wantNames := []string{clusterName}
698-
if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
699-
t.Fatal(err)
679+
cdsResourceCanceledCh := make(chan struct{}, 1)
680+
onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
681+
if req.GetTypeUrl() == version.V3ClusterURL {
682+
if len(req.GetResourceNames()) == 0 {
683+
select {
684+
case cdsResourceCanceledCh <- struct{}{}:
685+
case <-ctx.Done():
686+
}
687+
}
688+
}
689+
return nil
700690
}
691+
mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq)
701692

702693
// Configure the management server to return a cluster resource that
703694
// contains a config_source_specifier for the `lrs_server` field which is not
@@ -806,12 +797,31 @@ func (s) TestClusterUpdate_Failure(t *testing.T) {
806797
func (s) TestResolverError(t *testing.T) {
807798
_, resolverErrCh, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
808799
lis := testutils.NewListenerWrapper(t, nil)
809-
mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListener(t, lis)
800+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
801+
defer cancel()
802+
cdsResourceCanceledCh := make(chan struct{}, 1)
803+
cdsResourceRequestedCh := make(chan []string, 1)
804+
onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
805+
if req.GetTypeUrl() == version.V3ClusterURL {
806+
switch len(req.GetResourceNames()) {
807+
case 0:
808+
select {
809+
case cdsResourceCanceledCh <- struct{}{}:
810+
case <-ctx.Done():
811+
}
812+
default:
813+
select {
814+
case cdsResourceRequestedCh <- req.GetResourceNames():
815+
case <-ctx.Done():
816+
}
817+
}
818+
}
819+
return nil
820+
}
821+
mgmtServer, nodeID, cc, r, _ := setupWithManagementServer(t, lis, onStreamReq)
810822

811823
// Grab the wrapped connection from the listener wrapper. This will be used
812824
// to verify the connection is closed.
813-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
814-
defer cancel()
815825
val, err := lis.NewConnCh.Receive(ctx)
816826
if err != nil {
817827
t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
@@ -949,15 +959,21 @@ func (s) TestResolverError(t *testing.T) {
949959
// - when the cluster resource is re-sent by the management server, RPCs
950960
// should start succeeding.
951961
func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) {
952-
mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
953-
954-
// Verify that the specified cluster resource is requested.
955962
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
956963
defer cancel()
957-
wantNames := []string{clusterName}
958-
if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
959-
t.Fatal(err)
964+
cdsResourceCanceledCh := make(chan struct{}, 1)
965+
onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
966+
if req.GetTypeUrl() == version.V3ClusterURL {
967+
if len(req.GetResourceNames()) == 0 {
968+
select {
969+
case cdsResourceCanceledCh <- struct{}{}:
970+
case <-ctx.Done():
971+
}
972+
}
973+
}
974+
return nil
960975
}
976+
mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq)
961977

962978
// Start a test service backend.
963979
server := stubserver.StartTestService(t, nil)
@@ -1028,7 +1044,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) {
10281044
func (s) TestClose(t *testing.T) {
10291045
cdsBalancerCh := registerWrappedCDSPolicy(t)
10301046
_, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
1031-
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
1047+
mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
10321048

10331049
// Start a test service backend.
10341050
server := stubserver.StartTestService(t, nil)
@@ -1075,7 +1091,7 @@ func (s) TestClose(t *testing.T) {
10751091
func (s) TestExitIdle(t *testing.T) {
10761092
cdsBalancerCh := registerWrappedCDSPolicy(t)
10771093
_, _, exitIdleCh, _ := registerWrappedClusterResolverPolicy(t)
1078-
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
1094+
mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil)
10791095

10801096
// Start a test service backend.
10811097
server := stubserver.StartTestService(t, nil)

0 commit comments

Comments
 (0)