Skip to content

Commit 05c9bb3

Browse files
committed
--wip-- [skip ci]
1 parent 8cda228 commit 05c9bb3

File tree

2 files changed

+98
-92
lines changed

2 files changed

+98
-92
lines changed

packages/client/lib/client/index.ts

Lines changed: 81 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ export default class RedisClient<
498498
this.#validateOptions(options)
499499
this.#options = this.#initiateOptions(options);
500500
this.#queue = this.#initiateQueue();
501-
this.#socket = this.#initiateSocket(this.#options);
501+
this.#socket = this.#createSocket(this.#options);
502502
// Queue
503503
// toWrite [ C D E ]
504504
// waitingForReply [ A B ]
@@ -511,54 +511,54 @@ export default class RedisClient<
511511
// 4. [EVENT] In-flight commands completed
512512
// 5. [ACTION] Unpause writing -> we are going to write to the new socket from now on
513513
// 6. [ACTION] Destroy old socket
514-
this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => {
515-
// 1
516-
console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
517-
518-
// 2
519-
console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`);
520-
// this.#paused = true;
521-
522-
const oldSocket = this.#socket;
523-
this.#socket = this.#initiateSocket({
524-
...this.#options,
525-
socket: {
526-
...this.#options?.socket,
527-
host,
528-
port
529-
}
530-
});
531-
532-
// 3
533-
this.#socket.once('ready', () => {
534-
//TODO handshake...???
535-
console.log(`Connected to ${host}:${port}`);
536-
537-
// 4
538-
if(!this.#queue.isWaitingForReply()) {
539-
// 5 and 6
540-
console.log(`All in-flight commands completed`);
541-
console.log(`Resume writing`)
542-
oldSocket.destroy();
543-
this.#paused = false;
544-
}
545-
});
546-
547-
// 4
548-
this.#queue.events.once('waitingForReplyEmpty', () => {
549-
console.log(`All in-flight commands completed`);
550-
// 3
551-
if(this.#socket.isReady) {
552-
// 5 and 6
553-
console.log(`Connected to ${host}:${port}`);
554-
console.log(`Resume writing`)
555-
oldSocket.destroy();
556-
this.#paused = false;
557-
}
558-
});
559-
560-
await this.#socket.connect()
561-
});
514+
// this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => {
515+
// // 1
516+
// console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
517+
518+
// // 2
519+
// console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`);
520+
// // this.#paused = true;
521+
522+
// const oldSocket = this.#socket;
523+
// this.#socket = this.#initiateSocket({
524+
// ...this.#options,
525+
// socket: {
526+
// ...this.#options?.socket,
527+
// host,
528+
// port
529+
// }
530+
// });
531+
532+
// // 3
533+
// this.#socket.once('ready', () => {
534+
// //TODO handshake...???
535+
// console.log(`Connected to ${host}:${port}`);
536+
537+
// // 4
538+
// if(!this.#queue.isWaitingForReply()) {
539+
// // 5 and 6
540+
// console.log(`All in-flight commands completed`);
541+
// console.log(`Resume writing`)
542+
// oldSocket.destroy();
543+
// this.#paused = false;
544+
// }
545+
// });
546+
547+
// // 4
548+
// this.#queue.events.once('waitingForReplyEmpty', () => {
549+
// console.log(`All in-flight commands completed`);
550+
// // 3
551+
// if(this.#socket.isReady) {
552+
// // 5 and 6
553+
// console.log(`Connected to ${host}:${port}`);
554+
// console.log(`Resume writing`)
555+
// oldSocket.destroy();
556+
// this.#paused = false;
557+
// }
558+
// });
559+
560+
// await this.#socket.connect()
561+
// });
562562

