Skip to content

Commit 424e966

Browse files
craig[bot]tbg
andcommitted
Merge #143124
143124: kvserver: go through DB for GC r=tbg a=tbg Instead of doing a bespoke thing that hands requests to the replica directly and bypasses a lot of plumbing, go through the DB. This means GCRequest enters the Replica like every other request, after passing through the DistSender stack and being routed through the Node. This avoids surprises such as discovered in #143122. Epic: none Release note: None Co-authored-by: Tobias Grieger <[email protected]>
2 parents 62c9d63 + 367caec commit 424e966

File tree

5 files changed

+42
-60
lines changed

5 files changed

+42
-60
lines changed

pkg/kv/kvserver/kvserverbase/base.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,15 @@ var _ redact.SafeFormatter = CmdIDKey("")
132132

133133
// FilterArgs groups the arguments to a ReplicaCommandFilter.
134134
type FilterArgs struct {
135-
Ctx context.Context
136-
CmdID CmdIDKey
137-
Index int
138-
Sid roachpb.StoreID
139-
Req kvpb.Request
140-
Hdr kvpb.Header
141-
Version roachpb.Version
142-
Err error // only used for TestingPostEvalFilter
135+
Ctx context.Context
136+
CmdID CmdIDKey
137+
Index int
138+
Sid roachpb.StoreID
139+
Req kvpb.Request
140+
Hdr kvpb.Header
141+
AdmissionHdr kvpb.AdmissionHeader
142+
Version roachpb.Version
143+
Err error // only used for TestingPostEvalFilter
143144
}
144145

145146
// ProposalFilterArgs groups the arguments to ReplicaProposalFilter.

pkg/kv/kvserver/mvcc_gc_queue.go

Lines changed: 13 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"github.com/cockroachdb/cockroach/pkg/keys"
1617
"github.com/cockroachdb/cockroach/pkg/kv"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1819
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
@@ -123,16 +124,6 @@ var EnqueueInMvccGCQueueOnSpanConfigUpdateEnabled = settings.RegisterBoolSetting
123124
false,
124125
)
125126

