Skip to content

Commit 861e1b7

Browse files
committed
--wip-- [skip ci]
1 parent 6f1cd29 commit 861e1b7

File tree

2 files changed

+98
-92
lines changed

2 files changed

+98
-92
lines changed

packages/client/lib/client/index.ts

Lines changed: 81 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ export default class RedisClient<
474474
this.#validateOptions(options)
475475
this.#options = this.#initiateOptions(options);
476476
this.#queue = this.#initiateQueue();
477-
this.#socket = this.#initiateSocket(this.#options);
477+
this.#socket = this.#createSocket(this.#options);
478478
// Queue
479479
// toWrite [ C D E ]
480480
// waitingForReply [ A B ]
@@ -487,54 +487,54 @@ export default class RedisClient<
487487
// 4. [EVENT] In-flight commands completed
488488
// 5. [ACTION] Unpause writing -> we are going to write to the new socket from now on
489489
// 6. [ACTION] Destroy old socket
490-
this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => {
491-
// 1
492-
console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
493-
494-
// 2
495-
console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`);
496-
// this.#paused = true;
497-
498-
const oldSocket = this.#socket;
499-
this.#socket = this.#initiateSocket({
500-
...this.#options,
501-
socket: {
502-
...this.#options?.socket,
503-
host,
504-
port
505-
}
506-
});
507-
508-
// 3
509-
this.#socket.once('ready', () => {
510-
//TODO handshake...???
511-
console.log(`Connected to ${host}:${port}`);
512-
513-
// 4
514-
if(!this.#queue.isWaitingForReply()) {
515-
// 5 and 6
516-
console.log(`All in-flight commands completed`);
517-
console.log(`Resume writing`)
518-
oldSocket.destroy();
519-
this.#paused = false;
520-
}
521-
});
522-
523-
// 4
524-
this.#queue.events.once('waitingForReplyEmpty', () => {
525-
console.log(`All in-flight commands completed`);
526-
// 3
527-
if(this.#socket.isReady) {
528-
// 5 and 6
529-
console.log(`Connected to ${host}:${port}`);
530-
console.log(`Resume writing`)
531-
oldSocket.destroy();
532-
this.#paused = false;
533-
}
534-
});
535-
536-
await this.#socket.connect()
537-
});
490+
// this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => {
491+
// // 1
492+
// console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
493+
494+
// // 2
495+
// console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`);
496+
// // this.#paused = true;
497+
498+
// const oldSocket = this.#socket;
499+
// this.#socket = this.#initiateSocket({
500+
// ...this.#options,
501+
// socket: {
502+
// ...this.#options?.socket,
503+
// host,
504+
// port
505+
// }
506+
// });
507+
508+
// // 3
509+
// this.#socket.once('ready', () => {
510+
// //TODO handshake...???
511+
// console.log(`Connected to ${host}:${port}`);
512+
513+
// // 4
514+
// if(!this.#queue.isWaitingForReply()) {
515+
// // 5 and 6
516+
// console.log(`All in-flight commands completed`);
517+
// console.log(`Resume writing`)
518+
// oldSocket.destroy();
519+
// this.#paused = false;
520+
// }
521+
// });
522+
523+
// // 4
524+
// this.#queue.events.once('waitingForReplyEmpty', () => {
525+
// console.log(`All in-flight commands completed`);
526+
// // 3
527+
// if(this.#socket.isReady) {
528+
// // 5 and 6
529+
// console.log(`Connected to ${host}:${port}`);
530+
// console.log(`Resume writing`)
531+
// oldSocket.destroy();
532+
// this.#paused = false;
533+
// }
534+
// });
535+
536+
// await this.#socket.connect()
537+
// });
538538

