diff --git a/internal/xds/xdsclient/tests/ads_stream_restart_test.go b/internal/xds/xdsclient/tests/ads_stream_restart_test.go index 58fad8028f80..81f3a3c4535b 100644 --- a/internal/xds/xdsclient/tests/ads_stream_restart_test.go +++ b/internal/xds/xdsclient/tests/ads_stream_restart_test.go @@ -20,45 +20,23 @@ package xdsclient_test import ( "context" - "fmt" "testing" - "time" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" + "google.golang.org/grpc" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" - "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version" + "google.golang.org/protobuf/testing/protocmp" - v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) -// waitForResourceNames waits for the wantNames to be received on namesCh. -// Returns a non-nil error if the context expires before that. -func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) error { - t.Helper() - - var lastRequestedNames []string - for ; ; <-time.After(defaultTestShortTimeout) { - select { - case <-ctx.Done(): - return fmt.Errorf("timeout waiting for resources %v to be requested from the management server. Last requested resources: %v", wantNames, lastRequestedNames) - case gotNames := <-namesCh: - if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) { - return nil - } - lastRequestedNames = gotNames - } - } -} - // Tests that an ADS stream is restarted after a connection failure. Also // verifies that if there were any watches registered before the connection // failed, those resources are re-requested after the stream is restarted. @@ -74,47 +52,26 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { defer cancel() // Start an xDS management server that uses a couple of channels to inform - // the test about the specific LDS and CDS resource names being requested. - ldsResourcesCh := make(chan []string, 1) - cdsResourcesCh := make(chan []string, 1) - streamOpened := make(chan struct{}, 1) - streamClosed := make(chan struct{}, 1) + // the test about the request and response messages being exchanged. + streamRequestCh := testutils.NewChannel() + streamResponseCh := testutils.NewChannel() + streamOpened := testutils.NewChannel() + streamClosed := testutils.NewChannel() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ Listener: lis, OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { t.Logf("Received request for resources: %v of type %s", req.GetResourceNames(), req.GetTypeUrl()) - - // Drain the resource name channels before writing to them to ensure - // that the most recently requested names are made available to the - // test. - switch req.GetTypeUrl() { - case version.V3ClusterURL: - select { - case <-cdsResourcesCh: - default: - } - cdsResourcesCh <- req.GetResourceNames() - case version.V3ListenerURL: - select { - case <-ldsResourcesCh: - default: - } - ldsResourcesCh <- req.GetResourceNames() - } + streamRequestCh.SendContext(ctx, req) return nil }, + OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { + streamResponseCh.SendContext(ctx, resp) + }, OnStreamClosed: func(int64, *v3corepb.Node) { - select { - case streamClosed <- struct{}{}: - default: - } - + streamClosed.SendContext(ctx, nil) }, OnStreamOpen: func(context.Context, int64, string) error { - select { - case streamOpened <- struct{}{}: - default: - } + streamOpened.SendContext(ctx, nil) return nil }, }) @@ -156,93 +113,82 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { // Verify that an ADS stream is opened and an LDS request with the above // resource name is sent. - select { - case <-streamOpened: - case <-ctx.Done(): + if _, err = streamOpened.Receive(ctx); err != nil { t.Fatal("Timeout when waiting for ADS stream to open") } - if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { - t.Fatal(err) - } - // Verify the update received by the watcher. - wantListenerUpdate := listenerUpdateErrTuple{ - update: xdsresource.ListenerUpdate{ - RouteConfigName: routeConfigName, - HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + // Verify that the initial discovery request matches expectation. + r, err := streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the initial discovery request") + } + gotReq := r.(*v3discoverypb.DiscoveryRequest) + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", } - if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil { - t.Fatal(err) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) } - // Create a cluster resource on the management server, in addition to the - // existing listener resource. - const clusterName = "cluster" - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeConfigName)}, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, clusterName, e2e.SecurityLevelNone)}, - SkipValidation: true, - } - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) + // Capture the version and nonce from the response. + r, err = streamResponseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for a discovery response from the server") } + gotResp := r.(*v3discoverypb.DiscoveryResponse) - // Register a watch for a cluster resource, and verify that a CDS request - // with the above resource name is sent. - cw := newClusterWatcher() - cdsCancel := xdsresource.WatchCluster(client, clusterName, cw) - if err := waitForResourceNames(ctx, t, cdsResourcesCh, []string{clusterName}); err != nil { - t.Fatal(err) + // Verify that the ACK contains the appropriate version and nonce. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for ACK") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + wantReq.VersionInfo = gotResp.GetVersionInfo() + wantReq.ResponseNonce = gotResp.GetNonce() + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) } // Verify the update received by the watcher. - wantClusterUpdate := clusterUpdateErrTuple{ - update: xdsresource.ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: clusterName, + wantListenerUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ + RouteConfigName: routeConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, } - if err := verifyClusterUpdate(ctx, cw.updateCh, wantClusterUpdate); err != nil { - t.Fatal(err) - } - - // Cancel the watch for the above cluster resource, and verify that a CDS - // request with no resource names is sent. - cdsCancel() - if err := waitForResourceNames(ctx, t, cdsResourcesCh, []string{}); err != nil { + if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil { t.Fatal(err) } // Stop the restartable listener and wait for the stream to close. lis.Stop() - select { - case <-streamClosed: - case <-ctx.Done(): + if _, err = streamClosed.Receive(ctx); err != nil { t.Fatal("Timeout when waiting for ADS stream to close") } // Restart the restartable listener and wait for the stream to open. lis.Restart() - select { - case <-streamOpened: - case <-ctx.Done(): + if _, err = streamOpened.Receive(ctx); err != nil { t.Fatal("Timeout when waiting for ADS stream to open") } // Verify that the listener resource is requested again. - if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { - t.Fatal(err) + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the initial discovery request") } - - // Wait for a short duration and verify that no CDS request is sent, since - // there are no resources being watched. - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - select { - case <-sCtx.Done(): - case names := <-cdsResourcesCh: - t.Fatalf("CDS request sent for resource names %v, when expecting no request", names) + gotReq = r.(*v3discoverypb.DiscoveryRequest) + wantReq.ResponseNonce = "" + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) } }