126-
// See https://github.com/cockroachdb/cockroach/pull/143122.
127-
var mvccGCQueueFullyEnableAC = settings.RegisterBoolSetting(
128-
settings.SystemOnly,
129-
"kv.mvcc_gc.queue_kv_admission_control.enabled",
130-
"when true, MVCC GC queue operations are subject to store admission control. If set to false, "+
131-
"since store admission control will be disabled, replication flow control will also be effectively disabled. "+
132-
"This setting does not affect CPU admission control.",
133-
true,
134-
)
135-
136127
func largeAbortSpan(ms enginepb.MVCCStats) bool {
137128
// Checks if the size of the abort span exceeds the given threshold.
138129
// The abort span is not supposed to become that large, but it does
@@ -594,6 +585,11 @@ func (r *replicaGCer) template() kvpb.GCRequest {
594585
desc := r.repl.Desc()
595586
var template kvpb.GCRequest
596587
template.Key = desc.StartKey.AsRawKey()
588+
if r.repl.RangeID == 1 {
589+
// r1 should really start at LocalMax but it starts "officially" at KeyMin
590+
// which is not addressable.
591+
template.Key = keys.LocalMax
592+
}
597593
template.EndKey = desc.EndKey.AsRawKey()
598594

599595
return template
@@ -603,34 +599,13 @@ func (r *replicaGCer) send(ctx context.Context, req kvpb.GCRequest) error {
603599
n := atomic.AddInt32(&r.count, 1)
604600
log.Eventf(ctx, "sending batch %d (%d keys, %d rangekeys)", n, len(req.Keys), len(req.RangeKeys))
605601

606-
ba := &kvpb.BatchRequest{}
607-
// Technically not needed since we're talking directly to the Replica.
608-
ba.RangeID = r.repl.Desc().RangeID
609-
ba.Timestamp = r.repl.Clock().Now()
610-
ba.Add(&req)
611-
// Since we are talking directly to the replica, we need to explicitly do
612-
// admission control here, as we are bypassing server.Node.
613-
var admissionHandle kvadmission.Handle
614-
if r.admissionController != nil {
615-
ba.AdmissionHeader = gcAdmissionHeader(r.repl.ClusterSettings())
616-
ba.Replica.StoreID = r.storeID
617-
var err error
618-
admissionHandle, err = r.admissionController.AdmitKVWork(ctx, roachpb.SystemTenantID, ba)
619-
if err != nil {
620-
return err
621-
}
622-
if mvccGCQueueFullyEnableAC.Get(&r.repl.ClusterSettings().SV) {
623-
ctx = admissionHandle.AnnotateCtx(ctx)
624-
}
625-
}
626-
_, writeBytes, pErr := r.repl.SendWithWriteBytes(ctx, ba)
627-
defer writeBytes.Release()
628-
if r.admissionController != nil {
629-
r.admissionController.AdmittedKVWorkDone(admissionHandle, writeBytes)
630-
}
631-
if pErr != nil {
632-
log.VErrEventf(ctx, 2, "%v", pErr.String())
633-
return pErr.GoError()
602+
var b kv.Batch
603+
b.AddRawRequest(&req)
604+
b.AdmissionHeader = gcAdmissionHeader(r.repl.ClusterSettings())
605+
606+
if err := r.repl.store.cfg.DB.Run(ctx, &b); err != nil {
607+
log.Infof(ctx, "%s", err)
608+
return err
634609
}
635610
return nil
636611
}

pkg/kv/kvserver/mvcc_gc_queue_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1397,6 +1397,9 @@ func TestMVCCGCQueueChunkRequests(t *testing.T) {
13971397
func(filterArgs kvserverbase.FilterArgs) *kvpb.Error {
13981398
if _, ok := filterArgs.Req.(*kvpb.GCRequest); ok {
13991399
atomic.AddInt32(&gcRequests, 1)
1400+
// Verify that all MVCC GC requests have their admission control header
1401+
// populated.
1402+
assert.NotZero(t, filterArgs.AdmissionHdr.CreateTime)
14001403
return nil
14011404
}
14021405
return nil

pkg/kv/kvserver/replica_evaluate.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -324,13 +324,14 @@ func evaluateBatch(
324324
// If a unittest filter was installed, check for an injected error; otherwise, continue.
325325
if filter := rec.EvalKnobs().TestingEvalFilter; filter != nil {
326326
filterArgs := kvserverbase.FilterArgs{
327-
Ctx: ctx,
328-
CmdID: idKey,
329-
Index: index,
330-
Sid: rec.StoreID(),
331-
Req: args,
332-
Version: rec.ClusterSettings().Version.ActiveVersionOrEmpty(ctx).Version,
333-
Hdr: baHeader,
327+
Ctx: ctx,
328+
CmdID: idKey,
329+
Index: index,
330+
Sid: rec.StoreID(),
331+
Req: args,
332+
Version: rec.ClusterSettings().Version.ActiveVersionOrEmpty(ctx).Version,
333+
Hdr: baHeader,
334+
AdmissionHdr: ba.AdmissionHeader,
334335
}
335336
if pErr := filter(filterArgs); pErr != nil {
336337
if pErr.GetTxn() == nil {
@@ -356,13 +357,14 @@ func evaluateBatch(
356357

357358
if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil {
358359
filterArgs := kvserverbase.FilterArgs{
359-
Ctx: ctx,
360-
CmdID: idKey,
361-
Index: index,
362-
Sid: rec.StoreID(),
363-
Req: args,
364-
Hdr: baHeader,
365-
Err: err,
360+
Ctx: ctx,
361+
CmdID: idKey,
362+
Index: index,
363+
Sid: rec.StoreID(),
364+
Req: args,
365+
Hdr: baHeader,
366+
AdmissionHdr: ba.AdmissionHeader,
367+
Err: err,
366368
}
367369
if pErr := filter(filterArgs); pErr != nil {
368370
if pErr.GetTxn() == nil {

pkg/settings/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ var retiredSettings = map[InternalKey]struct{}{
259259

260260
// removed as of 25.2
261261
"kv.snapshot_receiver.excise.enabled": {},
262+
"kv.mvcc_gc.queue_kv_admission_control.enabled": {},
262263
"sql.catalog.experimental_use_session_based_leasing": {},
263264
"bulkio.backup.merge_file_buffer_size": {},
264265
"changefeed.new_webhook_sink_enabled": {},

0 commit comments

Comments
 (0)