Skip to content

Commit 7cc372f

Browse files
craig[bot]tbg
andcommitted
Merge #147607
147607: kv: increase adoption of Stopper Handle API r=tbg a=tbg Continues #142059. I opened a recent TPCC execution trace in https://github.com/felixge/sqlprof and worked through the output of the below query, which roughly shows which stopper tasks were active in the execution. I then added more based on looking at more execution traces from the 300 node scale test. ``` select * from (select count(*) as hits, list_slice(s.funcs, -3, -2) as root from stack_samples ss join stacks s on ss.src_stack_id = s.stack_id where root[2] LIKE '%Stopper%' group by root) order by hits desc; ``` I recommend reviewing with whitespace changes suppressed, and to watch out for ctx assignments, as the new API is more brittle with regards to them. Epic: CRDB-42584 Co-authored-by: Tobias Grieger <[email protected]>
2 parents 4f422e8 + 32f7bad commit 7cc372f

File tree

14 files changed

+576
-495
lines changed

14 files changed

+576
-495
lines changed

pkg/internal/client/requestbatcher/batcher.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,16 @@ func New(cfg Config) *RequestBatcher {
242242
}
243243
b.sendBatchOpName = redact.Sprintf("%s.sendBatch", b.cfg.Name)
244244
bgCtx := cfg.AmbientCtx.AnnotateCtx(context.Background())
245-
if err := cfg.Stopper.RunAsyncTask(bgCtx, b.cfg.Name.StripMarkers(), b.run); err != nil {
245+
bgCtx, hdl, err := cfg.Stopper.GetHandle(bgCtx, stop.TaskOpts{
246+
TaskName: b.cfg.Name.StripMarkers(),
247+
})
248+
if err != nil {
246249
panic(err)
247250
}
251+
go func(ctx context.Context) {
252+
defer hdl.Activate(ctx).Release(ctx)
253+
b.run(ctx)
254+
}(bgCtx)
248255
return b
249256
}
250257

@@ -342,7 +349,7 @@ func (b *RequestBatcher) sendDone(ba *batch) {
342349
}
343350

344351
func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) {
345-
if err := b.cfg.Stopper.RunAsyncTask(ctx, "send-batch", func(ctx context.Context) {
352+
work := func(ctx context.Context) {
346353
defer b.sendDone(ba)
347354
var batchRequest *kvpb.BatchRequest
348355
var br *kvpb.BatchResponse
@@ -426,9 +433,19 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) {
426433
}
427434
ba.reqs, prevResps = nextReqs, nextPrevResps
428435
}
429-
}); err != nil {
436+
}
437+
438+
ctx, hdl, err := b.cfg.Stopper.GetHandle(ctx, stop.TaskOpts{
439+
TaskName: "send-batch",
440+
})
441+
if err != nil {
430442
b.sendDone(ba)
443+
return
431444
}
445+
go func(ctx context.Context) {
446+
defer hdl.Activate(ctx).Release(ctx)
447+
work(ctx)
448+
}(ctx)
432449
}
433450

