Skip to content

Commit 8f552a0

Browse files
authored
refactor(WebSocketShard): identify throttling (#8888)
* refactor(WebSocketShard): identify throttling * chore: add worker handling * refactor: worker handling * chore: update tests * chore: use satisfies where applicable * chore: add informative comment * chore: apply suggestions * refactor(SimpleContextFetchingStrategy): support multiple managers
1 parent 3fca638 commit 8f552a0

File tree

9 files changed

+113
-38
lines changed

9 files changed

+113
-38
lines changed

packages/ws/__tests__/strategy/WorkerShardingStrategy.test.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,22 @@ const mockConstructor = vi.fn();
2727
const mockSend = vi.fn();
2828
const mockTerminate = vi.fn();
2929

30-
const memberChunkData: GatewayDispatchPayload = {
30+
const memberChunkData = {
3131
op: GatewayOpcodes.Dispatch,
3232
s: 123,
3333
t: GatewayDispatchEvents.GuildMembersChunk,
3434
d: {
3535
guild_id: '123',
3636
members: [],
3737
},
38-
};
38+
} as unknown as GatewayDispatchPayload;
3939

4040
const sessionInfo: SessionInfo = {
4141
shardId: 0,
4242
shardCount: 2,
4343
sequence: 123,
4444
sessionId: 'abc',
45+
resumeURL: 'wss://ehehe.gg',
4546
};
4647

4748
vi.mock('node:worker_threads', async () => {
@@ -109,6 +110,10 @@ vi.mock('node:worker_threads', async () => {
109110
this.emit('message', session);
110111
break;
111112
}
113+
114+
case WorkerSendPayloadOp.ShardCanIdentify: {
115+
break;
116+
}
112117
}
113118
}
114119

@@ -181,7 +186,10 @@ test('spawn, connect, send a message, session info, and destroy', async () => {
181186
expect.objectContaining({ workerData: expect.objectContaining({ shardIds: [0, 1] }) }),
182187
);
183188

184-
const payload: GatewaySendPayload = { op: GatewayOpcodes.RequestGuildMembers, d: { guild_id: '123', limit: 0 } };
189+
const payload = {
190+
op: GatewayOpcodes.RequestGuildMembers,
191+
d: { guild_id: '123', limit: 0, query: '' },
192+
} satisfies GatewaySendPayload;
185193
await manager.send(0, payload);
186194
expect(mockSend).toHaveBeenCalledWith(0, payload);
187195
expect(managerEmitSpy).toHaveBeenCalledWith(WebSocketShardEvents.Dispatch, {

packages/ws/src/strategies/context/IContextFetchingStrategy.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export interface IContextFetchingStrategy {
1818
readonly options: FetchingStrategyOptions;
1919
retrieveSessionInfo(shardId: number): Awaitable<SessionInfo | null>;
2020
updateSessionInfo(shardId: number, sessionInfo: SessionInfo | null): Awaitable<void>;
21+
waitForIdentify(): Promise<void>;
2122
}
2223

2324
export async function managerToFetchingStrategyOptions(manager: WebSocketManager): Promise<FetchingStrategyOptions> {
Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,28 @@
1+
import { IdentifyThrottler } from '../../utils/IdentifyThrottler.js';
12
import type { SessionInfo, WebSocketManager } from '../../ws/WebSocketManager.js';
23
import type { FetchingStrategyOptions, IContextFetchingStrategy } from './IContextFetchingStrategy.js';
34

45
export class SimpleContextFetchingStrategy implements IContextFetchingStrategy {
5-
public constructor(private readonly manager: WebSocketManager, public readonly options: FetchingStrategyOptions) {}
6+
// This strategy assumes every shard is running under the same process - therefore we need a single
7+
// IdentifyThrottler per manager.
8+
private static throttlerCache = new WeakMap<WebSocketManager, IdentifyThrottler>();
9+
10+
private static ensureThrottler(manager: WebSocketManager): IdentifyThrottler {
11+
const existing = SimpleContextFetchingStrategy.throttlerCache.get(manager);
12+
if (existing) {
13+
return existing;
14+
}
15+
16+
const throttler = new IdentifyThrottler(manager);
17+
SimpleContextFetchingStrategy.throttlerCache.set(manager, throttler);
18+
return throttler;
19+
}
20+
21+
private readonly throttler: IdentifyThrottler;
22+
23+
public constructor(private readonly manager: WebSocketManager, public readonly options: FetchingStrategyOptions) {
24+
this.throttler = SimpleContextFetchingStrategy.ensureThrottler(manager);
25+
}
626

727
public async retrieveSessionInfo(shardId: number): Promise<SessionInfo | null> {
828
return this.manager.options.retrieveSessionInfo(shardId);
@@ -11,4 +31,8 @@ export class SimpleContextFetchingStrategy implements IContextFetchingStrategy {
1131
public updateSessionInfo(shardId: number, sessionInfo: SessionInfo | null) {
1232
return this.manager.options.updateSessionInfo(shardId, sessionInfo);
1333
}
34+
35+
public async waitForIdentify(): Promise<void> {
36+
await this.throttler.waitForIdentify();
37+
}
1438
}

packages/ws/src/strategies/context/WorkerContextFetchingStrategy.ts

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,39 +12,57 @@ import type { FetchingStrategyOptions, IContextFetchingStrategy } from './IConte
1212
export class WorkerContextFetchingStrategy implements IContextFetchingStrategy {
1313
private readonly sessionPromises = new Collection<number, (session: SessionInfo | null) => void>();
1414

15+
private readonly waitForIdentifyPromises = new Collection<number, () => void>();
16+
1517
public constructor(public readonly options: FetchingStrategyOptions) {
1618
if (isMainThread) {
1719
throw new Error('Cannot instantiate WorkerContextFetchingStrategy on the main thread');
1820
}
1921

2022
parentPort!.on('message', (payload: WorkerSendPayload) => {
2123
if (payload.op === WorkerSendPayloadOp.SessionInfoResponse) {
22-
const resolve = this.sessionPromises.get(payload.nonce);
23-
resolve?.(payload.session);
24+
this.sessionPromises.get(payload.nonce)?.(payload.session);
2425
this.sessionPromises.delete(payload.nonce);
2526
}
27+
28+
if (payload.op === WorkerSendPayloadOp.ShardCanIdentify) {
29+
this.waitForIdentifyPromises.get(payload.nonce)?.();
30+
this.waitForIdentifyPromises.delete(payload.nonce);
31+
}
2632
});
2733
}
2834

2935
public async retrieveSessionInfo(shardId: number): Promise<SessionInfo | null> {
3036
const nonce = Math.random();
31-
const payload: WorkerRecievePayload = {
37+
const payload = {
3238
op: WorkerRecievePayloadOp.RetrieveSessionInfo,
3339
shardId,
3440
nonce,
35-
};
41+
} satisfies WorkerRecievePayload;
3642
// eslint-disable-next-line no-promise-executor-return
3743
const promise = new Promise<SessionInfo | null>((resolve) => this.sessionPromises.set(nonce, resolve));
3844
parentPort!.postMessage(payload);
3945
return promise;
4046
}
4147

4248
public updateSessionInfo(shardId: number, sessionInfo: SessionInfo | null) {
43-
const payload: WorkerRecievePayload = {
49+
const payload = {
4450
op: WorkerRecievePayloadOp.UpdateSessionInfo,
4551
shardId,
4652
session: sessionInfo,
47-
};
53+
} satisfies WorkerRecievePayload;
54+
parentPort!.postMessage(payload);
55+
}
56+
57+
public async waitForIdentify(): Promise<void> {
58+
const nonce = Math.random();
59+
const payload = {
60+
op: WorkerRecievePayloadOp.WaitForIdentify,
61+
nonce,
62+
} satisfies WorkerRecievePayload;
63+
// eslint-disable-next-line no-promise-executor-return
64+
const promise = new Promise<void>((resolve) => this.waitForIdentifyPromises.set(nonce, resolve));
4865
parentPort!.postMessage(payload);
66+
return promise;
4967
}
5068
}

packages/ws/src/strategies/sharding/SimpleShardingStrategy.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { Collection } from '@discordjs/collection';
22
import type { GatewaySendPayload } from 'discord-api-types/v10';
3-
import { IdentifyThrottler } from '../../utils/IdentifyThrottler.js';
43
import type { WebSocketManager } from '../../ws/WebSocketManager';
54
import { WebSocketShard, WebSocketShardEvents, type WebSocketShardDestroyOptions } from '../../ws/WebSocketShard.js';
65
import { managerToFetchingStrategyOptions } from '../context/IContextFetchingStrategy.js';
@@ -15,11 +14,8 @@ export class SimpleShardingStrategy implements IShardingStrategy {
1514

1615
private readonly shards = new Collection<number, WebSocketShard>();
1716

18-
private readonly throttler: IdentifyThrottler;
19-
2017
public constructor(manager: WebSocketManager) {
2118
this.manager = manager;
22-
this.throttler = new IdentifyThrottler(manager);
2319
}
2420

2521
/**
@@ -46,7 +42,6 @@ export class SimpleShardingStrategy implements IShardingStrategy {
4642
const promises = [];
4743

4844
for (const shard of this.shards.values()) {
49-
await this.throttler.waitForIdentify();
5045
promises.push(shard.connect());
5146
}
5247

packages/ws/src/strategies/sharding/WorkerShardingStrategy.ts

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ export enum WorkerSendPayloadOp {
1818
Destroy,
1919
Send,
2020
SessionInfoResponse,
21+
ShardCanIdentify,
2122
}
2223

2324
export type WorkerSendPayload =
2425
| { nonce: number; op: WorkerSendPayloadOp.SessionInfoResponse; session: SessionInfo | null }
26+
| { nonce: number; op: WorkerSendPayloadOp.ShardCanIdentify }
2527
| { op: WorkerSendPayloadOp.Connect; shardId: number }
2628
| { op: WorkerSendPayloadOp.Destroy; options?: WebSocketShardDestroyOptions; shardId: number }
2729
| { op: WorkerSendPayloadOp.Send; payload: GatewaySendPayload; shardId: number };
@@ -32,12 +34,14 @@ export enum WorkerRecievePayloadOp {
3234
Event,
3335
RetrieveSessionInfo,
3436
UpdateSessionInfo,
37+
WaitForIdentify,
3538
}
3639

3740
export type WorkerRecievePayload =
3841
// Can't seem to get a type-safe union based off of the event, so I'm sadly leaving data as any for now
3942
| { data: any; event: WebSocketShardEvents; op: WorkerRecievePayloadOp.Event; shardId: number }
4043
| { nonce: number; op: WorkerRecievePayloadOp.RetrieveSessionInfo; shardId: number }
44+
| { nonce: number; op: WorkerRecievePayloadOp.WaitForIdentify }
4145
| { op: WorkerRecievePayloadOp.Connected; shardId: number }
4246
| { op: WorkerRecievePayloadOp.Destroyed; shardId: number }
4347
| { op: WorkerRecievePayloadOp.UpdateSessionInfo; session: SessionInfo | null; shardId: number };
@@ -118,12 +122,10 @@ export class WorkerShardingStrategy implements IShardingStrategy {
118122
const promises = [];
119123

120124
for (const [shardId, worker] of this.#workerByShardId.entries()) {
121-
await this.throttler.waitForIdentify();
122-
123-
const payload: WorkerSendPayload = {
125+
const payload = {
124126
op: WorkerSendPayloadOp.Connect,
125127
shardId,
126-
};
128+
} satisfies WorkerSendPayload;
127129

128130
// eslint-disable-next-line no-promise-executor-return
129131
const promise = new Promise<void>((resolve) => this.connectPromises.set(shardId, resolve));
@@ -141,11 +143,11 @@ export class WorkerShardingStrategy implements IShardingStrategy {
141143
const promises = [];
142144

143145
for (const [shardId, worker] of this.#workerByShardId.entries()) {
144-
const payload: WorkerSendPayload = {
146+
const payload = {
145147
op: WorkerSendPayloadOp.Destroy,
146148
shardId,
147149
options,
148-
};
150+
} satisfies WorkerSendPayload;
149151

150152
promises.push(
151153
// eslint-disable-next-line no-promise-executor-return, promise/prefer-await-to-then
@@ -169,11 +171,11 @@ export class WorkerShardingStrategy implements IShardingStrategy {
169171
throw new Error(`No worker found for shard ${shardId}`);
170172
}
171173

172-
const payload: WorkerSendPayload = {
174+
const payload = {
173175
op: WorkerSendPayloadOp.Send,
174176
shardId,
175177
payload: data,
176-
};
178+
} satisfies WorkerSendPayload;
177179
worker.postMessage(payload);
178180
}
179181

@@ -213,6 +215,16 @@ export class WorkerShardingStrategy implements IShardingStrategy {
213215
await this.manager.options.updateSessionInfo(payload.shardId, payload.session);
214216
break;
215217
}
218+
219+
case WorkerRecievePayloadOp.WaitForIdentify: {
220+
await this.throttler.waitForIdentify();
221+
const response: WorkerSendPayload = {
222+
op: WorkerSendPayloadOp.ShardCanIdentify,
223+
nonce: payload.nonce,
224+
};
225+
worker.postMessage(response);
226+
break;
227+
}
216228
}
217229
}
218230
}

packages/ws/src/strategies/sharding/worker.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ for (const shardId of data.shardIds) {
4040
for (const event of Object.values(WebSocketShardEvents)) {
4141
// @ts-expect-error: Event types incompatible
4242
shard.on(event, (data) => {
43-
const payload: WorkerRecievePayload = {
43+
const payload = {
4444
op: WorkerRecievePayloadOp.Event,
4545
event,
4646
data,
4747
shardId,
48-
};
48+
} satisfies WorkerRecievePayload;
4949
parentPort!.postMessage(payload);
5050
});
5151
}
@@ -93,5 +93,9 @@ parentPort!
9393
case WorkerSendPayloadOp.SessionInfoResponse: {
9494
break;
9595
}
96+
97+
case WorkerSendPayloadOp.ShardCanIdentify: {
98+
break;
99+
}
96100
}
97101
});
Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import { setTimeout as sleep } from 'node:timers/promises';
2-
import type { WebSocketManager } from '../ws/WebSocketManager';
2+
import { AsyncQueue } from '@sapphire/async-queue';
3+
import type { WebSocketManager } from '../ws/WebSocketManager.js';
34

45
export class IdentifyThrottler {
6+
private readonly queue = new AsyncQueue();
7+
58
private identifyState = {
69
remaining: 0,
710
resetsAt: Number.POSITIVE_INFINITY,
@@ -10,20 +13,27 @@ export class IdentifyThrottler {
1013
public constructor(private readonly manager: WebSocketManager) {}
1114

1215
public async waitForIdentify(): Promise<void> {
13-
if (this.identifyState.remaining <= 0) {
14-
const diff = this.identifyState.resetsAt - Date.now();
15-
if (diff <= 5_000) {
16-
const time = diff + Math.random() * 1_500;
17-
await sleep(time);
16+
await this.queue.wait();
17+
18+
try {
19+
if (this.identifyState.remaining <= 0) {
20+
const diff = this.identifyState.resetsAt - Date.now();
21+
if (diff <= 5_000) {
22+
// To account for the latency the IDENTIFY payload goes through, we add a bit more wait time
23+
const time = diff + Math.random() * 1_500;
24+
await sleep(time);
25+
}
26+
27+
const info = await this.manager.fetchGatewayInformation();
28+
this.identifyState = {
29+
remaining: info.session_start_limit.max_concurrency,
30+
resetsAt: Date.now() + 5_000,
31+
};
1832
}
1933

20-
const info = await this.manager.fetchGatewayInformation();
21-
this.identifyState = {
22-
remaining: info.session_start_limit.max_concurrency,
23-
resetsAt: Date.now() + 5_000,
24-
};
34+
this.identifyState.remaining--;
35+
} finally {
36+
this.queue.shift();
2537
}
26-
27-
this.identifyState.remaining--;
2838
}
2939
}

packages/ws/src/ws/WebSocketShard.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,9 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
295295
`intents: ${this.strategy.options.intents}`,
296296
`compression: ${this.inflate ? 'zlib-stream' : this.useIdentifyCompress ? 'identify' : 'none'}`,
297297
]);
298+
299+
await this.strategy.waitForIdentify();
300+
298301
const d: GatewayIdentifyData = {
299302
token: this.strategy.options.token,
300303
properties: this.strategy.options.identifyProperties,

0 commit comments

Comments
 (0)