Skip to content

Commit 132211f

Browse files
committed
testserver: reduce wait for nodelocal capability
This adds the ability to restart the rangefeedcache used by the tenant capability watcher and then issues such a restart before waiting for the nodelocal capability. The initial scan the restart forces typically takes less time than waiting out the closed timestamp. Epic: CRDB-18499 Release note: None
1 parent 1e40fd2 commit 132211f

File tree

3 files changed

+38
-11
lines changed

3 files changed

+38
-11
lines changed

pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ type Watcher struct {
7777

7878
lastFrontierTS hlc.Timestamp // used to assert monotonicity across rangefeed attempts
7979

80+
// Used to force a restart during testing.
81+
restartErrCh chan error
82+
8083
knobs TestingKnobs
8184
}
8285

@@ -177,6 +180,7 @@ func NewWatcher(
177180
withPrevValue: withPrevValue,
178181
translateEvent: translateEvent,
179182
onUpdate: onUpdate,
183+
restartErrCh: make(chan error),
180184
}
181185
if knobs != nil {
182186
w.knobs = *knobs
@@ -353,12 +357,24 @@ func (s *Watcher) Run(ctx context.Context) error {
353357

354358
case err := <-errCh:
355359
return err
360+
case err := <-s.restartErrCh:
361+
return err
356362
case err := <-s.knobs.ErrorInjectionCh:
357363
return err
358364
}
359365
}
360366
}
361367

368+
var restartErr = errors.New("testing restart requested")
369+
370+
// TestingRestart injects an error into the rangefeed cache, forcing
371+
// it to restart. This is separate from the testing knob so that we
372+
// can force a restart from test infrastructure without overriding the
373+
// user-provided testing knobs.
374+
func (s *Watcher) TestingRestart() {
375+
s.restartErrCh <- restartErr
376+
}
377+
362378
func (s *Watcher) handleUpdate(
363379
ctx context.Context, buffer *rangefeedbuffer.Buffer, ts hlc.Timestamp, updateType UpdateType,
364380
) {

pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher/watcher.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ type Watcher struct {
5454
startCh chan struct{}
5555
startErr error
5656

57+
// rfc provides access to the underlying
58+
// rangefeedcache.Watcher for testing.
59+
rfc *rangefeedcache.Watcher
60+
5761
// initialScan is used to synchronize the Start() method with the
5862
// reception of the initial batch of values from the rangefeed
5963
// (which happens asynchronously).
@@ -212,6 +216,7 @@ func (w *Watcher) startRangeFeed(ctx context.Context) error {
212216
if err := rangefeedcache.Start(ctx, w.stopper, rfc, w.onError); err != nil {
213217
return err
214218
}
219+
w.rfc = rfc
215220

216221
// Wait for the initial scan before returning.
217222
select {
@@ -346,6 +351,12 @@ func (w *Watcher) handleIncrementalUpdate(
346351
}
347352
}
348353

354+
func (w *Watcher) TestingRestart() {
355+
if w.rfc != nil {
356+
w.rfc.TestingRestart()
357+
}
358+
}
359+
349360
// TestingFlushCapabilitiesState flushes the underlying global tenant capability
350361
// state for testing purposes. The returned entries are sorted by tenant ID.
351362
func (w *Watcher) TestingFlushCapabilitiesState() (entries []tenantcapabilities.Entry) {

pkg/server/testserver.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,18 +1201,13 @@ func (ts *TestServer) StartTenant(
12011201
baseCfg.DisableTLSForHTTP = params.DisableTLSForHTTP
12021202
baseCfg.EnableDemoLoginEndpoint = params.EnableDemoLoginEndpoint
12031203

1204-
// Waiting for capabilities can take 3+ seconds since the
1205-
// rangefeedcache needs to wait for the closed timestamp
1206-
// before flushing updates. To avoid paying this cost in all
1207-
// cases, we only set the nodelocal storage capability if the
1208-
// caller has configured an ExternalIODir since nodelocal
1209-
// storage only works with that configured.
1204+
// Waiting for capabilities can time To avoid paying this cost in all
1205+
// cases, we only set the nodelocal storage capability if the caller has
1206+
// configured an ExternalIODir since nodelocal storage only works with
1207+
// that configured.
12101208
//
1211-
// TODO(ssd): We should do more here. We could have the caller
1212-
// pass in explicitly that they want these capabilities. Or,
1213-
// we could modify the system in some way to avoid waiting on
1214-
// capabilities for so long. Also, note that this doesn't
1215-
// apply to StartSharedProcessTenant.
1209+
// TODO(ssd): We do not set this capability in
1210+
// StartSharedProcessTenant.
12161211
shouldGrantNodelocalCap := ts.params.ExternalIODir != ""
12171212
canGrantNodelocalCap := ts.ClusterSettings().Version.IsActive(ctx, clusterversion.V23_1TenantCapabilities)
12181213
if canGrantNodelocalCap && shouldGrantNodelocalCap {
@@ -1225,6 +1220,11 @@ func (ts *TestServer) StartTenant(
12251220
return nil, err
12261221
}
12271222
} else {
1223+
// Restart the capabilities watcher. Restarting the
1224+
// watcher forces a new initial scan which is faster
1225+
// than waiting out the closed timestamp interval
1226+
// required to see new updates.
1227+
ts.tenantCapabilitiesWatcher.TestingRestart()
12281228
if err := testutils.SucceedsSoonError(func() error {
12291229
capabilities, found := ts.TenantCapabilitiesReader().GetCapabilities(params.TenantID)
12301230
if !found {

0 commit comments

Comments
 (0)