Skip to content

Commit d66e35a

Browse files
authored
Merge pull request #151956 from cockroachdb/blathers/backport-release-25.3-151923
release-25.3: kvserver: deflake TestTxnReadWithinUncertaintyIntervalAfterLeaseTransfer
2 parents aa09066 + 44770d8 commit d66e35a

File tree

1 file changed

+44
-10
lines changed

1 file changed

+44
-10
lines changed

pkg/kv/kvserver/client_replica_test.go

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,8 @@ func TestTxnReadWithinUncertaintyIntervalAfterLeaseTransfer(t *testing.T) {
777777
defer leaktest.AfterTest(t)()
778778
defer log.Scope(t).Close(t)
779779

780+
// Increase the verbosity of the test to help investigate failures.
781+
require.NoError(t, log.SetVModule("replica_range_lease=3,raft=4,txn=3,txn_coord_sender=3"))
780782
const numNodes = 2
781783
var manuals []*hlc.HybridManualClock
782784
var clocks []*hlc.Clock
@@ -804,42 +806,69 @@ func TestTxnReadWithinUncertaintyIntervalAfterLeaseTransfer(t *testing.T) {
804806
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
805807
tc.SplitRangeOrFatal(t, keyA)
806808
keyADesc, keyBDesc := tc.SplitRangeOrFatal(t, keyB)
809+
t.Logf("split range at keyB=%s, got descriptors: keyADesc=%+v, keyBDesc=%+v",
810+
keyB, keyADesc, keyBDesc)
811+
807812
// Place key A's sole replica on node 1 and key B's sole replica on node 2.
808813
tc.AddVotersOrFatal(t, keyB, tc.Target(1))
814+
t.Logf("added voter for keyB=%s to node 2", keyB)
809815
tc.TransferRangeLeaseOrFatal(t, keyBDesc, tc.Target(1))
816+
t.Logf("transferred lease for keyB range to node 2")
810817
tc.RemoveVotersOrFatal(t, keyB, tc.Target(0))
818+
t.Logf("removed voter for keyB=%s from node 1", keyB)
811819

812820
// Pause the servers' clocks going forward.
813-
var maxNanos int64
814821
for i, m := range manuals {
815822
m.Pause()
816-
if cur := m.UnixNano(); cur > maxNanos {
817-
maxNanos = cur
818-
}
819823
clocks = append(clocks, tc.Servers[i].Clock())
820824
}
821-
// After doing so, perfectly synchronize them.
822-
for _, m := range manuals {
823-
m.Increment(maxNanos - m.UnixNano())
824-
}
825+
826+
// Synchronize the clocks. We wrap this in a SucceedsSoon to avoid messages
827+
// between the nodes to cause them to remain out of sync.
828+
var nowN1, nowN2 hlc.Timestamp
829+
testutils.SucceedsSoon(t, func() error {
830+
// Find the maximum clock value.
831+
maxNanos := manuals[0].UnixNano()
832+
for _, m := range manuals[1:] {
833+
maxNanos = max(maxNanos, m.UnixNano())
834+
}
835+
// After doing so, perfectly synchronize them.
836+
for _, m := range manuals {
837+
m.Increment(maxNanos - m.UnixNano())
838+
}
839+
840+
nowN1, nowN2 = clocks[0].Now(), clocks[1].Now()
841+
if nowN1.WallTime != nowN2.WallTime {
842+
return errors.Errorf("clocks are not synchronized: n1's clock: %+v, n2's clock: %+v",
843+
nowN1, nowN2)
844+
}
845+
return nil
846+
})
847+
848+
t.Logf("all clocks synchronized: n1's clock: %+v, n2's clock: %+v", nowN1, nowN2)
825849

826850
// Create a new transaction using the second node as the gateway.
827-
now := clocks[1].Now()
828851
maxOffset := clocks[1].MaxOffset().Nanoseconds()
829852
require.NotZero(t, maxOffset)
830-
txn := roachpb.MakeTransaction("test", keyB, isolation.Serializable, 1, now, maxOffset, int32(tc.Servers[1].SQLInstanceID()), 0, false /* omitInRangefeeds */)
853+
txn := roachpb.MakeTransaction("test", keyB, isolation.Serializable, 1, nowN2, maxOffset, int32(tc.Servers[1].SQLInstanceID()), 0, false /* omitInRangefeeds */)
854+
t.Logf("created transaction: %+v", txn)
831855
require.True(t, txn.ReadTimestamp.Less(txn.GlobalUncertaintyLimit))
832856
require.Len(t, txn.ObservedTimestamps, 0)
833857

834858
// Collect an observed timestamp in that transaction from node 2.
859+
t.Logf("collecting observed timestamp from node 2 for keyB=%s", keyB)
835860
getB := getArgs(keyB)
836861
resp, pErr := kv.SendWrappedWith(ctx, tc.Servers[1].DistSenderI().(kv.Sender), kvpb.Header{Txn: &txn}, getB)
837862
require.Nil(t, pErr)
863+
t.Logf("successfully read keyB=%s, response: %+v", keyB, resp)
838864
txn.Update(resp.Header().Txn)
839865
require.Len(t, txn.ObservedTimestamps, 1)
866+
t.Logf("updated transaction with observed timestamp, now has %+v observed timestamps", txn.ObservedTimestamps)
840867

841868
// Advance the clock on the first node.
842869
manuals[0].Increment(100)
870+
nowN1, nowN2 = clocks[0].Now(), clocks[1].Now()
871+
t.Logf("advanced n1's clock by 100ns: n1's clock: %+v, n2's clock: %+v", nowN1, nowN2)
843872

844873
// Perform a non-txn write on node 1. This will grab a timestamp from node 1's
845874
// clock, which leads the clock on node 2.
@@ -856,15 +885,20 @@ func TestTxnReadWithinUncertaintyIntervalAfterLeaseTransfer(t *testing.T) {
856885
// flakiness. For now, we just re-order the operations and assert that we
857886
// receive an uncertainty error even though its absence would not be a true
858887
// stale read.
888+
t.Logf("Performing non-txn write on node 0 for keyA=%s", keyA)
859889
ba := &kvpb.BatchRequest{}
860890
ba.Add(putArgs(keyA, []byte("val")))
861891
br, pErr := tc.Servers[0].DistSenderI().(kv.Sender).Send(ctx, ba)
862892
require.Nil(t, pErr)
863893
writeTs := br.Timestamp
894+
t.Logf("Successfully wrote keyA=%s, write timestamp: %s", keyA, writeTs)
864895

865896
// The transaction has a read timestamp beneath the write's commit timestamp
866897
// but a global uncertainty limit above the write's commit timestamp. The
867898
// observed timestamp collected is also beneath the write's commit timestamp.
899+
nowN1, nowN2 = clocks[0].Now(), clocks[1].Now()
900+
t.Logf("validating the clocks before test assertions: n1's clock: %+v, n2's clock: %+v",
901+
nowN1, nowN2)
868902
assert.True(t, txn.ReadTimestamp.Less(writeTs))
869903
assert.True(t, writeTs.Less(txn.GlobalUncertaintyLimit))
870904
assert.True(t, txn.ObservedTimestamps[0].Timestamp.ToTimestamp().Less(writeTs))

0 commit comments

Comments
 (0)