Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 61 additions & 115 deletions internal/xds/xdsclient/tests/ads_stream_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,23 @@ package xdsclient_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version"
"google.golang.org/protobuf/testing/protocmp"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)

// waitForResourceNames waits for the wantNames to be received on namesCh.
// Returns a non-nil error if the context expires before that.
func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) error {
t.Helper()

var lastRequestedNames []string
for ; ; <-time.After(defaultTestShortTimeout) {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for resources %v to be requested from the management server. Last requested resources: %v", wantNames, lastRequestedNames)
case gotNames := <-namesCh:
if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) {
return nil
}
lastRequestedNames = gotNames
}
}
}

// Tests that an ADS stream is restarted after a connection failure. Also
// verifies that if there were any watches registered before the connection
// failed, those resources are re-requested after the stream is restarted.
Expand All @@ -74,47 +52,26 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
defer cancel()

// Start an xDS management server that uses a couple of channels to inform
// the test about the specific LDS and CDS resource names being requested.
ldsResourcesCh := make(chan []string, 1)
cdsResourcesCh := make(chan []string, 1)
streamOpened := make(chan struct{}, 1)
streamClosed := make(chan struct{}, 1)
// the test about the request and response messages being exchanged.
streamRequestCh := testutils.NewChannel()
streamResponseCh := testutils.NewChannel()
streamOpened := testutils.NewChannel()
streamClosed := testutils.NewChannel()
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
t.Logf("Received request for resources: %v of type %s", req.GetResourceNames(), req.GetTypeUrl())

// Drain the resource name channels before writing to them to ensure
// that the most recently requested names are made available to the
// test.
switch req.GetTypeUrl() {
case version.V3ClusterURL:
select {
case <-cdsResourcesCh:
default:
}
cdsResourcesCh <- req.GetResourceNames()
case version.V3ListenerURL:
select {
case <-ldsResourcesCh:
default:
}
ldsResourcesCh <- req.GetResourceNames()
}
streamRequestCh.SendContext(ctx, req)
return nil
},
OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) {
streamResponseCh.SendContext(ctx, resp)
},
OnStreamClosed: func(int64, *v3corepb.Node) {
select {
case streamClosed <- struct{}{}:
default:
}

streamClosed.SendContext(ctx, nil)
},
OnStreamOpen: func(context.Context, int64, string) error {
select {
case streamOpened <- struct{}{}:
default:
}
streamOpened.SendContext(ctx, nil)
return nil
},
})
Expand Down Expand Up @@ -156,93 +113,82 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {

// Verify that an ADS stream is opened and an LDS request with the above
// resource name is sent.
select {
case <-streamOpened:
case <-ctx.Done():
if _, err = streamOpened.Receive(ctx); err != nil {
t.Fatal("Timeout when waiting for ADS stream to open")
}
if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil {
t.Fatal(err)
}

// Verify the update received by the watcher.
wantListenerUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: routeConfigName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
// Verify that the initial discovery request matches expectation.
r, err := streamRequestCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for the initial discovery request")
}
gotReq := r.(*v3discoverypb.DiscoveryRequest)
wantReq := &v3discoverypb.DiscoveryRequest{
VersionInfo: "",
Node: &v3corepb.Node{
Id: nodeID,
UserAgentName: "gRPC Go",
UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version},
ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"},
},
ResourceNames: []string{listenerName},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
ResponseNonce: "",
}
if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil {
t.Fatal(err)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}

// Create a cluster resource on the management server, in addition to the
// existing listener resource.
const clusterName = "cluster"
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeConfigName)},
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, clusterName, e2e.SecurityLevelNone)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
// Capture the version and nonce from the response.
r, err = streamResponseCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for a discovery response from the server")
}
gotResp := r.(*v3discoverypb.DiscoveryResponse)

// Register a watch for a cluster resource, and verify that a CDS request
// with the above resource name is sent.
cw := newClusterWatcher()
cdsCancel := xdsresource.WatchCluster(client, clusterName, cw)
if err := waitForResourceNames(ctx, t, cdsResourcesCh, []string{clusterName}); err != nil {
t.Fatal(err)
// Verify that the ACK contains the appropriate version and nonce.
r, err = streamRequestCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for ACK")
}
gotReq = r.(*v3discoverypb.DiscoveryRequest)
wantReq.VersionInfo = gotResp.GetVersionInfo()
wantReq.ResponseNonce = gotResp.GetNonce()
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}

// Verify the update received by the watcher.
wantClusterUpdate := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: clusterName,
wantListenerUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: routeConfigName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyClusterUpdate(ctx, cw.updateCh, wantClusterUpdate); err != nil {
t.Fatal(err)
}

// Cancel the watch for the above cluster resource, and verify that a CDS
// request with no resource names is sent.
cdsCancel()
if err := waitForResourceNames(ctx, t, cdsResourcesCh, []string{}); err != nil {
if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil {
t.Fatal(err)
}

// Stop the restartable listener and wait for the stream to close.
lis.Stop()
select {
case <-streamClosed:
case <-ctx.Done():
if _, err = streamClosed.Receive(ctx); err != nil {
t.Fatal("Timeout when waiting for ADS stream to close")
}

// Restart the restartable listener and wait for the stream to open.
lis.Restart()
select {
case <-streamOpened:
case <-ctx.Done():
if _, err = streamOpened.Receive(ctx); err != nil {
t.Fatal("Timeout when waiting for ADS stream to open")
}

// Verify that the listener resource is requested again.
if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil {
t.Fatal(err)
r, err = streamRequestCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for the initial discovery request")
}

// Wait for a short duration and verify that no CDS request is sent, since
// there are no resources being watched.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case names := <-cdsResourcesCh:
t.Fatalf("CDS request sent for resource names %v, when expecting no request", names)
gotReq = r.(*v3discoverypb.DiscoveryRequest)
wantReq.ResponseNonce = ""
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}
}
Loading