Skip to content

Commit 29ba001

Browse files
xdsclient: Fix race in SetWatchExpiryTimeoutForTesting (#8526)
Fixes: #8525 There is a race in [SetWatchExpiryTimeoutForTesting](https://github.com/grpc/grpc-go/blob/fa0d6583208033fe4f69d359f80286736fd121d0/internal/xds/clients/xdsclient/xdsclient.go#L121) which is used to override the watch expiry timeout of XDSClient for testing. Currently it just sets the watchExpiryTimeout of the XDSClient to the provided value without a mutex each time we call [NewClientForTesting](https://github.com/grpc/grpc-go/blob/fa0d6583208033fe4f69d359f80286736fd121d0/internal/xds/xdsclient/pool.go#L116C16-L116C35) which might of might not create a new XDSClient if one is already there. Fix : Add a new field `WatchExpiryTimeout` to the xdsclient [config](https://github.com/grpc/grpc-go/blob/30645d521be375d13fa4cb2baa0d2561ca44c342/internal/xds/clients/xdsclient/xdsconfig.go#L28) which will now be used instead of `internal.WatchExpiryTImeout` RELEASE NOTES: None
1 parent 30645d5 commit 29ba001

File tree

10 files changed

+63
-47
lines changed

10 files changed

+63
-47
lines changed

internal/xds/clients/xdsclient/internal/internal.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ package internal
2121
import "time"
2222

2323
var (
24-
// WatchExpiryTimeout is the watch expiry timeout for xDS client. It can be
25-
// overridden by tests to change the default watch expiry timeout.
26-
WatchExpiryTimeout time.Duration
27-
2824
// StreamBackoff is the stream backoff for xDS client. It can be overridden
2925
// by tests to change the default backoff strategy.
3026
StreamBackoff func(int) time.Duration

internal/xds/clients/xdsclient/test/ads_stream_watch_test.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/google/uuid"
2929
"google.golang.org/grpc/credentials/insecure"
3030
"google.golang.org/grpc/internal/testutils/xds/e2e"
31+
"google.golang.org/grpc/internal/xds/clients"
3132
"google.golang.org/grpc/internal/xds/clients/grpctransport"
3233
"google.golang.org/grpc/internal/xds/clients/internal/testutils"
3334
"google.golang.org/grpc/internal/xds/clients/xdsclient"
@@ -175,9 +176,34 @@ func (s) TestADS_WatchState_TimerFires(t *testing.T) {
175176
// short resource expiry timeout.
176177
nodeID := uuid.New().String()
177178
configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}}
178-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
179-
client := createXDSClient(t, mgmtServer.Address, nodeID, grpctransport.NewBuilder(configs))
180-
179+
resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType}
180+
si := clients.ServerIdentifier{
181+
ServerURI: mgmtServer.Address,
182+
Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"},
183+
}
184+
185+
xdsClientConfig := xdsclient.Config{
186+
Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}},
187+
Node: clients.Node{ID: nodeID, UserAgentName: "user-agent", UserAgentVersion: "0.0.0.0"},
188+
TransportBuilder: grpctransport.NewBuilder(configs),
189+
ResourceTypes: resourceTypes,
190+
// Xdstp resource names used in this test do not specify an
191+
// authority. These will end up looking up an entry with the
192+
// empty key in the authorities map. Having an entry with an
193+
// empty key and empty configuration, results in these
194+
// resources also using the top-level configuration.
195+
Authorities: map[string]xdsclient.Authority{
196+
"": {XDSServers: []xdsclient.ServerConfig{}},
197+
},
198+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
199+
}
200+
201+
// Create an xDS client with the above config.
202+
client, err := xdsclient.New(xdsClientConfig)
203+
if err != nil {
204+
t.Fatalf("Failed to create xDS client: %v", err)
205+
}
206+
t.Cleanup(func() { client.Close() })
181207
// Create a watch for the first listener resource and verify that the timer
182208
// is running and the watch state is `requested`.
183209
const listenerName = "listener"

internal/xds/clients/xdsclient/test/authority_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,10 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T) (*testutils.Liste
105105
testAuthority2: {XDSServers: []xdsclient.ServerConfig{}},
106106
testAuthority3: {XDSServers: []xdsclient.ServerConfig{{ServerIdentifier: siNonDefault}}},
107107
},
108+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
108109
}
109110

110111
// Create an xDS client with the above config.
111-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
112112
client, err := xdsclient.New(xdsClientConfig)
113113
if err != nil {
114114
t.Fatalf("Failed to create xDS client: %v", err)

internal/xds/clients/xdsclient/test/lds_watchers_test.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"google.golang.org/grpc/internal/xds/clients/internal/testutils"
3636
"google.golang.org/grpc/internal/xds/clients/internal/testutils/e2e"
3737
"google.golang.org/grpc/internal/xds/clients/xdsclient"
38-
xdsclientinternal "google.golang.org/grpc/internal/xds/clients/xdsclient/internal"
3938
"google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource"
4039

4140
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
@@ -115,12 +114,6 @@ func badListenerResource(t *testing.T, name string) *v3listenerpb.Listener {
115114
}
116115
}
117116

