Skip to content

Commit f6e9d3f

Browse files
craig[bot]wenyihu6
andcommitted
Merge #149948
149948: asim: port over mma integration and asim store rebalancer r=tbg a=wenyihu6 **Stacked on top of #149930 --- **asim: pass node id to queues for logging** This commit passes in the node id to different queues for logging purposes. Epic: none Release note: none --- **asim: add AllocatorSync to asim node** This commit adds the AllocatorSync struct to the asim node. Each MMAllocator will have a corresponding AllocatorSync responsible for coordinating changes between the various queues and the MMA logic. This struct is currently unused and will be integrated in future commits. Epic: none Release note: none --- **asim: integrate mma store rebalancer** This commit integrates mma store rebalancers to asim. --- **asim: add AllocatorSync to asim queues** This commit populates the AllocatorSync field in asim queues. It will be used in future commits to coordinate changes with mma. Epic: none Release note: none Co-authored-by: wenyihu6 <[email protected]>
2 parents 7b39ad9 + bd66dc4 commit f6e9d3f

File tree

12 files changed

+118
-21
lines changed

12 files changed

+118
-21
lines changed

pkg/kv/kvserver/asim/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ go_library(
1010
"//pkg/kv/kvserver/asim/gossip",
1111
"//pkg/kv/kvserver/asim/history",
1212
"//pkg/kv/kvserver/asim/metrics",
13+
"//pkg/kv/kvserver/asim/mmaintegration",
1314
"//pkg/kv/kvserver/asim/op",
1415
"//pkg/kv/kvserver/asim/queue",
1516
"//pkg/kv/kvserver/asim/scheduled",

pkg/kv/kvserver/asim/asim.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip"
1414
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/history"
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/metrics"
16+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/mmaintegration"
1617
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/op"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/queue"
1819
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/scheduled"
@@ -29,8 +30,8 @@ type Simulator struct {
2930
curr time.Time
3031
end time.Time
3132
// interval is the step between ticks for active simulaton components, such
32-
// as the queues, store rebalancer and state changers. It should be set
33-
// lower than the bgInterval, as updated occur more frequently.
33+
// as the queues, store rebalancer and state changers. It should be set lower
34+
// than the bgInterval, as updates occur more frequently.
3435
interval time.Duration
3536

3637
// The simulator can run multiple workload Generators in parallel.
@@ -47,6 +48,8 @@ type Simulator struct {
4748
sqs map[state.StoreID]queue.RangeQueue
4849
// Store rebalancers.
4950
srs map[state.StoreID]storerebalancer.StoreRebalancer
51+
// Multi-metric store rebalancers.
52+
mmSRs map[state.StoreID]*mmaintegration.MMAStoreRebalancer
5053
// Store operation controllers.
5154
controllers map[state.StoreID]op.Controller
5255

@@ -87,6 +90,7 @@ func NewSimulator(
8790
lqs := make(map[state.StoreID]queue.RangeQueue)
8891
sqs := make(map[state.StoreID]queue.RangeQueue)
8992
srs := make(map[state.StoreID]storerebalancer.StoreRebalancer)
93+
mmSRs := make(map[state.StoreID]*mmaintegration.MMAStoreRebalancer)
9094
changer := state.NewReplicaChanger()
9195
controllers := make(map[state.StoreID]op.Controller)
9296

@@ -103,6 +107,7 @@ func NewSimulator(
103107
sqs: sqs,
104108
controllers: controllers,
105109
srs: srs,
110+
mmSRs: mmSRs,
106111
pacers: pacers,
107112
gossip: gossip.NewGossip(initialState, settings),
108113
metrics: m,
@@ -133,19 +138,24 @@ func (s *Simulator) StoreAddNotify(storeID state.StoreID, _ state.State) {
133138
func (s *Simulator) addStore(storeID state.StoreID, tick time.Time) {
134139
allocator := s.state.Allocator(storeID)
135140
storePool := s.state.StorePool(storeID)
141+
store, _ := s.state.Store(storeID)
136142
s.rqs[storeID] = queue.NewReplicateQueue(
137143
storeID,
144+
store.NodeID(),
138145
s.changer,
139146
s.settings,
140147
allocator,
148+
s.state.Node(store.NodeID()).AllocatorSync(),
141149
storePool,
142150
tick,
143151
)
144152
s.lqs[storeID] = queue.NewLeaseQueue(
145153
storeID,
154+
store.NodeID(),
146155
s.changer,
147156
s.settings,
148157
allocator,
158+
s.state.Node(store.NodeID()).AllocatorSync(),
149159
storePool,
150160
tick,
151161
)
@@ -175,6 +185,25 @@ func (s *Simulator) addStore(storeID state.StoreID, tick time.Time) {
175185
s.settings,
176186
storerebalancer.GetStateRaftStatusFn(s.state),
177187
)
188+
// TODO: We add the store to every node's allocator in the cluster
189+
// immediately. This is also updated via gossip, however there is a delay
190+
// after startup. When calling `mma.ProcessStoreLeaseholderMsg` via
191+
// tickMMStoreRebalancers, there may be not be a store state for some
192+
// replicas. Setting it here ensures that the store is always present and
193+
// initiated in each node's allocator. We should instead handle this in mma,
194+
// or integration component.
195+
for _, node := range s.state.Nodes() {
196+
node.MMAllocator().SetStore(state.StoreAttrAndLocFromDesc(
197+
s.state.StoreDescriptors(false, storeID)[0]))
198+
}
199+
s.mmSRs[storeID] = mmaintegration.NewMMAStoreRebalancer(
200+
storeID,
201+
store.NodeID(),
202+
s.state.Node(store.NodeID()).MMAllocator(),
203+
s.state.Node(store.NodeID()).AllocatorSync(),
204+
s.controllers[storeID],
205+
s.settings,
206+
)
178207
}
179208

180209
// GetNextTickTime returns a simulated tick time, or an indication that the
@@ -242,6 +271,9 @@ func (s *Simulator) RunSim(ctx context.Context) {
242271
// Simulate the store rebalancer logic.
243272
s.tickStoreRebalancers(ctx, tick, stateForAlloc)
244273

274+
// Simulate the multi-metric store rebalancer logic.
275+
s.tickMMStoreRebalancers(ctx, tick, stateForAlloc)
276+
245277
// Print tick metrics.
246278
s.tickMetrics(ctx, tick)
247279
}
@@ -342,6 +374,16 @@ func (s *Simulator) tickStoreRebalancers(ctx context.Context, tick time.Time, st
342374
}
343375
}
344376

377+
// tickStoreRebalancers iterates over the multi-metric store rebalancers in the
378+
// cluster and ticks their control loop.
379+
func (s *Simulator) tickMMStoreRebalancers(ctx context.Context, tick time.Time, state state.State) {
380+
stores := s.state.Stores()
381+
s.shuffler(len(stores), func(i, j int) { stores[i], stores[j] = stores[j], stores[i] })
382+
for _, store := range stores {
383+
s.mmSRs[store.StoreID()].Tick(ctx, tick, state)
384+
}
385+
}
386+
345387
// tickMetrics prints the metrics up to the given tick.
346388
func (s *Simulator) tickMetrics(ctx context.Context, tick time.Time) {
347389
s.metrics.Tick(ctx, tick, s.state)

pkg/kv/kvserver/asim/queue/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
"//pkg/kv/kvpb",
1717
"//pkg/kv/kvserver/allocator",
1818
"//pkg/kv/kvserver/allocator/allocatorimpl",
19+
"//pkg/kv/kvserver/allocator/mmaprototypehelpers",
1920
"//pkg/kv/kvserver/allocator/plan",
2021
"//pkg/kv/kvserver/allocator/storepool",
2122
"//pkg/kv/kvserver/asim/config",

pkg/kv/kvserver/asim/queue/lease_queue.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ package queue
88
import (
99
"container/heap"
1010
"context"
11+
"fmt"
1112
"time"
1213

1314
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
15+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers"
1416
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
1517
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
1618
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
@@ -27,14 +29,17 @@ type leaseQueue struct {
2729
planner plan.ReplicationPlanner
2830
clock *hlc.Clock
2931
settings *config.SimulationSettings
32+
as *mmaprototypehelpers.AllocatorSync
3033
}
3134

3235
// NewLeaseQueue returns a new lease queue.
3336
func NewLeaseQueue(
3437
storeID state.StoreID,
38+
nodeID state.NodeID,
3539
stateChanger state.Changer,
3640
settings *config.SimulationSettings,
3741
allocator allocatorimpl.Allocator,
42+
allocatorSync *mmaprototypehelpers.AllocatorSync,
3843
storePool storepool.AllocatorStorePool,
3944
start time.Time,
4045
) RangeQueue {
@@ -50,8 +55,10 @@ func NewLeaseQueue(
5055
planner: plan.NewLeasePlanner(allocator, storePool),
5156
storePool: storePool,
5257
clock: storePool.Clock(),
58+
as: allocatorSync,
5359
}
5460
lq.AddLogTag("lease", nil)
61+
lq.AddLogTag(fmt.Sprintf("n%ds%d", nodeID, storeID), "")
5562
return &lq
5663
}
5764

pkg/kv/kvserver/asim/queue/lease_queue_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,11 @@ func TestLeaseQueue(t *testing.T) {
140140
store, _ := s.Store(testingStore)
141141
lq := NewLeaseQueue(
142142
store.StoreID(),
143+
store.NodeID(),
143144
changer,
144145
testSettings,
145146
s.Allocator(store.StoreID()),
147+
s.Node(store.NodeID()).AllocatorSync(),
146148
s.StorePool(store.StoreID()),
147149
start,
148150
)

pkg/kv/kvserver/asim/queue/replicate_queue.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
15+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers"
1516
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
1617
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
@@ -26,14 +27,17 @@ type replicateQueue struct {
2627
planner plan.ReplicationPlanner
2728
clock *hlc.Clock
2829
settings *config.SimulationSettings
30+
as *mmaprototypehelpers.AllocatorSync
2931
}
3032

3133
// NewReplicateQueue returns a new replicate queue.
3234
func NewReplicateQueue(
3335
storeID state.StoreID,
36+
nodeID state.NodeID,
3437
stateChanger state.Changer,
3538
settings *config.SimulationSettings,
3639
allocator allocatorimpl.Allocator,
40+
allocatorSync *mmaprototypehelpers.AllocatorSync,
3741
storePool storepool.AllocatorStorePool,
3842
start time.Time,
3943
) RangeQueue {
@@ -49,8 +53,10 @@ func NewReplicateQueue(
4953
planner: plan.NewReplicaPlanner(
5054
allocator, storePool, plan.ReplicaPlannerTestingKnobs{}),
5155
clock: storePool.Clock(),
56+
as: allocatorSync,
5257
}
5358
rq.AddLogTag("replica", nil)
59+
rq.AddLogTag(fmt.Sprintf("n%ds%d", nodeID, storeID), "")
5460
return &rq
5561
}
5662

@@ -64,7 +70,7 @@ func (rq *replicateQueue) MaybeAdd(ctx context.Context, replica state.Replica, s
6470
}
6571

6672
repl := NewSimulatorReplica(replica, s)
67-
rq.AddLogTag("r", repl.repl.Descriptor())
73+
rq.AddLogTag("r", repl.Desc().RangeID)
6874
rq.AnnotateCtx(ctx)
6975

7076
desc := repl.Desc()

pkg/kv/kvserver/asim/queue/replicate_queue_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,11 @@ func TestReplicateQueue(t *testing.T) {
289289
store, _ := s.Store(testingStore)
290290
rq := NewReplicateQueue(
291291
store.StoreID(),
292+
store.NodeID(),
292293
changer,
293294
testSettings,
294295
s.Allocator(store.StoreID()),
296+
s.Node(store.NodeID()).AllocatorSync(),
295297
s.StorePool(store.StoreID()),
296298
start,
297299
)

pkg/kv/kvserver/asim/state/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ go_library(
2626
"//pkg/kv/kvserver/allocator",
2727
"//pkg/kv/kvserver/allocator/allocatorimpl",
2828
"//pkg/kv/kvserver/allocator/mmaprototype",
29+
"//pkg/kv/kvserver/allocator/mmaprototypehelpers",
2930
"//pkg/kv/kvserver/allocator/storepool",
3031
"//pkg/kv/kvserver/asim/config",
3132
"//pkg/kv/kvserver/asim/workload",
@@ -45,6 +46,7 @@ go_library(
4546
"//pkg/util/metric",
4647
"//pkg/util/stop",
4748
"//pkg/util/timeutil",
49+
"@com_github_cockroachdb_logtags//:logtags",
4850
"@com_github_google_btree//:btree",
4951
"@org_golang_google_protobuf//proto",
5052
],

pkg/kv/kvserver/asim/state/impl.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
2424
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
2525
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype"
26+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers"
2627
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
2728
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
2829
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
@@ -36,6 +37,7 @@ import (
3637
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigreporter"
3738
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3839
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
40+
"github.com/cockroachdb/logtags"
3941
"github.com/google/btree"
4042
)
4143

@@ -324,6 +326,11 @@ func (s *state) Nodes() []Node {
324326
return nodes
325327
}
326328

329+
func (s *state) Node(nodeID NodeID) Node {
330+
node := s.nodes[nodeID]
331+
return node
332+
}
333+
327334
// RangeFor returns the range containing Key in [StartKey, EndKey). This
328335
// cannot fail.
329336
func (s *state) RangeFor(key Key) Range {
@@ -426,6 +433,7 @@ func (s *state) AddNode() Node {
426433
stores: []StoreID{},
427434
storepool: sp,
428435
mmAllocator: mmAllocator,
436+
as: mmaprototypehelpers.NewAllocatorSync(sp, mmAllocator),
429437
}
430438
s.nodes[nodeID] = node
431439
s.SetNodeLiveness(nodeID, livenesspb.NodeLivenessStatus_LIVE)
@@ -1136,7 +1144,24 @@ func (s *state) UpdateStorePool(
11361144
detail := storeDescriptors[gossipStoreID]
11371145
copiedDetail := detail.Copy()
11381146
node.storepool.Details.StoreDetails.Store(gossipStoreID, copiedDetail)
1147+
copiedDesc := *copiedDetail.Desc
11391148
// TODO(mma): Support origin timestamps.
1149+
ts := s.clock.Now()
1150+
storeLoadMsg := mmaprototypehelpers.MakeStoreLoadMsg(copiedDesc, ts.UnixNano())
1151+
node.mmAllocator.SetStore(StoreAttrAndLocFromDesc(copiedDesc))
1152+
ctx := logtags.AddTag(context.Background(), fmt.Sprintf("n%d", nodeID), "")
1153+
ctx = logtags.AddTag(ctx, "t", ts.Sub(s.settings.StartTime))
1154+
node.mmAllocator.ProcessStoreLoadMsg(ctx, &storeLoadMsg)
1155+
}
1156+
}
1157+
1158+
func StoreAttrAndLocFromDesc(desc roachpb.StoreDescriptor) mmaprototype.StoreAttributesAndLocality {
1159+
return mmaprototype.StoreAttributesAndLocality{
1160+
StoreID: desc.StoreID,
1161+
NodeID: desc.Node.NodeID,
1162+
NodeAttrs: desc.Node.Attrs,
1163+
NodeLocality: desc.Node.Locality,
1164+
StoreAttrs: desc.Attrs,
11401165
}
11411166
}
11421167

@@ -1400,6 +1425,7 @@ type node struct {
14001425
stores []StoreID
14011426
storepool *storepool.StorePool
14021427
mmAllocator mmaprototype.Allocator
1428+
as *mmaprototypehelpers.AllocatorSync
14031429
}
14041430

14051431
// NodeID returns the ID of this node.
@@ -1421,6 +1447,10 @@ func (n *node) MMAllocator() mmaprototype.Allocator {
14211447
return n.mmAllocator
14221448
}
14231449

1450+
func (n *node) AllocatorSync() *mmaprototypehelpers.AllocatorSync {
1451+
return n.as
1452+
}
1453+
14241454
// store is an implementation of the Store interface.
14251455
type store struct {
14261456
storeID StoreID

pkg/kv/kvserver/asim/state/state.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
1414
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype"
16+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers"
1617
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
1819
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
@@ -59,6 +60,7 @@ type State interface {
5960
// first flag is false, then the capacity is generated from scratch,
6061
// otherwise the last calculated capacity values are used for each store.
6162
StoreDescriptors(bool, ...StoreID) []roachpb.StoreDescriptor
63+
Node(NodeID) Node
6264
// Nodes returns all nodes that exist in this state.
6365
Nodes() []Node
6466
// RangeFor returns the range containing Key in [StartKey, EndKey). This
@@ -217,6 +219,8 @@ type Node interface {
217219
Descriptor() roachpb.NodeDescriptor
218220
// TODO(wenyihu6): use this in mma store rebalancer
219221
MMAllocator() mmaprototype.Allocator
222+
// AllocatorSync returns the AllocatorSync for this node.
223+
AllocatorSync() *mmaprototypehelpers.AllocatorSync
220224
}
221225

222226
// Store is a container for replicas.

0 commit comments

Comments
 (0)