Skip to content

Commit e2e7260

Browse files
craig[bot]wenyihu6
andcommitted
Merge #143010
143010: kvserver: replace roachpb. with ctpb.RangeClosedTimestampPolicy r=arulajmani a=wenyihu6 **kvserver: replace roachpb. with ctpb.RangeClosedTimestampPolicy** This commit replaces roachpb.RangeClosedTimestampPolicy with ctpb.LatencyBasedRangeClosedTimestampPolicy for kvserver sidetransport side. Currently, it maintains the existing behavior. Future commits will introduce additional enum values to add locality information into the side-transport closed timestamp estimation. Epic: none Release note: none --- **kvserver/closedts: change lastClosed to a map** This commit changes streamState shared between sender and receiver from a fixed size slice to a map, similar to trackedRange. Currently, it does not change any existing behavior. But future commits use this to make sure only lastClosed will be populated for enums with a corresponding roachpb.RangeClosedTimestampPolicy until the cluster has been fully upgraded to v25.2. Part of: #59680 Release note: none Co-authored-by: Wenyi Hu <[email protected]> Co-authored-by: wenyihu6 <[email protected]>
2 parents 80ae379 + b8ca6a8 commit e2e7260

File tree

17 files changed

+155
-94
lines changed

17 files changed

+155
-94
lines changed