118-
func overrideWatchExpiryTimeout(t *testing.T, watchExpiryTimeout time.Duration) {
119-
originalWatchExpiryTimeout := xdsclientinternal.WatchExpiryTimeout
120-
xdsclientinternal.WatchExpiryTimeout = watchExpiryTimeout
121-
t.Cleanup(func() { xdsclientinternal.WatchExpiryTimeout = originalWatchExpiryTimeout })
122-
}
123-
124117
// verifyNoListenerUpdate verifies that no listener update is received on the
125118
// provided update channel, and returns an error if an update is received.
126119
//
@@ -726,11 +719,10 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
726719
Node: clients.Node{ID: nodeID},
727720
TransportBuilder: grpctransport.NewBuilder(configs),
728721
ResourceTypes: resourceTypes,
722+
// Override the default watch expiry timeout.
723+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
729724
}
730725

731-
// Create an xDS client with the above config and override the default
732-
// watch expiry timeout.
733-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
734726
client, err := xdsclient.New(xdsClientConfig)
735727
if err != nil {
736728
t.Fatalf("Failed to create xDS client: %v", err)
@@ -777,11 +769,11 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
777769
Node: clients.Node{ID: nodeID},
778770
TransportBuilder: grpctransport.NewBuilder(configs),
779771
ResourceTypes: resourceTypes,
772+
// Override the default watch expiry timeout.
773+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
780774
}
781775

782-
// Create an xDS client with the above config and override the default
783-
// watch expiry timeout.
784-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
776+
// Create an xDS client with the above config.
785777
client, err := xdsclient.New(xdsClientConfig)
786778
if err != nil {
787779
t.Fatalf("Failed to create xDS client: %v", err)

internal/xds/clients/xdsclient/xdsclient.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ var (
6161
)
6262

6363
func init() {
64-
xdsclientinternal.WatchExpiryTimeout = defaultWatchExpiryTimeout
6564
xdsclientinternal.StreamBackoff = defaultExponentialBackoff
6665
xdsclientinternal.ResourceWatchStateForTesting = resourceWatchStateForTesting
6766
}
@@ -108,20 +107,16 @@ func New(config Config) (*XDSClient, error) {
108107
case config.Authorities == nil && config.Servers == nil:
109108
return nil, errors.New("xdsclient: no servers or authorities specified")
110109
}
111-
110+
if config.WatchExpiryTimeout == 0 {
111+
config.WatchExpiryTimeout = defaultWatchExpiryTimeout
112+
}
112113
client, err := newClient(&config, name)
113114
if err != nil {
114115
return nil, err
115116
}
116117
return client, nil
117118
}
118119

