Skip to content

Commit 0b875de

Browse files
craig[bot]fqazidodeca12stevendanna
committed
157131: catalog/lease: fix error handling for purgeOldVersions r=fqazi a=fqazi Previously, when purging old versions we would acquire a lease on the latest version, which was introduced when we added logic for acquiring leases on the previous version. The logic to acquire the previous version would clear the error, preventing errors from correctly surfacing and causing issues with dropped / offline descriptors. To address this, ensure the acquisition logic for the previous version is only executed if a clean up will occur. Informs: #156176 Release note: None 157792: storelivenss: update `storeliveness/doc.go` with heartbeat smearing info r=miraradeva a=dodeca12 `storeliveness/doc.go` is outdated with respect to heartbeat smearing changes. In particular, the `Transport` section doesn't have information about heartbeat smearing. Additionally, the `Configuration` section didn't detail the heartbeat smearing cluster setting `kv.store_liveness.heartbeat_smearing.enabled`. Added a dedicated bullet point in the `Transport` section (`5.2`) that explicitly describes heartbeat smearing as a feature to avoid goroutine spikes. Also updated the `Configuration` section (`5.3`) to detail that `kv.store_liveness.heartbeat_smearing.enabled` is available, and describes the behaviour of heartbeat sends when the cluster setting is enabled or disabled. Informs: #156830 Release note: None 157915: kvfollowerreadsccl: maybe deflake TestBoundedStalenessDataDriven r=stevendanna a=stevendanna This test determines what events occur by parsing the trace. In some cases, the parsing it was using to determine a "local read followed by remote leaseholder read" didn't account for changes in the potential trace messages encountered when leader leases are enabled. Here, I widen the scope of the trace parsing. Locally under stress this elliminated the previously encountered failure: ``` datadriven.go:357: ... SNIP ... boundedstaleness/single_row:24: still running after 10.000889738s ... SNIP ... boundedstaleness_test.go:405: condition failed to evaluate within 45s: from boundedstaleness_test.go:436: not yet a match, output: 1 events (1 found): * event 1: colbatchscan trace on node_idx 2: local read datadriven.go:343: ``` Fixes #154710 Release note: None Co-authored-by: Faizan Qazi <[email protected]> Co-authored-by: Swapneeth Gorantla <[email protected]> Co-authored-by: Steven Danna <[email protected]>
4 parents e837db7 + 6abab18 + 3bb81f3 + 9e41a32 commit 0b875de

File tree

4 files changed

+84
-23
lines changed

4 files changed

+84
-23
lines changed

pkg/ccl/kvccl/kvfollowerreadsccl/boundedstaleness_test.go

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ var (
4545
)
4646
)
4747

