Skip to content

Commit 7e0edad

Browse files
wenyihu6tbg
authored andcommitted
asim: add ChangeReplicasOp as an operation
This commit adds ChangeReplicasOp, enabling the simulator’s store rebalancer to enqueue replica change operations. It is currently unused; future commits will integrate it with mma to enqueue replica changes. Epic: none Release note: none
1 parent f6ce9da commit 7e0edad

File tree

3 files changed

+91
-0
lines changed

3 files changed

+91
-0
lines changed

pkg/kv/kvserver/asim/op/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
33
go_library(
44
name = "op",
55
srcs = [
6+
"change_replicas.go",
67
"controller.go",
78
"operation.go",
89
"pq.go",
@@ -12,6 +13,7 @@ go_library(
1213
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/op",
1314
visibility = ["//visibility:public"],
1415
deps = [
16+
"//pkg/kv/kvpb",
1517
"//pkg/kv/kvserver",
1618
"//pkg/kv/kvserver/allocator",
1719
"//pkg/kv/kvserver/allocator/allocatorimpl",
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package op
7+
8+
import (
9+
"time"
10+
11+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
13+
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/errors"
15+
)
16+
17+
// ChangeReplicasOp contains the information for a change replicas operation.
18+
type ChangeReplicasOp struct {
19+
baseOp
20+
rangeID state.RangeID
21+
changes kvpb.ReplicationChanges
22+
}
23+
24+
// NewChangeReplicasOp returns a new ChangeReplicasOp.
25+
// TODO(wenyihu6): unused for now - will be integrated with mma simulation
26+
func NewChangeReplicasOp(
27+
tick time.Time, rangeID roachpb.RangeID, changes kvpb.ReplicationChanges,
28+
) *ChangeReplicasOp {
29+
return &ChangeReplicasOp{
30+
baseOp: newBaseOp(tick),
31+
rangeID: state.RangeID(rangeID),
32+
changes: changes,
33+
}
34+
}
35+
36+
func (cro *ChangeReplicasOp) error(err error) {
37+
augmentedErr := errors.Wrapf(err, "Unable to change replicas for range_id=%d, changes=%v",
38+
cro.rangeID, cro.changes)
39+
cro.errs = append(cro.errs, augmentedErr)
40+
}

pkg/kv/kvserver/asim/op/controller.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ func (c *controller) process(
134134
op.done = true
135135
op.complete = tick
136136
}
137+
case *ChangeReplicasOp:
138+
if err := c.processChangeReplicas(tick, state, op); err != nil {
139+
op.error(err)
140+
op.done = true
141+
op.complete = tick
142+
}
137143
default:
138144
return
139145
}
@@ -228,3 +234,46 @@ func (c *controller) processTransferLease(
228234
ro.next = tick.Add(delay)
229235
return nil
230236
}
237+
238+
func (c *controller) processChangeReplicas(
239+
tick time.Time, s state.State, cro *ChangeReplicasOp,
240+
) error {
241+
rng, ok := s.Range(cro.rangeID)
242+
if !ok {
243+
panic(errors.Newf("programming error: range %d not found", cro.rangeID))
244+
}
245+
// We need to check if the change is already complete. If it is, we can
246+
// skip the operation. This is the case where the change didn't apply
247+
// instantly and processChangeReplicas is called over multiple ticks.
248+
if (cro.complete != time.Time{}) && !tick.Before(cro.complete) {
249+
cro.done = true
250+
return nil
251+
}
252+
253+
change := state.ReplicaChange{
254+
RangeID: cro.rangeID,
255+
Author: c.storeID,
256+
Changes: cro.changes,
257+
}
258+
259+
if len(cro.changes) == 0 {
260+
panic("unexpected empty changes in ChangeReplicasOp")
261+
}
262+
263+
targets := kvserver.SynthesizeTargetsByChangeType(cro.changes)
264+
change.Wait = c.settings.ReplicaChangeDelayFn()(rng.Size(),
265+
len(targets.VoterAdditions) > 0 || len(targets.NonVoterAdditions) > 0)
266+
completeAt, ok := c.changer.Push(tick, &change)
267+
if !ok {
268+
return errors.Newf("tick %d: Changer did not accept change for range %d", tick, cro.rangeID)
269+
}
270+
cro.complete = completeAt
271+
272+
if !tick.Before(completeAt) {
273+
// If the change was applied instantly (only promotion/demotion).
274+
cro.done = true
275+
} else {
276+
cro.next = completeAt
277+
}
278+
return nil
279+
}

0 commit comments

Comments
 (0)