Skip to content

Commit 4aab08f

Browse files
committed
--wip-- [skip ci]
1 parent 861e1b7 commit 4aab08f

File tree

2 files changed

+84
-21
lines changed

2 files changed

+84
-21
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { RedisClientOptions } from ".";
2+
import RedisCommandsQueue from "./commands-queue";
3+
import RedisSocket from "./socket";
4+
5+
export default class EnterpriseMaintenanceManager {
6+
client: any;
7+
commandsQueue: RedisCommandsQueue;
8+
options: RedisClientOptions;
9+
constructor(
10+
client: any,
11+
commandsQueue: RedisCommandsQueue,
12+
options: RedisClientOptions,
13+
) {
14+
this.client = client;
15+
this.commandsQueue = commandsQueue;
16+
this.options = options;
17+
18+
this.commandsQueue.events.on("moving", this.#onMoving);
19+
}
20+
21+
#onMoving = async (_afterMs: number, host: string, port: number) => {
22+
23+
this.client.pause()
24+
25+
const socket = new RedisSocket({
26+
...this.options.socket,
27+
host,
28+
port
29+
});
30+
await socket.connect();
31+
32+
//wait until waitingForReply is empty
33+
await new Promise<void>(resolve => {
34+
if(!this.commandsQueue.isWaitingForReply()) {
35+
resolve()
36+
} else {
37+
this.commandsQueue.events.once('waitingForReplyEmpty', resolve)
38+
}
39+
})
40+
41+
const oldSocket = this.client.socket
42+
oldSocket.removeAllListeners();
43+
oldSocket.destroy();
44+
45+
this.client.socket = socket;
46+
47+
this.client.resume()
48+
};
49+
50+
}

packages/client/lib/client/index.ts

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,15 @@ export default class RedisClient<
460460
return this._self.#dirtyWatch !== undefined
461461
}
462462

463+
get socket() {
464+
return this._self.#socket;
465+
}
466+
467+
set socket(socket: RedisSocket) {
468+
this._self.#socket = socket;
469+
this.#initiateSocket();
470+
}
471+
463472
/**
464473
* Marks the client's WATCH command as invalidated due to a topology change.
465474
* This will cause any subsequent EXEC in a transaction to fail with a WatchError.
@@ -557,6 +566,7 @@ export default class RedisClient<
557566
}
558567

559568
}
569+
560570
#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined {
561571

562572
// Convert username/password to credentialsProvider if no credentialsProvider is already in place
@@ -766,6 +776,29 @@ export default class RedisClient<
766776

767777
async #initiateSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): Promise<void> {
768778
await this.#socket.waitForReady();
779+
780+
this.#socket
781+
.on('data', chunk => {
782+
try {
783+
this.#queue.decoder.write(chunk);
784+
} catch (err) {
785+
this.#queue.resetDecoder();
786+
this.emit('error', err);
787+
}
788+
})
789+
.on('error', err => {
790+
this.emit('error', err);
791+
this.#clientSideCache?.onError();
792+
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
793+
this.#queue.flushWaitingForReply(err);
794+
} else {
795+
this.#queue.flushAll(err);
796+
}
797+
})
798+
.on('reconnecting', () => this.emit('reconnecting'))
799+
.on('drain', () => this.#maybeScheduleWrite())
800+
.on('end', () => this.emit('end'));
801+
769802
console.log('Initiator...');
770803
const promises = [];
771804
const chainId = Symbol('Socket Initiator');
@@ -800,31 +833,11 @@ export default class RedisClient<
800833

801834
#createSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisSocket {
802835
return new RedisSocket(options?.socket)
803-
.on('data', chunk => {
804-
try {
805-
this.#queue.decoder.write(chunk);
806-
} catch (err) {
807-
this.#queue.resetDecoder();
808-
this.emit('error', err);
809-
}
810-
})
811-
.on('error', err => {
812-
this.emit('error', err);
813-
this.#clientSideCache?.onError();
814-
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
815-
this.#queue.flushWaitingForReply(err);
816-
} else {
817-
this.#queue.flushAll(err);
818-
}
819-
})
820836
.on('connect', () => this.emit('connect'))
821837
.on('ready', () => {
822838
console.log('Socket ready');
823839
this.emit('ready');
824-
})
825-
.on('reconnecting', () => this.emit('reconnecting'))
826-
.on('drain', () => this.#maybeScheduleWrite())
827-
.on('end', () => this.emit('end'));
840+
});
828841
}
829842

830843
#pingTimer?: NodeJS.Timeout;

0 commit comments

Comments
 (0)