434451
func (b *RequestBatcher) sendResponse(req *request, resp Response) {

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1503,11 +1503,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
15031503
qiBatchIdx := batchIdx + 1
15041504
qiResponseCh := make(chan response, 1)
15051505

1506-
runTask := ds.stopper.RunAsyncTask
1507-
if ds.disableParallelBatches {
1508-
runTask = ds.stopper.RunTask
1509-
}
1510-
if err := runTask(ctx, "kv.DistSender: sending pre-commit query intents", func(ctx context.Context) {
1506+
sendPreCommit := func(ctx context.Context) {
15111507
// Map response index to the original un-swapped batch index.
15121508
// Remember that we moved the last QueryIntent in this batch
15131509
// from swapIdx to the end.
@@ -1530,8 +1526,24 @@ func (ds *DistSender) divideAndSendParallelCommit(
15301526
// concurrently with the EndTxn batch below.
15311527
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, qiIsReverse, true /* withCommit */, qiBatchIdx)
15321528
qiResponseCh <- response{reply: reply, positions: positions, pErr: pErr}
1533-
}); err != nil {
1534-
return nil, kvpb.NewError(err)
1529+
}
1530+
1531+
const taskName = "kv.DistSender: sending pre-commit query intents"
1532+
if ds.disableParallelBatches {
1533+
if err := ds.stopper.RunTask(ctx, taskName, sendPreCommit); err != nil {
1534+
return nil, kvpb.NewError(err)
1535+
}
1536+
} else {
1537+
ctx, hdl, err := ds.stopper.GetHandle(ctx, stop.TaskOpts{
1538+
TaskName: taskName,
1539+
})
1540+
if err != nil {
1541+
return nil, kvpb.NewError(err)
1542+
}
1543+
go func(ctx context.Context) {
1544+
defer hdl.Activate(ctx).Release(ctx)
1545+
sendPreCommit(ctx)
1546+
}(ctx)
15351547
}
15361548

15371549
// Adjust the original batch request to ignore the pre-commit
@@ -2069,26 +2081,25 @@ func (ds *DistSender) sendPartialBatchAsync(
20692081
responseCh chan response,
20702082
positions []int,
20712083
) bool {
2072-
if err := ds.stopper.RunAsyncTaskEx(
2073-
ctx,
2074-
stop.TaskOpts{
2075-
TaskName: "kv.DistSender: sending partial batch",
2076-
SpanOpt: stop.ChildSpan,
2077-
Sem: ds.asyncSenderSem,
2078-
WaitForSem: false,
2079-
},
2080-
func(ctx context.Context) {
2081-
ds.metrics.AsyncSentCount.Inc(1)
2082-
ds.metrics.AsyncInProgress.Inc(1)
2083-
defer ds.metrics.AsyncInProgress.Dec(1)
2084-
resp := ds.sendPartialBatch(ctx, ba, rs, isReverse, withCommit, batchIdx, routing)
2085-
resp.positions = positions
2086-
responseCh <- resp
2087-
},
2088-
); err != nil {
2084+
ctx, hdl, err := ds.stopper.GetHandle(ctx, stop.TaskOpts{
2085+
TaskName: "kv.DistSender: sending partial batch",
2086+
SpanOpt: stop.ChildSpan,
2087+
Sem: ds.asyncSenderSem,
2088+
WaitForSem: false,
2089+
})
2090+
if err != nil {
20892091
ds.metrics.AsyncThrottledCount.Inc(1)
20902092
return false
20912093
}
2094+
go func(ctx context.Context) {
2095+
defer hdl.Activate(ctx).Release(ctx)
2096+
ds.metrics.AsyncSentCount.Inc(1)
2097+
ds.metrics.AsyncInProgress.Inc(1)
2098+
defer ds.metrics.AsyncInProgress.Dec(1)
2099+
resp := ds.sendPartialBatch(ctx, ba, rs, isReverse, withCommit, batchIdx, routing)
2100+
resp.positions = positions
2101+
responseCh <- resp
2102+
}(ctx)
20922103
return true
20932104
}
20942105

pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -499,17 +499,26 @@ func (tc *txnCommitter) makeTxnCommitExplicitAsync(
499499
if multitenant.HasTenantCostControlExemption(ctx) {
500500
asyncCtx = multitenant.WithTenantCostControlExemption(asyncCtx)
501501
}
502-
if err := tc.stopper.RunAsyncTask(
503-
asyncCtx, "txnCommitter: making txn commit explicit", func(ctx context.Context) {
504-
tc.mu.Lock()
505-
defer tc.mu.Unlock()
506-
if err := makeTxnCommitExplicitLocked(ctx, tc.wrapped, txn, lockSpans); err != nil {
507-
log.Errorf(ctx, "making txn commit explicit failed for %s: %v", txn, err)
508-
}
509-
},
510-
); err != nil {
502+
503+
work := func(ctx context.Context) {
504+
tc.mu.Lock()
505+
defer tc.mu.Unlock()
506+
if err := makeTxnCommitExplicitLocked(ctx, tc.wrapped, txn, lockSpans); err != nil {
507+
log.Errorf(ctx, "making txn commit explicit failed for %s: %v", txn, err)
508+
}
509+
}
510+
511+
asyncCtx, hdl, err := tc.stopper.GetHandle(asyncCtx, stop.TaskOpts{
512+
TaskName: "txnCommitter: making txn commit explicit",
513+
})
514+
if err != nil {
511515
log.VErrEventf(ctx, 1, "failed to make txn commit explicit: %v", err)
516+
return
512517
}
518+
go func(ctx context.Context) {
519+
defer hdl.Activate(ctx).Release(ctx)
520+
work(ctx)
521+
}(asyncCtx)
513522
}
514523

515524
func makeTxnCommitExplicitLocked(

pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go

Lines changed: 66 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -566,67 +566,76 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
566566

567567
const taskName = "txnHeartbeater: aborting txn"
568568
log.VEventf(ctx, 2, "async abort for txn: %s", txn)
569-
if err := h.stopper.RunAsyncTask(h.AnnotateCtx(context.Background()), taskName,
570-
func(ctx context.Context) {
571-
if err := timeutil.RunWithTimeout(ctx, taskName, abortTxnAsyncTimeout,
572-
func(ctx context.Context) error {
573-
h.mu.Lock()
574-
defer h.mu.Unlock()
575-
576-
// If we find an abortTxnAsyncResultC, that means an async
577-
// rollback request is already in flight, so there's no
578-
// point in us running another. This can happen because the
579-
// TxnCoordSender also calls abortTxnAsyncLocked()
580-
// independently of the heartbeat loop.
581-
if h.mu.abortTxnAsyncResultC != nil {
582-
log.VEventf(ctx, 2,
583-
"skipping async abort due to concurrent async abort for %s", txn)
584-
return nil
585-
}
586-
587-
// TxnCoordSender allows EndTxn(commit=false) through even
588-
// after we set finalObservedStatus, and that request can
589-
// race with us for the mutex. Thus, if we find an in-flight
590-
// request here, after checking ifReqs=0 before being spawned,
591-
// we deduce that it must have been a rollback and there's no
592-
// point in sending another rollback.
593-
if h.mu.ifReqs > 0 {
594-
log.VEventf(ctx, 2,
595-
"skipping async abort due to client rollback for %s", txn)
596-
return nil
597-
}
598-
599-
// Set up a result channel to signal to an incoming client
600-
// rollback that an async rollback is already in progress,
601-
// and pass it the result. The buffer allows storing the
602-
// result even when no client rollback arrives. Recall that
603-
// the SendLocked() call below releases the mutex while
604-
// running, allowing concurrent incoming requests.
605-
h.mu.abortTxnAsyncResultC = make(chan abortTxnAsyncResult, 1)
606-
607-
// Send the abort request through the interceptor stack. This is
608-
// important because we need the txnPipeliner to append lock spans
609-
// to the EndTxn request.
610-
br, pErr := h.wrapped.SendLocked(ctx, ba)
611-
if pErr != nil {
612-
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
613-
h.metrics.AsyncRollbacksFailed.Inc(1)
614-
}
615-
616-
// Pass the result to a waiting client rollback, if any, and
617-
// remove the channel since we're no longer in flight.
618-
h.mu.abortTxnAsyncResultC <- abortTxnAsyncResult{br: br, pErr: pErr}
619-
h.mu.abortTxnAsyncResultC = nil
569+
570+
work := func(ctx context.Context) {
571+
if err := timeutil.RunWithTimeout(ctx, taskName, abortTxnAsyncTimeout,
572+
func(ctx context.Context) error {
573+
h.mu.Lock()
574+
defer h.mu.Unlock()
575+
576+
// If we find an abortTxnAsyncResultC, that means an async
577+
// rollback request is already in flight, so there's no
578+
// point in us running another. This can happen because the
579+
// TxnCoordSender also calls abortTxnAsyncLocked()
580+
// independently of the heartbeat loop.
581+
if h.mu.abortTxnAsyncResultC != nil {
582+
log.VEventf(ctx, 2,
583+
"skipping async abort due to concurrent async abort for %s", txn)
620584
return nil
621-
},
622-
); err != nil {
623-
log.VEventf(ctx, 1, "async abort failed for %s: %s", txn, err)
624-
}
625-
},
626-
); err != nil {
585+
}
586+
587+
// TxnCoordSender allows EndTxn(commit=false) through even
588+
// after we set finalObservedStatus, and that request can
589+
// race with us for the mutex. Thus, if we find an in-flight
590+
// request here, after checking ifReqs=0 before being spawned,
591+
// we deduce that it must have been a rollback and there's no
592+
// point in sending another rollback.
593+
if h.mu.ifReqs > 0 {
594+
log.VEventf(ctx, 2,
595+
"skipping async abort due to client rollback for %s", txn)
596+
return nil
597+
}
598+
599+
// Set up a result channel to signal to an incoming client
600+
// rollback that an async rollback is already in progress,
601+
// and pass it the result. The buffer allows storing the
602+
// result even when no client rollback arrives. Recall that
603+
// the SendLocked() call below releases the mutex while
604+
// running, allowing concurrent incoming requests.
605+
h.mu.abortTxnAsyncResultC = make(chan abortTxnAsyncResult, 1)
606+
607+
// Send the abort request through the interceptor stack. This is
608+
// important because we need the txnPipeliner to append lock spans
609+
// to the EndTxn request.
610+
br, pErr := h.wrapped.SendLocked(ctx, ba)
611+
if pErr != nil {
612+
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
613+
h.metrics.AsyncRollbacksFailed.Inc(1)
614+
}
615+
616+
// Pass the result to a waiting client rollback, if any, and
617+
// remove the channel since we're no longer in flight.
618+
h.mu.abortTxnAsyncResultC <- abortTxnAsyncResult{br: br, pErr: pErr}
619+
h.mu.abortTxnAsyncResultC = nil
620+
return nil
621+
},
622+
); err != nil {
623+
log.VEventf(ctx, 1, "async abort failed for %s: %s", txn, err)
624+
}
625+
}
626+
627+
asyncCtx, hdl, err := h.stopper.GetHandle(h.AnnotateCtx(context.Background()), stop.TaskOpts{
628+
TaskName: taskName,
629+
})
630+
if err != nil {
627631
log.Warningf(ctx, "%v", err)
628632
h.metrics.AsyncRollbacksFailed.Inc(1)
633+
return
629634
}
635+
go func(ctx context.Context) {
636+
defer hdl.Activate(ctx).Release(ctx)
637+
work(ctx)
638+
}(asyncCtx)
630639
}
631640

632641
// randLockingIndex returns the index of the first request that acquires locks

0 commit comments

Comments
 (0)