119-
// SetWatchExpiryTimeoutForTesting override the default watch expiry timeout
120-
// with provided timeout value.
121-
func (c *XDSClient) SetWatchExpiryTimeoutForTesting(watchExpiryTimeout time.Duration) {
122-
c.watchExpiryTimeout = watchExpiryTimeout
123-
}
124-
125120
// newClient returns a new XDSClient with the given config.
126121
func newClient(config *Config, target string) (*XDSClient, error) {
127122
ctx, cancel := context.WithCancel(context.Background())
@@ -130,7 +125,7 @@ func newClient(config *Config, target string) (*XDSClient, error) {
130125
done: syncutil.NewEvent(),
131126
authorities: make(map[string]*authority),
132127
config: config,
133-
watchExpiryTimeout: xdsclientinternal.WatchExpiryTimeout,
128+
watchExpiryTimeout: config.WatchExpiryTimeout,
134129
backoff: xdsclientinternal.StreamBackoff,
135130
serializer: syncutil.NewCallbackSerializer(ctx),
136131
serializerClose: cancel,

internal/xds/clients/xdsclient/xdsconfig.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package xdsclient
2020

2121
import (
22+
"time"
23+
2224
"google.golang.org/grpc/internal/xds/clients"
2325
)
2426

@@ -60,6 +62,13 @@ type Config struct {
6062
// MetricsReporter is used to report registered metrics. If unset, no
6163
// metrics will be reported.
6264
MetricsReporter clients.MetricsReporter
65+
66+
// WatchExpiryTimeout is the duration after which a resource watch expires
67+
// if the requested resource is not received from the management server.
68+
// Most users will not need to set this. If zero, a default value of 15
69+
// seconds is used as specified here:
70+
// envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#knowing-when-a-requested-resource-does-not-exist
71+
WatchExpiryTimeout time.Duration
6372
}
6473

6574
// ServerConfig contains configuration for an xDS management server.

internal/xds/xdsclient/clientimpl.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ func (mr *metricsReporter) ReportMetric(metric any) {
120120
}
121121
}
122122

123-
func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string) (*clientImpl, error) {
124-
gConfig, err := buildXDSClientConfig(config, metricsRecorder, target)
123+
func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string, watchExpiryTimeout time.Duration) (*clientImpl, error) {
124+
gConfig, err := buildXDSClientConfig(config, metricsRecorder, target, watchExpiryTimeout)
125125
if err != nil {
126126
return nil, err
127127
}
@@ -163,7 +163,7 @@ func (c *clientImpl) decrRef() int32 {
163163
}
164164

165165
// buildXDSClientConfig builds the xdsclient.Config from the bootstrap.Config.
166-
func buildXDSClientConfig(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string) (xdsclient.Config, error) {
166+
func buildXDSClientConfig(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, target string, watchExpiryTimeout time.Duration) (xdsclient.Config, error) {
167167
grpcTransportConfigs := make(map[string]grpctransport.Config)
168168
gServerCfgMap := make(map[xdsclient.ServerConfig]*bootstrap.ServerConfig)
169169

@@ -218,12 +218,13 @@ func buildXDSClientConfig(config *bootstrap.Config, metricsRecorder estats.Metri
218218
}
219219

220220
return xdsclient.Config{
221-
Authorities: gAuthorities,
222-
Servers: gServerCfgs,
223-
Node: gNode,
224-
TransportBuilder: grpctransport.NewBuilder(grpcTransportConfigs),
225-
ResourceTypes: supportedResourceTypes(config, gServerCfgMap),
226-
MetricsReporter: &metricsReporter{recorder: metricsRecorder, target: target},
221+
Authorities: gAuthorities,
222+
Servers: gServerCfgs,
223+
Node: gNode,
224+
TransportBuilder: grpctransport.NewBuilder(grpcTransportConfigs),
225+
ResourceTypes: supportedResourceTypes(config, gServerCfgMap),
226+
MetricsReporter: &metricsReporter{recorder: metricsRecorder, target: target},
227+
WatchExpiryTimeout: watchExpiryTimeout,
227228
}, nil
228229
}
229230

internal/xds/xdsclient/clientimpl_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) {
208208
if err != nil {
209209
t.Fatalf("Failed to create bootstrap config: %v", err)
210210
}
211-
gotCfg, err := buildXDSClientConfig(bootstrapConfig, stats.NewTestMetricsRecorder(), testTargetName)
211+
gotCfg, err := buildXDSClientConfig(bootstrapConfig, stats.NewTestMetricsRecorder(), testTargetName, 0)
212212
if err != nil {
213213
t.Fatalf("Failed to build XDSClientConfig: %v", err)
214214
}

internal/xds/xdsclient/pool.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func NewPool(config *bootstrap.Config) *Pool {
9999
// expected to invoke once they are done using the client. It is safe for the
100100
// caller to invoke this close function multiple times.
101101
func (p *Pool) NewClient(name string, metricsRecorder estats.MetricsRecorder) (XDSClient, func(), error) {
102-
return p.newRefCounted(name, metricsRecorder)
102+
return p.newRefCounted(name, metricsRecorder, defaultWatchExpiryTimeout)
103103
}
104104

105105
// NewClientForTesting returns an xDS client configured with the provided
@@ -126,11 +126,10 @@ func (p *Pool) NewClientForTesting(opts OptionsForTesting) (XDSClient, func(), e
126126
if opts.MetricsRecorder == nil {
127127
opts.MetricsRecorder = istats.NewMetricsRecorderList(nil)
128128
}
129-
c, cancel, err := p.newRefCounted(opts.Name, opts.MetricsRecorder)
129+
c, cancel, err := p.newRefCounted(opts.Name, opts.MetricsRecorder, opts.WatchExpiryTimeout)
130130
if err != nil {
131131
return nil, nil, err
132132
}
133-
c.SetWatchExpiryTimeoutForTesting(opts.WatchExpiryTimeout)
134133
return c, cancel, nil
135134
}
136135

@@ -252,7 +251,7 @@ func (p *Pool) clientRefCountedClose(name string) {
252251
// newRefCounted creates a new reference counted xDS client implementation for
253252
// name, if one does not exist already. If an xDS client for the given name
254253
// exists, it gets a reference to it and returns it.
255-
func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder) (*clientImpl, func(), error) {
254+
func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder, watchExpiryTimeout time.Duration) (*clientImpl, func(), error) {
256255
p.mu.Lock()
257256
defer p.mu.Unlock()
258257

@@ -276,7 +275,7 @@ func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder
276275
return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
277276
}
278277

279-
c, err := newClientImpl(config, metricsRecorder, name)
278+
c, err := newClientImpl(config, metricsRecorder, name, watchExpiryTimeout)
280279
if err != nil {
281280
return nil, nil, err
282281
}

internal/xds/xdsclient/tests/loadreport_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,8 +481,6 @@ func (s) TestConcurrentReportLoad(t *testing.T) {
481481
// concurrently with a shared XDSClient, each of which will create a new LRS
482482
// stream without any race.
483483
func (s) TestConcurrentChannels(t *testing.T) {
484-
// TODO(emchandwani) : Unskip after https://github.com/grpc/grpc-go/pull/8526 gets merged.
485-
t.Skip()
486484
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
487485
defer cancel()
488486

0 commit comments

Comments
 (0)