Skip to content

Commit 5c2a22f

Browse files
committed
asim: use roachpb.ReplicationTarget for AllocationTransferLeaseOp
Previously, AllocationTransferLeaseOp used store ID to represent the source and target. This commit updates it to use roachpb.ReplicationTarget, which includes both store id and node id. We made this change because future commits will pass fields from AllocationTransferLeaseOp directly into NonMMAPreTransferLease, which requires ReplicationTarget. Note that production behavior remains unchanged, as only the store ID is accessed for them. Epic: none Release note: none
1 parent f6e9d3f commit 5c2a22f

File tree

9 files changed

+31
-9
lines changed

9 files changed

+31
-9
lines changed

pkg/kv/kvserver/allocator/plan/lease.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,12 @@ func (lp LeasePlanner) PlanOneChange(
172172
}
173173

174174
change.Op = AllocationTransferLeaseOp{
175-
Source: repl.StoreID(),
176-
Target: target.StoreID,
175+
Source: roachpb.ReplicationTarget{
176+
StoreID: repl.StoreID(), NodeID: repl.NodeID(),
177+
},
178+
Target: roachpb.ReplicationTarget{
179+
StoreID: target.StoreID, NodeID: target.NodeID,
180+
},
177181
Usage: usage,
178182
bypassSafetyChecks: false,
179183
}

pkg/kv/kvserver/allocator/plan/op.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type AllocationOp interface {
2929
// AllocationTransferLeaseOp represents an operation to transfer a range lease to another
3030
// store, from the current one.
3131
type AllocationTransferLeaseOp struct {
32-
Target, Source roachpb.StoreID
32+
Target, Source roachpb.ReplicationTarget
3333
Usage allocator.RangeUsageInfo
3434
bypassSafetyChecks bool
3535
}

pkg/kv/kvserver/allocator/plan/replicate.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ type AllocatorReplica interface {
100100
GetCompactedIndex() kvpb.RaftIndex
101101
LastReplicaAdded() (roachpb.ReplicaID, time.Time)
102102
StoreID() roachpb.StoreID
103+
NodeID() roachpb.NodeID
103104
GetRangeID() roachpb.RangeID
104105
SendStreamStats(*rac2.RangeSendStreamStats)
105106
}
@@ -908,8 +909,14 @@ func (rp ReplicaPlanner) maybeTransferLeaseAwayTarget(
908909
log.KvDistribution.Infof(ctx, "transferring away lease to s%d", target.StoreID)
909910

910911
op = AllocationTransferLeaseOp{
911-
Source: repl.StoreID(),
912-
Target: target.StoreID,
912+
Source: roachpb.ReplicationTarget{
913+
NodeID: repl.NodeID(),
914+
StoreID: repl.StoreID(),
915+
},
916+
Target: roachpb.ReplicationTarget{
917+
NodeID: target.NodeID,
918+
StoreID: target.StoreID,
919+
},
913920
Usage: usageInfo,
914921
bypassSafetyChecks: false,
915922
}

pkg/kv/kvserver/asim/queue/allocator_replica.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,8 @@ func (sr *SimulatorReplica) RangeUsageInfo() allocator.RangeUsageInfo {
160160

161161
func (sr *SimulatorReplica) SendStreamStats(stats *rac2.RangeSendStreamStats) {
162162
}
163+
164+
// NodeID returns the Replica's NodeID.
165+
func (sr *SimulatorReplica) NodeID() roachpb.NodeID {
166+
return sr.repl.Descriptor().NodeID
167+
}

pkg/kv/kvserver/asim/queue/lease_queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ func (lq *leaseQueue) MaybeAdd(ctx context.Context, replica state.Replica, s sta
113113
func (lq *leaseQueue) Tick(ctx context.Context, tick time.Time, s state.State) {
114114
lq.AddLogTag("tick", tick)
115115
ctx = lq.ResetAndAnnotateCtx(ctx)
116+
// TODO(wenyihu6): it is unclear why next tick is forwarded to last tick
117+
// here (see #149904 for more details).
116118
if lq.lastTick.After(lq.next) {
117119
lq.next = lq.lastTick
118120
}

pkg/kv/kvserver/asim/queue/replicate_queue.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ func (rq *replicateQueue) MaybeAdd(ctx context.Context, replica state.Replica, s
112112
func (rq *replicateQueue) Tick(ctx context.Context, tick time.Time, s state.State) {
113113
rq.AddLogTag("tick", tick)
114114
ctx = rq.ResetAndAnnotateCtx(ctx)
115+
// TODO(wenyihu6): it is unclear why next tick is forwarded to last tick
116+
// here (see #149904 for more details).
115117
if rq.lastTick.After(rq.next) {
116118
rq.next = rq.lastTick
117119
}
@@ -175,8 +177,8 @@ func pushReplicateChange(
175177
case plan.AllocationTransferLeaseOp:
176178
stateChange = &state.LeaseTransferChange{
177179
RangeID: state.RangeID(change.Replica.GetRangeID()),
178-
TransferTarget: state.StoreID(op.Target),
179-
Author: state.StoreID(op.Source),
180+
TransferTarget: state.StoreID(op.Target.StoreID),
181+
Author: state.StoreID(op.Source.StoreID),
180182
Wait: delayFn(rng.Size(), true),
181183
}
182184
case plan.AllocationChangeReplicasOp:

pkg/kv/kvserver/asim/queue/split_queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ func (sq *splitQueue) MaybeAdd(ctx context.Context, replica state.Replica, state
6868
// FIFO order on ties. The tick currently only considers size based range
6969
// splitting.
7070
func (sq *splitQueue) Tick(ctx context.Context, tick time.Time, s state.State) {
71+
// TODO(wenyihu6): it is unclear why next tick is forwarded to last tick
72+
// here (see #149904 for more details).
7173
if sq.lastTick.After(sq.next) {
7274
sq.next = sq.lastTick
7375
}

pkg/kv/kvserver/lease_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (lq *leaseQueue) process(
136136
lease, _ := repl.GetLease()
137137
log.KvDistribution.Infof(ctx, "transferring lease to s%d usage=%v, lease=[%v type=%v]", transferOp.Target, transferOp.Usage, lease, lease.Type())
138138
lq.lastLeaseTransfer.Store(timeutil.Now())
139-
if err := repl.AdminTransferLease(ctx, transferOp.Target, false /* bypassSafetyChecks */); err != nil {
139+
if err := repl.AdminTransferLease(ctx, transferOp.Target.StoreID, false /* bypassSafetyChecks */); err != nil {
140140
return false, errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, transferOp.Target)
141141
}
142142
change.Op.ApplyImpact(lq.storePool)

pkg/kv/kvserver/replicate_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ func (rq *replicateQueue) applyChange(
809809
case plan.AllocationFinalizeAtomicReplicationOp:
810810
err = rq.finalizeAtomicReplication(ctx, replica)
811811
case plan.AllocationTransferLeaseOp:
812-
err = rq.TransferLease(ctx, replica, op.Source, op.Target, op.Usage)
812+
err = rq.TransferLease(ctx, replica, op.Source.StoreID, op.Target.StoreID, op.Usage)
813813
case plan.AllocationChangeReplicasOp:
814814
err = rq.changeReplicas(
815815
ctx,

0 commit comments

Comments
 (0)