Skip to content

Commit bf647bd

Browse files
committed
--wip-- [skip ci]
1 parent 2f10632 commit bf647bd

File tree

3 files changed

+47
-4
lines changed

3 files changed

+47
-4
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export default class RedisCommandsQueue {
6565
}
6666

6767
#invalidateCallback?: (key: RedisArgument | null) => unknown;
68+
#movingCallback?: (afterMs: number, host: string, port: number) => void;
6869

6970
constructor(
7071
respVersion: RespVersions,
@@ -134,6 +135,19 @@ export default class RedisCommandsQueue {
134135
}
135136
break;
136137
}
138+
case 'MOVING': {
139+
if (this.#movingCallback) {
140+
console.log('received moving', push)
141+
const [_, afterMs, url] = push;
142+
let [host, port] = url.toString().split(':');
143+
//['18.200.246.58'] - for some reason the server sends the host this way
144+
if(host.includes('[')) {
145+
host = host.slice(2, -2);
146+
}
147+
this.#movingCallback(afterMs, host, Number(port));
148+
}
149+
break;
150+
}
137151
}
138152
}
139153
},
@@ -145,6 +159,10 @@ export default class RedisCommandsQueue {
145159
this.#invalidateCallback = callback;
146160
}
147161

162+
setMovingCallback(callback?: (afterMs: number, host: string, port: number) => void) {
163+
this.#movingCallback = callback;
164+
}
165+
148166
addCommand<T>(
149167
args: ReadonlyArray<RedisArgument>,
150168
options?: CommandOptions

packages/client/lib/client/index.ts

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ export default class RedisClient<
390390
}
391391

392392
readonly #options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>;
393-
readonly #socket: RedisSocket;
393+
#socket: RedisSocket;
394394
readonly #queue: RedisCommandsQueue;
395395
#selectedDB = 0;
396396
#monitorCallback?: MonitorCallback<TYPE_MAPPING>;
@@ -455,7 +455,26 @@ export default class RedisClient<
455455
this.#validateOptions(options)
456456
this.#options = this.#initiateOptions(options);
457457
this.#queue = this.#initiateQueue();
458-
this.#socket = this.#initiateSocket();
458+
this.#socket = this.#initiateSocket(this.#options);
459+
460+
this.#queue.setMovingCallback(async (afterMs: number, host: string, port: number) => {
461+
console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
462+
const oldSocket = this.#socket;
463+
const newSocket = this.#initiateSocket({
464+
...this.#options,
465+
socket: {
466+
...this.#options?.socket,
467+
host,
468+
port
469+
}
470+
});
471+
newSocket.on('ready', () => {
472+
console.log(`Connected to ${host}:${port}, destroying old socket`);
473+
oldSocket.destroy()
474+
this.#socket = newSocket
475+
});
476+
await newSocket.connect()
477+
});
459478

460479
if (options?.clientSideCache) {
461480
if (options.clientSideCache instanceof ClientSideCacheProvider) {
@@ -676,8 +695,9 @@ export default class RedisClient<
676695
return commands;
677696
}
678697

679-
#initiateSocket(): RedisSocket {
698+
#initiateSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisSocket {
680699
const socketInitiator = async () => {
700+
console.log('Initiator...');
681701
const promises = [],
682702
chainId = Symbol('Socket Initiator');
683703

@@ -707,8 +727,9 @@ export default class RedisClient<
707727
}
708728
};
709729

710-
return new RedisSocket(socketInitiator, this.#options?.socket)
730+
return new RedisSocket(socketInitiator, options?.socket)
711731
.on('data', chunk => {
732+
console.log('Data received', chunk);
712733
try {
713734
this.#queue.decoder.write(chunk);
714735
} catch (err) {
@@ -717,6 +738,7 @@ export default class RedisClient<
717738
}
718739
})
719740
.on('error', err => {
741+
console.error('Socket error', err);
720742
this.emit('error', err);
721743
this.#clientSideCache?.onError();
722744
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
@@ -727,6 +749,7 @@ export default class RedisClient<
727749
})
728750
.on('connect', () => this.emit('connect'))
729751
.on('ready', () => {
752+
console.log('Socket ready');
730753
this.emit('ready');
731754
this.#setPingTimer();
732755
this.#maybeScheduleWrite();

packages/client/lib/client/socket.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ export default class RedisSocket extends EventEmitter {
209209
}
210210

211211
async #connect(): Promise<void> {
212+
console.log('Connecting...');
212213
let retries = 0;
213214
do {
214215
try {
@@ -224,6 +225,7 @@ export default class RedisSocket extends EventEmitter {
224225
}
225226
this.#isReady = true;
226227
this.#socketEpoch++;
228+
console.log('Socket connected, emit ready');
227229
this.emit('ready');
228230
} catch (err) {
229231
const retryIn = this.#shouldReconnect(retries++, err as Error);

0 commit comments

Comments
 (0)