diff --git a/internal/xds/resolver/helpers_test.go b/internal/xds/resolver/helpers_test.go index 243bbfd99f45..174b03883445 100644 --- a/internal/xds/resolver/helpers_test.go +++ b/internal/xds/resolver/helpers_test.go @@ -191,26 +191,6 @@ func verifyNoUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan } } -// waitForErrorFromResolver waits for the resolver to push an error and verifies -// that it matches the expected error and contains the expected node ID. -func waitForErrorFromResolver(ctx context.Context, errCh chan error, wantErr, wantNodeID string) error { - select { - case <-ctx.Done(): - return fmt.Errorf("timeout when waiting for error to be propagated to the ClientConn") - case gotErr := <-errCh: - if gotErr == nil { - return fmt.Errorf("got nil error from resolver, want %q", wantErr) - } - if !strings.Contains(gotErr.Error(), wantErr) { - return fmt.Errorf("got error from resolver %q, want %q", gotErr, wantErr) - } - if !strings.Contains(gotErr.Error(), wantNodeID) { - return fmt.Errorf("got error from resolver %q, want nodeID %q", gotErr, wantNodeID) - } - } - return nil -} - func verifyResolverError(gotErr error, wantCode codes.Code, wantErr, wantNodeID string) error { if gotErr == nil { return fmt.Errorf("got nil error from resolver, want error with code %v", wantCode) diff --git a/internal/xds/resolver/watch_service.go b/internal/xds/resolver/watch_service.go deleted file mode 100644 index 3fb122b001c0..000000000000 --- a/internal/xds/resolver/watch_service.go +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package resolver - -import ( - "context" - - "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" -) - -type listenerWatcher struct { - resourceName string - cancel func() - parent *xdsResolver -} - -func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatcher { - lw := &listenerWatcher{resourceName: resourceName, parent: parent} - lw.cancel = xdsresource.WatchListener(parent.xdsClient, resourceName, lw) - return lw -} - -func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) { - handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update); onDone() } - l.parent.serializer.ScheduleOr(handleUpdate, onDone) -} - -func (l *listenerWatcher) ResourceError(err error, onDone func()) { - handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() } - l.parent.serializer.ScheduleOr(handleError, onDone) -} - -func (l *listenerWatcher) AmbientError(err error, onDone func()) { - handleError := func(context.Context) { l.parent.onListenerResourceAmbientError(err); onDone() } - l.parent.serializer.ScheduleOr(handleError, onDone) -} - -func (l *listenerWatcher) stop() { - l.cancel() - l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName) -} - -type routeConfigWatcher struct { - resourceName string - cancel func() - parent *xdsResolver -} - -func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfigWatcher { - rw := &routeConfigWatcher{resourceName: resourceName, parent: parent} - rw.cancel = xdsresource.WatchRouteConfig(parent.xdsClient, resourceName, rw) - return rw -} - -func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigUpdate, onDone func()) { - handleUpdate := func(context.Context) { - r.parent.onRouteConfigResourceUpdate(r.resourceName, u) - onDone() - } - r.parent.serializer.ScheduleOr(handleUpdate, onDone) -} - -func (r *routeConfigWatcher) ResourceError(err error, onDone func()) { - handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() } - r.parent.serializer.ScheduleOr(handleError, onDone) -} - -func (r *routeConfigWatcher) AmbientError(err error, onDone func()) { - handleError := func(context.Context) { r.parent.onRouteConfigResourceAmbientError(r.resourceName, err); onDone() } - r.parent.serializer.ScheduleOr(handleError, onDone) -} - -func (r *routeConfigWatcher) stop() { - r.cancel() - r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName) -} diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index c28892a6f28c..c985cb42cf16 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -29,13 +29,13 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" - "google.golang.org/grpc/internal/pretty" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/wrr" "google.golang.org/grpc/internal/xds/bootstrap" rinternal "google.golang.org/grpc/internal/xds/resolver/internal" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" + "google.golang.org/grpc/internal/xds/xdsdepmgr" "google.golang.org/grpc/resolver" ) @@ -111,22 +111,37 @@ type xdsResolverBuilder struct { // The xds bootstrap process is performed (and a new xDS client is built) every // time an xds resolver is built. func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (_ resolver.Resolver, retErr error) { + // Initialize the xDS client. + newXDSClient := rinternal.NewXDSClient.(func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error)) + if b.newXDSClient != nil { + newXDSClient = b.newXDSClient + } + client, xdsClientClose, err := newXDSClient(target.String(), opts.MetricsRecorder) + if err != nil { + return nil, fmt.Errorf("xds: failed to create xds-client: %v", err) + } + + template, err := sanityChecksOnBootstrapConfig(target, opts, client) + if err != nil { + xdsClientClose() + return nil, err + } + ldsResourceName := bootstrap.PopulateResourceTemplate(template, target.Endpoint()) + r := &xdsResolver{ - cc: cc, - activeClusters: make(map[string]*clusterInfo), - channelID: rand.Uint64(), + cc: cc, + xdsClient: client, + xdsClientClose: xdsClientClose, + activeClusters: make(map[string]*clusterInfo), + channelID: rand.Uint64(), + ldsResourceName: ldsResourceName, } - defer func() { - if retErr != nil { - r.Close() - } - }() r.logger = prefixLogger(r) r.logger.Infof("Creating resolver for target: %+v", target) // Initialize the serializer used to synchronize the following: - // - updates from the xDS client. This could lead to generation of new - // service config if resolution is complete. + // - updates from the dependency manager. This could lead to generation of + // new service config if resolution is complete. // - completion of an RPC to a removed cluster causing the associated ref // count to become zero, resulting in generation of new service config. // - stopping of a config selector that results in generation of new service @@ -134,27 +149,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon ctx, cancel := context.WithCancel(context.Background()) r.serializer = grpcsync.NewCallbackSerializer(ctx) r.serializerCancel = cancel - - // Initialize the xDS client. - newXDSClient := rinternal.NewXDSClient.(func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error)) - if b.newXDSClient != nil { - newXDSClient = b.newXDSClient - } - client, closeFn, err := newXDSClient(target.String(), opts.MetricsRecorder) - if err != nil { - return nil, fmt.Errorf("xds: failed to create xds-client: %v", err) - } - r.xdsClient = client - r.xdsClientClose = closeFn - - // Determine the listener resource name and start a watcher for it. - template, err := r.sanityChecksOnBootstrapConfig(target, opts, r.xdsClient) - if err != nil { - return nil, err - } - r.dataplaneAuthority = opts.Authority - r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, target.Endpoint()) - r.listenerWatcher = newListenerWatcher(r.ldsResourceName, r) + r.dm = xdsdepmgr.New(r.ldsResourceName, opts.Authority, r.xdsClient, r) return r, nil } @@ -167,7 +162,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon // // Returns the listener resource name template to use. If any of the above // validations fail, a non-nil error is returned. -func (r *xdsResolver) sanityChecksOnBootstrapConfig(target resolver.Target, _ resolver.BuildOptions, client xdsclient.XDSClient) (string, error) { +func sanityChecksOnBootstrapConfig(target resolver.Target, _ resolver.BuildOptions, client xdsclient.XDSClient) (string, error) { bootstrapConfig := client.BootstrapConfig() if bootstrapConfig == nil { // This is never expected to happen after a successful xDS client @@ -205,50 +200,35 @@ func (*xdsResolverBuilder) Scheme() string { // xdsResolver implements the resolver.Resolver interface. // -// It registers a watcher for ServiceConfig updates with the xdsClient object -// (which performs LDS/RDS queries for the same), and passes the received -// updates to the ClientConn. +// It manages the dependency manager which in turn manages all xDS resource +// watches. It receives the xDS resource config and passes them to ClientConn. type xdsResolver struct { - cc resolver.ClientConn - logger *grpclog.PrefixLogger + // The following fields are initialized at creation time and are read-only + // after that. + cc resolver.ClientConn + logger *grpclog.PrefixLogger + ldsResourceName string + dm *xdsdepmgr.DependencyManager // The underlying xdsClient which performs all xDS requests and responses. xdsClient xdsclient.XDSClient xdsClientClose func() // A random number which uniquely identifies the channel which owns this // resolver. channelID uint64 - // All methods on the xdsResolver type except for the ones invoked by gRPC, // i.e ResolveNow() and Close(), are guaranteed to execute in the context of - // this serializer's callback. And since the serializer guarantees mutual - // exclusion among these callbacks, we can get by without any mutexes to - // access all of the below defined state. The only exception is Close(), - // which does access some of this shared state, but it does so after - // cancelling the context passed to the serializer. + // this serializer's callback. We use the serializer because these shared + // states are accessed by each RPC when it is committed,, and so + // serializeris preffered over a mutex. serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc - // dataplaneAuthority is the authority used for the data plane connections, - // which is also used to select the VirtualHost within the xDS - // RouteConfiguration. This is %-encoded to match with VirtualHost Domain - // in xDS RouteConfiguration. - dataplaneAuthority string - - ldsResourceName string - listenerWatcher *listenerWatcher - listenerUpdateRecvd bool - currentListener *xdsresource.ListenerUpdate - - rdsResourceName string - routeConfigWatcher *routeConfigWatcher - routeConfigUpdateRecvd bool - currentRouteConfig *xdsresource.RouteConfigUpdate - currentVirtualHost *xdsresource.VirtualHost // Matched virtual host for quick access. - + // The following fields are accessed only from within the serializer + // callbacks. + xdsConfig *xdsresource.XDSConfig // activeClusters is a map from cluster name to information about the // cluster that includes a ref count and load balancing configuration. - activeClusters map[string]*clusterInfo - + activeClusters map[string]*clusterInfo curConfigSelector stoppableConfigSelector } @@ -262,22 +242,53 @@ func (r *xdsResolver) Close() { r.serializerCancel() <-r.serializer.Done() - // Note that Close needs to check for nils even if some of them are always - // set in the constructor. This is because the constructor defers Close() in - // error cases, and the fields might not be set when the error happens. - - if r.listenerWatcher != nil { - r.listenerWatcher.stop() - } - if r.routeConfigWatcher != nil { - r.routeConfigWatcher.stop() + if r.dm != nil { + r.dm.Close() } + if r.xdsClientClose != nil { r.xdsClientClose() } r.logger.Infof("Shutdown") } +// Update is called when there is a new xDS config available from the dependency +// manager and does the following: +// - creates a new config selector (this involves incrementing references to +// clusters owned by this config selector). +// - stops the old config selector (this involves decrementing references to +// clusters owned by this config selector). +// - prunes active clusters and pushes a new service config to the channel. +// - updates the current config selector used by the resolver. +func (r *xdsResolver) Update(config *xdsresource.XDSConfig) { + r.serializer.TrySchedule(func(context.Context) { + r.xdsConfig = config + cs, err := r.newConfigSelector() + if err != nil { + r.onResourceError(err) + return + } + if !r.sendNewServiceConfig(cs) { + // Channel didn't like the update we provided (unexpected); erase + // this config selector and ignore this update, continuing with + // the previous config selector. + cs.stop() + return + } + + if r.curConfigSelector != nil { + r.curConfigSelector.stop() + } + r.curConfigSelector = cs + }) +} + +func (r *xdsResolver) Error(err error) { + r.serializer.TrySchedule(func(context.Context) { + r.onResourceError(err) + }) +} + // sendNewServiceConfig prunes active clusters, generates a new service config // based on the current set of active clusters, and sends an update to the // channel with that service config and the provided config selector. Returns @@ -308,7 +319,9 @@ func (r *xdsResolver) sendNewServiceConfig(cs stoppableConfigSelector) bool { } sc := serviceConfigJSON(r.activeClusters) - r.logger.Infof("For Listener resource %q and RouteConfiguration resource %q, generated service config: %v", r.ldsResourceName, r.rdsResourceName, pretty.FormatJSON(sc)) + if r.logger.V(2) { + r.logger.Infof("For Listener resource %q and RouteConfiguration resource %q, generated service config: %+v", r.ldsResourceName, r.xdsConfig.Listener.RouteConfigName, sc) + } // Send the update to the ClientConn. state := iresolver.SetConfigSelector(resolver.State{ @@ -338,25 +351,25 @@ func (r *xdsResolver) newConfigSelector() (*configSelector, error) { }) }, virtualHost: virtualHost{ - retryConfig: r.currentVirtualHost.RetryConfig, + retryConfig: r.xdsConfig.VirtualHost.RetryConfig, }, - routes: make([]route, len(r.currentVirtualHost.Routes)), + routes: make([]route, len(r.xdsConfig.VirtualHost.Routes)), clusters: make(map[string]*clusterInfo), - httpFilterConfig: r.currentListener.HTTPFilters, + httpFilterConfig: r.xdsConfig.Listener.HTTPFilters, } - for i, rt := range r.currentVirtualHost.Routes { + for i, rt := range r.xdsConfig.VirtualHost.Routes { clusters := rinternal.NewWRR.(func() wrr.WRR)() if rt.ClusterSpecifierPlugin != "" { clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin clusters.Add(&routeCluster{name: clusterName}, 1) ci := r.addOrGetActiveClusterInfo(clusterName) - ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.currentRouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])} + ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.xdsConfig.RouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])} cs.clusters[clusterName] = ci } else { for _, wc := range rt.WeightedClusters { clusterName := clusterPrefix + wc.Name - interceptor, err := newInterceptor(r.currentListener.HTTPFilters, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride, r.currentVirtualHost.HTTPFilterConfigOverride) + interceptor, err := newInterceptor(r.xdsConfig.Listener.HTTPFilters, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride, r.xdsConfig.VirtualHost.HTTPFilterConfigOverride) if err != nil { return nil, err } @@ -374,7 +387,7 @@ func (r *xdsResolver) newConfigSelector() (*configSelector, error) { cs.routes[i].m = xdsresource.RouteToMatcher(rt) cs.routes[i].actionType = rt.ActionType if rt.MaxStreamDuration == nil { - cs.routes[i].maxStreamDuration = r.currentListener.MaxStreamDuration + cs.routes[i].maxStreamDuration = r.xdsConfig.Listener.MaxStreamDuration } else { cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration } @@ -422,75 +435,6 @@ type clusterInfo struct { cfg xdsChildConfig } -// Determines if the xdsResolver has received all required configuration, i.e -// Listener and RouteConfiguration resources, from the management server, and -// whether a matching virtual host was found in the RouteConfiguration resource. -func (r *xdsResolver) resolutionComplete() bool { - return r.listenerUpdateRecvd && r.routeConfigUpdateRecvd && r.currentVirtualHost != nil -} - -// onResolutionComplete performs the following actions when resolution is -// complete, i.e Listener and RouteConfiguration resources have been received -// from the management server and a matching virtual host is found in the -// latter. -// - creates a new config selector (this involves incrementing references to -// clusters owned by this config selector). -// - stops the old config selector (this involves decrementing references to -// clusters owned by this config selector). -// - prunes active clusters and pushes a new service config to the channel. -// - updates the current config selector used by the resolver. -// -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onResolutionComplete() { - if !r.resolutionComplete() { - return - } - - cs, err := r.newConfigSelector() - if err != nil { - // Send an erroring config selector in this case that fails RPCs. - r.onResourceError(fmt.Errorf("xds: failed to create config selector: %v", err)) - return - } - if !r.sendNewServiceConfig(cs) { - // Channel didn't like the update we provided (unexpected); erase - // this config selector and ignore this update, continuing with - // the previous config selector. - cs.stop() - return - } - - if r.curConfigSelector != nil { - r.curConfigSelector.stop() - } - r.curConfigSelector = cs -} - -func (r *xdsResolver) applyRouteConfigUpdate(update *xdsresource.RouteConfigUpdate) { - matchVh := xdsresource.FindBestMatchingVirtualHost(r.dataplaneAuthority, update.VirtualHosts) - if matchVh == nil { - // TODO(purnesh42h): Should this be a resource or ambient error? Note - // that its being called only from resource update methods when we have - // finished removing the previous update. - r.onAmbientError(fmt.Errorf("no matching virtual host found for %q", r.dataplaneAuthority)) - return - } - r.currentRouteConfig = update - r.currentVirtualHost = matchVh - r.routeConfigUpdateRecvd = true - - r.onResolutionComplete() -} - -// onAmbientError propagates the error up to the channel. And since this is -// invoked only for non resource errors, we don't have to update resolver -// state and we can keep using the old config. -// -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onAmbientError(err error) { - r.cc.ReportError(err) -} - // Contains common functionality to be executed when resources of either type // are removed. // @@ -513,107 +457,3 @@ func (r *xdsResolver) onResourceError(err error) { } r.curConfigSelector = cs } - -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onListenerResourceUpdate(update *xdsresource.ListenerUpdate) { - if r.logger.V(2) { - r.logger.Infof("Received update for Listener resource %q: %v", r.ldsResourceName, pretty.ToJSON(update)) - } - - r.currentListener = update - r.listenerUpdateRecvd = true - - if update.InlineRouteConfig != nil { - // If there was a previous route config watcher because of a non-inline - // route configuration, cancel it. - r.rdsResourceName = "" - if r.routeConfigWatcher != nil { - r.routeConfigWatcher.stop() - r.routeConfigWatcher = nil - } - - r.applyRouteConfigUpdate(update.InlineRouteConfig) - return - } - - // We get here only if there was no inline route configuration. - - // If the route config name has not changed, send an update with existing - // route configuration and the newly received listener configuration. - if r.rdsResourceName == update.RouteConfigName { - r.onResolutionComplete() - return - } - - // If the route config name has changed, cancel the old watcher and start a - // new one. At this point, since we have not yet resolved the new route - // config name, we don't send an update to the channel, and therefore - // continue using the old route configuration (if received) until the new - // one is received. - r.rdsResourceName = update.RouteConfigName - if r.routeConfigWatcher != nil { - r.routeConfigWatcher.stop() - r.currentVirtualHost = nil - r.routeConfigUpdateRecvd = false - } - r.routeConfigWatcher = newRouteConfigWatcher(r.rdsResourceName, r) -} - -func (r *xdsResolver) onListenerResourceAmbientError(err error) { - if r.logger.V(2) { - r.logger.Infof("Received ambient error for Listener resource %q: %v", r.ldsResourceName, err) - } - r.onAmbientError(err) -} - -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onListenerResourceError(err error) { - if r.logger.V(2) { - r.logger.Infof("Received resource error for Listener resource %q: %v", r.ldsResourceName, err) - } - - r.listenerUpdateRecvd = false - if r.routeConfigWatcher != nil { - r.routeConfigWatcher.stop() - } - r.rdsResourceName = "" - r.currentVirtualHost = nil - r.routeConfigUpdateRecvd = false - r.routeConfigWatcher = nil - - r.onResourceError(err) -} - -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onRouteConfigResourceUpdate(name string, update *xdsresource.RouteConfigUpdate) { - if r.logger.V(2) { - r.logger.Infof("Received update for RouteConfiguration resource %q: %v", name, pretty.ToJSON(update)) - } - - if r.rdsResourceName != name { - // Drop updates from canceled watchers. - return - } - - r.applyRouteConfigUpdate(update) -} - -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onRouteConfigResourceAmbientError(name string, err error) { - if r.logger.V(2) { - r.logger.Infof("Received ambient error for RouteConfiguration resource %q: %v", name, err) - } - r.onAmbientError(err) -} - -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onRouteConfigResourceError(name string, err error) { - if r.logger.V(2) { - r.logger.Infof("Received resource error for RouteConfiguration resource %q: %v", name, err) - } - - if r.rdsResourceName != name { - return - } - r.onResourceError(err) -} diff --git a/internal/xds/resolver/xds_resolver_test.go b/internal/xds/resolver/xds_resolver_test.go index eaf2252fd65b..bc0b80ff0650 100644 --- a/internal/xds/resolver/xds_resolver_test.go +++ b/internal/xds/resolver/xds_resolver_test.go @@ -374,7 +374,7 @@ func (s) TestResolverBadServiceUpdate_NACKedWithCache(t *testing.T) { nodeID := uuid.New().String() mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID) - stateCh, errCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc) + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc) // Configure good listener and route configuration resources on the // management server. @@ -411,9 +411,6 @@ func (s) TestResolverBadServiceUpdate_NACKedWithCache(t *testing.T) { // Expect an error update from the resolver. Since the resource is cached, // it should be received as an ambient error. configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil) - if err := waitForErrorFromResolver(ctx, errCh, "no RouteSpecifier", nodeID); err != nil { - t.Fatal(err) - } // "Make an RPC" by invoking the config selector which should succeed by // continuing to use the previously cached resource.