Skip to content
6 changes: 4 additions & 2 deletions internal/xds/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cdsbalancer
import (
"context"

"google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)

Expand All @@ -32,8 +33,9 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) ResourceChanged(u *xdsresource.ClusterResourceData, onDone func()) {
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
func (cw *clusterWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) {
clusterData := rd.(*xdsresource.ClusterResourceData)
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, clusterData.Resource); onDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

Expand Down
3 changes: 1 addition & 2 deletions internal/xds/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"google.golang.org/grpc/internal/xds/balancer/loadstore"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/lrsclient"
"google.golang.org/grpc/internal/xds/xdsclient"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
Expand Down Expand Up @@ -236,7 +235,7 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
}
}
if startNewLoadReport {
var loadStore *lrsclient.LoadStore
var loadStore xdsclient.LoadStore
if b.xdsClient != nil {
loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(b.lrsServer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
clientimpl "google.golang.org/grpc/internal/xds/clients/xdsclient"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this import renaming here and elsewhere?

Please see: https://google.github.io/styleguide/go/decisions#import-renaming

"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)

Expand Down Expand Up @@ -76,12 +77,13 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
}

// ResourceChanged is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) ResourceChanged(update *xdsresource.EndpointsResourceData, onDone func()) {
func (er *edsDiscoveryMechanism) ResourceChanged(rd clientimpl.ResourceData, onDone func()) {
if er.stopped.HasFired() {
onDone()
return
}

update := rd.(*xdsresource.EndpointsResourceData)
er.mu.Lock()
er.update = &update.Resource
er.mu.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions internal/xds/balancer/loadstore/load_store_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"sync"

"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/lrsclient"
"google.golang.org/grpc/internal/xds/xdsclient"
)

// NewWrapper creates a Wrapper.
Expand Down Expand Up @@ -54,8 +54,8 @@ type Wrapper struct {
// store and perCluster are initialized as nil. They are only set by the
// balancer when LRS is enabled. Before that, all functions to record loads
// are no-op.
store *lrsclient.LoadStore
perCluster *lrsclient.PerClusterReporter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we change it to use the grpc xdsclient interfaces that we created?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, our balancer directly referenced lrsclient types like LoadStore and PerClusterReporter.
But in the new design, all xDS functionality goes through our gRPC xdsclient interfaces.
So I changed it to use those interfaces to decouple balancer code from the internal LRS implementation — it’s part of the #8381 cleanup to remove wrappers and standardize on the generic client.

store xdsclient.LoadStore
perCluster xdsclient.PerClusterReporter
}

// UpdateClusterAndService updates the cluster name and eds service for this
Expand All @@ -77,7 +77,7 @@ func (lsw *Wrapper) UpdateClusterAndService(cluster, edsService string) {

// UpdateLoadStore updates the load store for this wrapper. If it is changed
// from before, the perCluster store in this wrapper will also be updated.
func (lsw *Wrapper) UpdateLoadStore(store *lrsclient.LoadStore) {
func (lsw *Wrapper) UpdateLoadStore(store xdsclient.LoadStore) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
Expand Down
11 changes: 7 additions & 4 deletions internal/xds/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package resolver
import (
"context"

"google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)

Expand All @@ -36,8 +37,9 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
return lw
}

func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
func (l *listenerWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) {
listenerData := rd.(*xdsresource.ListenerResourceData)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making every watcher go through these type assertions is not going to scale.

handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(listenerData.Resource); onDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

Expand Down Expand Up @@ -68,9 +70,10 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
return rw
}

func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigResourceData, onDone func()) {
func (r *routeConfigWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) {
rcData := rd.(*xdsresource.RouteConfigResourceData)
handleUpdate := func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
r.parent.onRouteConfigResourceUpdate(r.resourceName, rcData.Resource)
onDone()
}
r.parent.serializer.ScheduleOr(handleUpdate, onDone)
Expand Down
12 changes: 7 additions & 5 deletions internal/xds/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)

Expand All @@ -58,7 +59,7 @@ type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err
// XDSClient wraps the methods on the XDSClient which are required by
// the listenerWrapper.
type XDSClient interface {
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
WatchResource(rType xdsclient.ResourceType, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, the watch API in this interface should exactly match the one in the external XDSClient.

BootstrapConfig() *bootstrap.Config
}

Expand Down Expand Up @@ -386,17 +387,18 @@ type ldsWatcher struct {
name string
}

func (lw *ldsWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
func (lw *ldsWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) {
defer onDone()
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
lw.logger.Warningf("Resource %q received update after listener was closed", lw.name)
return
}
listenerData := rd.(*xdsresource.ListenerResourceData)
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, listenerData.Resource)
}
l := lw.parent
ilc := update.Resource.InboundListenerCfg
ilc := listenerData.Resource.InboundListenerCfg
// Make sure that the socket address on the received Listener resource
// matches the address of the net.Listener passed to us by the user. This
// check is done here instead of at the XDSClient layer because of the
Expand Down
8 changes: 5 additions & 3 deletions internal/xds/server/rds_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)

Expand Down Expand Up @@ -135,20 +136,21 @@ type rdsWatcher struct {
canceled bool // eats callbacks if true
}

func (rw *rdsWatcher) ResourceChanged(update *xdsresource.RouteConfigResourceData, onDone func()) {
func (rw *rdsWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) {
defer onDone()
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()
return
}
rw.mu.Unlock()
rcData := rd.(*xdsresource.RouteConfigResourceData)
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, rcData.Resource)
}

routeName := rw.routeName
rwu := rdsWatcherUpdate{data: &update.Resource}
rwu := rdsWatcherUpdate{data: &rcData.Resource}
rw.parent.updates[routeName] = rwu
rw.parent.callback(routeName, rwu)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/xds/testutils/fakeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Client struct {
name string
loadReportCh *testutils.Channel
lrsCancelCh *testutils.Channel
loadStore *lrsclient.LoadStore
loadStore xdsclient.LoadStore
bootstrapCfg *bootstrap.Config
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func (*stream) Recv() ([]byte, error) {
}

// ReportLoad starts reporting load about clusterName to server.
func (xdsC *Client) ReportLoad(server *bootstrap.ServerConfig) (loadStore *lrsclient.LoadStore, cancel func(context.Context)) {
func (xdsC *Client) ReportLoad(server *bootstrap.ServerConfig) (loadStore xdsclient.LoadStore, cancel func(context.Context)) {
lrsClient, _ := lrsclient.New(lrsclient.Config{Node: clients.Node{ID: "fake-node-id"}, TransportBuilder: &transportBuilder{}})
xdsC.loadStore, _ = lrsClient.ReportLoad(clients.ServerIdentifier{ServerURI: server.ServerURI()})

Expand All @@ -100,7 +100,7 @@ func (xdsC *Client) WaitForCancelReportLoad(ctx context.Context) error {
}

// LoadStore returns the underlying load data store.
func (xdsC *Client) LoadStore() *lrsclient.LoadStore {
func (xdsC *Client) LoadStore() xdsclient.LoadStore {
return xdsC.loadStore
}

Expand Down
33 changes: 30 additions & 3 deletions internal/xds/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (

v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/lrsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
"google.golang.org/grpc/internal/xds/clients/xdsclient"
)

// XDSClient is a full fledged gRPC client which queries a set of discovery APIs
Expand All @@ -47,13 +48,39 @@ type XDSClient interface {
// During a race (e.g. an xDS response is received while the user is calling
// cancel()), there's a small window where the callback can be called after
// the watcher is canceled. Callers need to handle this case.
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
WatchResource(rType xdsclient.ResourceType, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API should match the one in the external XDSClient.


ReportLoad(*bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context))
ReportLoad(*bootstrap.ServerConfig) (LoadStore, func(context.Context))

BootstrapConfig() *bootstrap.Config
}

// PerClusterReporter is the minimal interface callers use to record per-cluster
// load and drops. It mirrors the PerClusterReporter methods from the concrete
// lrsclient.PerClusterReporter but is intentionally tiny.
type PerClusterReporter interface {
// CallStarted records that a call has started for the given locality.
CallStarted(locality clients.Locality)

// CallFinished records that a call has finished for the given locality.
// Pass the error (nil if success) so implementations can update success/failure counters.
CallFinished(locality clients.Locality, err error)

// CallServerLoad reports a numeric load metric for the given locality and cluster.
// (If your code uses a different name/parameters for this, match the concrete type.)
CallServerLoad(locality clients.Locality, clusterName string, value float64)

// CallDropped records a dropped request of the given category.
CallDropped(category string)
}

// LoadStore is the minimal interface returned by ReportLoad. It provides a
// way to get per-cluster reporters and to stop the store.
type LoadStore interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we add this interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of task #8381, I added the LoadStore interface to replace the previous wrapper and embedded logic used for load reporting. Earlier, the balancers and xDS components used a concrete wrapper (load_store_wrapper.go) to talk to the LRS client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's please the scope of this PR to only take care of the APIs involving resource type implementations and watchers etc. Smaller PRs ensure that they can be reviewed in time and reviewed more thoroughly.

So, let's please get rid of all load reporting changes and have them be made as a follow-up PR. Thanks.

ReporterForCluster(clusterName, serviceName string) *lrsclient.PerClusterReporter
Stop(ctx context.Context)
}

// DumpResources returns the status and contents of all xDS resources. It uses
// xDS clients from the default pool.
func DumpResources() *v3statuspb.ClientStatusResponse {
Expand Down
4 changes: 2 additions & 2 deletions internal/xds/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var (
// interface with ref counting so that it can be shared by the xds resolver and
// balancer implementations, across multiple ClientConns and Servers.
type clientImpl struct {
*xdsclient.XDSClient // TODO: #8313 - get rid of embedding, if possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we make this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We embedded xdsclient.XDSClient here as a temporary delegation layer to the generic client while we migrate call sites for #8381.

xdsClient *xdsclient.XDSClient

// The following fields are initialized at creation time and are read-only
// after that.
Expand Down Expand Up @@ -137,7 +137,7 @@ func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecor
return nil, err
}
c := &clientImpl{
XDSClient: client,
xdsClient: client,
xdsClientConfig: gConfig,
bootstrapConfig: config,
target: target,
Expand Down
5 changes: 2 additions & 3 deletions internal/xds/xdsclient/clientimpl_loadreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import (
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/grpctransport"
"google.golang.org/grpc/internal/xds/clients/lrsclient"
)

// ReportLoad starts a load reporting stream to the given server. All load
// reports to the same server share the LRS stream.
//
// It returns a lrsclient.LoadStore for the user to report loads.
func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context)) {
// It returns a LoadStore for the user to report loads.
func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (LoadStore, func(context.Context)) {
load, err := c.lrsClient.ReportLoad(clients.ServerIdentifier{
ServerURI: server.ServerURI(),
Extensions: grpctransport.ServerIdentifierExtension{
Expand Down
Loading
Loading