Skip to content

Commit 9b72230

Browse files
committed
mmaintegration: improve logging around replace lease changes
This commit improves logging around allocator sync to make it easier to observe replica lease changes and understand thrashing issues.
1 parent fb3523e commit 9b72230

File tree

16 files changed

+62
-23
lines changed

16 files changed

+62
-23
lines changed

pkg/kv/kvserver/asim/asim.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ func (s *Simulator) tickWorkload(ctx context.Context, tick time.Time) {
309309
// each store ticks the pending operations such as relocate range and lease
310310
// transfers.
311311
func (s *Simulator) tickStateChanges(ctx context.Context, tick time.Time) {
312-
s.changer.Tick(tick, s.state)
312+
s.changer.Tick(ctx, tick, s.state)
313313
stores := s.state.Stores()
314314
s.shuffler(len(stores), func(i, j int) { stores[i], stores[j] = stores[j], stores[i] })
315315
for _, store := range stores {
@@ -371,7 +371,7 @@ func (s *Simulator) tickQueues(ctx context.Context, tick time.Time, state state.
371371

372372
// Tick changes that may have been enqueued with a lower completion
373373
// than the current tick, from the queues.
374-
s.changer.Tick(tick, state)
374+
s.changer.Tick(ctx, tick, state)
375375

376376
// Try adding suggested load splits that are pending for this store.
377377
for _, rangeID := range state.LoadSplitterFor(storeID).ClearSplitKeys() {

pkg/kv/kvserver/asim/metrics/metrics_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func Example_leaseTransfer() {
9090
Author: 1,
9191
Wait: 0,
9292
})
93-
changer.Tick(state.TestingStartTime(), s)
93+
changer.Tick(ctx, state.TestingStartTime(), s)
9494
m.Tick(ctx, start, s)
9595
// Output:
9696
//tick,c_ranges,c_write,c_write_b,c_read,c_read_b,s_ranges,s_write,s_write_b,s_read,s_read_b,c_lease_moves,c_replica_moves,c_replica_b_moves
@@ -120,7 +120,7 @@ func Example_rebalance() {
120120
})...),
121121
Wait: 0,
122122
}
123-
c.Apply(s)
123+
c.Apply(ctx, s)
124124

125125
m.Tick(ctx, start, s)
126126
// Output:

pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func (msr *MMAStoreRebalancer) Tick(ctx context.Context, tick time.Time, s state
204204
}
205205
log.KvDistribution.VInfof(ctx, 1, "dispatching operation for pendingChange=%v", curChange)
206206
msr.pendingChanges[msr.pendingChangeIdx].syncChangeID =
207-
msr.as.MMAPreApply(curChange.usage, curChange.change)
207+
msr.as.MMAPreApply(ctx, curChange.usage, curChange.change)
208208
msr.pendingTicket = msr.controller.Dispatch(ctx, tick, s, curOp)
209209
}
210210
}

