Skip to content

Commit b1839bf

Browse files
craig[bot]wenyihu6
andcommitted
Merge #143795
143795: kvserver: use policy refresher for closed timestamp policy r=arulajmani a=wenyihu6 **kvserver: use policy refresher for closed timestamp policy** Previously, closed timestamp policies were computed on-demand during `r.closedTimestampPolicyRLocked` under RLock. This commit changes it so that is refreshed async during specific events instead of on-demand during `r.closedTimestampPolicyRLocked`. This helps reduce the time spent under the `r.mu.RLock()` in `closedTimestampPolicyRLocked` and also helps pave the foundation for future changes to use latency based closed timestamp policies. Policies are now refreshed on demand during specific events: - when there is a leaseholder change leasePostApplyLocked - when there is a config change r.SetSpanConfig - or periodically at the interval of kv.closed_timestamp.policy_refresh_interval. Part of: #59680 Release note: none --- **kvserver: rename registerLeaseholder to AndRefreshPolicy** This commit renames registerLeaseholder to registerLeaseholderAndRefreshPolicy to reflect its changes. In addition to registering the replica as a leaseholder, it now also triggersa closed timestamp policy refresh on the replica. Epic: none Release note: none Co-authored-by: wenyihu6 <[email protected]>
2 parents 75f4731 + c90ef6e commit b1839bf

File tree

13 files changed

+261
-15
lines changed

13 files changed