539539
if (options?.clientSideCache) {
540540
if (options.clientSideCache instanceof ClientSideCacheProvider) {
@@ -764,39 +764,42 @@ export default class RedisClient<
764764
return commands;
765765
}
766766

767-
#initiateSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisSocket {
768-
const socketInitiator = async () => {
769-
console.log('Initiator...');
770-
const promises = [],
771-
chainId = Symbol('Socket Initiator');
767+
async #initiateSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): Promise<void> {
768+
await this.#socket.waitForReady();
769+
console.log('Initiator...');
770+
const promises = [];
771+
const chainId = Symbol('Socket Initiator');
772+
773+
const resubscribePromise = this.#queue.resubscribe(chainId);
774+
if (resubscribePromise) {
775+
promises.push(resubscribePromise);
776+
}
772777

773-
const resubscribePromise = this.#queue.resubscribe(chainId);
774-
if (resubscribePromise) {
775-
promises.push(resubscribePromise);
776-
}
778+
if (this.#monitorCallback) {
779+
promises.push(
780+
this.#queue.monitor(
781+
this.#monitorCallback,
782+
{
783+
typeMapping: this._commandOptions?.typeMapping,
784+
chainId,
785+
asap: true
786+
}
787+
)
788+
);
789+
}
777790

778-
if (this.#monitorCallback) {
779-
promises.push(
780-
this.#queue.monitor(
781-
this.#monitorCallback,
782-
{
783-
typeMapping: this._commandOptions?.typeMapping,
784-
chainId,
785-
asap: true
786-
}
787-
)
788-
);
789-
}
791+
promises.push(...(await this.#handshake(chainId, true)));
790792

791-
promises.push(...(await this.#handshake(chainId, true)));
793+
this.#setPingTimer();
792794

793-
if (promises.length) {
794-
this.#write();
795-
return Promise.all(promises);
796-
}
797-
};
795+
if (promises.length) {
796+
this.#write();
797+
await Promise.all(promises);
798+
}
799+
}
798800

799-
return new RedisSocket(socketInitiator, options?.socket)
801+
#createSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisSocket {
802+
return new RedisSocket(options?.socket)
800803
.on('data', chunk => {
801804
try {
802805
this.#queue.decoder.write(chunk);
@@ -818,8 +821,6 @@ export default class RedisClient<
818821
.on('ready', () => {
819822
console.log('Socket ready');
820823
this.emit('ready');
821-
this.#setPingTimer();
822-
this.#maybeScheduleWrite();
823824
})
824825
.on('reconnecting', () => this.emit('reconnecting'))
825826
.on('drain', () => this.#maybeScheduleWrite())
@@ -932,6 +933,7 @@ export default class RedisClient<
932933

933934
async connect() {
934935
await this._self.#socket.connect();
936+
await this._self.#initiateSocket(this._self.#options);
935937
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
936938
}
937939

packages/client/lib/client/socket.ts

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,7 @@ export type RedisTcpSocketOptions = RedisTcpOptions | RedisTlsOptions;
5151

5252
export type RedisSocketOptions = RedisTcpSocketOptions | RedisIpcOptions;
5353

54-
export type RedisSocketInitiator = () => void | Promise<unknown>;
55-
5654
export default class RedisSocket extends EventEmitter {
57-
readonly #initiator;
5855
readonly #connectTimeout;
5956
readonly #reconnectStrategy;
6057
readonly #socketFactory;
@@ -82,16 +79,23 @@ export default class RedisSocket extends EventEmitter {
8279
return this.#socketEpoch;
8380
}
8481

85-
constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) {
82+
constructor(options?: RedisSocketOptions) {
8683
super();
8784

88-
this.#initiator = initiator;
8985
this.#connectTimeout = options?.connectTimeout ?? 5000;
9086
this.#reconnectStrategy = this.#createReconnectStrategy(options);
9187
this.#socketFactory = this.#createSocketFactory(options);
9288
this.#socketTimeout = options?.socketTimeout;
9389
}
9490

91+
async waitForReady(): Promise<void> {
92+
if (this.#isReady) return
93+
return new Promise((resolve, reject) => {
94+
this.once('ready', resolve);
95+
this.once('error', reject);
96+
});
97+
}
98+
9599
#createReconnectStrategy(options?: RedisSocketOptions): ReconnectStrategyFunction {
96100
const strategy = options?.reconnectStrategy;
97101
if (strategy === false || typeof strategy === 'number') {
@@ -216,14 +220,14 @@ export default class RedisSocket extends EventEmitter {
216220
this.#socket = await this.#createSocket();
217221
this.emit('connect');
218222

219-
try {
220-
await this.#initiator();
221-
} catch (err) {
222-
console.log('Initiator failed', err);
223-
this.#socket.destroy();
224-
this.#socket = undefined;
225-
throw err;
226-
}
223+
// try {
224+
// await this.#initiator();
225+
// } catch (err) {
226+
// console.log('Initiator failed', err);
227+
// this.#socket.destroy();
228+
// this.#socket = undefined;
229+
// throw err;
230+
// }
227231
this.#isReady = true;
228232
this.#socketEpoch++;
229233
console.log('Socket connected, emit ready');

0 commit comments

Comments
 (0)