pkg/kv/kvserver/asim/op/controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestLeaseTransferOp(t *testing.T) {
9595
results := make([]map[state.RangeID]state.StoreID, len(tc.ticks))
9696
pending := []DispatchedTicket{}
9797
for i, tick := range tc.ticks {
98-
changer.Tick(state.OffsetTick(start, tick), s)
98+
changer.Tick(ctx, state.OffsetTick(start, tick), s)
9999
controller.Tick(ctx, state.OffsetTick(start, tick), s)
100100

101101
for _, transfers := range tc.transfers[tick] {
@@ -305,7 +305,7 @@ func TestRelocateRangeOp(t *testing.T) {
305305
// range rebalancer will fail if any pending changes that were
306306
// set to complete at tick t, still exist at tick t. So we tick
307307
// it first here.
308-
changer.Tick(state.OffsetTick(start, tick), s)
308+
changer.Tick(ctx, state.OffsetTick(start, tick), s)
309309
controller.Tick(ctx, state.OffsetTick(start, tick), s)
310310

311311
relocations := tc.relocations[tick]

pkg/kv/kvserver/asim/queue/lease_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func TestLeaseQueue(t *testing.T) {
172172
s.TickClock(state.OffsetTick(start, tick))
173173

174174
// Tick state updates that are queued for completion.
175-
changer.Tick(state.OffsetTick(start, tick), s)
175+
changer.Tick(ctx, state.OffsetTick(start, tick), s)
176176

177177
// Update the store's view of the cluster, we update all stores
178178
// but only care about s1's view.

pkg/kv/kvserver/asim/queue/replicate_queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ func pushReplicateChange(
188188
if as != nil {
189189
// as may be nil in some tests.
190190
changeID = as.NonMMAPreTransferLease(
191+
ctx,
191192
repl.Desc(),
192193
repl.RangeUsageInfo(),
193194
op.Source,
@@ -204,6 +205,7 @@ func pushReplicateChange(
204205
if as != nil {
205206
// as may be nil in some tests.
206207
changeID = as.NonMMAPreChangeReplicas(
208+
ctx,
207209
repl.Desc(),
208210
repl.RangeUsageInfo(),
209211
op.Chgs,

pkg/kv/kvserver/asim/queue/replicate_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ func TestReplicateQueue(t *testing.T) {
321321
s.TickClock(state.OffsetTick(start, tick))
322322

323323
// Tick state updates that are queued for completion.
324-
changer.Tick(state.OffsetTick(start, tick), s)
324+
changer.Tick(ctx, state.OffsetTick(start, tick), s)
325325

326326
// Update the store's view of the cluster, we update all stores
327327
// but only care about s1's view.

pkg/kv/kvserver/asim/queue/split_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func TestSplitQueue(t *testing.T) {
162162
sq.Tick(ctx, state.OffsetTick(start, tick), s)
163163

164164
// Tick state updates that are queued for completion.
165-
changer.Tick(state.OffsetTick(start, tick), s)
165+
changer.Tick(ctx, state.OffsetTick(start, tick), s)
166166

167167
// Check every replica on the leaseholder store for enqueuing.
168168
for _, repl := range s.Replicas(store.StoreID()) {

pkg/kv/kvserver/asim/state/change.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,21 @@
66
package state
77

88
import (
9+
"context"
910
"fmt"
1011
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1314
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
1517
"github.com/google/btree"
1618
)
1719

1820
// Change is a state change for a range, to a target store that has some delay.
1921
type Change interface {
2022
// Apply applies a change to the state.
21-
Apply(s State)
23+
Apply(ctx context.Context, s State)
2224
// Target returns the recipient store of the change.
2325
Target() StoreID
2426
// Range returns the range id the change is for.
@@ -40,7 +42,7 @@ type Changer interface {
4042
Push(tick time.Time, sc Change) (time.Time, bool)
4143
// Tick updates state changer to apply any changes that have occurred
4244
// between the last tick and this one.
43-
Tick(tick time.Time, state State)
45+
Tick(ctx context.Context, tick time.Time, state State)
4446
}
4547

4648
// ReplicaChange contains information necessary to add, remove or move (both) a
@@ -70,9 +72,10 @@ type LeaseTransferChange struct {
7072
}
7173

7274
// Apply applies a change to the state.
73-
func (lt *LeaseTransferChange) Apply(s State) {
75+
func (lt *LeaseTransferChange) Apply(ctx context.Context, s State) {
7476
if s.TransferLease(lt.RangeID, lt.TransferTarget) {
7577
s.ClusterUsageInfo().storeRef(lt.Author).LeaseTransfers++
78+
log.KvDistribution.VEventf(ctx, 3, "r%d: s%d transferred lease to s%d (took %s)", lt.RangeID, lt.Author, lt.TransferTarget, lt.Wait)
7679
}
7780
}
7881

@@ -99,9 +102,10 @@ func (lt *LeaseTransferChange) Blocking() bool {
99102
}
100103

101104
// Apply applies a change to the state.
102-
func (rsc *RangeSplitChange) Apply(s State) {
105+
func (rsc *RangeSplitChange) Apply(ctx context.Context, s State) {
103106
if _, _, ok := s.SplitRange(rsc.SplitKey); ok {
104107
s.ClusterUsageInfo().storeRef(rsc.Author).RangeSplits++
108+
log.KvDistribution.VEventf(ctx, 3, "r%d: s%d split range at key=%d (took %s)", rsc.RangeID, rsc.Author, rsc.SplitKey, rsc.Wait)
105109
}
106110
}
107111

@@ -183,7 +187,7 @@ func replChangeHasStoreID(storeID StoreID, changes []roachpb.ReplicationTarget)
183187
// Apply applies a replica change for a range. This is an implementation of the
184188
// Change interface. It requires that a replica being removed, must not hold
185189
// the lease unless a replica is also being added in the same change.
186-
func (rc *ReplicaChange) Apply(s State) {
190+
func (rc *ReplicaChange) Apply(ctx context.Context, s State) {
187191
if len(rc.Changes) == 0 {
188192
// Nothing to do.
189193
return
@@ -197,6 +201,11 @@ func (rc *ReplicaChange) Apply(s State) {
197201
rangeID := rc.RangeID
198202

199203
defer func() {
204+
if rollback != nil {
205+
log.KvDistribution.VEventf(ctx, 5, "r%d: s%d failed to apply replica change (after %s)", rangeID, rc.Author, rc.Wait)
206+
} else {
207+
log.KvDistribution.VEventf(ctx, 5, "r%d: s%d successfully applied replica change (after %s)", rangeID, rc.Author, rc.Wait)
208+
}
200209
n := len(rollback)
201210
for i := n - 1; i > -1; i-- {
202211
if rollback[i] != nil {
@@ -275,6 +284,7 @@ func (rc *ReplicaChange) Apply(s State) {
275284
for _, nonVoterPromotion := range targets.NonVoterPromotions {
276285
ok, revert := promoDemo(
277286
s, rangeID, StoreID(nonVoterPromotion.StoreID), roachpb.NON_VOTER, roachpb.VOTER_FULL)
287+
log.KvDistribution.VEventf(ctx, 5, "r%d: author s%d promoted non-voter s%s to voter", rangeID, rc.Author, nonVoterPromotion.StoreID)
278288
rollback = append(rollback, revert...)
279289
if !ok {
280290
return
@@ -283,6 +293,7 @@ func (rc *ReplicaChange) Apply(s State) {
283293
for _, voterAddition := range targets.VoterAdditions {
284294
ok, revert := addReplica(
285295
s, rangeID, StoreID(voterAddition.StoreID), roachpb.VOTER_FULL)
296+
log.KvDistribution.VEventf(ctx, 5, "r%d: author s%d added voter s%s", rangeID, rc.Author, voterAddition.StoreID)
286297
rollback = append(rollback, revert)
287298
if !ok {
288299
return
@@ -296,6 +307,7 @@ func (rc *ReplicaChange) Apply(s State) {
296307
if !s.TransferLease(rangeID, nextLH) {
297308
return
298309
}
310+
log.KvDistribution.VEventf(ctx, 5, "r%d: author s%d transferred lease to s%d", rangeID, rc.Author, nextLH)
299311
rollback = append(rollback, func() {
300312
if !s.TransferLease(rangeID, lhStore.StoreID()) {
301313
panic("unable to rollback lease transfer")
@@ -306,6 +318,7 @@ func (rc *ReplicaChange) Apply(s State) {
306318
for _, voterDemotion := range targets.VoterDemotions {
307319
ok, revert := promoDemo(
308320
s, rangeID, StoreID(voterDemotion.StoreID), roachpb.VOTER_FULL, roachpb.NON_VOTER)
321+
log.KvDistribution.VEventf(ctx, 5, "r%d: author s%d demoted voter s%s to non-voter", rangeID, rc.Author, voterDemotion.StoreID)
309322
rollback = append(rollback, revert...)
310323
if !ok {
311324
return
@@ -314,6 +327,7 @@ func (rc *ReplicaChange) Apply(s State) {
314327
for _, voterRemoval := range targets.VoterRemovals {
315328
ok, revert := removeReplica(
316329
s, rangeID, StoreID(voterRemoval.StoreID), roachpb.VOTER_FULL)
330+
log.KvDistribution.VEventf(ctx, 5, "r%d: author s%d removed voter s%s", rangeID, rc.Author, voterRemoval.StoreID)
317331
rollback = append(rollback, revert)
318332
if !ok {
319333
return
@@ -322,6 +336,7 @@ func (rc *ReplicaChange) Apply(s State) {
322336
for _, nonVoterAddition := range targets.NonVoterAdditions {
323337
ok, revert := addReplica(
324338
s, rangeID, StoreID(nonVoterAddition.StoreID), roachpb.NON_VOTER)
339+
log.KvDistribution.VEventf(ctx, 5, "r%d: author s%d added non-voter s%s", rangeID, rc.Author, nonVoterAddition.StoreID)
325340
rollback = append(rollback, revert)
326341
if !ok {
327342
return
@@ -330,6 +345,7 @@ func (rc *ReplicaChange) Apply(s State) {
330345
for _, nonVoterRemoval := range targets.NonVoterRemovals {
331346
ok, revert := removeReplica(
332347
s, rangeID, StoreID(nonVoterRemoval.StoreID), roachpb.NON_VOTER)
348+
log.KvDistribution.VEventf(ctx, 5, "r%d: author s%d removed non-voter s%s", rangeID, rc.Author, nonVoterRemoval.StoreID)
333349
rollback = append(rollback, revert)
334350
if !ok {
335351
return
@@ -345,6 +361,7 @@ func (rc *ReplicaChange) Apply(s State) {
345361

346362
authorUsageInfo := s.ClusterUsageInfo().storeRef(rc.Author)
347363
authorUsageInfo.Rebalances++
364+
log.KvDistribution.VEventf(ctx, 5, "r%d: author s%d rebalancing", rangeID, rc.Author)
348365
if requiresUpReplication {
349366
authorUsageInfo.RebalanceSentBytes += r.Size()
350367
s.ClusterUsageInfo().storeRef(storeNeedingSnapshot).RebalanceRcvdBytes += r.Size()
@@ -470,7 +487,7 @@ func (rc *replicaChanger) Push(tick time.Time, change Change) (time.Time, bool)
470487

471488
// Tick updates state changer to apply any changes that have occurred
472489
// between the last tick and this one.
473-
func (rc *replicaChanger) Tick(tick time.Time, state State) {
490+
func (rc *replicaChanger) Tick(ctx context.Context, tick time.Time, state State) {
474491
var changeList []*pendingChange
475492

476493
// NB: Add the smallest unit of time, in order to find all items in
@@ -483,7 +500,7 @@ func (rc *replicaChanger) Tick(tick time.Time, state State) {
483500

484501
for _, nextChange := range changeList {
485502
change := rc.pendingTickets[nextChange.ticket]
486-
change.Apply(state)
503+
change.Apply(ctx, state)
487504

488505
// Cleanup the pending trackers for this ticket. This allows another
489506
// change to be pushed for Range().

pkg/kv/kvserver/asim/state/change_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package state
77

88
import (
9+
"context"
910
"sort"
1011
"testing"
1112
"time"
@@ -321,11 +322,12 @@ func TestReplicaChange(t *testing.T) {
321322

322323
for _, tc := range testCases {
323324
t.Run(tc.desc, func(t *testing.T) {
325+
ctx := context.Background()
324326
state := testMakeRangeState(tc.stores, tc.initVoters, tc.initNonVoters)
325327
r, _ := state.Range(1)
326328
state.TransferLease(r.RangeID(), StoreID(1))
327329
change := tc.change(state)
328-
change.Apply(state)
330+
change.Apply(ctx, state)
329331
voters := testGetReplLocations(state, r, roachpb.VOTER_FULL)
330332
nonVoters := testGetReplLocations(state, r, roachpb.NON_VOTER)
331333
leaseholder := StoreID(-1)
@@ -686,8 +688,9 @@ func TestReplicaStateChanger(t *testing.T) {
686688
tsResults := make([]int64, 0, 1)
687689
resultLeaseholders := make(map[int64]map[int64]StoreID)
688690

691+
ctx := context.Background()
689692
for _, tick := range tc.ticks {
690-
changer.Tick(OffsetTick(start, tick), state)
693+
changer.Tick(ctx, OffsetTick(start, tick), state)
691694
if change, ok := tc.pushes[tick]; ok {
692695
if ts, ok := changer.Push(OffsetTick(start, tick), change(state)); ok {
693696
tsResults = append(tsResults, ReverseOffsetTick(start, ts))

0 commit comments

Comments
 (0)