Skip to content

Commit 1925449

Browse files
craig[bot]Andrew BaptistYevgeniy Miretskiy
committed
108197: kvserver: pass Desc and SpanConf through allocator r=kvoli a=andrewbaptist Previously the different layers of the allocator would load the Desc and SpanConf as needed, this had a risk of them changing between various loads and could cause strange and hard to track down races. Now they are loaded once and passed through all the layers. Epic: none Release note: None 108399: roachtest: Treat job query error as retryable r=miretskiy a=miretskiy Retry attempts to retrieve job status (in cdc test) as long as the parent context is active. Attempts to query jobs table may fail if the cluster is under significant load. Epic: None Fixes cockroachdb#108399 Release note: None Co-authored-by: Andrew Baptist <[email protected]> Co-authored-by: Yevgeniy Miretskiy <[email protected]>
3 parents b91c919 + 8478b85 + 8fc7974 commit 1925449

File tree

14 files changed

+169
-108
lines changed

14 files changed

+169
-108
lines changed

pkg/cmd/roachtest/tests/cdc_bench.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
2929
"github.com/cockroachdb/cockroach/pkg/jobs"
3030
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
31+
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
3132
"github.com/cockroachdb/cockroach/pkg/util/randutil"
3233
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3334
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
@@ -293,7 +294,7 @@ func runCDCBenchScan(
293294
// Wait for the changefeed to complete, and compute throughput.
294295
m.Go(func(ctx context.Context) error {
295296
t.L().Printf("waiting for changefeed to finish")
296-
info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) {
297+
info, err := waitForChangefeed(ctx, conn, jobID, t.L(), func(info changefeedInfo) (bool, error) {
297298
switch jobs.Status(info.status) {
298299
case jobs.StatusSucceeded:
299300
return true, nil
@@ -444,7 +445,7 @@ func runCDCBenchWorkload(
444445
// the changefeed wasn't lagging by more than 1-2 minutes, but with 100k
445446
// ranges it was found to sometimes lag by over 8 minutes.
446447
m.Go(func(ctx context.Context) error {
447-
info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) {
448+
info, err := waitForChangefeed(ctx, conn, jobID, t.L(), func(info changefeedInfo) (bool, error) {
448449
switch jobs.Status(info.status) {
449450
case jobs.StatusPending, jobs.StatusRunning:
450451
doneValue := done.Load()
@@ -465,7 +466,7 @@ func runCDCBenchWorkload(
465466
now := timeutil.Now()
466467
t.L().Printf("waiting for changefeed watermark to reach current time (%s)",
467468
now.Format(time.RFC3339))
468-
info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) {
469+
info, err := waitForChangefeed(ctx, conn, jobID, t.L(), func(info changefeedInfo) (bool, error) {
469470
switch jobs.Status(info.status) {
470471
case jobs.StatusPending, jobs.StatusRunning:
471472
return info.highwaterTime.After(now), nil
@@ -539,11 +540,15 @@ func getAllZoneTargets(ctx context.Context, t test.Test, conn *gosql.DB) []strin
539540

540541
// waitForChangefeed waits until the changefeed satisfies the given closure.
541542
func waitForChangefeed(
542-
ctx context.Context, conn *gosql.DB, jobID int, f func(changefeedInfo) (bool, error),
543+
ctx context.Context,
544+
conn *gosql.DB,
545+
jobID int,
546+
logger *logger.Logger,
547+
f func(changefeedInfo) (bool, error),
543548
) (changefeedInfo, error) {
544549
ticker := time.NewTicker(5 * time.Second)
545550
defer ticker.Stop()
546-
for {
551+
for attempt := 0; ; attempt++ {
547552
select {
548553
case <-ticker.C:
549554
case <-ctx.Done():
@@ -552,7 +557,11 @@ func waitForChangefeed(
552557

553558
info, err := getChangefeedInfo(conn, jobID)
554559
if err != nil {
555-
return changefeedInfo{}, err
560+
logger.Errorf("error getting changefeed info: %v (attempt %d)", err, attempt+1)
561+
if attempt > 5 {
562+
return changefeedInfo{}, errors.Wrap(err, "failed 5 attempts to get changefeed info")
563+
}
564+
continue
556565
} else if info.errMsg != "" {
557566
return changefeedInfo{}, errors.Errorf("changefeed error: %s", info.errMsg)
558567
}

pkg/kv/kvserver/allocator/allocatorimpl/allocator.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1961,13 +1961,13 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO
19611961
func (a *Allocator) ValidLeaseTargets(
19621962
ctx context.Context,
19631963
storePool storepool.AllocatorStorePool,
1964+
desc *roachpb.RangeDescriptor,
19641965
conf roachpb.SpanConfig,
19651966
existing []roachpb.ReplicaDescriptor,
19661967
leaseRepl interface {
19671968
StoreID() roachpb.StoreID
19681969
RaftStatus() *raft.Status
19691970
GetFirstIndex() kvpb.RaftIndex
1970-
Desc() *roachpb.RangeDescriptor
19711971
},
19721972
opts allocator.TransferLeaseOptions,
19731973
) []roachpb.ReplicaDescriptor {
@@ -2014,9 +2014,8 @@ func (a *Allocator) ValidLeaseTargets(
20142014
// replica set, however are in the candidate list. Uninitialized
20152015
// replicas will always need a snapshot.
20162016
existingCandidates := []roachpb.ReplicaDescriptor{}
2017-
rangeDesc := leaseRepl.Desc()
20182017
for _, candidate := range candidates {
2019-
if _, ok := rangeDesc.GetReplicaDescriptor(candidate.StoreID); ok {
2018+
if _, ok := desc.GetReplicaDescriptor(candidate.StoreID); ok {
20202019
existingCandidates = append(existingCandidates, candidate)
20212020
} else {
20222021
validSnapshotCandidates = append(validSnapshotCandidates, candidate)
@@ -2027,7 +2026,7 @@ func (a *Allocator) ValidLeaseTargets(
20272026

20282027
status := leaseRepl.RaftStatus()
20292028
if a.knobs != nil && a.knobs.RaftStatusFn != nil {
2030-
status = a.knobs.RaftStatusFn(leaseRepl)
2029+
status = a.knobs.RaftStatusFn(desc, leaseRepl.StoreID())
20312030
}
20322031

20332032
candidates = append(validSnapshotCandidates, excludeReplicasInNeedOfSnapshots(
@@ -2220,14 +2219,14 @@ func (a *Allocator) IOOverloadOptions() IOOverloadOptions {
22202219
func (a *Allocator) TransferLeaseTarget(
22212220
ctx context.Context,
22222221
storePool storepool.AllocatorStorePool,
2222+
desc *roachpb.RangeDescriptor,
22232223
conf roachpb.SpanConfig,
22242224
existing []roachpb.ReplicaDescriptor,
22252225
leaseRepl interface {
22262226
StoreID() roachpb.StoreID
22272227
GetRangeID() roachpb.RangeID
22282228
RaftStatus() *raft.Status
22292229
GetFirstIndex() kvpb.RaftIndex
2230-
Desc() *roachpb.RangeDescriptor
22312230
},
22322231
usageInfo allocator.RangeUsageInfo,
22332232
forceDecisionWithoutStats bool,
@@ -2259,7 +2258,7 @@ func (a *Allocator) TransferLeaseTarget(
22592258
return roachpb.ReplicaDescriptor{}
22602259
}
22612260

2262-
validTargets := a.ValidLeaseTargets(ctx, storePool, conf, existing, leaseRepl, opts)
2261+
validTargets := a.ValidLeaseTargets(ctx, storePool, desc, conf, existing, leaseRepl, opts)
22632262

22642263
// Short-circuit if there are no valid targets out there.
22652264
if len(validTargets) == 0 || (len(validTargets) == 1 && validTargets[0].StoreID == leaseRepl.StoreID()) {
@@ -2494,13 +2493,13 @@ func getLoadDelta(
24942493
func (a *Allocator) ShouldTransferLease(
24952494
ctx context.Context,
24962495
storePool storepool.AllocatorStorePool,
2496+
desc *roachpb.RangeDescriptor,
24972497
conf roachpb.SpanConfig,
24982498
existing []roachpb.ReplicaDescriptor,
24992499
leaseRepl interface {
25002500
StoreID() roachpb.StoreID
25012501
RaftStatus() *raft.Status
25022502
GetFirstIndex() kvpb.RaftIndex
2503-
Desc() *roachpb.RangeDescriptor
25042503
},
25052504
usageInfo allocator.RangeUsageInfo,
25062505
) bool {
@@ -2510,6 +2509,7 @@ func (a *Allocator) ShouldTransferLease(
25102509
existing = a.ValidLeaseTargets(
25112510
ctx,
25122511
storePool,
2512+
desc,
25132513
conf,
25142514
existing,
25152515
leaseRepl,

pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1967,6 +1967,7 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) {
19671967
target := a.TransferLeaseTarget(
19681968
ctx,
19691969
sp,
1970+
&roachpb.RangeDescriptor{},
19701971
emptySpanConfig(),
19711972
c.existing,
19721973
&mockRepl{
@@ -2096,6 +2097,7 @@ func TestAllocatorTransferLeaseTargetIOOverloadCheck(t *testing.T) {
20962097
target := a.TransferLeaseTarget(
20972098
ctx,
20982099
sp,
2100+
&roachpb.RangeDescriptor{},
20992101
emptySpanConfig(),
21002102
existing,
21012103
&mockRepl{
@@ -2211,6 +2213,7 @@ func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) {
22112213
target := a.TransferLeaseTarget(
22122214
ctx,
22132215
sp,
2216+
&roachpb.RangeDescriptor{},
22142217
emptySpanConfig(),
22152218
c.existing,
22162219
repl,
@@ -2303,6 +2306,7 @@ func TestAllocatorTransferLeaseTargetConstraints(t *testing.T) {
23032306
target := a.TransferLeaseTarget(
23042307
context.Background(),
23052308
sp,
2309+
&roachpb.RangeDescriptor{},
23062310
c.conf,
23072311
c.existing,
23082312
&mockRepl{
@@ -2415,6 +2419,7 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) {
24152419
target := a.TransferLeaseTarget(
24162420
ctx,
24172421
storePool,
2422+
&roachpb.RangeDescriptor{},
24182423
c.conf,
24192424
c.existing,
24202425
&mockRepl{
@@ -2699,6 +2704,7 @@ func TestAllocatorShouldTransferLease(t *testing.T) {
26992704
result := a.ShouldTransferLease(
27002705
ctx,
27012706
sp,
2707+
&roachpb.RangeDescriptor{},
27022708
emptySpanConfig(),
27032709
c.existing,
27042710
&mockRepl{
@@ -2767,6 +2773,7 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) {
27672773
result := a.ShouldTransferLease(
27682774
ctx,
27692775
storePool,
2776+
&roachpb.RangeDescriptor{},
27702777
emptySpanConfig(),
27712778
c.existing,
27722779
&mockRepl{
@@ -2814,6 +2821,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) {
28142821
result := a.ShouldTransferLease(
28152822
ctx,
28162823
storePool,
2824+
&roachpb.RangeDescriptor{},
28172825
emptySpanConfig(),
28182826
replicas(1, 2, 3),
28192827
&mockRepl{storeID: 2, replicationFactor: 3},
@@ -2955,6 +2963,7 @@ func TestAllocatorLeasePreferences(t *testing.T) {
29552963
result := a.ShouldTransferLease(
29562964
ctx,
29572965
sp,
2966+
&roachpb.RangeDescriptor{},
29582967
conf,
29592968
c.existing,
29602969
&mockRepl{
@@ -2970,6 +2979,7 @@ func TestAllocatorLeasePreferences(t *testing.T) {
29702979
target := a.TransferLeaseTarget(
29712980
ctx,
29722981
sp,
2982+
&roachpb.RangeDescriptor{},
29732983
conf,
29742984
c.existing,
29752985
&mockRepl{
@@ -2989,6 +2999,7 @@ func TestAllocatorLeasePreferences(t *testing.T) {
29892999
target = a.TransferLeaseTarget(
29903000
ctx,
29913001
sp,
3002+
&roachpb.RangeDescriptor{},
29923003
conf,
29933004
c.existing,
29943005
&mockRepl{
@@ -3082,6 +3093,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) {
30823093
target := a.TransferLeaseTarget(
30833094
ctx,
30843095
sp,
3096+
&roachpb.RangeDescriptor{},
30853097
conf,
30863098
c.existing,
30873099
&mockRepl{
@@ -3102,6 +3114,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) {
31023114
target = a.TransferLeaseTarget(
31033115
ctx,
31043116
sp,
3117+
&roachpb.RangeDescriptor{},
31053118
conf,
31063119
c.existing,
31073120
&mockRepl{
@@ -5732,6 +5745,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
57325745
target := a.TransferLeaseTarget(
57335746
ctx,
57345747
storePool,
5748+
&roachpb.RangeDescriptor{},
57355749
emptySpanConfig(),
57365750
existing,
57375751
&mockRepl{

pkg/kv/kvserver/allocator/base.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,10 @@ type TestingKnobs struct {
9191
// targets produced by the Allocator to include replicas that may be waiting
9292
// for snapshots.
9393
AllowLeaseTransfersToReplicasNeedingSnapshots bool
94-
RaftStatusFn func(r interface {
95-
Desc() *roachpb.RangeDescriptor
96-
StoreID() roachpb.StoreID
97-
}) *raft.Status
94+
RaftStatusFn func(
95+
desc *roachpb.RangeDescriptor,
96+
storeID roachpb.StoreID,
97+
) *raft.Status
9898
// BlockTransferTarget can be used to block returning any transfer targets
9999
// from TransferLeaseTarget.
100100
BlockTransferTarget func() bool

0 commit comments

Comments
 (0)