Skip to content

Commit 78d8d53

Browse files
authored
Merge pull request #159864 from arulajmani/blathers/backport-release-26.1-159820
release-26.1: kvserver: make ensureLeaderStepsDown more resolute
2 parents 83db224 + c472396 commit 78d8d53

13 files changed

+364
-306
lines changed

pkg/kv/kvserver/client_merge_test.go

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -647,29 +647,29 @@ func mergeCheckingTimestampCaches(
647647
}
648648

649649
// Applied to the leaseholder's raft transport during the partition.
650-
partitionedLeaseholderFuncs := noopRaftHandlerFuncs()
651-
partitionedLeaseholderFuncs.dropReq = func(*kvserverpb.RaftMessageRequest) bool {
650+
partitionedLeaseholderFuncs := kvtestutils.NoopRaftHandlerFuncs()
651+
partitionedLeaseholderFuncs.DropReq = func(*kvserverpb.RaftMessageRequest) bool {
652652
// Ignore everything from new leader.
653653
return true
654654
}
655655

656656
// Applied to the leader and other follower's raft transport during the
657657
// partition.
658-
partitionedLeaderFuncs := noopRaftHandlerFuncs()
659-
partitionedLeaderFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool {
658+
partitionedLeaderFuncs := kvtestutils.NoopRaftHandlerFuncs()
659+
partitionedLeaderFuncs.DropReq = func(req *kvserverpb.RaftMessageRequest) bool {
660660
// Ignore everything from leaseholder, except forwarded proposals.
661661
return req.FromReplica.StoreID == lhsStore.StoreID() &&
662662
req.Message.Type != raftpb.MsgProp
663663
}
664-
partitionedLeaderFuncs.dropHB = func(hb *kvserverpb.RaftHeartbeat) bool {
664+
partitionedLeaderFuncs.DropHB = func(hb *kvserverpb.RaftHeartbeat) bool {
665665
// Ignore heartbeats from leaseholder, results in campaign.
666666
return hb.FromReplicaID == roachpb.ReplicaID(lhsRepls[0].RaftStatus().ID)
667667
}
668668

669669
// Applied to leaseholder after the partition heals.
670670
var truncIndex kvpb.RaftIndex
671-
restoredLeaseholderFuncs := noopRaftHandlerFuncs()
672-
restoredLeaseholderFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool {
671+
restoredLeaseholderFuncs := kvtestutils.NoopRaftHandlerFuncs()
672+
restoredLeaseholderFuncs.DropReq = func(req *kvserverpb.RaftMessageRequest) bool {
673673
// Make sure that even going forward no MsgApp for what we just
674674
// truncated can make it through. The Raft transport is asynchronous
675675
// so this is necessary to make the test pass reliably - otherwise
@@ -695,16 +695,16 @@ func mergeCheckingTimestampCaches(
695695
filterMu.mergeCommitFilter = func() {
696696
// Install leader-leaseholder partition.
697697
for i, s := range lhsStores {
698-
var funcs unreliableRaftHandlerFuncs
698+
var funcs kvtestutils.UnreliableRaftHandlerFuncs
699699
if i == 0 {
700700
funcs = partitionedLeaseholderFuncs
701701
} else {
702702
funcs = partitionedLeaderFuncs
703703
}
704-
tc.Servers[i].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(s.StoreID(), &unreliableRaftHandler{
705-
rangeID: lhsDesc.GetRangeID(),
704+
tc.Servers[i].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(s.StoreID(), &kvtestutils.UnreliableRaftHandler{
705+
RangeID: lhsDesc.GetRangeID(),
706706
IncomingRaftMessageHandler: s,
707-
unreliableRaftHandlerFuncs: funcs,
707+
UnreliableRaftHandlerFuncs: funcs,
708708
})
709709
}
710710

@@ -804,10 +804,10 @@ func mergeCheckingTimestampCaches(
804804
for i, s := range lhsStores {
805805
var h kvserver.IncomingRaftMessageHandler
806806
if i == 0 {
807-
h = &unreliableRaftHandler{
808-
rangeID: lhsDesc.GetRangeID(),
807+
h = &kvtestutils.UnreliableRaftHandler{
808+
RangeID: lhsDesc.GetRangeID(),
809809
IncomingRaftMessageHandler: s,
810-
unreliableRaftHandlerFuncs: restoredLeaseholderFuncs,
810+
UnreliableRaftHandlerFuncs: restoredLeaseholderFuncs,
811811
}
812812
} else {
813813
h = s
@@ -2734,11 +2734,11 @@ func TestStoreRangeMergeSlowUnabandonedFollower_WithSplit(t *testing.T) {
27342734

27352735
// Start dropping all Raft traffic to the LHS on store2 so that it won't be
27362736
// aware that there is a merge in progress.
2737-
tc.Servers[2].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{
2738-
rangeID: lhsDesc.RangeID,
2737+
tc.Servers[2].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(store2.Ident.StoreID, &kvtestutils.UnreliableRaftHandler{
2738+
RangeID: lhsDesc.RangeID,
27392739
IncomingRaftMessageHandler: store2,
2740-
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
2741-
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
2740+
UnreliableRaftHandlerFuncs: kvtestutils.UnreliableRaftHandlerFuncs{
2741+
DropReq: func(req *kvserverpb.RaftMessageRequest) bool {
27422742
return true
27432743
},
27442744
},
@@ -3022,11 +3022,11 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi
30223022

30233023
// Start dropping all Raft traffic to the LHS replica on store2 so that it
30243024
// won't be aware that there is a merge in progress.
3025-
tc.Servers[2].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{
3026-
rangeID: lhsDesc.RangeID,
3025+
tc.Servers[2].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(store2.Ident.StoreID, &kvtestutils.UnreliableRaftHandler{
3026+
RangeID: lhsDesc.RangeID,
30273027
IncomingRaftMessageHandler: store2,
3028-
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
3029-
dropReq: func(*kvserverpb.RaftMessageRequest) bool {
3028+
UnreliableRaftHandlerFuncs: kvtestutils.UnreliableRaftHandlerFuncs{
3029+
DropReq: func(*kvserverpb.RaftMessageRequest) bool {
30303030
return true
30313031
},
30323032
},
@@ -3420,11 +3420,11 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) {
34203420
// from range 1, which will be the LHS of the split, so that store2's replica
34213421
// of range 1 never processes the split trigger, which would create an
34223422
// initialized replica of A.
3423-
unreliableHandler := &unreliableRaftHandler{
3424-
rangeID: desc.RangeID,
3423+
unreliableHandler := &kvtestutils.UnreliableRaftHandler{
3424+
RangeID: desc.RangeID,
34253425
IncomingRaftMessageHandler: store2,
3426-
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
3427-
dropReq: func(request *kvserverpb.RaftMessageRequest) bool {
3426+
UnreliableRaftHandlerFuncs: kvtestutils.UnreliableRaftHandlerFuncs{
3427+
DropReq: func(request *kvserverpb.RaftMessageRequest) bool {
34283428
return true
34293429
},
34303430
},
@@ -4096,11 +4096,11 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
40964096
aRepl0 := store0.LookupReplica(roachpb.RKey(keyA))
40974097

40984098
// Start dropping all Raft traffic to the first range on store2.
4099-
tc.Servers[2].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{
4100-
rangeID: aRepl0.RangeID,
4099+
tc.Servers[2].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(store2.Ident.StoreID, &kvtestutils.UnreliableRaftHandler{
4100+
RangeID: aRepl0.RangeID,
41014101
IncomingRaftMessageHandler: store2,
4102-
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
4103-
dropReq: func(request *kvserverpb.RaftMessageRequest) bool {
4102+
UnreliableRaftHandlerFuncs: kvtestutils.UnreliableRaftHandlerFuncs{
4103+
DropReq: func(request *kvserverpb.RaftMessageRequest) bool {
41044104
return true
41054105
},
41064106
},
@@ -4151,11 +4151,11 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
41514151

41524152
// Restore Raft traffic to the LHS on store2.
41534153
log.KvDistribution.Infof(ctx, "restored traffic to store 2")
4154-
tc.Servers[2].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{
4155-
rangeID: aRepl0.RangeID,
4154+
tc.Servers[2].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(store2.Ident.StoreID, &kvtestutils.UnreliableRaftHandler{
4155+
RangeID: aRepl0.RangeID,
41564156
IncomingRaftMessageHandler: store2,
4157-
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
4158-
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
4157+
UnreliableRaftHandlerFuncs: kvtestutils.UnreliableRaftHandlerFuncs{
4158+
DropReq: func(req *kvserverpb.RaftMessageRequest) bool {
41594159
// Make sure that even going forward no MsgApp for what we
41604160
// just truncated can make it through. The Raft transport is
41614161
// asynchronous so this is necessary to make the test pass
@@ -4168,8 +4168,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
41684168
return req.Message.Type == raftpb.MsgApp && kvpb.RaftIndex(req.Message.Index) < index
41694169
},
41704170
// Don't drop heartbeats or responses.
4171-
dropHB: func(*kvserverpb.RaftHeartbeat) bool { return false },
4172-
dropResp: func(*kvserverpb.RaftMessageResponse) bool { return false },
4171+
DropHB: func(*kvserverpb.RaftHeartbeat) bool { return false },
4172+
DropResp: func(*kvserverpb.RaftMessageResponse) bool { return false },
41734173
},
41744174
})
41754175

pkg/kv/kvserver/client_raft_epoch_leases_test.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
2222
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
2323
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
24+
"github.com/cockroachdb/cockroach/pkg/kv/kvtestutils"
2425
"github.com/cockroachdb/cockroach/pkg/raft"
2526
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
2627
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -306,18 +307,18 @@ func TestRequestsOnLaggingReplicaEpochLeases(t *testing.T) {
306307

307308
for _, i := range []int{0, 1, 2} {
308309
store := tc.GetFirstStoreFromServer(t, i)
309-
h := &unreliableRaftHandler{
310-
name: fmt.Sprintf("store %d", i),
311-
rangeID: rngDesc.RangeID,
310+
h := &kvtestutils.UnreliableRaftHandler{
311+
Name: fmt.Sprintf("store %d", i),
312+
RangeID: rngDesc.RangeID,
312313
IncomingRaftMessageHandler: store,
313314
}
314315
if i != partitionNodeIdx {
315316
// Only filter messages from the partitioned store on the other two
316317
// stores.
317-
h.dropReq = func(req *kvserverpb.RaftMessageRequest) bool {
318+
h.DropReq = func(req *kvserverpb.RaftMessageRequest) bool {
318319
return req.FromReplica.StoreID == partRepl.StoreID()
319320
}
320-
h.dropHB = func(hb *kvserverpb.RaftHeartbeat) bool {
321+
h.DropHB = func(hb *kvserverpb.RaftHeartbeat) bool {
321322
return hb.FromReplicaID == partReplDesc.ReplicaID
322323
}
323324
}
@@ -551,17 +552,17 @@ func TestSnapshotAfterTruncationWithUncommittedTailEpochLeases(t *testing.T) {
551552
//
552553
log.KvExec.Infof(ctx, "test: installing unreliable Raft transports")
553554
for _, s := range []int{0, 1, 2} {
554-
h := &unreliableRaftHandler{
555-
rangeID: partRepl.RangeID,
555+
h := &kvtestutils.UnreliableRaftHandler{
556+
RangeID: partRepl.RangeID,
556557
IncomingRaftMessageHandler: tc.GetFirstStoreFromServer(t, s),
557558
}
558559
if s != partStore {
559560
// Only filter messages from the partitioned store on the other
560561
// two stores.
561-
h.dropReq = func(req *kvserverpb.RaftMessageRequest) bool {
562+
h.DropReq = func(req *kvserverpb.RaftMessageRequest) bool {
562563
return req.FromReplica.StoreID == partRepl.StoreID()
563564
}
564-
h.dropHB = func(hb *kvserverpb.RaftHeartbeat) bool {
565+
h.DropHB = func(hb *kvserverpb.RaftHeartbeat) bool {
565566
return hb.FromReplicaID == partReplDesc.ReplicaID
566567
}
567568
}
@@ -661,20 +662,20 @@ func TestSnapshotAfterTruncationWithUncommittedTailEpochLeases(t *testing.T) {
661662
// Remove the partition. Snapshot should follow.
662663
log.KvExec.Infof(ctx, "test: removing the partition")
663664
for _, s := range []int{0, 1, 2} {
664-
tc.Servers[s].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(tc.Target(s).StoreID, &unreliableRaftHandler{
665-
rangeID: partRepl.RangeID,
665+
tc.Servers[s].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(tc.Target(s).StoreID, &kvtestutils.UnreliableRaftHandler{
666+
RangeID: partRepl.RangeID,
666667
IncomingRaftMessageHandler: tc.GetFirstStoreFromServer(t, s),
667-
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
668-
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
668+
UnreliableRaftHandlerFuncs: kvtestutils.UnreliableRaftHandlerFuncs{
669+
DropReq: func(req *kvserverpb.RaftMessageRequest) bool {
669670
// Make sure that even going forward no MsgApp for what we just truncated can
670671
// make it through. The Raft transport is asynchronous so this is necessary
671672
// to make the test pass reliably.
672673
// NB: the Index on the message is the log index that _precedes_ any of the
673674
// entries in the MsgApp, so filter where msg.Index < index, not <= index.
674675
return req.Message.Type == raftpb.MsgApp && kvpb.RaftIndex(req.Message.Index) < index
675676
},
676-
dropHB: func(*kvserverpb.RaftHeartbeat) bool { return false },
677-
dropResp: func(*kvserverpb.RaftMessageResponse) bool { return false },
677+
DropHB: func(*kvserverpb.RaftHeartbeat) bool { return false },
678+
DropResp: func(*kvserverpb.RaftMessageResponse) bool { return false },
678679
},
679680
})
680681
}

0 commit comments

Comments
 (0)