+261
-15
lines changed

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ go_library(
141141
"//pkg/kv/kvserver/benignerror",
142142
"//pkg/kv/kvserver/closedts",
143143
"//pkg/kv/kvserver/closedts/ctpb",
144+
"//pkg/kv/kvserver/closedts/policyrefresher",
144145
"//pkg/kv/kvserver/closedts/sidetransport",
145146
"//pkg/kv/kvserver/closedts/tracker",
146147
"//pkg/kv/kvserver/concurrency",

pkg/kv/kvserver/client_split_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4140,7 +4140,12 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
41404140

41414141
// Verify that the closed timestamp policy is set up.
41424142
repl := store.LookupReplica(roachpb.RKey(descKey))
4143-
require.Equal(t, repl.ClosedTimestampPolicy(), roachpb.LEAD_FOR_GLOBAL_READS)
4143+
testutils.SucceedsSoon(t, func() error {
4144+
if actual := repl.ClosedTimestampPolicy(); actual != roachpb.LEAD_FOR_GLOBAL_READS {
4145+
return errors.Newf("expected LEAD_FOR_GLOBAL_READS, got %s", actual)
4146+
}
4147+
return nil
4148+
})
41444149

41454150
// Write to the range, which has the effect of bumping the closed timestamp.
41464151
pArgs := putArgs(descKey, []byte("foo"))

pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
"//pkg/kv/kvpb",
1717
"//pkg/kv/kvserver/closedts",
1818
"//pkg/kv/kvserver/closedts/ctpb",
19+
"//pkg/kv/kvserver/closedts/policyrefresher",
1920
"//pkg/roachpb",
2021
"//pkg/rpc",
2122
"//pkg/rpc/nodedialer",

pkg/kv/kvserver/closedts/sidetransport/sender.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2222
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
2323
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
24+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/policyrefresher"
2425
"github.com/cockroachdb/cockroach/pkg/roachpb"
2526
"github.com/cockroachdb/cockroach/pkg/rpc"
2627
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
@@ -132,6 +133,7 @@ type leaseholder struct {
132133
// Replica represents a *Replica object, but with only the capabilities needed
133134
// by the closed timestamp side transport to accomplish its job.
134135
type Replica interface {
136+
policyrefresher.Replica
135137
// Accessors.
136138
StoreID() roachpb.StoreID
137139
GetRangeID() roachpb.RangeID
@@ -474,6 +476,18 @@ func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp {
474476
return now
475477
}
476478

479+
// GetLeaseholders returns a slice of all replicas that are currently
480+
// leaseholders on this node.
481+
func (s *Sender) GetLeaseholders() []policyrefresher.Replica {
482+
s.leaseholdersMu.Lock()
483+
defer s.leaseholdersMu.Unlock()
484+
leaseholders := make([]policyrefresher.Replica, 0, len(s.leaseholdersMu.leaseholders))
485+
for _, lh := range s.leaseholdersMu.leaseholders {
486+
leaseholders = append(leaseholders, lh.Replica)
487+
}
488+
return leaseholders
489+
}
490+
477491
// GetSnapshot generates an update that contains all the sender's state (as
478492
// opposed to being an incremental delta since a previous message). The returned
479493
// msg will have the `snapshot` field set, and a sequence number indicating

pkg/kv/kvserver/closedts/sidetransport/sender_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ type mockReplica struct {
4848

4949
var _ Replica = &mockReplica{}
5050

51-
func (m *mockReplica) StoreID() roachpb.StoreID { return m.storeID }
52-
func (m *mockReplica) GetRangeID() roachpb.RangeID { return m.rangeID }
53-
func (m *mockReplica) RefreshLatency(latencyInfo map[roachpb.NodeID]time.Duration) {}
51+
func (m *mockReplica) StoreID() roachpb.StoreID { return m.storeID }
52+
func (m *mockReplica) GetRangeID() roachpb.RangeID { return m.rangeID }
53+
func (m *mockReplica) RefreshPolicy(latencyInfo map[roachpb.NodeID]time.Duration) {}
5454
func (m *mockReplica) BumpSideTransportClosed(
5555
_ context.Context, _ hlc.ClockTimestamp, _ map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp,
5656
) BumpSideTransportClosedResult {

pkg/kv/kvserver/replica.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,6 +1083,12 @@ type Replica struct {
10831083
// GC threshold for the range.
10841084
pendingGCThreshold hlc.Timestamp
10851085
}
1086+
1087+
// cachedClosedTimestampPolicy is the cached closed timestamp policy of the
1088+
// range. It is updated asynchronously by listening on span configuration
1089+
// changes, leaseholder changes, and periodically at the interval of
1090+
// kv.closed_timestamp.policy_refresh_interval by PolicyRefresher.
1091+
cachedClosedTimestampPolicy atomic.Int32
10861092
}
10871093

10881094
// String returns the string representation of the replica using an
@@ -1174,6 +1180,7 @@ func (r *Replica) SetSpanConfig(conf roachpb.SpanConfig, sp roachpb.Span) bool {
11741180
r.mu.conf = conf
11751181
r.mu.spanConfigExplicitlySet = true
11761182
r.mu.confSpan = sp
1183+
r.store.policyRefresher.EnqueueReplicaForRefresh(r)
11771184
return oldConf.HasConfigurationChange(conf)
11781185
}
11791186

@@ -1314,23 +1321,43 @@ func toClientClosedTsPolicy(
13141321
}
13151322

13161323
// closedTimestampPolicyRLocked returns the closed timestamp policy of the
1317-
// range, which is updated asynchronously by listening in on span configuration
1318-
// changes.
1324+
// range, which is updated asynchronously by listening on span configuration
1325+
// changes, leaseholder changes, and periodically at the interval of
1326+
// kv.closed_timestamp.policy_refresh_interval.
13191327
//
13201328
// NOTE: an exported version of this method which does not require the replica
13211329
// lock exists in helpers_test.go. Move here if needed.
13221330
func (r *Replica) closedTimestampPolicyRLocked() ctpb.RangeClosedTimestampPolicy {
1323-
if r.mu.conf.GlobalReads {
1324-
if !r.shMu.state.Desc.ContainsKey(roachpb.RKey(keys.NodeLivenessPrefix)) {
1325-
return ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO
1326-
}
1331+
// TODO(wenyi): try to remove the need for this key comparison under RLock.
1332+
// See more in #143648.
1333+
if r.shMu.state.Desc.ContainsKey(roachpb.RKey(keys.NodeLivenessPrefix)) {
1334+
return ctpb.LAG_BY_CLUSTER_SETTING
1335+
}
1336+
return ctpb.RangeClosedTimestampPolicy(r.cachedClosedTimestampPolicy.Load())
1337+
}
1338+
1339+
// RefreshPolicy updates the replica's cached closed timestamp policy based on
1340+
// its span configuration. Note that the given map can be nil.
1341+
//
1342+
// TODO(wenyihu6): update this function to consult the supplied map and
1343+
// clarify what happens when the map is nil
1344+
func (r *Replica) RefreshPolicy(_ map[roachpb.NodeID]time.Duration) {
1345+
policy := func() ctpb.RangeClosedTimestampPolicy {
1346+
desc, conf := r.DescAndSpanConfig()
13271347
// The node liveness range ignores zone configs and always uses a
13281348
// LAG_BY_CLUSTER_SETTING closed timestamp policy. If it was to begin
13291349
// closing timestamps in the future, it would break liveness updates,
13301350
// which perform a 1PC transaction with a commit trigger and can not
13311351
// tolerate being pushed into the future.
1352+
if desc.ContainsKey(roachpb.RKey(keys.NodeLivenessPrefix)) {
1353+
return ctpb.LAG_BY_CLUSTER_SETTING
1354+
}
1355+
if !conf.GlobalReads {
1356+
return ctpb.LAG_BY_CLUSTER_SETTING
1357+
}
1358+
return ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO
13321359
}
1333-
return ctpb.LAG_BY_CLUSTER_SETTING
1360+
r.cachedClosedTimestampPolicy.Store(int32(policy()))
13341361
}
13351362

13361363
// NodeID returns the ID of the node this replica belongs to.

pkg/kv/kvserver/replica_closedts_internal_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/kv"
1717
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1818
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
19+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
1920
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
2021
"github.com/cockroachdb/cockroach/pkg/roachpb"
2122
"github.com/cockroachdb/cockroach/pkg/testutils"
@@ -30,6 +31,12 @@ import (
3031
"golang.org/x/sync/errgroup"
3132
)
3233

34+
// TestingSetCachedClosedTimestampPolicy sets the closed timestamp policy on r
35+
// to be the given policy. It is a test-only helper method.
36+
func (r *Replica) TestingSetCachedClosedTimestampPolicy(policy ctpb.RangeClosedTimestampPolicy) {
37+
r.cachedClosedTimestampPolicy.Store(int32(policy))
38+
}
39+
3340
func TestSideTransportClosed(t *testing.T) {
3441
defer leaktest.AfterTest(t)()
3542
defer log.Scope(t).Close(t)

pkg/kv/kvserver/replica_closedts_test.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/kv"
1919
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2020
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
21+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
2122
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
2223
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
2324
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
2425
"github.com/cockroachdb/cockroach/pkg/roachpb"
2526
"github.com/cockroachdb/cockroach/pkg/server"
27+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2628
"github.com/cockroachdb/cockroach/pkg/testutils"
2729
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2830
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
@@ -937,3 +939,171 @@ func testNonBlockingReadsWithReaderFn(
937939
atomic.StoreInt32(&done, 1)
938940
require.NoError(t, g.Wait())
939941
}
942+
943+
// TestClosedTimestampPolicyRefreshOnSetSpanConfig tests that SetSpanConfig
944+
// correctly triggers the closed timestamp policy refresh.
945+
func TestClosedTimestampPolicyRefreshOnSetSpanConfig(t *testing.T) {
946+
defer leaktest.AfterTest(t)()
947+
defer log.Scope(t).Close(t)
948+
949+
ctx := context.Background()
950+
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{})
951+
defer tc.Stopper().Stop(ctx)
952+
953+
scratchKey := tc.ScratchRange(t)
954+
955+
repl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(scratchKey))
956+
require.Equal(t, roachpb.LAG_BY_CLUSTER_SETTING, repl.GetRangeInfo(ctx).ClosedTimestampPolicy)
957+
958+
spanConfig, err := repl.LoadSpanConfig(ctx)
959+
spanConfig.GlobalReads = true
960+
require.NoError(t, err)
961+
require.NotNil(t, spanConfig)
962+
repl.SetSpanConfig(*spanConfig, roachpb.Span{Key: scratchKey})
963+
964+
// Trigger policy refresh.
965+
testutils.SucceedsSoon(t, func() error {
966+
if repl.GetRangeInfo(ctx).ClosedTimestampPolicy != roachpb.LEAD_FOR_GLOBAL_READS {
967+
return errors.New("expected LEAD_FOR_GLOBAL_READS")
968+
}
969+
return nil
970+
})
971+
}
972+
973+
// TestClosedTimestampPolicyRefreshIntervalOnLivenessRanges tests that the
974+
// closed timestamp policy is correctly applied to the node liveness range. That
975+
// is, even if we try to set the node liveness range to have global reads, the
976+
// closed timestamp policy should still be LAG_BY_CLUSTER_SETTING. Read more in
977+
// replica.closedTimestampPolicyRLocked.
978+
func TestClosedTimestampPolicyRefreshIntervalOnLivenessRanges(t *testing.T) {
979+
defer leaktest.AfterTest(t)()
980+
defer log.Scope(t).Close(t)
981+
982+
ctx := context.Background()
983+
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{})
984+
defer tc.Stopper().Stop(ctx)
985+
986+
// Get the node liveness range descriptor.
987+
livenessRangeDesc, err := tc.LookupRange(keys.NodeLivenessPrefix)
988+
require.NoError(t, err)
989+
990+
// Check liveness range policy.
991+
livenessRepl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(livenessRangeDesc.StartKey)
992+
require.Equal(t, roachpb.LAG_BY_CLUSTER_SETTING, livenessRepl.GetRangeInfo(ctx).ClosedTimestampPolicy)
993+
994+
spanConfig, err := livenessRepl.LoadSpanConfig(ctx)
995+
spanConfig.GlobalReads = true
996+
require.NoError(t, err)
997+
require.NotNil(t, spanConfig)
998+
livenessRepl.SetSpanConfig(*spanConfig, roachpb.Span{Key: keys.NodeLivenessPrefix})
999+
1000+
require.Never(t, func() bool {
1001+
expectedState := livenessRepl.GetRangeInfo(ctx).ClosedTimestampPolicy == roachpb.LAG_BY_CLUSTER_SETTING
1002+
return !expectedState
1003+
}, 3*time.Second, 500*time.Millisecond)
1004+
}
1005+
1006+
// TestSideTransportLeaseholder verifies that a range's leaseholder is properly
1007+
// tracked by the closed timestamp side transport, even when the range is
1008+
// receiving writes and the side transport interval is disabled.
1009+
func TestSideTransportLeaseholder(t *testing.T) {
1010+
defer leaktest.AfterTest(t)()
1011+
defer log.Scope(t).Close(t)
1012+
ctx := context.Background()
1013+
st := cluster.MakeTestingClusterSettings()
1014+
// Disable side transport interval to verify tracking works even without
1015+
// active transport.
1016+
closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 0)
1017+
tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{
1018+
ReplicationMode: base.ReplicationManual,
1019+
ServerArgs: base.TestServerArgs{
1020+
Settings: st,
1021+
},
1022+
})
1023+
defer tc.Stopper().Stop(ctx)
1024+
1025+
// Get store and create test range.
1026+
store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID())
1027+
require.NoError(t, err)
1028+
scratchKey := tc.ScratchRange(t)
1029+
tc.AddVotersOrFatal(t, scratchKey, tc.Target(1))
1030+
tc.AddNonVotersOrFatal(t, scratchKey, tc.Target(2))
1031+
repl := store.LookupReplica(roachpb.RKey(scratchKey))
1032+
require.NotNil(t, repl)
1033+
1034+
// Start goroutine that continuously writes to the range to create write load.
1035+
go func() {
1036+
for {
1037+
select {
1038+
case <-time.After(10 * time.Millisecond):
1039+
pArgs := putArgs(scratchKey, []byte("value"))
1040+
if _, pErr := kv.SendWrapped(ctx, store.DB().NonTransactionalSender(), pArgs); pErr != nil {
1041+
log.Errorf(ctx, "failed to put value: %s", pErr)
1042+
}
1043+
case <-tc.Stopper().ShouldQuiesce():
1044+
return
1045+
}
1046+
}
1047+
}()
1048+
1049+
// Verify that the range appears in the closed timestamp sender's leaseholders
1050+
// list despite write load and disabled side transport.
1051+
testutils.SucceedsSoon(t, func() error {
1052+
closedTsSender := store.GetStoreConfig().ClosedTimestampSender
1053+
leaseholders := closedTsSender.GetLeaseholders()
1054+
for _, lh := range leaseholders {
1055+
if lh.(*kvserver.Replica).RangeID == repl.RangeID {
1056+
return nil
1057+
}
1058+
}
1059+
return errors.Errorf("range %d not found in leaseholders slice", repl.RangeID)
1060+
})
1061+
}
1062+
1063+
// TestClosedTimestampPolicyRefreshIntervalOnLeaseTransfers tests that the
1064+
// closed timestamp policy is correctly refreshed on a range after a lease
1065+
// transfer.
1066+
func TestClosedTimestampPolicyRefreshIntervalOnLeaseTransfers(t *testing.T) {
1067+
defer leaktest.AfterTest(t)()
1068+
defer log.Scope(t).Close(t)
1069+
1070+
ctx := context.Background()
1071+
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
1072+
ReplicationMode: base.ReplicationManual,
1073+
})
1074+
defer tc.Stopper().Stop(ctx)
1075+
1076+
scratchKey := tc.ScratchRange(t)
1077+
desc := tc.AddVotersOrFatal(t, scratchKey, tc.Target(1), tc.Target(2))
1078+
1079+
repl1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(scratchKey))
1080+
require.Equal(t, roachpb.LAG_BY_CLUSTER_SETTING, repl1.GetRangeInfo(ctx).ClosedTimestampPolicy)
1081+
1082+
repl2 := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(scratchKey))
1083+
require.Equal(t, roachpb.LAG_BY_CLUSTER_SETTING, repl2.GetRangeInfo(ctx).ClosedTimestampPolicy)
1084+
1085+
spanConfig, err := repl2.LoadSpanConfig(ctx)
1086+
spanConfig.GlobalReads = true
1087+
require.NoError(t, err)
1088+
require.NotNil(t, spanConfig)
1089+
repl2.SetSpanConfig(*spanConfig, roachpb.Span{Key: scratchKey})
1090+
testutils.SucceedsSoon(t, func() error {
1091+
if repl2.GetRangeInfo(ctx).ClosedTimestampPolicy != roachpb.LEAD_FOR_GLOBAL_READS {
1092+
return errors.New("expected LEAD_FOR_GLOBAL_READS")
1093+
}
1094+
return nil
1095+
})
1096+
1097+
// Force repl2 policy to be LAG_BY_CLUSTER_SETTING.
1098+
repl2.TestingSetCachedClosedTimestampPolicy(ctpb.LAG_BY_CLUSTER_SETTING)
1099+
require.Equal(t, roachpb.LAG_BY_CLUSTER_SETTING, repl2.GetRangeInfo(ctx).ClosedTimestampPolicy)
1100+
1101+
// Ensure that transferring the lease to repl2 does trigger a lease refresh.
1102+
require.NoError(t, tc.TransferRangeLease(desc, tc.Target(1)))
1103+
testutils.SucceedsSoon(t, func() error {
1104+
if actual := repl2.GetRangeInfo(ctx).ClosedTimestampPolicy; actual != roachpb.LEAD_FOR_GLOBAL_READS {
1105+
return errors.Newf("expected LEAD_FOR_GLOBAL_READS but got %v", actual)
1106+
}
1107+
return nil
1108+
})
1109+
}

pkg/kv/kvserver/replica_init.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ func newUninitializedReplicaWithoutRaftGroup(
277277
EnabledWhenLeaderLevel: r.raftMu.flowControlLevel,
278278
Knobs: r.store.TestingKnobs().FlowControlTestingKnobs,
279279
})
280+
r.RefreshPolicy(nil)
280281
return r
281282
}
282283

pkg/kv/kvserver/replica_proposal.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ func (r *Replica) leasePostApplyLocked(
581581

582582
// Inform the store of this lease.
583583
if iAmTheLeaseHolder {
584-
r.store.registerLeaseholder(ctx, r, newLease.Sequence)
584+
r.store.registerLeaseholderAndRefreshPolicy(ctx, r, newLease.Sequence)
585585
} else {
586586
r.store.unregisterLeaseholder(ctx, r)
587587
}

0 commit comments

Comments
 (0)