@@ -5327,9 +5327,13 @@ func TestAckWriteBeforeApplication(t *testing.T) {
53275327func TestProcessSplitAfterRightHandSideHasBeenRemoved (t * testing.T ) {
53285328 defer leaktest .AfterTest (t )()
53295329 defer log .Scope (t ).Close (t )
5330+ skip .UnderDuressWithIssue (t , 158295 )
53305331 ctx := context .Background ()
53315332
5332- testutils .RunValues (t , "lease-type" , roachpb .TestingAllLeaseTypes (), func (t * testing.T , leaseType roachpb.LeaseType ) {
5333+ // Only run with leader leases for simplicity. The test logic is not
5334+ // lease-type specific.
5335+ leaseType := roachpb .LeaseLeader
5336+ {
53335337 ctx = logtags .AddTag (ctx , "gotest" , t .Name ())
53345338 noopProposalFilter := kvserverbase .ReplicaProposalFilter (func (args kvserverbase.ProposalFilterArgs ) * kvpb.Error {
53355339 return nil
@@ -5340,11 +5344,13 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
53405344 return proposalFilter .Load ().(kvserverbase.ReplicaProposalFilter )(args )
53415345 }
53425346
5343- increment := func (t * testing.T , db * kv.DB , key roachpb.Key , by int64 ) {
5347+ // increment sends an increment request directly via the provided sender.
5348+ // We use a direct store sender (via TestSender) rather than going through
5349+ // DistSender to avoid routing issues when n1 is partitioned. See #158295.
5350+ increment := func (t * testing.T , sender kv.Sender , key roachpb.Key , by int64 ) {
53445351 t .Helper ()
5345- b := & kv.Batch {}
5346- b .AddRawRequest (incrementArgs (key , by ))
5347- require .NoError (t , db .Run (ctx , b ))
5352+ _ , pErr := kv .SendWrapped (ctx , sender , incrementArgs (key , by ))
5353+ require .NoError (t , pErr .GoError ())
53485354 }
53495355 ensureNoTombstone := func (t * testing.T , store * kvserver.Store , rangeID roachpb.RangeID ) {
53505356 t .Helper ()
@@ -5403,14 +5409,19 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
54035409 // split to succeed and the RHS to eventually also be on all 3 nodes.
54045410 setup := func (t * testing.T ) (
54055411 tc * testcluster.TestCluster ,
5406- db * kv. DB ,
5412+ leaseholderStore * kvserver. Store ,
54075413 keyA , keyB roachpb.Key ,
54085414 lhsID roachpb.RangeID ,
54095415 lhsPartition * testClusterPartitionedRange ,
54105416 ) {
54115417 lisReg := listenerutil .NewListenerRegistry ()
54125418 const numServers int = 3
54135419 stickyServerArgs := make (map [int ]base.TestServerArgs )
5420+ // Create a shared PinnedLeasesKnob so we can prevent the partitioned
5421+ // node (n1) from re-acquiring the lease after we transfer it away. This
5422+ // closes a race window where n1 could get the lease back between the
5423+ // transfer and partition activation. See #158295.
5424+ pinnedLeases := kvserver .NewPinnedLeases ()
54145425 for i := 0 ; i < numServers ; i ++ {
54155426 st := cluster .MakeTestingClusterSettings ()
54165427 kvserver .OverrideDefaultLeaseType (ctx , & st .SV , leaseType )
@@ -5437,6 +5448,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
54375448 // trigger by being caught up by a later snapshot. See #154313.
54385449 DisableRaftLogQueue : true ,
54395450 TestingProposalFilter : testingProposalFilter ,
5451+ PinnedLeases : pinnedLeases ,
54405452 },
54415453 },
54425454 RaftConfig : base.RaftConfig {
@@ -5458,7 +5470,12 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
54585470 })
54595471
54605472 tc .Stopper ().AddCloser (stop .CloserFn (lisReg .Close ))
5461- db = tc .GetFirstStoreFromServer (t , 1 ).DB ()
5473+ // Use the leaseholder's store (n3, server index 2) to send requests
5474+ // directly after the partition is activated, avoiding DistSender
5475+ // routing issues when n1 is partitioned.
5476+ leaseholderStore = tc .GetFirstStoreFromServer (t , 2 )
5477+ // Use DB for operations before the partition is activated.
5478+ db := leaseholderStore .DB ()
54625479
54635480 // Split off a non-system range so we don't have to account for node liveness
54645481 // traffic.
@@ -5473,22 +5490,30 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
54735490 lhsPartition , err := setupPartitionedRange (tc , desc .RangeID ,
54745491 0 /* replicaID */ , 0 /* partitionedNode */ , false /* activated */ , kvtestutils.UnreliableRaftHandlerFuncs {})
54755492 require .NoError (t , err )
5476- // Wait for all nodes to catch up.
5477- increment (t , db , keyA , 5 )
5493+ // Wait for all nodes to catch up. Use DB since partition is not active.
5494+ _ , pErr := kv .SendWrapped (ctx , db .NonTransactionalSender (), incrementArgs (keyA , 5 ))
5495+ require .NoError (t , pErr .GoError ())
54785496 tc .WaitForValues (t , keyA , []int64 {5 , 5 , 5 })
54795497
5498+ // Pin the lease to n3 (tc.Target(2)) so that n1 cannot re-acquire it
5499+ // after we transfer it away. This prevents a race where n1 could get
5500+ // the lease back before the partition is activated. See #158295.
5501+ pinnedLeases .PinLease (desc .RangeID , tc .Target (2 ).StoreID )
54805502 // Transfer the lease off of node 0.
54815503 tc .TransferRangeLeaseOrFatal (t , desc , tc .Target (2 ))
54825504
5483- // Make sure everybody knows about that transfer.
5484- increment (t , db , keyA , 1 )
5505+ // Make sure everybody knows about that transfer. Use DB since
5506+ // partition is not active yet.
5507+ _ , pErr = kv .SendWrapped (ctx , db .NonTransactionalSender (), incrementArgs (keyA , 1 ))
5508+ require .NoError (t , pErr .GoError ())
54855509 tc .WaitForValues (t , keyA , []int64 {6 , 6 , 6 })
54865510 log .KvExec .Infof (ctx , "activating LHS partition: %s" , lhsPartition )
54875511 lhsPartition .activate ()
54885512
5489- increment (t , db , keyA , 1 )
5513+ // After partition activation, use the leaseholder's store directly.
5514+ increment (t , leaseholderStore .TestSender (), keyA , 1 )
54905515 tc .WaitForValues (t , keyA , []int64 {6 , 7 , 7 })
5491- return tc , db , keyA , keyB , lhsID , lhsPartition
5516+ return tc , leaseholderStore , keyA , keyB , lhsID , lhsPartition
54925517 }
54935518
54945519 // In this case we only have the LHS partitioned. The RHS will learn about its
@@ -5497,21 +5522,21 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
54975522 // partition the RHS and ensure that the split does not clobber the RHS's hard
54985523 // state.
54995524 t .Run ("(1) no RHS partition" , func (t * testing.T ) {
5500- tc , db , keyA , keyB , _ , lhsPartition := setup (t )
5525+ tc , leaseholderStore , keyA , keyB , _ , lhsPartition := setup (t )
55015526
55025527 defer tc .Stopper ().Stop (ctx )
55035528 tc .SplitRangeOrFatal (t , keyB )
55045529
55055530 // Write a value which we can observe to know when the split has been
55065531 // applied by the LHS.
5507- increment (t , db , keyA , 1 )
5532+ increment (t , leaseholderStore . TestSender () , keyA , 1 )
55085533 tc .WaitForValues (t , keyA , []int64 {6 , 8 , 8 })
55095534
5510- increment (t , db , keyB , 6 )
5535+ increment (t , leaseholderStore . TestSender () , keyB , 6 )
55115536 // Wait for all non-partitioned nodes to catch up.
55125537 tc .WaitForValues (t , keyB , []int64 {0 , 6 , 6 })
55135538
5514- rhsInfo , err := getRangeInfo (ctx , db , keyB )
5539+ rhsInfo , err := getRangeInfo (ctx , leaseholderStore . DB () , keyB )
55155540 require .NoError (t , err )
55165541 rhsID := rhsInfo .Desc .RangeID
55175542 _ , store0Exists := rhsInfo .Desc .GetReplicaDescriptor (1 )
@@ -5553,21 +5578,21 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
55535578 // This case is like the previous case except the store crashes after
55545579 // laying down a tombstone.
55555580 t .Run ("(2) no RHS partition, with restart" , func (t * testing.T ) {
5556- tc , db , keyA , keyB , _ , lhsPartition := setup (t )
5581+ tc , leaseholderStore , keyA , keyB , _ , lhsPartition := setup (t )
55575582 defer tc .Stopper ().Stop (ctx )
55585583
55595584 tc .SplitRangeOrFatal (t , keyB )
55605585
55615586 // Write a value which we can observe to know when the split has been
55625587 // applied by the LHS.
5563- increment (t , db , keyA , 1 )
5588+ increment (t , leaseholderStore . TestSender () , keyA , 1 )
55645589 tc .WaitForValues (t , keyA , []int64 {6 , 8 , 8 })
55655590
5566- increment (t , db , keyB , 6 )
5591+ increment (t , leaseholderStore . TestSender () , keyB , 6 )
55675592 // Wait for all non-partitioned nodes to catch up.
55685593 tc .WaitForValues (t , keyB , []int64 {0 , 6 , 6 })
55695594
5570- rhsInfo , err := getRangeInfo (ctx , db , keyB )
5595+ rhsInfo , err := getRangeInfo (ctx , leaseholderStore . DB () , keyB )
55715596 require .NoError (t , err )
55725597 rhsID := rhsInfo .Desc .RangeID
55735598 _ , store0Exists := rhsInfo .Desc .GetReplicaDescriptor (1 )
@@ -5602,7 +5627,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
56025627 curB := int64 (6 )
56035628 for curB < 100 {
56045629 curB ++
5605- increment (t , db , keyB , 1 )
5630+ increment (t , leaseholderStore . TestSender () , keyB , 1 )
56065631 tc .WaitForValues (t , keyB , []int64 {0 , curB , curB })
56075632 }
56085633
@@ -5631,22 +5656,22 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
56315656 // the split is processed. We partition the RHS's new replica ID before
56325657 // processing the split to ensure that the RHS doesn't get initialized.
56335658 t .Run ("(3) initial replica RHS partition, no restart" , func (t * testing.T ) {
5634- tc , db , keyA , keyB , _ , lhsPartition := setup (t )
5659+ tc , leaseholderStore , keyA , keyB , _ , lhsPartition := setup (t )
56355660 defer tc .Stopper ().Stop (ctx )
56365661 var rhsPartition * testClusterPartitionedRange
56375662 partitionReplicaOnSplit (t , tc , keyB , lhsPartition , & rhsPartition )
56385663 tc .SplitRangeOrFatal (t , keyB )
56395664
56405665 // Write a value which we can observe to know when the split has been
56415666 // applied by the LHS.
5642- increment (t , db , keyA , 1 )
5667+ increment (t , leaseholderStore . TestSender () , keyA , 1 )
56435668 tc .WaitForValues (t , keyA , []int64 {6 , 8 , 8 })
56445669
5645- increment (t , db , keyB , 6 )
5670+ increment (t , leaseholderStore . TestSender () , keyB , 6 )
56465671 // Wait for all non-partitioned nodes to catch up.
56475672 tc .WaitForValues (t , keyB , []int64 {0 , 6 , 6 })
56485673
5649- rhsInfo , err := getRangeInfo (ctx , db , keyB )
5674+ rhsInfo , err := getRangeInfo (ctx , leaseholderStore . DB () , keyB )
56505675 require .NoError (t , err )
56515676 rhsID := rhsInfo .Desc .RangeID
56525677 _ , store0Exists := rhsInfo .Desc .GetReplicaDescriptor (1 )
@@ -5698,35 +5723,32 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
56985723 // about its higher replica ID the store crashes. However, it doesn't forget
56995724 // this ID, so the RHS replica is still not initialized by the split.
57005725 t .Run ("(4) initial replica RHS partition, with restart" , func (t * testing.T ) {
5701- tc , db , keyA , keyB , _ , lhsPartition := setup (t )
5726+ tc , leaseholderStore , keyA , keyB , _ , lhsPartition := setup (t )
57025727 defer tc .Stopper ().Stop (ctx )
57035728 var rhsPartition * testClusterPartitionedRange
57045729
57055730 partitionReplicaOnSplit (t , tc , keyB , lhsPartition , & rhsPartition )
57065731 tc .SplitRangeOrFatal (t , keyB )
57075732
5708- if leaseType == roachpb .LeaseLeader {
5709- // Since both LHS and RHS use the same store, let's remove the store
5710- // partition from `rhsPartition` and keep it only in `lhsPartition`.
5711- // This will help us control the store partition using one object.
5712- // TODO(ibrahim): Make the test pass when both LHS and RHS ranges are
5713- // recovered at the same time.
5714- store , err := tc .Servers [0 ].GetStores ().(* kvserver.Stores ).
5715- GetStore (tc .Servers [0 ].GetFirstStoreID ())
5716- require .NoError (t , err )
5717- rhsPartition .removeStore (store .StoreID ())
5718- }
5733+ // Since both LHS and RHS use the same store (leader leases), remove
5734+ // the store partition from `rhsPartition` and keep it only in
5735+ // `lhsPartition`. This lets us control the store partition using
5736+ // one object.
5737+ store , err := tc .Servers [0 ].GetStores ().(* kvserver.Stores ).
5738+ GetStore (tc .Servers [0 ].GetFirstStoreID ())
5739+ require .NoError (t , err )
5740+ rhsPartition .removeStore (store .StoreID ())
57195741
57205742 // Write a value which we can observe to know when the split has been
57215743 // applied by the LHS.
5722- increment (t , db , keyA , 1 )
5744+ increment (t , leaseholderStore . TestSender () , keyA , 1 )
57235745 tc .WaitForValues (t , keyA , []int64 {6 , 8 , 8 })
57245746
5725- increment (t , db , keyB , 6 )
5747+ increment (t , leaseholderStore . TestSender () , keyB , 6 )
57265748 // Wait for all non-partitioned nodes to catch up.
57275749 tc .WaitForValues (t , keyB , []int64 {0 , 6 , 6 })
57285750
5729- rhsInfo , err := getRangeInfo (ctx , db , keyB )
5751+ rhsInfo , err := getRangeInfo (ctx , leaseholderStore . DB () , keyB )
57305752 require .NoError (t , err )
57315753 rhsID := rhsInfo .Desc .RangeID
57325754 _ , store0Exists := rhsInfo .Desc .GetReplicaDescriptor (1 )
@@ -5765,7 +5787,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
57655787 curB := int64 (6 )
57665788 for curB < 100 {
57675789 curB ++
5768- increment (t , db , keyB , 1 )
5790+ increment (t , leaseholderStore . TestSender () , keyB , 1 )
57695791 tc .WaitForValues (t , keyB , []int64 {0 , curB , curB })
57705792 }
57715793
@@ -5791,7 +5813,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
57915813 })
57925814 tc .WaitForValues (t , keyB , []int64 {curB , curB , curB })
57935815 })
5794- })
5816+ }
57955817}
57965818
57975819type noopRaftMessageResponseStream struct {}
0 commit comments