Skip to content

Commit aad8e62

Browse files
committed
--wip-- [skip ci]
1 parent 05c9bb3 commit aad8e62

File tree

2 files changed

+84
-21
lines changed

2 files changed

+84
-21
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { RedisClientOptions } from ".";
2+
import RedisCommandsQueue from "./commands-queue";
3+
import RedisSocket from "./socket";
4+
5+
export default class EnterpriseMaintenanceManager {
6+
client: any;
7+
commandsQueue: RedisCommandsQueue;
8+
options: RedisClientOptions;
9+
constructor(
10+
client: any,
11+
commandsQueue: RedisCommandsQueue,
12+
options: RedisClientOptions,
13+
) {
14+
this.client = client;
15+
this.commandsQueue = commandsQueue;
16+
this.options = options;
17+
18+
this.commandsQueue.events.on("moving", this.#onMoving);
19+
}
20+
21+
#onMoving = async (_afterMs: number, host: string, port: number) => {
22+
23+
this.client.pause()
24+
25+
const socket = new RedisSocket({
26+
...this.options.socket,
27+
host,
28+
port
29+
});
30+
await socket.connect();
31+
32+
//wait until waitingForReply is empty
33+
await new Promise<void>(resolve => {
34+
if(!this.commandsQueue.isWaitingForReply()) {
35+
resolve()
36+
} else {
37+
this.commandsQueue.events.once('waitingForReplyEmpty', resolve)
38+
}
39+
})
40+
41+
const oldSocket = this.client.socket
42+
oldSocket.removeAllListeners();
43+
oldSocket.destroy();
44+
45+
this.client.socket = socket;
46+
47+
this.client.resume()
48+
};
49+
50+
}

packages/client/lib/client/index.ts

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,15 @@ export default class RedisClient<
484484
return this._self.#dirtyWatch !== undefined
485485
}
486486

487+
get socket() {
488+
return this._self.#socket;
489+
}
490+
491+
set socket(socket: RedisSocket) {
492+
this._self.#socket = socket;
493+
this.#initiateSocket();
494+
}
495+
487496
/**
488497
* Marks the client's WATCH command as invalidated due to a topology change.
489498
* This will cause any subsequent EXEC in a transaction to fail with a WatchError.
@@ -581,6 +590,7 @@ export default class RedisClient<
581590
}
582591

583592
}
593+
584594
#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined {
585595

586596
// Convert username/password to credentialsProvider if no credentialsProvider is already in place
@@ -785,6 +795,29 @@ export default class RedisClient<
785795

786796
async #initiateSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): Promise<void> {
787797
await this.#socket.waitForReady();
798+
799+
this.#socket
800+
.on('data', chunk => {
801+
try {
802+
this.#queue.decoder.write(chunk);
803+
} catch (err) {
804+
this.#queue.resetDecoder();
805+
this.emit('error', err);
806+
}
807+
})
808+
.on('error', err => {
809+
this.emit('error', err);
810+
this.#clientSideCache?.onError();
811+
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
812+
this.#queue.flushWaitingForReply(err);
813+
} else {
814+
this.#queue.flushAll(err);
815+
}
816+
})
817+
.on('reconnecting', () => this.emit('reconnecting'))
818+
.on('drain', () => this.#maybeScheduleWrite())
819+
.on('end', () => this.emit('end'));
820+
788821
console.log('Initiator...');
789822
const promises = [];
790823
const chainId = Symbol('Socket Initiator');
@@ -819,31 +852,11 @@ export default class RedisClient<
819852

820853
#createSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisSocket {
821854
return new RedisSocket(options?.socket)
822-
.on('data', chunk => {
823-
try {
824-
this.#queue.decoder.write(chunk);
825-
} catch (err) {
826-
this.#queue.resetDecoder();
827-
this.emit('error', err);
828-
}
829-
})
830-
.on('error', err => {
831-
this.emit('error', err);
832-
this.#clientSideCache?.onError();
833-
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
834-
this.#queue.flushWaitingForReply(err);
835-
} else {
836-
this.#queue.flushAll(err);
837-
}
838-
})
839855
.on('connect', () => this.emit('connect'))
840856
.on('ready', () => {
841857
console.log('Socket ready');
842858
this.emit('ready');
843-
})
844-
.on('reconnecting', () => this.emit('reconnecting'))
845-
.on('drain', () => this.#maybeScheduleWrite())
846-
.on('end', () => this.emit('end'));
859+
});
847860
}
848861

849862
#pingTimer?: NodeJS.Timeout;

0 commit comments

Comments
 (0)