48+
// fullTraceDebug is a flag that controls whether full traces are printed in the
49+
// case of some errors.
50+
const fullTraceDebug = false
51+
4852
func TestBoundedStalenessEnterpriseLicense(t *testing.T) {
4953
defer leaktest.AfterTest(t)()
5054
defer log.Scope(t).Close(t)
@@ -148,8 +152,10 @@ type boundedStalenessEvents struct {
148152
// A mutex is needed as the event handlers (onStmtTrace) can race.
149153
mu struct {
150154
syncutil.Mutex
151-
stmt string
152-
events []boundedStalenessDataDrivenEvent
155+
stmt string
156+
// Only populated if fullTraceDebug constant is true.
157+
traceForDebugging string
158+
events []boundedStalenessDataDrivenEvent
153159
}
154160
}
155161

@@ -167,6 +173,13 @@ func (bse *boundedStalenessEvents) clearEvents() {
167173
bse.mu.events = nil
168174
}
169175

176+
func (bse *boundedStalenessEvents) fullTrace() string {
177+
bse.mu.Lock()
178+
defer bse.mu.Unlock()
179+
180+
return bse.mu.traceForDebugging
181+
}
182+
170183
func (bse *boundedStalenessEvents) setStmt(s string) {
171184
bse.mu.Lock()
172185
defer bse.mu.Unlock()
@@ -240,17 +253,23 @@ func (bse *boundedStalenessEvents) onStmtTrace(nodeIdx int, rec tracingpb.Record
240253
defer bse.mu.Unlock()
241254

242255
if bse.mu.stmt != "" && bse.mu.stmt == stmt {
256+
if fullTraceDebug {
257+
bse.mu.traceForDebugging = rec.String()
258+
}
259+
243260
spans := make(map[tracingpb.SpanID]tracingpb.RecordedSpan)
244261
for _, sp := range rec {
245262
spans[sp.SpanID] = sp
263+
notLeaseHolderError := tracing.LogsContainMsg(sp, "[NotLeaseHolderError] lease held by different store;")
264+
notLeaseHolderError = notLeaseHolderError || tracing.LogsContainMsg(sp, "[NotLeaseHolderError] leader lease is not held locally, cannot determine validity;")
265+
246266
if sp.Operation == "dist sender send" && spans[sp.ParentSpanID].Operation == "colbatchscan" {
247267
bse.mu.events = append(bse.mu.events, &boundedStalenessTraceEvent{
248-
operation: spans[sp.ParentSpanID].Operation,
249-
nodeIdx: nodeIdx,
250-
localRead: tracing.LogsContainMsg(sp, kvbase.RoutingRequestLocallyMsg),
251-
followerRead: kvtestutils.OnlyFollowerReads(rec),
252-
remoteLeaseholderRead: tracing.LogsContainMsg(sp, "[NotLeaseHolderError] lease held by different store;") &&
253-
tracing.LogsContainMsg(sp, "trying next peer"),
268+
operation: spans[sp.ParentSpanID].Operation,
269+
nodeIdx: nodeIdx,
270+
localRead: tracing.LogsContainMsg(sp, kvbase.RoutingRequestLocallyMsg),
271+
followerRead: kvtestutils.OnlyFollowerReads(rec),
272+
remoteLeaseholderRead: notLeaseHolderError && tracing.LogsContainMsg(sp, "trying next peer"),
254273
})
255274
}
256275
}
@@ -261,10 +280,7 @@ func TestBoundedStalenessDataDriven(t *testing.T) {
261280
defer leaktest.AfterTest(t)()
262281
defer log.Scope(t).Close(t)
263282

264-
const msg = "1μs staleness reads may actually succeed due to the slow environment"
265-
skip.UnderStress(t, msg)
266-
skip.UnderRace(t, msg)
267-
skip.UnderDeadlock(t, msg)
283+
skip.UnderDuress(t, "1μs staleness reads may actually succeed due to the slow environment")
268284
defer ccl.TestingEnableEnterprise()()
269285

270286
ctx := context.Background()
@@ -426,14 +442,22 @@ func TestBoundedStalenessDataDriven(t *testing.T) {
426442
}
427443
}()
428444
if !followerRead {
445+
var trace string
446+
if fullTraceDebug {
447+
trace = fmt.Sprintf("\nfull_trace:\n%s", bse.fullTrace())
448+
}
429449
bse.clearEvents()
430-
return errors.AssertionFailedf("not follower reads found:\n%s", bse.String())
450+
return errors.AssertionFailedf("not follower reads found:\n%s%s", bse.String(), trace)
431451
}
432452
}
433453
if waitUntilMatch {
434454
if d.Expected != ret {
455+
var trace string
456+
if fullTraceDebug {
457+
trace = fmt.Sprintf("\nfull_trace:\n%s", bse.fullTrace())
458+
}
435459
bse.clearEvents()
436-
return errors.AssertionFailedf("not yet a match, output:\n%s\n", ret)
460+
return errors.AssertionFailedf("not yet a match, output:\n%s%s", ret, trace)
437461
}
438462
}
439463
return nil

pkg/kv/kvserver/storeliveness/doc.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,10 @@
205205
// process is also aided by synchronizing the sending of heartbeats across all
206206
// stores on a given node.
207207
//
208+
// - Heartbeat smearing (enabled by default). The sending of heartbeats across
209+
// all nodes is synchronized and paced to avoid goroutine spikes. See
210+
// Configuration section for more details.
211+
//
208212
// 5.3. Configuration
209213
//
210214
// Store liveness can be enabled/disabled using the `kv.store_liveness.enabled`
@@ -213,6 +217,18 @@
213217
// well as calls to `SupportFor` and `SupportFrom`. This is required for
214218
// correctness.
215219
//
220+
// The behaviour of heartbeat sends is governed by the
221+
// `kv.store_liveness.heartbeat_smearing.enabled` cluster setting (enabled by
222+
// default). When enabled, heartbeat sends are distributed over a certain
223+
// duration heartbeat (configured via the
224+
// `kv.store_liveness.heartbeat_smearing.refresh` cluster setting), with
225+
// messages being sent at a certain interval (configured via the
226+
// `kv.store_liveness.heartbeat_smearing.smear` cluster setting) to avoid
227+
// spiking the number of runnable goroutines (the per-node send queue
228+
// processors, and the per-node gRPC connections). When disabled, heartbeat
229+
// sends are sent immediately upon enqueueing, bypassing the smearing mechanism.
230+
// Note: the smearing applies to both heartbeat responses and requests.
231+
//
216232
// Additionally, `config.go` defines tunable configuration parameters for the
217233
// various timeouts and intervals that store liveness uses. Other intervals
218234
// (like support duration and heartbeat interval) are defined in

pkg/sql/catalog/lease/descriptor_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (t *descriptorState) findForTimestampImpl(
138138

139139
// If we have the initial version of the descriptor, and it satisfies the read
140140
// timestamp, then the object was just created. We can confirm it satisfies
141-
// the request, by executing findForTimestampImpl with the readTimestamp instead.
141+
// the request by executing findForTimestampImpl with the readTimestamp instead.
142142
if oldest := t.mu.active.findOldest(); hasDifferentReadTimeStamp &&
143143
oldest != nil &&
144144
oldest.GetVersion() == 1 &&

pkg/sql/catalog/lease/lease.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,7 +1274,7 @@ func (m *Manager) purgeOldVersions(
12741274
retry.Options{
12751275
MaxDuration: time.Second * 30}); r.Next(); {
12761276
// Acquire a refcount on the descriptor on the latest version to maintain an
1277-
// active lease, so that it doesn't get released when removeInactives()
1277+
// active lease so that it doesn't get released when removeInactives()
12781278
// is called below. Release this lease after calling removeInactives().
12791279
desc, _, err = t.findForTimestamp(ctx, TimestampToReadTimestamp(m.storage.clock.Now()))
12801280
if err == nil || !errors.Is(err, errRenewLease) {
@@ -1290,7 +1290,7 @@ func (m *Manager) purgeOldVersions(
12901290
// Assert this should never happen due to a fixed expiration, since the range
12911291
// feed is responsible for purging old versions and acquiring new versions.
12921292
if newest.hasFixedExpiration() {
1293-
return errors.AssertionFailedf("the latest version of the descriptor has" +
1293+
return errors.AssertionFailedf("the latest version of the descriptor has " +
12941294
"a fixed expiration, this should never happen")
12951295
}
12961296
// Otherwise, we ran into some type of transient issue, where the sqllivness
@@ -1307,9 +1307,10 @@ func (m *Manager) purgeOldVersions(
13071307
err = nil
13081308
}
13091309

1310-
// Optionally, acquire the refcount on the previous version.
1310+
// Optionally, acquire the refcount on the previous version for the locked
1311+
// leasing mode.
13111312
acquireLeaseOnPrevious := func() error {
1312-
if dropped || !LockedLeaseTimestamp.Get(&m.storage.settings.SV) {
1313+
if !LockedLeaseTimestamp.Get(&m.storage.settings.SV) {
13131314
return nil
13141315
}
13151316
var handles []*closeTimeStampHandle
@@ -1376,11 +1377,12 @@ func (m *Manager) purgeOldVersions(
13761377
return gatheredErrors
13771378
}
13781379

1379-
if err = acquireLeaseOnPrevious(); err != nil {
1380-
log.Dev.Errorf(ctx, "unable to acquire lease on previous version of descriptor: %s", err)
1381-
}
1382-
13831380
if isInactive := catalog.HasInactiveDescriptorError(err); err == nil || isInactive {
1381+
// If previous versions are released, then acquire a lease on the previous
1382+
// version for the locked leasing mode.
1383+
if acquirePreviousErr := acquireLeaseOnPrevious(); acquirePreviousErr != nil {
1384+
log.Dev.Errorf(ctx, "unable to acquire lease on previous version of descriptor: %s", acquirePreviousErr)
1385+
}
13841386
removeInactives(isInactive)
13851387
if desc != nil {
13861388
t.release(ctx, desc)
@@ -2325,12 +2327,25 @@ func (m *Manager) StartRefreshLeasesTask(ctx context.Context, s *stop.Stopper, d
23252327
defer m.initComplete.Swap(true)
23262328
m.watchForUpdates(ctx)
23272329
_ = s.RunAsyncTask(ctx, "refresh-leases", func(ctx context.Context) {
2330+
defer func() {
2331+
if err := recover(); err != nil {
2332+
log.Dev.Warningf(ctx, "panic in refresh-leases: %v", err)
2333+
panic(err)
2334+
}
2335+
}()
2336+
23282337
for {
23292338
select {
23302339
case id := <-m.descDelCh:
23312340
// Descriptor is marked as deleted, so mark it for deletion or
23322341
// remove it if it's no longer in use.
23332342
_ = s.RunAsyncTask(ctx, "purgeOldVersionsOrAcquireInitialVersion deleted descriptor", func(ctx context.Context) {
2343+
defer func() {
2344+
if err := recover(); err != nil {
2345+
log.Dev.Warningf(ctx, "panic in purgeOldVersionsOrAcquireInitialVersion deleted descriptor: %v", err)
2346+
panic(err)
2347+
}
2348+
}()
23342349
// Once the descriptor is purged notify that some change has occurred.
23352350
defer m.leaseGeneration.Add(1)
23362351
state := m.findNewest(id)
@@ -2412,6 +2427,12 @@ func (m *Manager) StartRefreshLeasesTask(ctx context.Context, s *stop.Stopper, d
24122427
// of increased latency right as the descriptor has been committed.
24132428
if now := db.Clock().Now(); now.Less(desc.GetModificationTime()) {
24142429
_ = s.RunAsyncTask(ctx, "wait to purgeOldVersionsOrAcquireInitialVersion", func(ctx context.Context) {
2430+
defer func() {
2431+
if err := recover(); err != nil {
2432+
log.Dev.Warningf(ctx, "panic in wait to purgeOldVersionsOrAcquireInitialVersion: %v", err)
2433+
panic(err)
2434+
}
2435+
}()
24152436
toWait := time.Duration(desc.GetModificationTime().WallTime - now.WallTime)
24162437
select {
24172438
case <-time.After(toWait):

0 commit comments

Comments
 (0)