Skip to content

Commit d803ec1

Browse files
committed
refactor(client): Simplify socket initialization and improve readiness handling
- Changed `#initiateSocket` to a streamlined asynchronous method to handle socket readiness and initialization logic. - Added `waitForReady` method to `RedisSocket` to await socket readiness. - Introduced `#createSocket` method in `RedisClient` to decouple socket creation from initialization logic. - Refactored `connect` method to ensure proper socket readiness and initialization before proceeding.
1 parent 7689ddb commit d803ec1

File tree

2 files changed

+51
-50
lines changed

2 files changed

+51
-50
lines changed

packages/client/lib/client/index.ts

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ export default class RedisClient<
455455
this.#validateOptions(options)
456456
this.#options = this.#initiateOptions(options);
457457
this.#queue = this.#initiateQueue();
458-
this.#socket = this.#initiateSocket();
458+
this.#socket = this.#createSocket(this.#options);
459459

460460
if (options?.clientSideCache) {
461461
if (options.clientSideCache instanceof ClientSideCacheProvider) {
@@ -688,38 +688,10 @@ export default class RedisClient<
688688
return commands;
689689
}
690690

691-
#initiateSocket(): RedisSocket {
692-
const socketInitiator = async () => {
693-
const promises = [],
694-
chainId = Symbol('Socket Initiator');
691+
async #initiateSocket(): Promise<void> {
692+
await this.#socket.waitForReady();
695693

696-
const resubscribePromise = this.#queue.resubscribe(chainId);
697-
if (resubscribePromise) {
698-
promises.push(resubscribePromise);
699-
}
700-
701-
if (this.#monitorCallback) {
702-
promises.push(
703-
this.#queue.monitor(
704-
this.#monitorCallback,
705-
{
706-
typeMapping: this._commandOptions?.typeMapping,
707-
chainId,
708-
asap: true
709-
}
710-
)
711-
);
712-
}
713-
714-
promises.push(...(await this.#handshake(chainId, true)));
715-
716-
if (promises.length) {
717-
this.#write();
718-
return Promise.all(promises);
719-
}
720-
};
721-
722-
return new RedisSocket(socketInitiator, this.#options?.socket)
694+
this.#socket
723695
.on('data', chunk => {
724696
try {
725697
this.#queue.decoder.write(chunk);
@@ -737,15 +709,47 @@ export default class RedisClient<
737709
this.#queue.flushAll(err);
738710
}
739711
})
740-
.on('connect', () => this.emit('connect'))
741-
.on('ready', () => {
742-
this.emit('ready');
743-
this.#setPingTimer();
744-
this.#maybeScheduleWrite();
745-
})
746712
.on('reconnecting', () => this.emit('reconnecting'))
747713
.on('drain', () => this.#maybeScheduleWrite())
748714
.on('end', () => this.emit('end'));
715+
716+
const promises = [];
717+
const chainId = Symbol('Socket Initiator');
718+
719+
const resubscribePromise = this.#queue.resubscribe(chainId);
720+
if (resubscribePromise) {
721+
promises.push(resubscribePromise);
722+
}
723+
724+
if (this.#monitorCallback) {
725+
promises.push(
726+
this.#queue.monitor(
727+
this.#monitorCallback,
728+
{
729+
typeMapping: this._commandOptions?.typeMapping,
730+
chainId,
731+
asap: true
732+
}
733+
)
734+
);
735+
}
736+
737+
promises.push(...(await this.#handshake(chainId, true)));
738+
739+
this.#setPingTimer();
740+
741+
if (promises.length) {
742+
this.#write();
743+
await Promise.all(promises);
744+
}
745+
}
746+
747+
#createSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisSocket {
748+
return new RedisSocket(options?.socket)
749+
.on('connect', () => this.emit('connect'))
750+
.on('ready', () => {
751+
this.emit('ready');
752+
});
749753
}
750754

751755
#pingTimer?: NodeJS.Timeout;
@@ -854,6 +858,7 @@ export default class RedisClient<
854858

855859
async connect() {
856860
await this._self.#socket.connect();
861+
await this._self.#initiateSocket();
857862
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
858863
}
859864

packages/client/lib/client/socket.ts

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,7 @@ export type RedisTcpSocketOptions = RedisTcpOptions | RedisTlsOptions;
5151

5252
export type RedisSocketOptions = RedisTcpSocketOptions | RedisIpcOptions;
5353

54-
export type RedisSocketInitiator = () => void | Promise<unknown>;
55-
5654
export default class RedisSocket extends EventEmitter {
57-
readonly #initiator;
5855
readonly #connectTimeout;
5956
readonly #reconnectStrategy;
6057
readonly #socketFactory;
@@ -85,13 +82,20 @@ export default class RedisSocket extends EventEmitter {
8582
constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) {
8683
super();
8784

88-
this.#initiator = initiator;
8985
this.#connectTimeout = options?.connectTimeout ?? 5000;
9086
this.#reconnectStrategy = this.#createReconnectStrategy(options);
9187
this.#socketFactory = this.#createSocketFactory(options);
9288
this.#socketTimeout = options?.socketTimeout;
9389
}
9490

91+
async waitForReady(): Promise<void> {
92+
if (this.#isReady) return
93+
return new Promise((resolve, reject) => {
94+
this.once('ready', resolve);
95+
this.once('error', reject);
96+
});
97+
}
98+
9599
#createReconnectStrategy(options?: RedisSocketOptions): ReconnectStrategyFunction {
96100
const strategy = options?.reconnectStrategy;
97101
if (strategy === false || typeof strategy === 'number') {
@@ -214,14 +218,6 @@ export default class RedisSocket extends EventEmitter {
214218
try {
215219
this.#socket = await this.#createSocket();
216220
this.emit('connect');
217-
218-
try {
219-
await this.#initiator();
220-
} catch (err) {
221-
this.#socket.destroy();
222-
this.#socket = undefined;
223-
throw err;
224-
}
225221
this.#isReady = true;
226222
this.#socketEpoch++;
227223
this.emit('ready');

0 commit comments

Comments
 (0)