Skip to content

Commit ff90206

Browse files
authored
use FIFO queue for shard assignment, to reduce churn (#5070)
1 parent f325caa commit ff90206

File tree

2 files changed

+58
-69
lines changed

2 files changed

+58
-69
lines changed

.changeset/orange-tips-slide.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/cluster": patch
3+
---
4+
5+
use FIFO queue for shard assignment, to reduce churn

packages/cluster/src/internal/shardManager.ts

Lines changed: 53 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { constFalse } from "effect/Function"
55
import * as MutableHashMap from "effect/MutableHashMap"
66
import * as MutableHashSet from "effect/MutableHashSet"
77
import * as Option from "effect/Option"
8-
import * as Order from "effect/Order"
98
import type { Runner } from "../Runner.js"
109
import type { RunnerAddress } from "../RunnerAddress.js"
1110
import { RunnerHealth } from "../RunnerHealth.js"
@@ -60,9 +59,6 @@ export class State {
6059
const now = yield* Clock.currentTimeMillis
6160
const allRunners = MutableHashMap.empty<RunnerAddress, RunnerWithMetadata>()
6261
const runnerState = new Map<string, MutableHashMap.MutableHashMap<RunnerAddress, RunnerWithMetadata>>()
63-
// for (const group of groups) {
64-
// runnerState.set(group, MutableHashMap.empty<RunnerAddress, RunnerWithMetadata>())
65-
// }
6662
for (const [address, runner] of aliveRunners) {
6763
const withMetadata = RunnerWithMetadata({ runner, registeredAt: now })
6864
MutableHashMap.set(allRunners, address, withMetadata)
@@ -96,32 +92,66 @@ export class State {
9692
readonly shardsPerGroup: number
9793
) {
9894
this.assignments = MutableHashMap.empty<ShardId, Option.Option<RunnerAddress>>()
95+
this.perRunner = new Map<string, MutableHashMap.MutableHashMap<RunnerAddress, Set<number>>>()
96+
97+
for (const [address, meta] of this.allRunners) {
98+
for (const group of meta.runner.groups) {
99+
let runnerMap = this.perRunner.get(group)
100+
if (!runnerMap) {
101+
runnerMap = MutableHashMap.empty<RunnerAddress, Set<number>>()
102+
this.perRunner.set(group, runnerMap)
103+
}
104+
MutableHashMap.set(runnerMap, address, new Set())
105+
}
106+
}
107+
99108
for (const [group, groupMap] of this.shards) {
100-
for (const [id, address] of groupMap) {
109+
const perRunnerMap = this.perRunner.get(group)!
110+
for (const [id, address_] of groupMap) {
111+
const address = Option.filter(address_, (addr) => MutableHashMap.has(this.allRunners, addr))
101112
MutableHashMap.set(this.assignments, new ShardId({ group, id }), address)
113+
if (Option.isSome(address)) {
114+
Option.getOrUndefined(MutableHashMap.get(perRunnerMap, address.value))?.add(id)
115+
}
102116
}
103117
}
104118
}
105119

106120
readonly assignments: MutableHashMap.MutableHashMap<ShardId, Option.Option<RunnerAddress>>
121+
readonly perRunner: Map<string, MutableHashMap.MutableHashMap<RunnerAddress, Set<number>>>
107122

108123
addGroup(group: string): void {
109124
this.runners.set(group, MutableHashMap.empty<RunnerAddress, RunnerWithMetadata>())
110125
const shardMap = new Map<number, Option.Option<RunnerAddress>>()
126+
this.shards.set(group, shardMap)
111127
for (let n = 1; n <= this.shardsPerGroup; n++) {
112128
shardMap.set(n, Option.none())
113129
MutableHashMap.set(this.assignments, new ShardId({ group, id: n }), Option.none())
114130
}
115-
this.shards.set(group, shardMap)
131+
132+
const perRunnerMap = MutableHashMap.empty<RunnerAddress, Set<number>>()
133+
this.perRunner.set(group, perRunnerMap)
134+
for (const [address] of this.allRunners) {
135+
MutableHashMap.set(perRunnerMap, address, new Set())
136+
}
116137
}
117138

118139
addAssignments(
119140
shards: Iterable<ShardId>,
120141
address: Option.Option<RunnerAddress>
121142
) {
122143
for (const shardId of shards) {
144+
const currentAddress = Option.flatten(MutableHashMap.get(this.assignments, shardId))
123145
MutableHashMap.set(this.assignments, shardId, address)
124146
this.shards.get(shardId.group)?.set(shardId.id, address)
147+
148+
const perRunner = this.perRunner.get(shardId.group)!
149+
if (Option.isSome(currentAddress)) {
150+
Option.getOrUndefined(MutableHashMap.get(perRunner, currentAddress.value))?.delete(shardId.id)
151+
}
152+
if (Option.isSome(address)) {
153+
Option.getOrUndefined(MutableHashMap.get(perRunner, address.value))?.add(shardId.id)
154+
}
125155
}
126156
}
127157

@@ -134,13 +164,19 @@ export class State {
134164
}
135165
const groupMap = this.runners.get(group)!
136166
MutableHashMap.set(groupMap, runner.address, withMetadata)
167+
const perRunner = this.perRunner.get(group)!
168+
MutableHashMap.set(perRunner, runner.address, new Set())
137169
}
138170
}
139171

140172
removeRunner(address: RunnerAddress): void {
141173
MutableHashMap.remove(this.allRunners, address)
142-
for (const groupMap of this.runners.values()) {
174+
for (const group of this.runners.keys()) {
175+
const groupMap = this.runners.get(group)!
143176
MutableHashMap.remove(groupMap, address)
177+
178+
const perRunner = this.perRunner.get(group)!
179+
MutableHashMap.remove(perRunner, address)
144180
}
145181
}
146182

@@ -182,19 +218,12 @@ export class State {
182218
}
183219

184220
shardsPerRunner(group: string): MutableHashMap.MutableHashMap<RunnerAddress, Set<number>> {
185-
const groupRunners = this.runners.get(group)
186221
const shards = MutableHashMap.empty<RunnerAddress, Set<number>>()
222+
const perRunner = this.perRunner.get(group)
223+
if (!perRunner || MutableHashMap.isEmpty(perRunner)) return shards
187224

188-
if (!groupRunners || MutableHashMap.isEmpty(groupRunners)) return shards
189-
MutableHashMap.forEach(groupRunners, (_, address) => {
190-
MutableHashMap.set(shards, address, new Set())
191-
})
192-
193-
const assignments = this.shards.get(group)!
194-
for (const [id, address] of assignments) {
195-
if (Option.isNone(address)) continue
196-
const shardIds = Option.getOrUndefined(MutableHashMap.get(shards, address.value))!
197-
shardIds.add(id)
225+
for (const [address, shardSet] of perRunner) {
226+
MutableHashMap.set(shards, address, new Set(shardSet))
198227
}
199228

200229
return shards
@@ -244,11 +273,6 @@ export interface RunnerWithMetadata {
244273
/** @internal */
245274
export const RunnerWithMetadata = (runner: RunnerWithMetadata): RunnerWithMetadata => runner
246275

247-
const allocationOrder: Order.Order<[number, number, number]> = Order.combine(
248-
Order.mapInput(Order.number, ([, shards]) => shards),
249-
Order.mapInput(Order.number, ([, , registeredAt]) => registeredAt)
250-
)
251-
252276
/** @internal */
253277
export function decideAssignmentsForShards(state: State, group: string): readonly [
254278
assignments: MutableHashMap.MutableHashMap<RunnerAddress, Set<number>>,
@@ -259,39 +283,17 @@ export function decideAssignmentsForShards(state: State, group: string): readonl
259283
const maxVersion = state.maxVersion
260284
const shardsToRebalance = state.unassignedShards(group)
261285

262-
const runnerGroup = state.runners.get(group)!
263-
const shardsGroup = state.shards.get(group)!
264-
265286
if (state.allRunnersHaveVersion(maxVersion)) {
266-
const extraShardsToAllocate = Arr.empty<[number, shardsInverse: number, registeredAt: number]>()
267287
const averageShardsPerRunner = state.averageShardsPerRunner(group)
268288
MutableHashMap.forEach(shardsPerRunner, (shards) => {
269-
// Count how many extra shards there are compared to the average
270289
const extraShards = Math.max(0, shards.size - averageShardsPerRunner)
271-
for (const shard of takeRandom(shards, extraShards)) {
272-
const maybeAddress = shardsGroup.get(shard) ?? Option.none()
273-
if (Option.isNone(maybeAddress)) {
274-
extraShardsToAllocate.push([shard, Number.MIN_SAFE_INTEGER, Number.MIN_SAFE_INTEGER])
275-
continue
276-
}
277-
const address = maybeAddress.value
278-
extraShardsToAllocate.push([
279-
shard,
280-
Option.match(MutableHashMap.get(shardsPerRunner, address), {
281-
onNone: () => Number.MIN_SAFE_INTEGER,
282-
onSome: (shards) => -shards.size
283-
}),
284-
Option.match(MutableHashMap.get(runnerGroup, address), {
285-
onNone: () => Number.MIN_SAFE_INTEGER,
286-
onSome: (meta) => meta.registeredAt
287-
})
288-
])
290+
const iter = shards.values()
291+
for (let i = 0; i < extraShards; i++) {
292+
const shard = iter.next()
293+
if (shard.done) break
294+
shardsToRebalance.push(shard.value)
289295
}
290296
})
291-
extraShardsToAllocate.sort(allocationOrder)
292-
for (let i = 0; i < extraShardsToAllocate.length; i++) {
293-
shardsToRebalance.push(extraShardsToAllocate[i][0])
294-
}
295297
}
296298

297299
return pickNewRunners(shardsToRebalance, state, group, shardsPerRunner, maxVersion)
@@ -301,7 +303,7 @@ function pickNewRunners(
301303
shardsToRebalance: ReadonlyArray<number>,
302304
state: State,
303305
group: string,
304-
shardsPerRunner = state.shardsPerRunner(group),
306+
shardsPerRunner: MutableHashMap.MutableHashMap<RunnerAddress, Set<number>>,
305307
maybeMaxVersion = state.maxVersion
306308
): readonly [
307309
assignments: MutableHashMap.MutableHashMap<RunnerAddress, Set<number>>,
@@ -393,24 +395,6 @@ function pickNewRunners(
393395
return [addressAssignments, unassignments, changes]
394396
}
395397

396-
function takeRandom<A>(self: Iterable<A>, n: number): ReadonlyArray<A> {
397-
const array = Array.from(self)
398-
let currentIndex = array.length
399-
while (currentIndex != 0) {
400-
const randomIndex = Math.floor(Math.random() * currentIndex)
401-
currentIndex = currentIndex - 1
402-
swap(array, currentIndex, randomIndex)
403-
}
404-
return n < array.length ? array.slice(0, n) : array
405-
}
406-
407-
function swap<A>(array: Array<A>, i: number, j: number): ReadonlyArray<A> {
408-
const tmp = array[i]
409-
array[i] = array[j]
410-
array[j] = tmp
411-
return array
412-
}
413-
414398
/** @internal */
415399
export const addAllNested = <K, V>(
416400
self: MutableHashMap.MutableHashMap<K, MutableHashSet.MutableHashSet<V>>,

0 commit comments

Comments
 (0)