Skip to content

Commit b8ca6a8

Browse files
committed
kvserver/closedts: change lastClosed to a map
This commit changes streamState shared between sender and receiver from a fixed size slice to a map, similar to trackedRange. Currently, it does not change any existing behavior. But future commits use this to make sure only lastClosed will be populated for enums with a corresponding roachpb.RangeClosedTimestampPolicy until the cluster has been fully upgraded to v25.2. Part of: #59680 Release note: none
1 parent 2a2f28a commit b8ca6a8

File tree

6 files changed

+46
-35
lines changed

6 files changed

+46
-35
lines changed

pkg/kv/kvserver/closedts/sidetransport/receiver.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,11 @@ func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
246246
if !ok {
247247
log.Fatalf(ctx, "attempting to unregister a missing range: r%d", rangeID)
248248
}
249-
r.stores.ForwardSideTransportClosedTimestampForRange(
250-
ctx, rangeID, r.mu.lastClosed[info.policy], info.lai)
249+
ts, ok := r.mu.lastClosed[info.policy]
250+
if !ok {
251+
log.Fatalf(ctx, "missing closed timestamp policy %v for range r%d", info.policy, rangeID)
252+
}
253+
r.stores.ForwardSideTransportClosedTimestampForRange(ctx, rangeID, ts, info.lai)
251254
}
252255
r.mu.RUnlock()
253256
}
@@ -258,9 +261,7 @@ func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
258261

