Skip to content

Commit b7cc5c7

Browse files
authored
simplify and increase rebalance rate for ShardManager (#5044)
1 parent 93687dd commit b7cc5c7

File tree

5 files changed

+64
-144
lines changed

5 files changed

+64
-144
lines changed

.changeset/short-owls-crash.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/cluster": patch
3+
---
4+
5+
simplify and increase rebalance rate for ShardManager

packages/cluster/src/ShardManager.ts

Lines changed: 36 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,7 @@ import * as Schema from "effect/Schema"
3333
import type { Scope } from "effect/Scope"
3434
import { RunnerNotRegistered } from "./ClusterError.js"
3535
import * as ClusterMetrics from "./ClusterMetrics.js"
36-
import {
37-
addAllNested,
38-
decideAssignmentsForUnassignedShards,
39-
decideAssignmentsForUnbalancedShards,
40-
State
41-
} from "./internal/shardManager.js"
36+
import { addAllNested, decideAssignmentsForShards, State } from "./internal/shardManager.js"
4237
import * as MachineId from "./MachineId.js"
4338
import { Runner } from "./Runner.js"
4439
import { RunnerAddress } from "./RunnerAddress.js"
@@ -74,7 +69,7 @@ export class ShardManager extends Context.Tag("@effect/cluster/ShardManager")<Sh
7469
/**
7570
* Rebalance shards assigned to runners within the cluster.
7671
*/
77-
readonly rebalance: (immediate: boolean) => Effect.Effect<void>
72+
readonly rebalance: Effect.Effect<void>
7873
/**
7974
* Notify the cluster of an unhealthy runner.
8075
*/
@@ -130,7 +125,7 @@ export class Config extends Context.Tag("@effect/cluster/ShardManager/Config")<C
130125
* @since 1.0.0
131126
*/
132127
static readonly defaults: Config["Type"] = {
133-
rebalanceDebounce: Duration.millis(500),
128+
rebalanceDebounce: Duration.seconds(3),
134129
rebalanceInterval: Duration.seconds(20),
135130
rebalanceRetryInterval: Duration.seconds(10),
136131
rebalanceRate: 2 / 100,
@@ -505,11 +500,8 @@ export const make = Effect.gen(function*() {
505500
state.addRunner(runner, clock.unsafeCurrentTimeMillis())
506501
updateRunnerMetrics()
507502
yield* PubSub.publish(events, ShardingEvent.RunnerRegistered({ address: runner.address }))
508-
509-
if (state.allUnassignedShards.length > 0) {
510-
yield* rebalance(false)
511-
}
512503
yield* Effect.forkIn(persistRunners, scope)
504+
yield* Effect.forkIn(rebalance, scope)
513505
return MachineId.make(++machineId)
514506
})
515507

@@ -532,52 +524,40 @@ export const make = Effect.gen(function*() {
532524
}
533525

534526
yield* Effect.forkIn(persistRunners, scope)
535-
yield* Effect.forkIn(rebalance(true), scope)
527+
yield* Effect.forkIn(rebalance, scope)
536528
})
537529

538530
let rebalancing = false
539-
let nextRebalanceImmediate = false
540531
let rebalanceDeferred: Deferred.Deferred<void> | undefined
541532
const rebalanceFibers = yield* FiberSet.make()
542533

543-
const rebalance = (immmediate: boolean): Effect.Effect<void> =>
544-
Effect.withFiberRuntime<void>((fiber) => {
545-
if (!rebalancing) {
546-
rebalancing = true
547-
return rebalanceLoop(immmediate)
548-
}
549-
if (immmediate) {
550-
nextRebalanceImmediate = true
551-
}
552-
if (!rebalanceDeferred) {
553-
rebalanceDeferred = Deferred.unsafeMake(fiber.id())
554-
}
555-
return Deferred.await(rebalanceDeferred)
556-
})
557-
558-
const rebalanceLoop = (immediate?: boolean): Effect.Effect<void> =>
559-
Effect.suspend(() => {
560-
const deferred = rebalanceDeferred
561-
rebalanceDeferred = undefined
562-
if (!immediate) {
563-
immediate = nextRebalanceImmediate
564-
nextRebalanceImmediate = false
565-
}
566-
return runRebalance(immediate).pipe(
567-
deferred ? Effect.intoDeferred(deferred) : identity,
568-
Effect.onExit(() => {
569-
if (!rebalanceDeferred) {
570-
rebalancing = false
571-
return Effect.void
572-
}
573-
return Effect.forkIn(rebalanceLoop(), scope)
574-
})
575-
)
576-
})
534+
const rebalance = Effect.withFiberRuntime<void>((fiber) => {
535+
if (!rebalancing) {
536+
rebalancing = true
537+
return rebalanceLoop
538+
}
539+
if (!rebalanceDeferred) {
540+
rebalanceDeferred = Deferred.unsafeMake(fiber.id())
541+
}
542+
return Deferred.await(rebalanceDeferred)
543+
})
577544

578-
const runRebalance = Effect.fn("ShardManager.rebalance")(function*(immediate: boolean) {
579-
yield* Effect.annotateCurrentSpan("immmediate", immediate)
545+
const rebalanceLoop: Effect.Effect<void> = Effect.suspend(() => {
546+
const deferred = rebalanceDeferred
547+
rebalanceDeferred = undefined
548+
return runRebalance.pipe(
549+
deferred ? Effect.intoDeferred(deferred) : identity,
550+
Effect.onExit(() => {
551+
if (!rebalanceDeferred) {
552+
rebalancing = false
553+
return Effect.void
554+
}
555+
return Effect.forkIn(rebalanceLoop, scope)
556+
})
557+
)
558+
})
580559

560+
const runRebalance = Effect.gen(function*() {
581561
yield* Effect.sleep(config.rebalanceDebounce)
582562

583563
if (state.shards.size === 0) {
@@ -590,10 +570,7 @@ export const make = Effect.gen(function*() {
590570
const unassignments = MutableHashMap.empty<RunnerAddress, MutableHashSet.MutableHashSet<ShardId>>()
591571
const changes = MutableHashSet.empty<RunnerAddress>()
592572
for (const group of state.shards.keys()) {
593-
const [groupAssignments, groupUnassignments, groupChanges] =
594-
immediate || (state.unassignedShards(group).length > 0)
595-
? decideAssignmentsForUnassignedShards(state, group)
596-
: decideAssignmentsForUnbalancedShards(state, group, config.rebalanceRate)
573+
const [groupAssignments, groupUnassignments, groupChanges] = decideAssignmentsForShards(state, group)
597574
for (const [address, shards] of groupAssignments) {
598575
addAllNested(assignments, address, Array.from(shards, (id) => makeShardId(group, id)))
599576
}
@@ -605,7 +582,7 @@ export const make = Effect.gen(function*() {
605582
}
606583
}
607584

608-
yield* Effect.logDebug(`Rebalancing shards (immediate = ${immediate})`)
585+
yield* Effect.logDebug(`Rebalancing shards`)
609586

610587
if (MutableHashSet.size(changes) === 0) return
611588

@@ -691,16 +668,16 @@ export const make = Effect.gen(function*() {
691668
yield* Effect.logWarning("Failed to rebalance runners: ", failedRunners)
692669
}
693670

694-
if (wereFailures && immediate) {
671+
if (wereFailures) {
695672
// Try rebalancing again later if there were any failures
696673
yield* Clock.sleep(config.rebalanceRetryInterval).pipe(
697-
Effect.zipRight(rebalance(immediate)),
674+
Effect.zipRight(rebalance),
698675
Effect.forkIn(scope)
699676
)
700677
}
701678

702679
yield* persistAssignments
703-
})
680+
}).pipe(Effect.withSpan("ShardManager.rebalance", { captureStackTrace: false }))
704681

705682
const checkRunnerHealth: Effect.Effect<void> = Effect.suspend(() =>
706683
Effect.forEach(MutableHashMap.keys(state.allRunners), notifyUnhealthyRunner, {
@@ -720,14 +697,8 @@ export const make = Effect.gen(function*() {
720697

721698
yield* Effect.forkIn(persistRunners, scope)
722699

723-
// Rebalance immediately if there are unassigned shards
724-
yield* Effect.forkIn(
725-
rebalance(state.allUnassignedShards.length > 0),
726-
scope
727-
)
728-
729700
// Start a regular cluster rebalance at the configured interval
730-
yield* rebalance(false).pipe(
701+
yield* rebalance.pipe(
731702
Effect.andThen(Effect.sleep(config.rebalanceInterval)),
732703
Effect.forever,
733704
Effect.forkIn(scope)

packages/cluster/src/internal/shardManager.ts

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -244,34 +244,26 @@ export interface RunnerWithMetadata {
244244
/** @internal */
245245
export const RunnerWithMetadata = (runner: RunnerWithMetadata): RunnerWithMetadata => runner
246246

247-
/** @internal */
248-
export function decideAssignmentsForUnassignedShards(state: State, group: string): readonly [
249-
assignments: MutableHashMap.MutableHashMap<RunnerAddress, Set<number>>,
250-
unassignments: MutableHashMap.MutableHashMap<RunnerAddress, Set<number>>,
251-
changes: MutableHashSet.MutableHashSet<RunnerAddress>
252-
] {
253-
return pickNewRunners(state.unassignedShards(group), state, group, true, 1)
254-
}
255-
256247
const allocationOrder: Order.Order<[number, number, number]> = Order.combine(
257248
Order.mapInput(Order.number, ([, shards]) => shards),
258249
Order.mapInput(Order.number, ([, , registeredAt]) => registeredAt)
259250
)
260251

261252
/** @internal */
262-
export function decideAssignmentsForUnbalancedShards(state: State, group: string, rate: number): readonly [
253+
export function decideAssignmentsForShards(state: State, group: string): readonly [
263254
assignments: MutableHashMap.MutableHashMap<RunnerAddress, Set<number>>,
264255
unassignments: MutableHashMap.MutableHashMap<RunnerAddress, Set<number>>,
265256
changes: MutableHashSet.MutableHashSet<RunnerAddress>
266257
] {
267258
const shardsPerRunner = state.shardsPerRunner(group)
268259
const maxVersion = state.maxVersion
269-
const extraShardsToAllocate = Arr.empty<[number, shardsInverse: number, registeredAt: number]>()
260+
const shardsToRebalance = state.unassignedShards(group)
270261

271262
const runnerGroup = state.runners.get(group)!
272263
const shardsGroup = state.shards.get(group)!
273264

274265
if (state.allRunnersHaveVersion(maxVersion)) {
266+
const extraShardsToAllocate = Arr.empty<[number, shardsInverse: number, registeredAt: number]>()
275267
const averageShardsPerRunner = state.averageShardsPerRunner(group)
276268
MutableHashMap.forEach(shardsPerRunner, (shards) => {
277269
// Count how many extra shards there are compared to the average
@@ -296,19 +288,19 @@ export function decideAssignmentsForUnbalancedShards(state: State, group: string
296288
])
297289
}
298290
})
291+
extraShardsToAllocate.sort(allocationOrder)
292+
for (let i = 0; i < extraShardsToAllocate.length; i++) {
293+
shardsToRebalance.push(extraShardsToAllocate[i][0])
294+
}
299295
}
300296

301-
const sortedShardsToRebalance = extraShardsToAllocate.sort(allocationOrder).map(([shard]) => shard)
302-
303-
return pickNewRunners(sortedShardsToRebalance, state, group, false, rate, shardsPerRunner, maxVersion)
297+
return pickNewRunners(shardsToRebalance, state, group, shardsPerRunner, maxVersion)
304298
}
305299

306300
function pickNewRunners(
307301
shardsToRebalance: ReadonlyArray<number>,
308302
state: State,
309303
group: string,
310-
immediate: boolean,
311-
rate: number,
312304
shardsPerRunner = state.shardsPerRunner(group),
313305
maybeMaxVersion = state.maxVersion
314306
): readonly [
@@ -343,13 +335,6 @@ function pickNewRunners(
343335
// Do not assign to a runner that has unassignments in the same rebalance
344336
if (MutableHashMap.has(unassignments, address)) continue
345337

346-
// Do not assign too many shards to each runner unless rebalancing must
347-
// occur immediately
348-
if (!immediate) {
349-
const assignmentCount = Option.getOrUndefined(MutableHashMap.get(addressAssignments, address))?.size ?? 0
350-
if (assignmentCount >= shardsGroup.size * rate) continue
351-
}
352-
353338
if (candidate === undefined || shards.size < candidateShards!.size) {
354339
candidate = address
355340
candidateShards = shards

packages/cluster/test/ShardManager.bench.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
} from "@effect/cluster"
1111
import { bench, describe, expect } from "@effect/vitest"
1212
import { Array, Data, Effect, Layer, Logger, MutableHashMap, Option, TestClock, TestContext } from "effect"
13-
import { decideAssignmentsForUnassignedShards, RunnerWithMetadata, State } from "../src/internal/shardManager.js"
13+
import { decideAssignmentsForShards, RunnerWithMetadata, State } from "../src/internal/shardManager.js"
1414

1515
describe("ShardManager", () => {
1616
const shards300 = Array.makeBy(
@@ -45,12 +45,12 @@ describe("ShardManager", () => {
4545
new Map(shards1000)
4646
)
4747

48-
bench("decideAssignmentsForUnassignedShards - 30 runners 300 shards", () => {
49-
decideAssignmentsForUnassignedShards(state30, "default")
48+
bench("decideAssignmentsForShards - 30 runners 300 shards", () => {
49+
decideAssignmentsForShards(state30, "default")
5050
})
5151

52-
bench("decideAssignmentsForUnassignedShards - 100 runners 1000 shards", () => {
53-
decideAssignmentsForUnassignedShards(state100, "default")
52+
bench("decideAssignmentsForShards - 100 runners 1000 shards", () => {
53+
decideAssignmentsForShards(state100, "default")
5454
})
5555

5656
const ShardManagerLive = ShardManager.layer.pipe(

0 commit comments

Comments
 (0)