Skip to content

Commit 24053ce

Browse files
committed
--wip-- [skip ci]
1 parent 28d719d commit 24053ce

File tree

3 files changed

+47
-4
lines changed

3 files changed

+47
-4
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export default class RedisCommandsQueue {
6565
}
6666

6767
#invalidateCallback?: (key: RedisArgument | null) => unknown;
68+
#movingCallback?: (afterMs: number, host: string, port: number) => void;
6869

6970
constructor(
7071
respVersion: RespVersions,
@@ -134,6 +135,19 @@ export default class RedisCommandsQueue {
134135
}
135136
break;
136137
}
138+
case 'MOVING': {
139+
if (this.#movingCallback) {
140+
console.log('received moving', push)
141+
const [_, afterMs, url] = push;
142+
let [host, port] = url.toString().split(':');
143+
//['18.200.246.58'] - for some reason the server sends the host this way
144+
if(host.includes('[')) {
145+
host = host.slice(2, -2);
146+
}
147+
this.#movingCallback(afterMs, host, Number(port));
148+
}
149+
break;
150+
}
137151
}
138152
}
139153
},
@@ -145,6 +159,10 @@ export default class RedisCommandsQueue {
145159
this.#invalidateCallback = callback;
146160
}
147161

162+
setMovingCallback(callback?: (afterMs: number, host: string, port: number) => void) {
163+
this.#movingCallback = callback;
164+
}
165+
148166
addCommand<T>(
149167
args: ReadonlyArray<RedisArgument>,
150168
options?: CommandOptions

packages/client/lib/client/index.ts

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ export default class RedisClient<
366366
}
367367

368368
readonly #options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>;
369-
readonly #socket: RedisSocket;
369+
#socket: RedisSocket;
370370
readonly #queue: RedisCommandsQueue;
371371
#selectedDB = 0;
372372
#monitorCallback?: MonitorCallback<TYPE_MAPPING>;
@@ -431,7 +431,26 @@ export default class RedisClient<
431431
this.#validateOptions(options)
432432
this.#options = this.#initiateOptions(options);
433433
this.#queue = this.#initiateQueue();
434-
this.#socket = this.#initiateSocket();
434+
this.#socket = this.#initiateSocket(this.#options);
435+
436+
this.#queue.setMovingCallback(async (afterMs: number, host: string, port: number) => {
437+
console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
438+
const oldSocket = this.#socket;
439+
const newSocket = this.#initiateSocket({
440+
...this.#options,
441+
socket: {
442+
...this.#options?.socket,
443+
host,
444+
port
445+
}
446+
});
447+
newSocket.on('ready', () => {
448+
console.log(`Connected to ${host}:${port}, destroying old socket`);
449+
oldSocket.destroy()
450+
this.#socket = newSocket
451+
});
452+
await newSocket.connect()
453+
});
435454

436455
if (options?.clientSideCache) {
437456
if (options.clientSideCache instanceof ClientSideCacheProvider) {
@@ -657,8 +676,9 @@ export default class RedisClient<
657676
return commands;
658677
}
659678

660-
#initiateSocket(): RedisSocket {
679+
#initiateSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisSocket {
661680
const socketInitiator = async () => {
681+
console.log('Initiator...');
662682
const promises = [],
663683
chainId = Symbol('Socket Initiator');
664684

@@ -688,8 +708,9 @@ export default class RedisClient<
688708
}
689709
};
690710

691-
return new RedisSocket(socketInitiator, this.#options?.socket)
711+
return new RedisSocket(socketInitiator, options?.socket)
692712
.on('data', chunk => {
713+
console.log('Data received', chunk);
693714
try {
694715
this.#queue.decoder.write(chunk);
695716
} catch (err) {
@@ -698,6 +719,7 @@ export default class RedisClient<
698719
}
699720
})
700721
.on('error', err => {
722+
console.error('Socket error', err);
701723
this.emit('error', err);
702724
this.#clientSideCache?.onError();
703725
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
@@ -708,6 +730,7 @@ export default class RedisClient<
708730
})
709731
.on('connect', () => this.emit('connect'))
710732
.on('ready', () => {
733+
console.log('Socket ready');
711734
this.emit('ready');
712735
this.#setPingTimer();
713736
this.#maybeScheduleWrite();

packages/client/lib/client/socket.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ export default class RedisSocket extends EventEmitter {
209209
}
210210

211211
async #connect(): Promise<void> {
212+
console.log('Connecting...');
212213
let retries = 0;
213214
do {
214215
try {
@@ -224,6 +225,7 @@ export default class RedisSocket extends EventEmitter {
224225
}
225226
this.#isReady = true;
226227
this.#socketEpoch++;
228+
console.log('Socket connected, emit ready');
227229
this.emit('ready');
228230
} catch (err) {
229231
const retryIn = this.#shouldReconnect(retries++, err as Error);

0 commit comments

Comments
 (0)