Skip to content

Commit e6de453

Browse files
authored
fix socket error handlers (#2092)
* fix socket error handlers, reset parser on error * fix #2080 - reset pubSubState on socket error * fix pubsub * fix "RedisSocketInitiator"
1 parent 8b5a547 commit e6de453

File tree

6 files changed

+67
-141
lines changed

6 files changed

+67
-141
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 38 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,6 @@ interface PubSubListeners {
5353

5454
type PubSubListenersMap = Map<string, PubSubListeners>;
5555

56-
interface PubSubState {
57-
subscribing: number;
58-
subscribed: number;
59-
unsubscribing: number;
60-
listeners: {
61-
channels: PubSubListenersMap;
62-
patterns: PubSubListenersMap;
63-
};
64-
}
65-
6656
export default class RedisCommandsQueue {
6757
static #flushQueue<T extends CommandWaitingForReply>(queue: LinkedList<T>, err: Error): void {
6858
while (queue.length) {
@@ -98,7 +88,16 @@ export default class RedisCommandsQueue {
9888

9989
readonly #waitingForReply = new LinkedList<CommandWaitingForReply>();
10090

101-
#pubSubState: PubSubState | undefined;
91+
readonly #pubSubState = {
92+
isActive: false,
93+
subscribing: 0,
94+
subscribed: 0,
95+
unsubscribing: 0,
96+
listeners: {
97+
channels: new Map(),
98+
patterns: new Map()
99+
}
100+
};
102101

103102
static readonly #PUB_SUB_MESSAGES = {
104103
message: Buffer.from('message'),
@@ -111,7 +110,7 @@ export default class RedisCommandsQueue {
111110

112111
readonly #parser = new RedisParser({
113112
returnReply: (reply: unknown) => {
114-
if (this.#pubSubState && Array.isArray(reply)) {
113+
if (this.#pubSubState.isActive && Array.isArray(reply)) {
115114
if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) {
116115
return RedisCommandsQueue.#emitPubSubMessage(
117116
this.#pubSubState.listeners.channels,
@@ -150,7 +149,7 @@ export default class RedisCommandsQueue {
150149
}
151150

152151
addCommand<T = RedisCommandRawReply>(args: RedisCommandArguments, options?: QueueCommandOptions): Promise<T> {
153-
if (this.#pubSubState && !options?.ignorePubSubMode) {
152+
if (this.#pubSubState.isActive && !options?.ignorePubSubMode) {
154153
return Promise.reject(new Error('Cannot send commands in PubSub mode'));
155154
} else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) {
156155
return Promise.reject(new Error('The queue is full'));
@@ -190,27 +189,16 @@ export default class RedisCommandsQueue {
190189
});
191190
}
192191

193-
#initiatePubSubState(): PubSubState {
194-
return this.#pubSubState ??= {
195-
subscribed: 0,
196-
subscribing: 0,
197-
unsubscribing: 0,
198-
listeners: {
199-
channels: new Map(),
200-
patterns: new Map()
201-
}
202-
};
203-
}
204-
205192
subscribe<T extends boolean>(
206193
command: PubSubSubscribeCommands,
207194
channels: RedisCommandArgument | Array<RedisCommandArgument>,
208195
listener: PubSubListener<T>,
209196
returnBuffers?: T
210197
): Promise<void> {
211-
const pubSubState = this.#initiatePubSubState(),
212-
channelsToSubscribe: Array<RedisCommandArgument> = [],
213-
listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ? pubSubState.listeners.channels : pubSubState.listeners.patterns;
198+
const channelsToSubscribe: Array<RedisCommandArgument> = [],
199+
listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ?
200+
this.#pubSubState.listeners.channels :
201+
this.#pubSubState.listeners.patterns;
214202
for (const channel of (Array.isArray(channels) ? channels : [channels])) {
215203
const channelString = typeof channel === 'string' ? channel : channel.toString();
216204
let listeners = listenersMap.get(channelString);
@@ -230,6 +218,7 @@ export default class RedisCommandsQueue {
230218
if (!channelsToSubscribe.length) {
231219
return Promise.resolve();
232220
}
221+
233222
return this.#pushPubSubCommand(command, channelsToSubscribe);
234223
}
235224

@@ -239,10 +228,6 @@ export default class RedisCommandsQueue {
239228
listener?: PubSubListener<T>,
240229
returnBuffers?: T
241230
): Promise<void> {
242-
if (!this.#pubSubState) {
243-
return Promise.resolve();
244-
}
245-
246231
const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ?
247232
this.#pubSubState.listeners.channels :
248233
this.#pubSubState.listeners.patterns;
@@ -280,8 +265,7 @@ export default class RedisCommandsQueue {
280265

281266
#pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array<RedisCommandArgument>): Promise<void> {
282267
return new Promise((resolve, reject) => {
283-
const pubSubState = this.#initiatePubSubState(),
284-
isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE,
268+
const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE,
285269
inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing',
286270
commandArgs: Array<RedisCommandArgument> = [command];
287271

@@ -293,38 +277,42 @@ export default class RedisCommandsQueue {
293277
channelsCounter = channels.length;
294278
}
295279

296-
pubSubState[inProgressKey] += channelsCounter;
280+
this.#pubSubState.isActive = true;
281+
this.#pubSubState[inProgressKey] += channelsCounter;
297282

298283
this.#waitingToBeSent.push({
299284
args: commandArgs,
300285
channelsCounter,
301286
returnBuffers: true,
302287
resolve: () => {
303-
pubSubState[inProgressKey] -= channelsCounter;
304-
if (isSubscribe) {
305-
pubSubState.subscribed += channelsCounter;
306-
} else {
307-
pubSubState.subscribed -= channelsCounter;
308-
if (!pubSubState.subscribed && !pubSubState.subscribing && !pubSubState.subscribed) {
309-
this.#pubSubState = undefined;
310-
}
311-
}
288+
this.#pubSubState[inProgressKey] -= channelsCounter;
289+
this.#pubSubState.subscribed += channelsCounter * (isSubscribe ? 1 : -1);
290+
this.#updatePubSubActiveState();
312291
resolve();
313292
},
314293
reject: err => {
315-
pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1);
294+
this.#pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1);
295+
this.#updatePubSubActiveState();
316296
reject(err);
317297
}
318298
});
319299
});
320300
}
321301

322-
resubscribe(): Promise<any> | undefined {
323-
if (!this.#pubSubState) {
324-
return;
302+
#updatePubSubActiveState(): void {
303+
if (
304+
!this.#pubSubState.subscribed &&
305+
!this.#pubSubState.subscribing &&
306+
!this.#pubSubState.subscribed
307+
) {
308+
this.#pubSubState.isActive = false;
325309
}
310+
}
326311

312+
resubscribe(): Promise<any> | undefined {
327313
this.#pubSubState.subscribed = 0;
314+
this.#pubSubState.subscribing = 0;
315+
this.#pubSubState.unsubscribing = 0;
328316

329317
const promises = [],
330318
{ channels, patterns } = this.#pubSubState.listeners;
@@ -369,8 +357,7 @@ export default class RedisCommandsQueue {
369357
#setReturnBuffers() {
370358
this.#parser.setReturnBuffers(
371359
!!this.#waitingForReply.head?.value.returnBuffers ||
372-
!!this.#pubSubState?.subscribed ||
373-
!!this.#pubSubState?.subscribing
360+
!!this.#pubSubState.isActive
374361
);
375362
}
376363

@@ -390,6 +377,7 @@ export default class RedisCommandsQueue {
390377
}
391378

392379
flushWaitingForReply(err: Error): void {
380+
this.#parser.reset();
393381
RedisCommandsQueue.#flushQueue(this.#waitingForReply, err);
394382

395383
if (!this.#chainInExecution) return;

packages/client/lib/client/index.spec.ts

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
33
import RedisClient, { RedisClientType } from '.';
44
import { RedisClientMultiCommandType } from './multi-command';
55
import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisScripts } from '../commands';
6-
import { AbortError, AuthError, ClientClosedError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors';
6+
import { AbortError, ClientClosedError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors';
77
import { defineScript } from '../lua-script';
88
import { spy } from 'sinon';
99
import { once } from 'events';
@@ -87,30 +87,6 @@ describe('Client', () => {
8787
);
8888
}, GLOBAL.SERVERS.PASSWORD);
8989

90-
testUtils.testWithClient('should not retry connecting if failed due to wrong auth', async client => {
91-
let message;
92-
if (testUtils.isVersionGreaterThan([6, 2])) {
93-
message = 'WRONGPASS invalid username-password pair or user is disabled.';
94-
} else if (testUtils.isVersionGreaterThan([6])) {
95-
message = 'WRONGPASS invalid username-password pair';
96-
} else {
97-
message = 'ERR invalid password';
98-
}
99-
100-
await assert.rejects(
101-
client.connect(),
102-
new AuthError(message)
103-
);
104-
105-
assert.equal(client.isOpen, false);
106-
}, {
107-
...GLOBAL.SERVERS.PASSWORD,
108-
clientOptions: {
109-
password: 'wrongpassword'
110-
},
111-
disableClientSetup: true
112-
});
113-
11490
testUtils.testWithClient('should execute AUTH before SELECT', async client => {
11591
assert.equal(
11692
(await client.clientInfo()).db,
@@ -300,7 +276,8 @@ describe('Client', () => {
300276
await client.multi()
301277
.sAdd('a', ['b', 'c'])
302278
.v4.exec(),
303-
[2])
279+
[2]
280+
);
304281
}, {
305282
...GLOBAL.SERVERS.OPEN,
306283
clientOptions: {
@@ -681,10 +658,6 @@ describe('Client', () => {
681658
const listener = spy();
682659
await subscriber.subscribe('channel', listener);
683660

684-
subscriber.on('error', err => {
685-
console.error('subscriber err', err.message);
686-
});
687-
688661
await Promise.all([
689662
once(subscriber, 'error'),
690663
publisher.clientKill({

packages/client/lib/client/index.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { ScanCommandOptions } from '../commands/SCAN';
1111
import { HScanTuple } from '../commands/HSCAN';
1212
import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply } from '../commander';
1313
import { Pool, Options as PoolOptions, createPool } from 'generic-pool';
14-
import { ClientClosedError, DisconnectsClientError, AuthError } from '../errors';
14+
import { ClientClosedError, DisconnectsClientError } from '../errors';
1515
import { URL } from 'url';
1616
import { TcpSocketConnectOpts } from 'net';
1717

@@ -254,9 +254,7 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
254254
password: this.#options.password ?? ''
255255
}),
256256
{ asap: true }
257-
).catch(err => {
258-
throw new AuthError(err.message);
259-
})
257+
)
260258
);
261259
}
262260

packages/client/lib/client/socket.spec.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ describe('Socket', () => {
2121
return time;
2222
});
2323

24-
const socket = new RedisSocket(undefined, {
25-
host: 'error',
26-
reconnectStrategy
27-
});
24+
const socket = new RedisSocket(
25+
() => Promise.resolve(),
26+
{
27+
host: 'error',
28+
reconnectStrategy
29+
}
30+
);
2831

2932
socket.on('error', () => {
3033
// ignore errors

0 commit comments

Comments
 (0)