pkg/kv/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ go_library(
2020
"//pkg/keys",
2121
"//pkg/kv/kvpb",
2222
"//pkg/kv/kvserver/closedts",
23+
"//pkg/kv/kvserver/closedts/ctpb",
2324
"//pkg/kv/kvserver/concurrency/isolation",
2425
"//pkg/roachpb",
2526
"//pkg/settings",

pkg/kv/kvserver/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ go_library(
140140
"//pkg/kv/kvserver/batcheval/result",
141141
"//pkg/kv/kvserver/benignerror",
142142
"//pkg/kv/kvserver/closedts",
143+
"//pkg/kv/kvserver/closedts/ctpb",
143144
"//pkg/kv/kvserver/closedts/sidetransport",
144145
"//pkg/kv/kvserver/closedts/tracker",
145146
"//pkg/kv/kvserver/concurrency",
@@ -426,6 +427,7 @@ go_test(
426427
"//pkg/kv/kvserver/batcheval/result",
427428
"//pkg/kv/kvserver/benignerror",
428429
"//pkg/kv/kvserver/closedts",
430+
"//pkg/kv/kvserver/closedts/ctpb",
429431
"//pkg/kv/kvserver/closedts/tracker",
430432
"//pkg/kv/kvserver/concurrency",
431433
"//pkg/kv/kvserver/concurrency/isolation",

pkg/kv/kvserver/closedts/BUILD.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ go_library(
99
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts",
1010
visibility = ["//visibility:public"],
1111
deps = [
12-
"//pkg/roachpb",
12+
"//pkg/kv/kvserver/closedts/ctpb",
1313
"//pkg/settings",
1414
"//pkg/util/hlc",
1515
],
@@ -20,7 +20,7 @@ go_test(
2020
srcs = ["policy_test.go"],
2121
embed = [":closedts"],
2222
deps = [
23-
"//pkg/roachpb",
23+
"//pkg/kv/kvserver/closedts/ctpb",
2424
"//pkg/util/hlc",
2525
"//pkg/util/leaktest",
2626
"//pkg/util/log",

pkg/kv/kvserver/closedts/ctpb/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ proto_library(
2121
strip_import_prefix = "/pkg",
2222
visibility = ["//visibility:public"],
2323
deps = [
24-
"//pkg/roachpb:roachpb_proto",
2524
"//pkg/util/hlc:hlc_proto",
2625
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
2726
],
@@ -34,7 +33,6 @@ go_proto_library(
3433
proto = ":ctpb_proto",
3534
visibility = ["//visibility:public"],
3635
deps = [
37-
"//pkg/roachpb",
3836
"//pkg/util/hlc",
3937
"@com_github_gogo_protobuf//gogoproto",
4038
],

pkg/kv/kvserver/closedts/ctpb/service.proto

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,41 @@ syntax = "proto3";
77
package cockroach.kv.kvserver.ctupdate;
88
option go_package = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb";
99

10-
import "roachpb/data.proto";
1110
import "util/hlc/timestamp.proto";
1211

1312
import "gogoproto/gogo.proto";
1413

14+
// RangeClosedTimestampPolicy defines a range's closed timestamp policy. All
15+
// ranges in a cluster are placed in a closed timestamp policy bucket, which is
16+
// decided by the leaseholder replica of the range. The side transport sender
17+
// periodically transmits closed timestamp updates for ranges it's responsible
18+
// for, closing timestamps on a per-bucket basis.
19+
//
20+
// NB: The policy is only used inside kvserver, and shouldn't leak to the
21+
// client. See roachpb.ClosedTimestampPolicy and toClientClosedTsPolicy for the
22+
// client side equivalent.
23+
//
24+
// Closed timestamp policies on the server can be dynamically adjusted based on
25+
// observed network latencies between the leaseholder and its farthest follower.
26+
// Each range is bucketed based on observed network latencies, and buckets may
27+
// change as network latencies change, or the range's SpanConfig changes.
28+
enum RangeClosedTimestampPolicy {
29+
option (gogoproto.goproto_enum_prefix) = false;
30+
// LAG_BY_CLUSTER_SETTING indicates that the range's closed timestamp is
31+
// configured to lag behind present time by the value configured for the
32+
// `kv.closed_timestamp.target_duration` cluster setting.
33+
LAG_BY_CLUSTER_SETTING = 0;
34+
// The following policies correspond to roachpb.LEAD_FOR_GLOBAL_READS policy.
35+
// LEAD_FOR_GLOBAL_READS indicates that the range's closed timestamp is
36+
// configured to lead present time such that all followers of the range are
37+
// able to serve consistent, present time reads.
38+
//
39+
// Lead policy for global reads with no locality information.
40+
LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO = 1;
41+
// Sentinel value for slice sizing.
42+
MAX_CLOSED_TIMESTAMP_POLICY = 2;
43+
}
44+
1545
// Update contains information about (the advancement of) closed timestamps for
1646
// ranges with leases on the sender node. Updates are of two types: snapshots
1747
// and incrementals. Snapshots are stand-alone messages, explicitly containing
@@ -57,7 +87,7 @@ message Update {
5787
// and the regular Raft transport are possible, as are races between two
5888
// side-transport streams for an outgoing and incoming leaseholder.
5989
message GroupUpdate {
60-
roachpb.RangeClosedTimestampPolicy policy = 1;
90+
RangeClosedTimestampPolicy policy = 1;
6191
util.hlc.Timestamp closed_timestamp = 2 [(gogoproto.nullable) = false];
6292
}
6393
repeated GroupUpdate closed_timestamps = 4 [(gogoproto.nullable) = false];
@@ -80,7 +110,7 @@ message Update {
80110
message RangeUpdate {
81111
uint64 range_id = 1 [(gogoproto.customname) = "RangeID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"];
82112
uint64 lai = 2 [(gogoproto.customname) = "LAI", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/kv/kvpb.LeaseAppliedIndex"];
83-
roachpb.RangeClosedTimestampPolicy policy = 3;
113+
RangeClosedTimestampPolicy policy = 3;
84114
}
85115
repeated RangeUpdate added_or_updated = 6 [(gogoproto.nullable) = false];
86116
}

pkg/kv/kvserver/closedts/policy.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ package closedts
88
import (
99
"time"
1010

11-
"github.com/cockroachdb/cockroach/pkg/roachpb"
11+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
1212
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1313
)
1414

@@ -105,14 +105,14 @@ func TargetForPolicy(
105105
lagTargetDuration time.Duration,
106106
leadTargetOverride time.Duration,
107107
sideTransportCloseInterval time.Duration,
108-
policy roachpb.RangeClosedTimestampPolicy,
108+
policy ctpb.RangeClosedTimestampPolicy,
109109
) hlc.Timestamp {
110110
var targetOffsetTime time.Duration
111111
switch policy {
112-
case roachpb.LAG_BY_CLUSTER_SETTING:
112+
case ctpb.LAG_BY_CLUSTER_SETTING:
113113
// Simple calculation: lag now by desired duration.
114114
targetOffsetTime = -lagTargetDuration
115-
case roachpb.LEAD_FOR_GLOBAL_READS:
115+
case ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO:
116116
// Override entirely with cluster setting, if necessary.
117117
if leadTargetOverride != 0 {
118118
targetOffsetTime = leadTargetOverride

pkg/kv/kvserver/closedts/policy_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12-
"github.com/cockroachdb/cockroach/pkg/roachpb"
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
1313
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1414
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1515
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -31,30 +31,30 @@ func TestTargetForPolicy(t *testing.T) {
3131
lagTargetNanos time.Duration
3232
leadTargetOverride time.Duration
3333
sideTransportCloseInterval time.Duration
34-
rangePolicy roachpb.RangeClosedTimestampPolicy
34+
rangePolicy ctpb.RangeClosedTimestampPolicy
3535
expClosedTSTarget hlc.Timestamp
3636
}{
3737
{
3838
lagTargetNanos: secs(3),
39-
rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING,
39+
rangePolicy: ctpb.LAG_BY_CLUSTER_SETTING,
4040
expClosedTSTarget: now.Add(-secs(3).Nanoseconds(), 0),
4141
},
4242
{
4343
lagTargetNanos: secs(1),
44-
rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING,
44+
rangePolicy: ctpb.LAG_BY_CLUSTER_SETTING,
4545
expClosedTSTarget: now.Add(-secs(1).Nanoseconds(), 0),
4646
},
4747
{
4848
sideTransportCloseInterval: millis(200),
49-
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
49+
rangePolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
5050
expClosedTSTarget: now.
5151
Add((maxClockOffset +
5252
millis(275) /* sideTransportPropTime */ +
5353
millis(25) /* bufferTime */).Nanoseconds(), 0),
5454
},
5555
{
5656
sideTransportCloseInterval: millis(50),
57-
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
57+
rangePolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
5858
expClosedTSTarget: now.
5959
Add((maxClockOffset +
6060
millis(245) /* raftTransportPropTime */ +
@@ -63,7 +63,7 @@ func TestTargetForPolicy(t *testing.T) {
6363
{
6464
leadTargetOverride: millis(1234),
6565
sideTransportCloseInterval: millis(200),
66-
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
66+
rangePolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
6767
expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0),
6868
},
6969
} {

pkg/kv/kvserver/closedts/sidetransport/receiver.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,11 @@ func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
246246
if !ok {
247247
log.Fatalf(ctx, "attempting to unregister a missing range: r%d", rangeID)
248248
}
249-
r.stores.ForwardSideTransportClosedTimestampForRange(
250-
ctx, rangeID, r.mu.lastClosed[info.policy], info.lai)
249+
ts, ok := r.mu.lastClosed[info.policy]
250+
if !ok {
251+
log.Fatalf(ctx, "missing closed timestamp policy %v for range r%d", info.policy, rangeID)
252+
}
253+
r.stores.ForwardSideTransportClosedTimestampForRange(ctx, rangeID, ts, info.lai)
251254
}
252255
r.mu.RUnlock()
253256
}
@@ -258,9 +261,7 @@ func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
258261

259262
// Reset all the state on snapshots.
260263
if msg.Snapshot {
261-
for i := range r.mu.lastClosed {
262-
r.mu.lastClosed[i] = hlc.Timestamp{}
263-
}
264+
r.mu.lastClosed = make(map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp, len(r.mu.lastClosed))
264265
r.mu.tracked = make(map[roachpb.RangeID]trackedRange, len(r.mu.tracked))
265266
} else if msg.SeqNum != r.mu.lastSeqNum+1 {
266267
log.Fatalf(ctx, "expected closed timestamp side-transport message with sequence number "+

pkg/kv/kvserver/closedts/sidetransport/receiver_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,13 @@ func TestIncomingStreamProcessUpdateBasic(t *testing.T) {
8686
SeqNum: 1,
8787
Snapshot: true,
8888
ClosedTimestamps: []ctpb.Update_GroupUpdate{
89-
{Policy: roachpb.LAG_BY_CLUSTER_SETTING, ClosedTimestamp: ts10},
90-
{Policy: roachpb.LEAD_FOR_GLOBAL_READS, ClosedTimestamp: ts20},
89+
{Policy: ctpb.LAG_BY_CLUSTER_SETTING, ClosedTimestamp: ts10},
90+
{Policy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO, ClosedTimestamp: ts20},
9191
},
9292
AddedOrUpdated: []ctpb.Update_RangeUpdate{
93-
{RangeID: 1, LAI: lai100, Policy: roachpb.LAG_BY_CLUSTER_SETTING},
94-
{RangeID: 2, LAI: lai101, Policy: roachpb.LAG_BY_CLUSTER_SETTING},
95-
{RangeID: 3, LAI: lai102, Policy: roachpb.LEAD_FOR_GLOBAL_READS},
93+
{RangeID: 1, LAI: lai100, Policy: ctpb.LAG_BY_CLUSTER_SETTING},
94+
{RangeID: 2, LAI: lai101, Policy: ctpb.LAG_BY_CLUSTER_SETTING},
95+
{RangeID: 3, LAI: lai102, Policy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO},
9696
},
9797
Removed: nil,
9898
}
@@ -114,11 +114,11 @@ func TestIncomingStreamProcessUpdateBasic(t *testing.T) {
114114
SeqNum: 2,
115115
Snapshot: false,
116116
ClosedTimestamps: []ctpb.Update_GroupUpdate{
117-
{Policy: roachpb.LAG_BY_CLUSTER_SETTING, ClosedTimestamp: ts11},
118-
{Policy: roachpb.LEAD_FOR_GLOBAL_READS, ClosedTimestamp: ts21},
117+
{Policy: ctpb.LAG_BY_CLUSTER_SETTING, ClosedTimestamp: ts11},
118+
{Policy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO, ClosedTimestamp: ts21},
119119
},
120120
AddedOrUpdated: []ctpb.Update_RangeUpdate{
121-
{RangeID: 3, LAI: lai103, Policy: roachpb.LEAD_FOR_GLOBAL_READS},
121+
{RangeID: 3, LAI: lai103, Policy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO},
122122
},
123123
Removed: []roachpb.RangeID{1},
124124
}
@@ -140,12 +140,12 @@ func TestIncomingStreamProcessUpdateBasic(t *testing.T) {
140140
SeqNum: 3,
141141
Snapshot: true,
142142
ClosedTimestamps: []ctpb.Update_GroupUpdate{
143-
{Policy: roachpb.LAG_BY_CLUSTER_SETTING, ClosedTimestamp: ts12},
144-
{Policy: roachpb.LEAD_FOR_GLOBAL_READS, ClosedTimestamp: ts22},
143+
{Policy: ctpb.LAG_BY_CLUSTER_SETTING, ClosedTimestamp: ts12},
144+
{Policy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO, ClosedTimestamp: ts22},
145145
},
146146
AddedOrUpdated: []ctpb.Update_RangeUpdate{
147-
{RangeID: 3, LAI: lai102, Policy: roachpb.LEAD_FOR_GLOBAL_READS},
148-
{RangeID: 4, LAI: lai100, Policy: roachpb.LAG_BY_CLUSTER_SETTING},
147+
{RangeID: 3, LAI: lai102, Policy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO},
148+
{RangeID: 4, LAI: lai100, Policy: ctpb.LAG_BY_CLUSTER_SETTING},
149149
},
150150
Removed: nil,
151151
}
@@ -186,10 +186,10 @@ func TestIncomingStreamCallsIntoStoresDontHoldLock(t *testing.T) {
186186
msg := &ctpb.Update{
187187
NodeID: 1, SeqNum: 1, Snapshot: true,
188188
ClosedTimestamps: []ctpb.Update_GroupUpdate{
189-
{Policy: roachpb.LEAD_FOR_GLOBAL_READS, ClosedTimestamp: ts10},
189+
{Policy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO, ClosedTimestamp: ts10},
190190
},
191191
AddedOrUpdated: []ctpb.Update_RangeUpdate{
192-
{RangeID: 1, LAI: lai100, Policy: roachpb.LEAD_FOR_GLOBAL_READS},
192+
{RangeID: 1, LAI: lai100, Policy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO},
193193
},
194194
Removed: nil,
195195
}

0 commit comments

Comments
 (0)