259262
// Reset all the state on snapshots.
260263
if msg.Snapshot {
261-
for i := range r.mu.lastClosed {
262-
r.mu.lastClosed[i] = hlc.Timestamp{}
263-
}
264+
r.mu.lastClosed = make(map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp, len(r.mu.lastClosed))
264265
r.mu.tracked = make(map[roachpb.RangeID]trackedRange, len(r.mu.tracked))
265266
} else if msg.SeqNum != r.mu.lastSeqNum+1 {
266267
log.Fatalf(ctx, "expected closed timestamp side-transport message with sequence number "+

pkg/kv/kvserver/closedts/sidetransport/sender.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,12 @@ type Sender struct {
9797
type streamState struct {
9898
// lastSeqNum is the sequence number of the last message published.
9999
lastSeqNum ctpb.SeqNum
100-
// lastClosed is the closed timestamp published for each policy in the
101-
// last message.
102-
lastClosed [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
100+
// lastClosed is the closed timestamp published for each policy in the last
101+
// message. During mixed version clusters with nodes running both v25.1 and
102+
// v25.2, lastClosed will only be populated for enums with a corresponding
103+
// roachpb.RangeClosedTimestampPolicy (LAG_BY_CLUSTER_SETTING and
104+
// LEAD_FOR_GLOBAL_READS) until the cluster is fully upgraded to v25.2.
105+
lastClosed map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp
103106
// tracked maintains the information that was communicated to connections in
104107
// the last sent message (implicitly or explicitly). A range enters this
105108
// structure as soon as it's included in a message, and exits it when it's
@@ -151,7 +154,7 @@ type Replica interface {
151154
BumpSideTransportClosed(
152155
ctx context.Context,
153156
now hlc.ClockTimestamp,
154-
targetByPolicy [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp,
157+
targetByPolicy map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp,
155158
) BumpSideTransportClosedResult
156159
}
157160

@@ -210,6 +213,7 @@ func newSenderWithConnFactory(
210213
buf: newUpdatesBuf(),
211214
}
212215
s.trackedMu.tracked = make(map[roachpb.RangeID]trackedRange)
216+
s.trackedMu.lastClosed = make(map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp)
213217
s.leaseholdersMu.leaseholders = make(map[roachpb.RangeID]leaseholder)
214218
s.connsMu.conns = make(map[roachpb.NodeID]conn)
215219
return s
@@ -305,10 +309,14 @@ func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp {
305309
defer s.trackedMu.Unlock()
306310
log.VEventf(ctx, 4, "side-transport generating a new message")
307311
s.trackedMu.closingFailures = [MaxReason]int{}
308-
312+
// TODO(wenyihu6): a cluster version check is needed here once we add new
313+
// policies to ctpb.RangeClosedTimestampPolicies that do not have a
314+
// corresponding roachpb.RangeClosedTimestampPolicy.
315+
numPolicies := int(roachpb.MAX_CLOSED_TIMESTAMP_POLICY)
316+
s.trackedMu.lastClosed = make(map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp, numPolicies)
309317
msg := &ctpb.Update{
310318
NodeID: s.nodeID,
311-
ClosedTimestamps: make([]ctpb.Update_GroupUpdate, len(s.trackedMu.lastClosed)),
319+
ClosedTimestamps: make([]ctpb.Update_GroupUpdate, numPolicies),
312320
}
313321

314322
// Determine the message's sequence number.
@@ -326,8 +334,7 @@ func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp {
326334
lagTargetDuration := closedts.TargetDuration.Get(&s.st.SV)
327335
leadTargetOverride := closedts.LeadForGlobalReadsOverride.Get(&s.st.SV)
328336
sideTransportCloseInterval := closedts.SideTransportCloseInterval.Get(&s.st.SV)
329-
for i := range s.trackedMu.lastClosed {
330-
pol := ctpb.RangeClosedTimestampPolicy(i)
337+
for pol := ctpb.RangeClosedTimestampPolicy(0); pol < ctpb.RangeClosedTimestampPolicy(numPolicies); pol++ {
331338
target := closedts.TargetForPolicy(
332339
now,
333340
maxClockOffset,
@@ -482,14 +489,14 @@ func (s *Sender) GetSnapshot() *ctpb.Update {
482489
// of incremental messages.
483490
SeqNum: s.trackedMu.lastSeqNum,
484491
Snapshot: true,
485-
ClosedTimestamps: make([]ctpb.Update_GroupUpdate, len(s.trackedMu.lastClosed)),
492+
ClosedTimestamps: make([]ctpb.Update_GroupUpdate, 0, len(s.trackedMu.lastClosed)),
486493
AddedOrUpdated: make([]ctpb.Update_RangeUpdate, 0, len(s.trackedMu.tracked)),
487494
}
488495
for pol, ts := range s.trackedMu.lastClosed {
489-
msg.ClosedTimestamps[pol] = ctpb.Update_GroupUpdate{
490-
Policy: ctpb.RangeClosedTimestampPolicy(pol),
496+
msg.ClosedTimestamps = append(msg.ClosedTimestamps, ctpb.Update_GroupUpdate{
497+
Policy: pol,
491498
ClosedTimestamp: ts,
492-
}
499+
})
493500
}
494501
for rid, r := range s.trackedMu.tracked {
495502
msg.AddedOrUpdated = append(msg.AddedOrUpdated, ctpb.Update_RangeUpdate{
@@ -886,7 +893,7 @@ func (s streamState) String() string {
886893
} else {
887894
agoMsg = fmt.Sprintf("%s in the future", -ago)
888895
}
889-
fmt.Fprintf(sb, "%s:%s (%s)", roachpb.RangeClosedTimestampPolicy(policy), closedTS, agoMsg)
896+
fmt.Fprintf(sb, "%s:%s (%s)", policy, closedTS, agoMsg)
890897
}
891898

892899
// List the tracked ranges.

pkg/kv/kvserver/closedts/sidetransport/sender_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ type mockReplica struct {
4848

4949
var _ Replica = &mockReplica{}
5050

51-
func (m *mockReplica) StoreID() roachpb.StoreID { return m.storeID }
52-
func (m *mockReplica) GetRangeID() roachpb.RangeID { return m.rangeID }
51+
func (m *mockReplica) StoreID() roachpb.StoreID { return m.storeID }
52+
func (m *mockReplica) GetRangeID() roachpb.RangeID { return m.rangeID }
53+
func (m *mockReplica) RefreshLatency(latencyInfo map[roachpb.NodeID]time.Duration) {}
5354
func (m *mockReplica) BumpSideTransportClosed(
54-
_ context.Context, _ hlc.ClockTimestamp, _ [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp,
55+
_ context.Context, _ hlc.ClockTimestamp, _ map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp,
5556
) BumpSideTransportClosedResult {
5657
m.mu.Lock()
5758
defer m.mu.Unlock()
@@ -120,7 +121,7 @@ func newMockReplica(id roachpb.RangeID, nodes ...roachpb.NodeID) *mockReplica {
120121
rangeID: id,
121122
canBump: true,
122123
lai: 5,
123-
policy: ctpb.LAG_BY_CLUSTER_SETTING,
124+
policy: ctpb.LAG_BY_CLUSTER_SETTING,
124125
}
125126
r.mu.desc = desc
126127
return r
@@ -137,7 +138,7 @@ func newMockReplicaEx(id roachpb.RangeID, replicas ...roachpb.ReplicationTarget)
137138
rangeID: id,
138139
canBump: true,
139140
lai: 5,
140-
policy: ctpb.LAG_BY_CLUSTER_SETTING,
141+
policy: ctpb.LAG_BY_CLUSTER_SETTING,
141142
}
142143
r.mu.desc = desc
143144
return r

pkg/kv/kvserver/replica.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ package kvserver
77

88
import (
99
"context"
10-
fmt "fmt"
10+
"fmt"
1111
"slices"
1212
"sync"
1313
"sync/atomic"

pkg/kv/kvserver/replica_closedts.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1212
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
13+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
1314
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport"
1415
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
1516
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -32,7 +33,7 @@ import (
3233
func (r *Replica) BumpSideTransportClosed(
3334
ctx context.Context,
3435
now hlc.ClockTimestamp,
35-
targetByPolicy [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp,
36+
targetByPolicy map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp,
3637
) sidetransport.BumpSideTransportClosedResult {
3738
var res sidetransport.BumpSideTransportClosedResult
3839
r.mu.Lock()

pkg/kv/kvserver/replica_closedts_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/kv"
1919
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2020
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
21+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
2122
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
2223
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
2324
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -262,8 +263,8 @@ func TestBumpSideTransportClosed(t *testing.T) {
262263
setup: func(a setupArgs) (chan struct{}, chan error, error) {
263264
// Manually bump the assigned closed timestamp to a time below
264265
// where the test will attempt to bump it to.
265-
var targets [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
266-
targets[roachpb.LAG_BY_CLUSTER_SETTING] = a.target.Add(-1, 0)
266+
targets := map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp{}
267+
targets[ctpb.LAG_BY_CLUSTER_SETTING] = a.target.Add(-1, 0)
267268
return nil, nil, testutils.SucceedsSoonError(func() error {
268269
res := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
269270
if !res.OK {
@@ -279,8 +280,8 @@ func TestBumpSideTransportClosed(t *testing.T) {
279280
setup: func(a setupArgs) (chan struct{}, chan error, error) {
280281
// Manually bump the assigned closed timestamp to a time equal
281282
// to where the test will attempt to bump it to.
282-
var targets [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
283-
targets[roachpb.LAG_BY_CLUSTER_SETTING] = a.target
283+
targets := map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp{}
284+
targets[ctpb.LAG_BY_CLUSTER_SETTING] = a.target
284285
return nil, nil, testutils.SucceedsSoonError(func() error {
285286
res := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
286287
if !res.OK {
@@ -296,8 +297,8 @@ func TestBumpSideTransportClosed(t *testing.T) {
296297
setup: func(a setupArgs) (chan struct{}, chan error, error) {
297298
// Manually bump the assigned closed timestamp to a time above
298299
// where the test will attempt to bump it to.
299-
var targets [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
300-
targets[roachpb.LAG_BY_CLUSTER_SETTING] = a.target.Add(1, 0)
300+
targets := map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp{}
301+
targets[ctpb.LAG_BY_CLUSTER_SETTING] = a.target.Add(1, 0)
301302
return nil, nil, testutils.SucceedsSoonError(func() error {
302303
res := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
303304
if !res.OK {
@@ -373,8 +374,8 @@ func TestBumpSideTransportClosed(t *testing.T) {
373374
} else {
374375
target, exp = test.computeTarget(repl)
375376
}
376-
var targets [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
377-
targets[roachpb.LAG_BY_CLUSTER_SETTING] = target
377+
targets := map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp{}
378+
targets[ctpb.LAG_BY_CLUSTER_SETTING] = target
378379

379380
// Run the setup function to get the replica in the desired state.
380381
var unblockFilterC chan struct{}
@@ -716,13 +717,13 @@ func BenchmarkBumpSideTransportClosed(b *testing.B) {
716717

717718
manual.Pause()
718719
now := s.Clock().NowAsClockTimestamp()
719-
var targets [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
720+
targets := map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp{}
720721

721722
b.ResetTimer()
722723
for i := 0; i < b.N; i++ {
723724
// Advance time and the closed timestamp target.
724725
now = now.ToTimestamp().Add(1, 0).UnsafeToClockTimestamp()
725-
targets[roachpb.LAG_BY_CLUSTER_SETTING] = now.ToTimestamp()
726+
targets[ctpb.LAG_BY_CLUSTER_SETTING] = now.ToTimestamp()
726727

727728
// Perform the call.
728729
res := r.BumpSideTransportClosed(ctx, now, targets)

0 commit comments

Comments
 (0)