diff --git a/internal/xds/balancer/cdsbalancer/cluster_watcher.go b/internal/xds/balancer/cdsbalancer/cluster_watcher.go index dd702b125b32..917748c035a1 100644 --- a/internal/xds/balancer/cdsbalancer/cluster_watcher.go +++ b/internal/xds/balancer/cdsbalancer/cluster_watcher.go @@ -19,6 +19,7 @@ package cdsbalancer import ( "context" + "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" ) @@ -32,8 +33,9 @@ type clusterWatcher struct { parent *cdsBalancer } -func (cw *clusterWatcher) ResourceChanged(u *xdsresource.ClusterResourceData, onDone func()) { - handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() } +func (cw *clusterWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) { + clusterData := rd.(*xdsresource.ClusterResourceData) + handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, clusterData.Resource); onDone() } cw.parent.serializer.ScheduleOr(handleUpdate, onDone) } diff --git a/internal/xds/balancer/clusterimpl/clusterimpl.go b/internal/xds/balancer/clusterimpl/clusterimpl.go index ffe0a3db55a8..06cf091edce1 100644 --- a/internal/xds/balancer/clusterimpl/clusterimpl.go +++ b/internal/xds/balancer/clusterimpl/clusterimpl.go @@ -42,7 +42,6 @@ import ( "google.golang.org/grpc/internal/xds/balancer/loadstore" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/clients" - "google.golang.org/grpc/internal/xds/clients/lrsclient" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" @@ -236,7 +235,7 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error { } } if startNewLoadReport { - var loadStore *lrsclient.LoadStore + var loadStore xdsclient.LoadStore if b.xdsClient != nil { loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(b.lrsServer) } diff --git a/internal/xds/balancer/clusterresolver/resource_resolver_eds.go b/internal/xds/balancer/clusterresolver/resource_resolver_eds.go index 18b517f111d9..836b09e5ae39 100644 --- a/internal/xds/balancer/clusterresolver/resource_resolver_eds.go +++ b/internal/xds/balancer/clusterresolver/resource_resolver_eds.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" + clientimpl "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" ) @@ -76,12 +77,13 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR } // ResourceChanged is invoked to report an update for the resource being watched. -func (er *edsDiscoveryMechanism) ResourceChanged(update *xdsresource.EndpointsResourceData, onDone func()) { +func (er *edsDiscoveryMechanism) ResourceChanged(rd clientimpl.ResourceData, onDone func()) { if er.stopped.HasFired() { onDone() return } + update := rd.(*xdsresource.EndpointsResourceData) er.mu.Lock() er.update = &update.Resource er.mu.Unlock() diff --git a/internal/xds/balancer/loadstore/load_store_wrapper.go b/internal/xds/balancer/loadstore/load_store_wrapper.go index 89d5ad8751a1..38395605240b 100644 --- a/internal/xds/balancer/loadstore/load_store_wrapper.go +++ b/internal/xds/balancer/loadstore/load_store_wrapper.go @@ -23,7 +23,7 @@ import ( "sync" "google.golang.org/grpc/internal/xds/clients" - "google.golang.org/grpc/internal/xds/clients/lrsclient" + "google.golang.org/grpc/internal/xds/xdsclient" ) // NewWrapper creates a Wrapper. @@ -54,8 +54,8 @@ type Wrapper struct { // store and perCluster are initialized as nil. They are only set by the // balancer when LRS is enabled. Before that, all functions to record loads // are no-op. - store *lrsclient.LoadStore - perCluster *lrsclient.PerClusterReporter + store xdsclient.LoadStore + perCluster xdsclient.PerClusterReporter } // UpdateClusterAndService updates the cluster name and eds service for this @@ -77,7 +77,7 @@ func (lsw *Wrapper) UpdateClusterAndService(cluster, edsService string) { // UpdateLoadStore updates the load store for this wrapper. If it is changed // from before, the perCluster store in this wrapper will also be updated. -func (lsw *Wrapper) UpdateLoadStore(store *lrsclient.LoadStore) { +func (lsw *Wrapper) UpdateLoadStore(store xdsclient.LoadStore) { lsw.mu.Lock() defer lsw.mu.Unlock() if store == lsw.store { diff --git a/internal/xds/resolver/watch_service.go b/internal/xds/resolver/watch_service.go index 44b885c44055..23f1409eeb7e 100644 --- a/internal/xds/resolver/watch_service.go +++ b/internal/xds/resolver/watch_service.go @@ -21,6 +21,7 @@ package resolver import ( "context" + "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" ) @@ -36,8 +37,9 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch return lw } -func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) { - handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() } +func (l *listenerWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) { + listenerData := rd.(*xdsresource.ListenerResourceData) + handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(listenerData.Resource); onDone() } l.parent.serializer.ScheduleOr(handleUpdate, onDone) } @@ -68,9 +70,10 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi return rw } -func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigResourceData, onDone func()) { +func (r *routeConfigWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) { + rcData := rd.(*xdsresource.RouteConfigResourceData) handleUpdate := func(context.Context) { - r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource) + r.parent.onRouteConfigResourceUpdate(r.resourceName, rcData.Resource) onDone() } r.parent.serializer.ScheduleOr(handleUpdate, onDone) diff --git a/internal/xds/server/listener_wrapper.go b/internal/xds/server/listener_wrapper.go index 1f7da61175e6..cea7d746facb 100644 --- a/internal/xds/server/listener_wrapper.go +++ b/internal/xds/server/listener_wrapper.go @@ -33,6 +33,7 @@ import ( internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" ) @@ -58,7 +59,7 @@ type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err // XDSClient wraps the methods on the XDSClient which are required by // the listenerWrapper. type XDSClient interface { - WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) + WatchResource(rType xdsclient.ResourceType, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) BootstrapConfig() *bootstrap.Config } @@ -386,17 +387,18 @@ type ldsWatcher struct { name string } -func (lw *ldsWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) { +func (lw *ldsWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) { defer onDone() if lw.parent.closed.HasFired() { - lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update) + lw.logger.Warningf("Resource %q received update after listener was closed", lw.name) return } + listenerData := rd.(*xdsresource.ListenerResourceData) if lw.logger.V(2) { - lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource) + lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, listenerData.Resource) } l := lw.parent - ilc := update.Resource.InboundListenerCfg + ilc := listenerData.Resource.InboundListenerCfg // Make sure that the socket address on the received Listener resource // matches the address of the net.Listener passed to us by the user. This // check is done here instead of at the XDSClient layer because of the diff --git a/internal/xds/server/rds_handler.go b/internal/xds/server/rds_handler.go index bf78c37c8292..19edf38093fb 100644 --- a/internal/xds/server/rds_handler.go +++ b/internal/xds/server/rds_handler.go @@ -22,6 +22,7 @@ import ( "sync" igrpclog "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" ) @@ -135,7 +136,7 @@ type rdsWatcher struct { canceled bool // eats callbacks if true } -func (rw *rdsWatcher) ResourceChanged(update *xdsresource.RouteConfigResourceData, onDone func()) { +func (rw *rdsWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) { defer onDone() rw.mu.Lock() if rw.canceled { @@ -143,12 +144,13 @@ func (rw *rdsWatcher) ResourceChanged(update *xdsresource.RouteConfigResourceDat return } rw.mu.Unlock() + rcData := rd.(*xdsresource.RouteConfigResourceData) if rw.logger.V(2) { - rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource) + rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, rcData.Resource) } routeName := rw.routeName - rwu := rdsWatcherUpdate{data: &update.Resource} + rwu := rdsWatcherUpdate{data: &rcData.Resource} rw.parent.updates[routeName] = rwu rw.parent.callback(routeName, rwu) } diff --git a/internal/xds/testutils/fakeclient/client.go b/internal/xds/testutils/fakeclient/client.go index fa465139ee94..21021c79b5df 100644 --- a/internal/xds/testutils/fakeclient/client.go +++ b/internal/xds/testutils/fakeclient/client.go @@ -40,7 +40,7 @@ type Client struct { name string loadReportCh *testutils.Channel lrsCancelCh *testutils.Channel - loadStore *lrsclient.LoadStore + loadStore xdsclient.LoadStore bootstrapCfg *bootstrap.Config } @@ -80,7 +80,7 @@ func (*stream) Recv() ([]byte, error) { } // ReportLoad starts reporting load about clusterName to server. -func (xdsC *Client) ReportLoad(server *bootstrap.ServerConfig) (loadStore *lrsclient.LoadStore, cancel func(context.Context)) { +func (xdsC *Client) ReportLoad(server *bootstrap.ServerConfig) (loadStore xdsclient.LoadStore, cancel func(context.Context)) { lrsClient, _ := lrsclient.New(lrsclient.Config{Node: clients.Node{ID: "fake-node-id"}, TransportBuilder: &transportBuilder{}}) xdsC.loadStore, _ = lrsClient.ReportLoad(clients.ServerIdentifier{ServerURI: server.ServerURI()}) @@ -100,7 +100,7 @@ func (xdsC *Client) WaitForCancelReportLoad(ctx context.Context) error { } // LoadStore returns the underlying load data store. -func (xdsC *Client) LoadStore() *lrsclient.LoadStore { +func (xdsC *Client) LoadStore() xdsclient.LoadStore { return xdsC.loadStore } diff --git a/internal/xds/xdsclient/client.go b/internal/xds/xdsclient/client.go index 514273164402..b7a44f0de2f7 100644 --- a/internal/xds/xdsclient/client.go +++ b/internal/xds/xdsclient/client.go @@ -25,8 +25,9 @@ import ( v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/internal/xds/clients" "google.golang.org/grpc/internal/xds/clients/lrsclient" - "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" + "google.golang.org/grpc/internal/xds/clients/xdsclient" ) // XDSClient is a full fledged gRPC client which queries a set of discovery APIs @@ -47,13 +48,39 @@ type XDSClient interface { // During a race (e.g. an xDS response is received while the user is calling // cancel()), there's a small window where the callback can be called after // the watcher is canceled. Callers need to handle this case. - WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) + WatchResource(rType xdsclient.ResourceType, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) - ReportLoad(*bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context)) + ReportLoad(*bootstrap.ServerConfig) (LoadStore, func(context.Context)) BootstrapConfig() *bootstrap.Config } +// PerClusterReporter is the minimal interface callers use to record per-cluster +// load and drops. It mirrors the PerClusterReporter methods from the concrete +// lrsclient.PerClusterReporter but is intentionally tiny. +type PerClusterReporter interface { + // CallStarted records that a call has started for the given locality. + CallStarted(locality clients.Locality) + + // CallFinished records that a call has finished for the given locality. + // Pass the error (nil if success) so implementations can update success/failure counters. + CallFinished(locality clients.Locality, err error) + + // CallServerLoad reports a numeric load metric for the given locality and cluster. + // (If your code uses a different name/parameters for this, match the concrete type.) + CallServerLoad(locality clients.Locality, clusterName string, value float64) + + // CallDropped records a dropped request of the given category. + CallDropped(category string) +} + +// LoadStore is the minimal interface returned by ReportLoad. It provides a +// way to get per-cluster reporters and to stop the store. +type LoadStore interface { + ReporterForCluster(clusterName, serviceName string) *lrsclient.PerClusterReporter + Stop(ctx context.Context) +} + // DumpResources returns the status and contents of all xDS resources. It uses // xDS clients from the default pool. func DumpResources() *v3statuspb.ClientStatusResponse { diff --git a/internal/xds/xdsclient/clientimpl.go b/internal/xds/xdsclient/clientimpl.go index b1f797993fd7..bb67bc92f1c1 100644 --- a/internal/xds/xdsclient/clientimpl.go +++ b/internal/xds/xdsclient/clientimpl.go @@ -81,7 +81,7 @@ var ( // interface with ref counting so that it can be shared by the xds resolver and // balancer implementations, across multiple ClientConns and Servers. type clientImpl struct { - *xdsclient.XDSClient // TODO: #8313 - get rid of embedding, if possible. + xdsClient *xdsclient.XDSClient // The following fields are initialized at creation time and are read-only // after that. @@ -137,7 +137,7 @@ func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecor return nil, err } c := &clientImpl{ - XDSClient: client, + xdsClient: client, xdsClientConfig: gConfig, bootstrapConfig: config, target: target, diff --git a/internal/xds/xdsclient/clientimpl_loadreport.go b/internal/xds/xdsclient/clientimpl_loadreport.go index ffd0c90b8f54..1e95ddba22e5 100644 --- a/internal/xds/xdsclient/clientimpl_loadreport.go +++ b/internal/xds/xdsclient/clientimpl_loadreport.go @@ -24,14 +24,13 @@ import ( "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/clients" "google.golang.org/grpc/internal/xds/clients/grpctransport" - "google.golang.org/grpc/internal/xds/clients/lrsclient" ) // ReportLoad starts a load reporting stream to the given server. All load // reports to the same server share the LRS stream. // -// It returns a lrsclient.LoadStore for the user to report loads. -func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context)) { +// It returns a LoadStore for the user to report loads. +func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (LoadStore, func(context.Context)) { load, err := c.lrsClient.ReportLoad(clients.ServerIdentifier{ ServerURI: server.ServerURI(), Extensions: grpctransport.ServerIdentifierExtension{ diff --git a/internal/xds/xdsclient/clientimpl_test.go b/internal/xds/xdsclient/clientimpl_test.go index d2fc8f7f9332..acc2b27dbe59 100644 --- a/internal/xds/xdsclient/clientimpl_test.go +++ b/internal/xds/xdsclient/clientimpl_test.go @@ -81,10 +81,10 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, Locality: clients.Locality{Region: node.Locality.Region, Zone: node.Locality.Zone, SubZone: node.Locality.SubZone}, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()}, Authorities: map[string]xdsclient.Authority{}, ResourceTypes: map[string]xdsclient.ResourceType{ - version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)}, - version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, - version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)}, - version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, + version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c, gServerCfgMap)}, + version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewRouteConfigResourceTypeDecoder()}, + version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewClusterResourceTypeDecoder(c, gServerCfgMap)}, + version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewEndpointsResourceTypeDecoder()}, }, MetricsReporter: &metricsReporter{recorder: stats.NewTestMetricsRecorder(), target: testTargetName}, TransportBuilder: grpctransport.NewBuilder(map[string]grpctransport.Config{ @@ -119,10 +119,10 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()}, Authorities: map[string]xdsclient.Authority{"auth1": {XDSServers: []xdsclient.ServerConfig{expTopLevelS}}, "auth2": {XDSServers: []xdsclient.ServerConfig{expAuth2S}}}, ResourceTypes: map[string]xdsclient.ResourceType{ - version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)}, - version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, - version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gSCfgMap)}, - version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, + version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c, gSCfgMap)}, + version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewRouteConfigResourceTypeDecoder()}, + version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewClusterResourceTypeDecoder(c, gSCfgMap)}, + version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewEndpointsResourceTypeDecoder()}, }, MetricsReporter: &metricsReporter{recorder: stats.NewTestMetricsRecorder(), target: testTargetName}, TransportBuilder: grpctransport.NewBuilder(map[string]grpctransport.Config{ @@ -151,10 +151,10 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()}, Authorities: map[string]xdsclient.Authority{}, ResourceTypes: map[string]xdsclient.ResourceType{ - version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)}, - version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, - version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)}, - version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, + version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c, gServerCfgMap)}, + version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewRouteConfigResourceTypeDecoder()}, + version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewClusterResourceTypeDecoder(c, gServerCfgMap)}, + version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewEndpointsResourceTypeDecoder()}, }, MetricsReporter: &metricsReporter{recorder: stats.NewTestMetricsRecorder(), target: testTargetName}, TransportBuilder: grpctransport.NewBuilder(map[string]grpctransport.Config{ @@ -183,10 +183,10 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()}, Authorities: map[string]xdsclient.Authority{}, ResourceTypes: map[string]xdsclient.ResourceType{ - version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)}, - version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, - version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)}, - version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, + version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c, gServerCfgMap)}, + version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewRouteConfigResourceTypeDecoder()}, + version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewClusterResourceTypeDecoder(c, gServerCfgMap)}, + version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewEndpointsResourceTypeDecoder()}, }, MetricsReporter: &metricsReporter{recorder: stats.NewTestMetricsRecorder(), target: testTargetName}, TransportBuilder: grpctransport.NewBuilder(map[string]grpctransport.Config{ diff --git a/internal/xds/xdsclient/clientimpl_watchers.go b/internal/xds/xdsclient/clientimpl_watchers.go index 398de1ed73b6..37a8d545b842 100644 --- a/internal/xds/xdsclient/clientimpl_watchers.go +++ b/internal/xds/xdsclient/clientimpl_watchers.go @@ -17,15 +17,13 @@ package xdsclient -import ( - "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" -) +import "google.golang.org/grpc/internal/xds/clients/xdsclient" // WatchResource uses xDS to discover the resource associated with the provided // resource name. The resource type implementation determines how xDS responses // are are deserialized and validated, as received from the xDS management // server. Upon receipt of a response from the management server, an // appropriate callback on the watcher is invoked. -func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) { - return c.XDSClient.WatchResource(rType.TypeURL(), resourceName, xdsresource.GenericResourceWatcher(watcher)) +func (c *clientImpl) WatchResource(rType xdsclient.ResourceType, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) { + return c.xdsClient.WatchResource(rType.TypeURL, resourceName, watcher) } diff --git a/internal/xds/xdsclient/metrics_test.go b/internal/xds/xdsclient/metrics_test.go index 26a38aea381e..37ba1c707bc0 100644 --- a/internal/xds/xdsclient/metrics_test.go +++ b/internal/xds/xdsclient/metrics_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/internal/testutils/stats" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" @@ -40,7 +41,7 @@ import ( type noopListenerWatcher struct{} -func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) { +func (noopListenerWatcher) ResourceChanged(_ xdsclient.ResourceData, onDone func()) { onDone() } diff --git a/internal/xds/xdsclient/pool.go b/internal/xds/xdsclient/pool.go index eb0197e09a7f..f3376951115c 100644 --- a/internal/xds/xdsclient/pool.go +++ b/internal/xds/xdsclient/pool.go @@ -172,7 +172,7 @@ func (p *Pool) DumpResources() *v3statuspb.ClientStatusResponse { resp := &v3statuspb.ClientStatusResponse{} for key, client := range p.clients { - b, err := client.DumpResources() + b, err := client.xdsClient.DumpResources() if err != nil { return nil } @@ -243,7 +243,7 @@ func (p *Pool) clientRefCountedClose(name string) { // This attempts to close the transport to the management server and could // theoretically call back into the xdsclient package again and deadlock. // Hence, this needs to be called without holding the lock. - client.Close() + client.xdsClient.Close() xdsClientImplCloseHook(name) } diff --git a/internal/xds/xdsclient/resource_types.go b/internal/xds/xdsclient/resource_types.go index 88451ab8257e..d007666da182 100644 --- a/internal/xds/xdsclient/resource_types.go +++ b/internal/xds/xdsclient/resource_types.go @@ -30,25 +30,25 @@ func supportedResourceTypes(config *bootstrap.Config, gServerCfgMap map[xdsclien TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, - Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(config), + Decoder: xdsresource.NewListenerResourceTypeDecoder(config, gServerCfgMap), }, version.V3RouteConfigURL: { TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, - Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder(), + Decoder: xdsresource.NewRouteConfigResourceTypeDecoder(), }, version.V3ClusterURL: { TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, - Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(config, gServerCfgMap), + Decoder: xdsresource.NewClusterResourceTypeDecoder(config, gServerCfgMap), }, version.V3EndpointsURL: { TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, - Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder(), + Decoder: xdsresource.NewEndpointsResourceTypeDecoder(), }, } } diff --git a/internal/xds/xdsclient/tests/authority_test.go b/internal/xds/xdsclient/tests/authority_test.go index fbea7cb8ec3c..3275393114a2 100644 --- a/internal/xds/xdsclient/tests/authority_test.go +++ b/internal/xds/xdsclient/tests/authority_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/bootstrap" + clientimpl "google.golang.org/grpc/internal/xds/clients/xdsclient" xdstestutils "google.golang.org/grpc/internal/xds/testutils" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" @@ -316,7 +317,7 @@ func (s) TestAuthority_Fallback(t *testing.T) { if err != nil { t.Fatalf("Error when waiting for a resource update callback: %v", err) } - gotUpdate := v.(xdsresource.ClusterUpdate) + gotUpdate := v.(clusterUpdateErrTuple).update wantUpdate := xdsresource.ClusterUpdate{ ClusterName: clusterName, EDSServiceName: edsSecondaryName, @@ -351,8 +352,9 @@ func newClusterWatcherV2() *clusterWatcherV2 { } } -func (cw *clusterWatcherV2) ResourceChanged(update *xdsresource.ClusterResourceData, onDone func()) { - cw.updateCh.Send(update.Resource) +func (cw *clusterWatcherV2) ResourceChanged(rd clientimpl.ResourceData, onDone func()) { + clusterData := rd.(*xdsresource.ClusterResourceData) + cw.updateCh.Send(clusterUpdateErrTuple{update: clusterData.Resource}) onDone() } diff --git a/internal/xds/xdsclient/tests/cds_watchers_test.go b/internal/xds/xdsclient/tests/cds_watchers_test.go index e6368340294f..ba6069bb2976 100644 --- a/internal/xds/xdsclient/tests/cds_watchers_test.go +++ b/internal/xds/xdsclient/tests/cds_watchers_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/bootstrap" + clientimpl "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" @@ -44,7 +45,7 @@ import ( type noopClusterWatcher struct{} -func (noopClusterWatcher) ResourceChanged(_ *xdsresource.ClusterResourceData, onDone func()) { +func (noopClusterWatcher) ResourceChanged(_ clientimpl.ResourceData, onDone func()) { onDone() } func (noopClusterWatcher) ResourceError(_ error, onDone func()) { @@ -67,8 +68,9 @@ func newClusterWatcher() *clusterWatcher { return &clusterWatcher{updateCh: testutils.NewChannel()} } -func (cw *clusterWatcher) ResourceChanged(update *xdsresource.ClusterResourceData, onDone func()) { - cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource}) +func (cw *clusterWatcher) ResourceChanged(rd clientimpl.ResourceData, onDone func()) { + clusterData := rd.(*xdsresource.ClusterResourceData) + cw.updateCh.Send(clusterUpdateErrTuple{update: clusterData.Resource}) onDone() } diff --git a/internal/xds/xdsclient/tests/eds_watchers_test.go b/internal/xds/xdsclient/tests/eds_watchers_test.go index a76c58641439..98a507a5505c 100644 --- a/internal/xds/xdsclient/tests/eds_watchers_test.go +++ b/internal/xds/xdsclient/tests/eds_watchers_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/clients" + clientimpl "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" "google.golang.org/protobuf/types/known/wrapperspb" @@ -53,7 +54,7 @@ const ( type noopEndpointsWatcher struct{} -func (noopEndpointsWatcher) ResourceChanged(_ *xdsresource.EndpointsResourceData, onDone func()) { +func (noopEndpointsWatcher) ResourceChanged(_ clientimpl.ResourceData, onDone func()) { onDone() } func (noopEndpointsWatcher) ResourceError(_ error, onDone func()) { @@ -76,11 +77,11 @@ func newEndpointsWatcher() *endpointsWatcher { return &endpointsWatcher{updateCh: testutils.NewChannel()} } -func (ew *endpointsWatcher) ResourceChanged(update *xdsresource.EndpointsResourceData, onDone func()) { - ew.updateCh.Send(endpointsUpdateErrTuple{update: update.Resource}) +func (ew *endpointsWatcher) ResourceChanged(rd clientimpl.ResourceData, onDone func()) { + endpointsData := rd.(*xdsresource.EndpointsResourceData) + ew.updateCh.Send(endpointsUpdateErrTuple{update: endpointsData.Resource}) onDone() } - func (ew *endpointsWatcher) ResourceError(err error, onDone func()) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` diff --git a/internal/xds/xdsclient/tests/lds_watchers_test.go b/internal/xds/xdsclient/tests/lds_watchers_test.go index 877148639b2c..7b4f2f6aa941 100644 --- a/internal/xds/xdsclient/tests/lds_watchers_test.go +++ b/internal/xds/xdsclient/tests/lds_watchers_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/bootstrap" + xdsClient "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" @@ -48,7 +49,7 @@ import ( type noopListenerWatcher struct{} -func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) { +func (noopListenerWatcher) ResourceChanged(_ xdsClient.ResourceData, onDone func()) { onDone() } func (noopListenerWatcher) ResourceError(_ error, onDone func()) { @@ -71,8 +72,9 @@ func newListenerWatcher() *listenerWatcher { return &listenerWatcher{updateCh: testutils.NewChannel()} } -func (lw *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) { - lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) +func (lw *listenerWatcher) ResourceChanged(rd xdsClient.ResourceData, onDone func()) { + clusterData := rd.(*xdsresource.ListenerResourceData) + lw.updateCh.Send(listenerUpdateErrTuple{update: clusterData.Resource}) onDone() } @@ -100,8 +102,9 @@ func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} } -func (lw *listenerWatcherMultiple) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) { - lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) +func (lw *listenerWatcherMultiple) ResourceChanged(rd xdsClient.ResourceData, onDone func()) { + listenerData := rd.(*xdsresource.ListenerResourceData) + lw.updateCh.Send(listenerUpdateErrTuple{update: listenerData.Resource}) onDone() } diff --git a/internal/xds/xdsclient/tests/rds_watchers_test.go b/internal/xds/xdsclient/tests/rds_watchers_test.go index 3ec333ad8a24..fa9dc447eb78 100644 --- a/internal/xds/xdsclient/tests/rds_watchers_test.go +++ b/internal/xds/xdsclient/tests/rds_watchers_test.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/bootstrap" + clientimpl "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" "google.golang.org/protobuf/types/known/wrapperspb" @@ -43,7 +44,7 @@ import ( type noopRouteConfigWatcher struct{} -func (noopRouteConfigWatcher) ResourceChanged(_ *xdsresource.RouteConfigResourceData, onDone func()) { +func (noopRouteConfigWatcher) ResourceChanged(_ clientimpl.ResourceData, onDone func()) { onDone() } func (noopRouteConfigWatcher) ResourceError(_ error, onDone func()) { @@ -66,8 +67,9 @@ func newRouteConfigWatcher() *routeConfigWatcher { return &routeConfigWatcher{updateCh: testutils.NewChannel()} } -func (rw *routeConfigWatcher) ResourceChanged(update *xdsresource.RouteConfigResourceData, onDone func()) { - rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) +func (rw *routeConfigWatcher) ResourceChanged(rd clientimpl.ResourceData, onDone func()) { + rcData := rd.(*xdsresource.RouteConfigResourceData) + rw.updateCh.Send(routeConfigUpdateErrTuple{update: rcData.Resource}) onDone() } diff --git a/internal/xds/xdsclient/xdsresource/cluster_resource_type.go b/internal/xds/xdsclient/xdsresource/cluster_resource_type.go index 2a6a08f90647..91fd28499983 100644 --- a/internal/xds/xdsclient/xdsresource/cluster_resource_type.go +++ b/internal/xds/xdsclient/xdsresource/cluster_resource_type.go @@ -18,6 +18,8 @@ package xdsresource import ( + "bytes" + "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/xds/bootstrap" xdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient" @@ -34,15 +36,15 @@ const ( var ( // Compile time interface checks. - _ Type = clusterResourceType{} - - // Singleton instantiation of the resource type implementation. - clusterType = clusterResourceType{ - resourceTypeState: resourceTypeState{ - typeURL: version.V3ClusterURL, - typeName: ClusterResourceTypeName, - allResourcesRequiredInSotW: true, - }, + _ xdsclient.Decoder = clusterResourceType{} + _ xdsclient.ResourceData = (*ClusterResourceData)(nil) + + // ClusterResource is a singleton instance of xdsclient.ResourceType + // that defines the configuration for the Cluster resource. + ClusterResource = xdsclient.ResourceType{ + TypeURL: version.V3ClusterURL, + TypeName: ClusterResourceTypeName, + AllResourcesRequiredInSotW: true, } ) @@ -52,28 +54,45 @@ var ( // Implements the Type interface. type clusterResourceType struct { resourceTypeState + BootstrapConfig *bootstrap.Config + ServerConfigMap map[xdsclient.ServerConfig]*bootstrap.ServerConfig } // Decode deserializes and validates an xDS resource serialized inside the // provided `Any` proto, as received from the xDS management server. -func (clusterResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { - name, cluster, err := unmarshalClusterResource(resource, opts.ServerConfig) +func (ct clusterResourceType) Decode(resource xdsclient.AnyProto, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { + // Convert generic AnyProto -> anypb.Any + a := &anypb.Any{ + TypeUrl: resource.TypeURL, + Value: resource.Value, + } + + // Map generic decode options to internal options + internalOpts := &DecodeOptions{BootstrapConfig: ct.BootstrapConfig} + if gOpts.ServerConfig != nil && ct.ServerConfigMap != nil { + if sc, ok := ct.ServerConfigMap[*gOpts.ServerConfig]; ok { + internalOpts.ServerConfig = sc + } + } + + name, cluster, err := unmarshalClusterResource(a, internalOpts.ServerConfig) switch { case name == "": // Name is unset only when protobuf deserialization fails. return nil, err case err != nil: // Protobuf deserialization succeeded, but resource validation failed. - return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: ClusterUpdate{}}}, err + return &xdsclient.DecodeResult{ + Name: name, + Resource: &ClusterResourceData{Resource: ClusterUpdate{}}, + }, err } - // Perform extra validation here. - if err := securityConfigValidator(opts.BootstrapConfig, cluster.SecurityCfg); err != nil { - return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: ClusterUpdate{}}}, err + if err := securityConfigValidator(internalOpts.BootstrapConfig, cluster.SecurityCfg); err != nil { + return &xdsclient.DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: ClusterUpdate{}}}, err } - return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: cluster}}, nil - + return &xdsclient.DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: cluster}}, nil } // ClusterResourceData wraps the configuration of a Cluster resource as received @@ -109,52 +128,45 @@ func (c *ClusterResourceData) Raw() *anypb.Any { return c.Resource.Raw } -// ClusterWatcher wraps the callbacks to be invoked for different events -// corresponding to the cluster resource being watched. gRFC A88 contains an -// exhaustive list of what method is invoked under what conditions. -type ClusterWatcher interface { - // ResourceChanged indicates a new version of the resource is available. - ResourceChanged(resource *ClusterResourceData, done func()) - - // ResourceError indicates an error occurred while trying to fetch or - // decode the associated resource. The previous version of the resource - // should be considered invalid. - ResourceError(err error, done func()) - - // AmbientError indicates an error occurred after a resource has been - // received that should not modify the use of that resource but may provide - // useful information about the state of the XDSClient for debugging - // purposes. The previous version of the resource should still be - // considered valid. - AmbientError(err error, done func()) -} - -type delegatingClusterWatcher struct { - watcher ClusterWatcher -} - -func (d *delegatingClusterWatcher) ResourceChanged(data ResourceData, onDone func()) { - c := data.(*ClusterResourceData) - d.watcher.ResourceChanged(c, onDone) -} - -func (d *delegatingClusterWatcher) ResourceError(err error, onDone func()) { - d.watcher.ResourceError(err, onDone) +// Equal returns true if other is equal to c +func (c *ClusterResourceData) Equal(other xdsclient.ResourceData) bool { + if c == nil && other == nil { + return true + } + if (c == nil) != (other == nil) { + return false + } + if otherCRD, ok := other.(*ClusterResourceData); ok { + return c.RawEqual(otherCRD) + } + return bytes.Equal(c.Bytes(), other.Bytes()) } -func (d *delegatingClusterWatcher) AmbientError(err error, onDone func()) { - d.watcher.AmbientError(err, onDone) +// Bytes returns the underlying raw bytes of the Cluster resource. +func (c *ClusterResourceData) Bytes() []byte { + raw := c.Raw() + if raw == nil { + return nil + } + return raw.Value } // WatchCluster uses xDS to discover the configuration associated with the // provided cluster resource name. -func WatchCluster(p Producer, name string, w ClusterWatcher) (cancel func()) { - delegator := &delegatingClusterWatcher{watcher: w} - return p.WatchResource(clusterType, name, delegator) +func WatchCluster(p Producer, name string, w xdsclient.ResourceWatcher) (cancel func()) { + return p.WatchResource(ClusterResource, name, w) } -// NewGenericClusterResourceTypeDecoder returns a xdsclient.Decoder that -// wraps the xdsresource.clusterType. -func NewGenericClusterResourceTypeDecoder(bc *bootstrap.Config, gServerCfgMap map[xdsclient.ServerConfig]*bootstrap.ServerConfig) xdsclient.Decoder { - return &GenericResourceTypeDecoder{ResourceType: clusterType, BootstrapConfig: bc, ServerConfigMap: gServerCfgMap} +// NewClusterResourceTypeDecoder returns an xdsclient.Decoder that has access to +// bootstrap config and server config mapping for decoding. +func NewClusterResourceTypeDecoder(bc *bootstrap.Config, gServerCfgMap map[xdsclient.ServerConfig]*bootstrap.ServerConfig) xdsclient.Decoder { + return &clusterResourceType{ + resourceTypeState: resourceTypeState{ + typeURL: version.V3ClusterURL, + typeName: ClusterResourceTypeName, + allResourcesRequiredInSotW: true, + }, + BootstrapConfig: bc, + ServerConfigMap: gServerCfgMap, + } } diff --git a/internal/xds/xdsclient/xdsresource/endpoints_resource_type.go b/internal/xds/xdsclient/xdsresource/endpoints_resource_type.go index 7ca45ec6ad0c..65906102e651 100644 --- a/internal/xds/xdsclient/xdsresource/endpoints_resource_type.go +++ b/internal/xds/xdsclient/xdsresource/endpoints_resource_type.go @@ -18,6 +18,8 @@ package xdsresource import ( + "bytes" + "google.golang.org/grpc/internal/pretty" xdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version" @@ -33,15 +35,15 @@ const ( var ( // Compile time interface checks. - _ Type = endpointsResourceType{} - - // Singleton instantiation of the resource type implementation. - endpointsType = endpointsResourceType{ - resourceTypeState: resourceTypeState{ - typeURL: version.V3EndpointsURL, - typeName: "EndpointsResource", - allResourcesRequiredInSotW: false, - }, + _ xdsclient.Decoder = endpointsResourceType{} + _ xdsclient.ResourceData = (*EndpointsResourceData)(nil) + + // EndpointsResource is a singleton instance of xdsclient.ResourceType that + // defines the configuration for the Endpoints resource. + EndpointsResource = xdsclient.ResourceType{ + TypeURL: version.V3EndpointsURL, + TypeName: EndpointsResourceTypeName, + AllResourcesRequiredInSotW: false, } ) @@ -55,19 +57,22 @@ type endpointsResourceType struct { // Decode deserializes and validates an xDS resource serialized inside the // provided `Any` proto, as received from the xDS management server. -func (endpointsResourceType) Decode(_ *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { - name, rc, err := unmarshalEndpointsResource(resource) +func (et endpointsResourceType) Decode(resource xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { + a := &anypb.Any{ + TypeUrl: resource.TypeURL, + Value: resource.Value, + } + + name, rc, err := unmarshalEndpointsResource(a) switch { case name == "": // Name is unset only when protobuf deserialization fails. return nil, err case err != nil: // Protobuf deserialization succeeded, but resource validation failed. - return &DecodeResult{Name: name, Resource: &EndpointsResourceData{Resource: EndpointsUpdate{}}}, err + return &xdsclient.DecodeResult{Name: name, Resource: &EndpointsResourceData{Resource: EndpointsUpdate{}}}, err } - - return &DecodeResult{Name: name, Resource: &EndpointsResourceData{Resource: rc}}, nil - + return &xdsclient.DecodeResult{Name: name, Resource: &EndpointsResourceData{Resource: rc}}, nil } // EndpointsResourceData wraps the configuration of an Endpoints resource as @@ -104,52 +109,40 @@ func (e *EndpointsResourceData) Raw() *anypb.Any { return e.Resource.Raw } -// EndpointsWatcher wraps the callbacks to be invoked for different -// events corresponding to the endpoints resource being watched. gRFC A88 -// contains an exhaustive list of what method is invoked under what conditions. -type EndpointsWatcher interface { - // ResourceChanged indicates a new version of the resource is available. - ResourceChanged(resource *EndpointsResourceData, done func()) - - // ResourceError indicates an error occurred while trying to fetch or - // decode the associated resource. The previous version of the resource - // should be considered invalid. - ResourceError(err error, done func()) - - // AmbientError indicates an error occurred after a resource has been - // received that should not modify the use of that resource but may provide - // useful information about the state of the XDSClient for debugging - // purposes. The previous version of the resource should still be - // considered valid. - AmbientError(err error, done func()) -} - -type delegatingEndpointsWatcher struct { - watcher EndpointsWatcher -} - -func (d *delegatingEndpointsWatcher) ResourceChanged(data ResourceData, onDone func()) { - e := data.(*EndpointsResourceData) - d.watcher.ResourceChanged(e, onDone) -} - -func (d *delegatingEndpointsWatcher) ResourceError(err error, onDone func()) { - d.watcher.ResourceError(err, onDone) +// Equal returns true if other xdsclient.ResourceData is equal to e. +func (e *EndpointsResourceData) Equal(other xdsclient.ResourceData) bool { + if e == nil && other == nil { + return true + } + if (e == nil) != (other == nil) { + return false + } + if otherERD, ok := other.(*EndpointsResourceData); ok { + return e.RawEqual(otherERD) + } + return bytes.Equal(e.Bytes(), other.Bytes()) } -func (d *delegatingEndpointsWatcher) AmbientError(err error, onDone func()) { - d.watcher.AmbientError(err, onDone) +// Bytes returns the underlying raw bytes of the Endpoints resource. +func (e *EndpointsResourceData) Bytes() []byte { + raw := e.Raw() + if raw == nil { + return nil + } + return raw.Value } // WatchEndpoints uses xDS to discover the configuration associated with the // provided endpoints resource name. -func WatchEndpoints(p Producer, name string, w EndpointsWatcher) (cancel func()) { - delegator := &delegatingEndpointsWatcher{watcher: w} - return p.WatchResource(endpointsType, name, delegator) +func WatchEndpoints(p Producer, name string, w xdsclient.ResourceWatcher) (cancel func()) { + return p.WatchResource(EndpointsResource, name, w) } -// NewGenericEndpointsResourceTypeDecoder returns a xdsclient.Decoder that -// wraps the xdsresource.endpointsType. -func NewGenericEndpointsResourceTypeDecoder() xdsclient.Decoder { - return &GenericResourceTypeDecoder{ResourceType: endpointsType} +// NewEndpointsResourceTypeDecoder returns a decoder for Endpoints resources. +func NewEndpointsResourceTypeDecoder() xdsclient.Decoder { + return endpointsResourceType{resourceTypeState: resourceTypeState{ + typeURL: version.V3EndpointsURL, + typeName: EndpointsResourceTypeName, + allResourcesRequiredInSotW: false, + }} } diff --git a/internal/xds/xdsclient/xdsresource/listener_resource_type.go b/internal/xds/xdsclient/xdsresource/listener_resource_type.go index 100a06f97b67..48c8ab1b979b 100644 --- a/internal/xds/xdsclient/xdsresource/listener_resource_type.go +++ b/internal/xds/xdsclient/xdsresource/listener_resource_type.go @@ -18,6 +18,7 @@ package xdsresource import ( + "bytes" "fmt" "google.golang.org/grpc/internal/pretty" @@ -36,15 +37,15 @@ const ( var ( // Compile time interface checks. - _ Type = listenerResourceType{} - - // Singleton instantiation of the resource type implementation. - listenerType = listenerResourceType{ - resourceTypeState: resourceTypeState{ - typeURL: version.V3ListenerURL, - typeName: ListenerResourceTypeName, - allResourcesRequiredInSotW: true, - }, + _ xdsclient.Decoder = listenerResourceType{} + _ xdsclient.ResourceData = (*ListenerResourceData)(nil) + + // ListenerResource is a singleton instance of xdsclient.ResourceType that + // defines the configuration for the Listener resource. + ListenerResource = xdsclient.ResourceType{ + TypeURL: version.V3ListenerURL, + TypeName: ListenerResourceTypeName, + AllResourcesRequiredInSotW: true, } ) @@ -54,6 +55,8 @@ var ( // Implements the Type interface. type listenerResourceType struct { resourceTypeState + BootstrapConfig *bootstrap.Config + ServerConfigMap map[xdsclient.ServerConfig]*bootstrap.ServerConfig } func securityConfigValidator(bc *bootstrap.Config, sc *SecurityConfig) error { @@ -87,23 +90,38 @@ func listenerValidator(bc *bootstrap.Config, lis ListenerUpdate) error { // Decode deserializes and validates an xDS resource serialized inside the // provided `Any` proto, as received from the xDS management server. -func (listenerResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { - name, listener, err := unmarshalListenerResource(resource) +func (lt listenerResourceType) Decode(resource xdsclient.AnyProto, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { + a := &anypb.Any{ + TypeUrl: resource.TypeURL, + Value: resource.Value, + } + + // Map generic decode options to internal options: + internalOpts := &DecodeOptions{BootstrapConfig: lt.BootstrapConfig} + if gOpts.ServerConfig != nil && lt.ServerConfigMap != nil { + if sc, ok := lt.ServerConfigMap[*gOpts.ServerConfig]; ok { + internalOpts.ServerConfig = sc + } + } + name, listener, err := unmarshalListenerResource(a) switch { case name == "": // Name is unset only when protobuf deserialization fails. return nil, err case err != nil: // Protobuf deserialization succeeded, but resource validation failed. - return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err + return &xdsclient.DecodeResult{ + Name: name, + Resource: &ListenerResourceData{Resource: ListenerUpdate{}}, + }, err } // Perform extra validation here. - if err := listenerValidator(opts.BootstrapConfig, listener); err != nil { - return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err + if err := listenerValidator(internalOpts.BootstrapConfig, listener); err != nil { + return &xdsclient.DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err } - return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: listener}}, nil + return &xdsclient.DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: listener}}, nil } // ListenerResourceData wraps the configuration of a Listener resource as @@ -139,51 +157,45 @@ func (l *ListenerResourceData) Raw() *anypb.Any { return l.Resource.Raw } -// ListenerWatcher wraps the callbacks to be invoked for different -// events corresponding to the listener resource being watched. gRFC A88 -// contains an exhaustive list of what method is invoked under what conditions. -type ListenerWatcher interface { - // ResourceChanged indicates a new version of the resource is available. - ResourceChanged(resource *ListenerResourceData, done func()) - - // ResourceError indicates an error occurred while trying to fetch or - // decode the associated resource. The previous version of the resource - // should be considered invalid. - ResourceError(err error, done func()) - - // AmbientError indicates an error occurred after a resource has been - // received that should not modify the use of that resource but may provide - // useful information about the state of the XDSClient for debugging - // purposes. The previous version of the resource should still be - // considered valid. - AmbientError(err error, done func()) -} - -type delegatingListenerWatcher struct { - watcher ListenerWatcher -} - -func (d *delegatingListenerWatcher) ResourceChanged(data ResourceData, onDone func()) { - l := data.(*ListenerResourceData) - d.watcher.ResourceChanged(l, onDone) -} -func (d *delegatingListenerWatcher) ResourceError(err error, onDone func()) { - d.watcher.ResourceError(err, onDone) +// Equal returns true if other xdsclient.ResourceData is equal to l. +func (l *ListenerResourceData) Equal(other xdsclient.ResourceData) bool { + if l == nil && other == nil { + return true + } + if (l == nil) != (other == nil) { + return false + } + if otherLRD, ok := other.(*ListenerResourceData); ok { + return l.RawEqual(otherLRD) + } + return bytes.Equal(l.Bytes(), other.Bytes()) } -func (d *delegatingListenerWatcher) AmbientError(err error, onDone func()) { - d.watcher.AmbientError(err, onDone) +// Bytes returns the underlying raw bytes of the listener resource. +func (l *ListenerResourceData) Bytes() []byte { + raw := l.Raw() + if raw == nil { + return nil + } + return raw.Value } // WatchListener uses xDS to discover the configuration associated with the // provided listener resource name. -func WatchListener(p Producer, name string, w ListenerWatcher) (cancel func()) { - delegator := &delegatingListenerWatcher{watcher: w} - return p.WatchResource(listenerType, name, delegator) +func WatchListener(p Producer, name string, w xdsclient.ResourceWatcher) (cancel func()) { + return p.WatchResource(ListenerResource, name, w) } -// NewGenericListenerResourceTypeDecoder returns a xdsclient.Decoder that wraps -// the xdsresource.listenerType. -func NewGenericListenerResourceTypeDecoder(bc *bootstrap.Config) xdsclient.Decoder { - return &GenericResourceTypeDecoder{ResourceType: listenerType, BootstrapConfig: bc} +// NewListenerResourceTypeDecoder returns an xdsclient.Decoder that has access to +// bootstrap config and server config mapping for decoding. +func NewListenerResourceTypeDecoder(bc *bootstrap.Config, serverConfigMap map[xdsclient.ServerConfig]*bootstrap.ServerConfig) xdsclient.Decoder { + return &listenerResourceType{ + resourceTypeState: resourceTypeState{ + typeURL: version.V3ListenerURL, + typeName: ListenerResourceTypeName, + allResourcesRequiredInSotW: true, + }, + BootstrapConfig: bc, + ServerConfigMap: serverConfigMap, + } } diff --git a/internal/xds/xdsclient/xdsresource/resource_type.go b/internal/xds/xdsclient/xdsresource/resource_type.go index 2c591312f1be..ca48ae9bad0b 100644 --- a/internal/xds/xdsclient/xdsresource/resource_type.go +++ b/internal/xds/xdsclient/xdsresource/resource_type.go @@ -25,8 +25,6 @@ package xdsresource import ( - "fmt" - xdsinternal "google.golang.org/grpc/internal/xds" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/clients/xdsclient" @@ -36,10 +34,10 @@ import ( func init() { xdsinternal.ResourceTypeMapForTesting = make(map[string]any) - xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL] = listenerType - xdsinternal.ResourceTypeMapForTesting[version.V3RouteConfigURL] = routeConfigType - xdsinternal.ResourceTypeMapForTesting[version.V3ClusterURL] = clusterType - xdsinternal.ResourceTypeMapForTesting[version.V3EndpointsURL] = endpointsType + xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL] = ListenerResource + xdsinternal.ResourceTypeMapForTesting[version.V3RouteConfigURL] = RouteConfigResource + xdsinternal.ResourceTypeMapForTesting[version.V3ClusterURL] = ClusterResource + xdsinternal.ResourceTypeMapForTesting[version.V3EndpointsURL] = EndpointsResource } // Producer contains a single method to discover resource configuration from a @@ -52,7 +50,7 @@ type Producer interface { // xDS responses are are deserialized and validated, as received from the // xDS management server. Upon receipt of a response from the management // server, an appropriate callback on the watcher is invoked. - WatchResource(rType Type, resourceName string, watcher ResourceWatcher) (cancel func()) + WatchResource(rType xdsclient.ResourceType, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) } // ResourceWatcher is notified of the resource updates and errors that are @@ -81,12 +79,9 @@ type ResourceWatcher interface { AmbientError(err error, done func()) } -// TODO: Once the implementation is complete, rename this interface as -// ResourceType and get rid of the existing ResourceType enum. - -// Type wraps all resource-type specific functionality. Each supported resource -// type will provide an implementation of this interface. -type Type interface { +// ResourceType wraps all resource-type specific functionality. Each supported +// resource type will provide an implementation of this interface. +type ResourceType interface { // TypeURL is the xDS type URL of this resource type for v3 transport. TypeURL() string @@ -94,10 +89,7 @@ type Type interface { // can be used for logging/debugging purposes, as well in cases where the // resource type name is to be uniquely identified but the actual // functionality provided by the resource type is not required. - // - // TODO: once Type is renamed to ResourceType, rename TypeName to - // ResourceTypeName. - TypeName() string + ResourceTypeName() string // AllResourcesRequiredInSotW indicates whether this resource type requires // that all resources be present in every SotW response from the server. If @@ -111,7 +103,7 @@ type Type interface { // If protobuf deserialization fails or resource validation fails, // returns a non-nil error. Otherwise, returns a fully populated // DecodeResult. - Decode(*DecodeOptions, *anypb.Any) (*DecodeResult, error) + Decode(*xdsclient.AnyProto, xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) } // ResourceData contains the configuration data sent by the xDS management @@ -170,119 +162,3 @@ func (r resourceTypeState) TypeName() string { func (r resourceTypeState) AllResourcesRequiredInSotW() bool { return r.allResourcesRequiredInSotW } - -// GenericResourceTypeDecoder wraps an xdsresource.Type and implements -// xdsclient.Decoder. -// -// TODO: #8313 - Delete this once the internal xdsclient usages are updated -// to use the generic xdsclient.ResourceType interface directly. -type GenericResourceTypeDecoder struct { - ResourceType Type - BootstrapConfig *bootstrap.Config - ServerConfigMap map[xdsclient.ServerConfig]*bootstrap.ServerConfig -} - -// Decode deserialize and validate resource bytes of an xDS resource received -// from the xDS management server. -func (gd *GenericResourceTypeDecoder) Decode(resource xdsclient.AnyProto, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { - rProto := &anypb.Any{ - TypeUrl: resource.TypeURL, - Value: resource.Value, - } - opts := &DecodeOptions{BootstrapConfig: gd.BootstrapConfig} - if gOpts.ServerConfig != nil { - opts.ServerConfig = gd.ServerConfigMap[*gOpts.ServerConfig] - } - - result, err := gd.ResourceType.Decode(opts, rProto) - if result == nil { - return nil, err - } - if err != nil { - return &xdsclient.DecodeResult{Name: result.Name}, err - } - - return &xdsclient.DecodeResult{Name: result.Name, Resource: &genericResourceData{resourceData: result.Resource}}, nil -} - -// genericResourceData embed an xdsresource.ResourceData and implements -// xdsclient.ResourceData. -// -// TODO: #8313 - Delete this once the internal xdsclient usages are updated -// to use the generic xdsclient.ResourceData interface directly. -type genericResourceData struct { - resourceData ResourceData -} - -// Equal returns true if the passed in xdsclient.ResourceData -// is equal to that of the receiver. -func (grd *genericResourceData) Equal(other xdsclient.ResourceData) bool { - if other == nil { - return false - } - otherResourceData, ok := other.(*genericResourceData) - if !ok { - return false - } - return grd.resourceData.RawEqual(otherResourceData.resourceData) -} - -// Bytes returns the underlying raw bytes of the wrapped resource. -func (grd *genericResourceData) Bytes() []byte { - rawAny := grd.resourceData.Raw() - if rawAny == nil { - return nil - } - return rawAny.Value -} - -// genericResourceWatcher wraps xdsresource.ResourceWatcher and implements -// xdsclient.ResourceWatcher. -// -// TODO: #8313 - Delete this once the internal xdsclient usages are updated -// to use the generic xdsclient.ResourceWatcher interface directly. -type genericResourceWatcher struct { - xdsResourceWatcher ResourceWatcher -} - -// ResourceChanged indicates a new version of the wrapped resource is -// available. -func (gw *genericResourceWatcher) ResourceChanged(gData xdsclient.ResourceData, done func()) { - if gData == nil { - gw.xdsResourceWatcher.ResourceChanged(nil, done) - return - } - - grd, ok := gData.(*genericResourceData) - if !ok { - err := fmt.Errorf("genericResourceWatcher received unexpected xdsclient.ResourceData type %T, want *genericResourceData", gData) - gw.xdsResourceWatcher.ResourceError(err, done) - return - } - gw.xdsResourceWatcher.ResourceChanged(grd.resourceData, done) -} - -// ResourceError indicates an error occurred while trying to fetch or -// decode the associated wrapped resource. The previous version of the -// wrapped resource should be considered invalid. -func (gw *genericResourceWatcher) ResourceError(err error, done func()) { - gw.xdsResourceWatcher.ResourceError(err, done) -} - -// AmbientError indicates an error occurred after a resource has been -// received that should not modify the use of that wrapped resource but may -// provide useful information about the state of the XDSClient for debugging -// purposes. The previous version of the wrapped resource should still be -// considered valid. -func (gw *genericResourceWatcher) AmbientError(err error, done func()) { - gw.xdsResourceWatcher.AmbientError(err, done) -} - -// GenericResourceWatcher returns a xdsclient.ResourceWatcher that wraps an -// xdsresource.ResourceWatcher to make it compatible with xdsclient.ResourceWatcher. -func GenericResourceWatcher(xdsResourceWatcher ResourceWatcher) xdsclient.ResourceWatcher { - if xdsResourceWatcher == nil { - return nil - } - return &genericResourceWatcher{xdsResourceWatcher: xdsResourceWatcher} -} diff --git a/internal/xds/xdsclient/xdsresource/route_config_resource_type.go b/internal/xds/xdsclient/xdsresource/route_config_resource_type.go index 912dc1b762b4..847594aeaaee 100644 --- a/internal/xds/xdsclient/xdsresource/route_config_resource_type.go +++ b/internal/xds/xdsclient/xdsresource/route_config_resource_type.go @@ -18,6 +18,8 @@ package xdsresource import ( + "bytes" + "google.golang.org/grpc/internal/pretty" xdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version" @@ -33,15 +35,15 @@ const ( var ( // Compile time interface checks. - _ Type = routeConfigResourceType{} - - // Singleton instantiation of the resource type implementation. - routeConfigType = routeConfigResourceType{ - resourceTypeState: resourceTypeState{ - typeURL: version.V3RouteConfigURL, - typeName: "RouteConfigResource", - allResourcesRequiredInSotW: false, - }, + _ xdsclient.Decoder = routeConfigResourceType{} + _ xdsclient.ResourceData = (*RouteConfigResourceData)(nil) + + // RouteConfigResource is a singleton instance of xdsclient.ResourceType + // that defines the configuration for the RouteConfig resource. + RouteConfigResource = xdsclient.ResourceType{ + TypeURL: version.V3RouteConfigURL, + TypeName: RouteConfigTypeName, + AllResourcesRequiredInSotW: false, } ) @@ -55,19 +57,22 @@ type routeConfigResourceType struct { // Decode deserializes and validates an xDS resource serialized inside the // provided `Any` proto, as received from the xDS management server. -func (routeConfigResourceType) Decode(_ *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { - name, rc, err := unmarshalRouteConfigResource(resource) +func (rt routeConfigResourceType) Decode(resource xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { + a := &anypb.Any{ + TypeUrl: resource.TypeURL, + Value: resource.Value, + } + + name, rc, err := unmarshalRouteConfigResource(a) switch { case name == "": // Name is unset only when protobuf deserialization fails. return nil, err case err != nil: // Protobuf deserialization succeeded, but resource validation failed. - return &DecodeResult{Name: name, Resource: &RouteConfigResourceData{Resource: RouteConfigUpdate{}}}, err + return &xdsclient.DecodeResult{Name: name, Resource: &RouteConfigResourceData{Resource: RouteConfigUpdate{}}}, err } - - return &DecodeResult{Name: name, Resource: &RouteConfigResourceData{Resource: rc}}, nil - + return &xdsclient.DecodeResult{Name: name, Resource: &RouteConfigResourceData{Resource: rc}}, nil } // RouteConfigResourceData wraps the configuration of a RouteConfiguration @@ -105,53 +110,40 @@ func (r *RouteConfigResourceData) Raw() *anypb.Any { return r.Resource.Raw } -// RouteConfigWatcher wraps the callbacks to be invoked for different -// events corresponding to the route configuration resource being watched. gRFC -// A88 contains an exhaustive list of what method is invoked under what -// conditions. -type RouteConfigWatcher interface { - // ResourceChanged indicates a new version of the resource is available. - ResourceChanged(resource *RouteConfigResourceData, done func()) - - // ResourceError indicates an error occurred while trying to fetch or - // decode the associated resource. The previous version of the resource - // should be considered invalid. - ResourceError(err error, done func()) - - // AmbientError indicates an error occurred after a resource has been - // received that should not modify the use of that resource but may provide - // useful information about the state of the XDSClient for debugging - // purposes. The previous version of the resource should still be - // considered valid. - AmbientError(err error, done func()) -} - -type delegatingRouteConfigWatcher struct { - watcher RouteConfigWatcher -} - -func (d *delegatingRouteConfigWatcher) ResourceChanged(data ResourceData, onDone func()) { - rc := data.(*RouteConfigResourceData) - d.watcher.ResourceChanged(rc, onDone) -} - -func (d *delegatingRouteConfigWatcher) ResourceError(err error, onDone func()) { - d.watcher.ResourceError(err, onDone) +// Equal returns true if other xdsclient.ResourceData is equal to r. +func (r *RouteConfigResourceData) Equal(other xdsclient.ResourceData) bool { + if r == nil && other == nil { + return true + } + if (r == nil) != (other == nil) { + return false + } + if otherRCRD, ok := other.(*RouteConfigResourceData); ok { + return r.RawEqual(otherRCRD) + } + return bytes.Equal(r.Bytes(), other.Bytes()) } -func (d *delegatingRouteConfigWatcher) AmbientError(err error, onDone func()) { - d.watcher.AmbientError(err, onDone) +// Bytes returns the underlying raw bytes of the RouteConfig resource. +func (r *RouteConfigResourceData) Bytes() []byte { + raw := r.Raw() + if raw == nil { + return nil + } + return raw.Value } // WatchRouteConfig uses xDS to discover the configuration associated with the // provided route configuration resource name. -func WatchRouteConfig(p Producer, name string, w RouteConfigWatcher) (cancel func()) { - delegator := &delegatingRouteConfigWatcher{watcher: w} - return p.WatchResource(routeConfigType, name, delegator) +func WatchRouteConfig(p Producer, name string, w xdsclient.ResourceWatcher) (cancel func()) { + return p.WatchResource(RouteConfigResource, name, w) } -// NewGenericRouteConfigResourceTypeDecoder returns a xdsclient.Decoder that -// wraps the xdsresource.routeConfigType. -func NewGenericRouteConfigResourceTypeDecoder() xdsclient.Decoder { - return &GenericResourceTypeDecoder{ResourceType: routeConfigType} +// NewRouteConfigResourceTypeDecoder returns a decoder for RouteConfig resources. +func NewRouteConfigResourceTypeDecoder() xdsclient.Decoder { + return routeConfigResourceType{resourceTypeState: resourceTypeState{ + typeURL: version.V3RouteConfigURL, + typeName: RouteConfigTypeName, + allResourcesRequiredInSotW: false, + }} } diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index d09deeb2682d..3f9f84c50b8b 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -51,6 +51,7 @@ import ( v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" v3statuspbgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" + clientimpl "google.golang.org/grpc/internal/xds/clients/xdsclient" _ "google.golang.org/grpc/internal/xds/httpfilter/router" // Register the router filter ) @@ -71,7 +72,7 @@ func Test(t *testing.T) { type nopListenerWatcher struct{} -func (nopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) { +func (nopListenerWatcher) ResourceChanged(_ clientimpl.ResourceData, onDone func()) { onDone() } func (nopListenerWatcher) ResourceError(_ error, onDone func()) { @@ -83,7 +84,7 @@ func (nopListenerWatcher) AmbientError(_ error, onDone func()) { type nopRouteConfigWatcher struct{} -func (nopRouteConfigWatcher) ResourceChanged(_ *xdsresource.RouteConfigResourceData, onDone func()) { +func (nopRouteConfigWatcher) ResourceChanged(_ clientimpl.ResourceData, onDone func()) { onDone() } func (nopRouteConfigWatcher) ResourceError(_ error, onDone func()) { @@ -95,7 +96,7 @@ func (nopRouteConfigWatcher) AmbientError(_ error, onDone func()) { type nopClusterWatcher struct{} -func (nopClusterWatcher) ResourceChanged(_ *xdsresource.ClusterResourceData, onDone func()) { +func (nopClusterWatcher) ResourceChanged(_ clientimpl.ResourceData, onDone func()) { onDone() } func (nopClusterWatcher) ResourceError(_ error, onDone func()) { @@ -107,7 +108,7 @@ func (nopClusterWatcher) AmbientError(_ error, onDone func()) { type nopEndpointsWatcher struct{} -func (nopEndpointsWatcher) ResourceChanged(_ *xdsresource.EndpointsResourceData, onDone func()) { +func (nopEndpointsWatcher) ResourceChanged(_ clientimpl.ResourceData, onDone func()) { onDone() } func (nopEndpointsWatcher) ResourceError(_ error, onDone func()) { @@ -138,7 +139,7 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa } } -func (w *blockingListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) { +func (w *blockingListenerWatcher) ResourceChanged(_ clientimpl.ResourceData, onDone func()) { writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingListenerWatcher) ResourceError(_ error, onDone func()) {