563563
if (options?.clientSideCache) {
564564
if (options.clientSideCache instanceof ClientSideCacheProvider) {
@@ -783,39 +783,42 @@ export default class RedisClient<
783783
return commands;
784784
}
785785

786-
#initiateSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisSocket {
787-
const socketInitiator = async () => {
788-
console.log('Initiator...');
789-
const promises = [],
790-
chainId = Symbol('Socket Initiator');
786+
async #initiateSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): Promise<void> {
787+
await this.#socket.waitForReady();
788+
console.log('Initiator...');
789+
const promises = [];
790+
const chainId = Symbol('Socket Initiator');
791+
792+
const resubscribePromise = this.#queue.resubscribe(chainId);
793+
if (resubscribePromise) {
794+
promises.push(resubscribePromise);
795+
}
791796

792-
const resubscribePromise = this.#queue.resubscribe(chainId);
793-
if (resubscribePromise) {
794-
promises.push(resubscribePromise);
795-
}
797+
if (this.#monitorCallback) {
798+
promises.push(
799+
this.#queue.monitor(
800+
this.#monitorCallback,
801+
{
802+
typeMapping: this._commandOptions?.typeMapping,
803+
chainId,
804+
asap: true
805+
}
806+
)
807+
);
808+
}
796809

797-
if (this.#monitorCallback) {
798-
promises.push(
799-
this.#queue.monitor(
800-
this.#monitorCallback,
801-
{
802-
typeMapping: this._commandOptions?.typeMapping,
803-
chainId,
804-
asap: true
805-
}
806-
)
807-
);
808-
}
810+
promises.push(...(await this.#handshake(chainId, true)));
809811

810-
promises.push(...(await this.#handshake(chainId, true)));
812+
this.#setPingTimer();
811813

812-
if (promises.length) {
813-
this.#write();
814-
return Promise.all(promises);
815-
}
816-
};
814+
if (promises.length) {
815+
this.#write();
816+
await Promise.all(promises);
817+
}
818+
}
817819

818-
return new RedisSocket(socketInitiator, options?.socket)
820+
#createSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisSocket {
821+
return new RedisSocket(options?.socket)
819822
.on('data', chunk => {
820823
try {
821824
this.#queue.decoder.write(chunk);
@@ -837,8 +840,6 @@ export default class RedisClient<
837840
.on('ready', () => {
838841
console.log('Socket ready');
839842
this.emit('ready');
840-
this.#setPingTimer();
841-
this.#maybeScheduleWrite();
842843
})
843844
.on('reconnecting', () => this.emit('reconnecting'))
844845
.on('drain', () => this.#maybeScheduleWrite())
@@ -951,6 +952,7 @@ export default class RedisClient<
951952

952953
async connect() {
953954
await this._self.#socket.connect();
955+
await this._self.#initiateSocket(this._self.#options);
954956
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
955957
}
956958

packages/client/lib/client/socket.ts

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,7 @@ export type RedisTcpSocketOptions = RedisTcpOptions | RedisTlsOptions;
5151

5252
export type RedisSocketOptions = RedisTcpSocketOptions | RedisIpcOptions;
5353

54-
export type RedisSocketInitiator = () => void | Promise<unknown>;
55-
5654
export default class RedisSocket extends EventEmitter {
57-
readonly #initiator;
5855
readonly #connectTimeout;
5956
readonly #reconnectStrategy;
6057
readonly #socketFactory;
@@ -82,16 +79,23 @@ export default class RedisSocket extends EventEmitter {
8279
return this.#socketEpoch;
8380
}
8481

85-
constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) {
82+
constructor(options?: RedisSocketOptions) {
8683
super();
8784

88-
this.#initiator = initiator;
8985
this.#connectTimeout = options?.connectTimeout ?? 5000;
9086
this.#reconnectStrategy = this.#createReconnectStrategy(options);
9187
this.#socketFactory = this.#createSocketFactory(options);
9288
this.#socketTimeout = options?.socketTimeout;
9389
}
9490

91+
async waitForReady(): Promise<void> {
92+
if (this.#isReady) return
93+
return new Promise((resolve, reject) => {
94+
this.once('ready', resolve);
95+
this.once('error', reject);
96+
});
97+
}
98+
9599
#createReconnectStrategy(options?: RedisSocketOptions): ReconnectStrategyFunction {
96100
const strategy = options?.reconnectStrategy;
97101
if (strategy === false || typeof strategy === 'number') {
@@ -216,14 +220,14 @@ export default class RedisSocket extends EventEmitter {
216220
this.#socket = await this.#createSocket();
217221
this.emit('connect');
218222

219-
try {
220-
await this.#initiator();
221-
} catch (err) {
222-
console.log('Initiator failed', err);
223-
this.#socket.destroy();
224-
this.#socket = undefined;
225-
throw err;
226-
}
223+
// try {
224+
// await this.#initiator();
225+
// } catch (err) {
226+
// console.log('Initiator failed', err);
227+
// this.#socket.destroy();
228+
// this.#socket = undefined;
229+
// throw err;
230+
// }
227231
this.#isReady = true;
228232
this.#socketEpoch++;
229233
console.log('Socket connected, emit ready');

0 commit comments

Comments
 (0)