Skip to content

Commit 6fe3653

Browse files
committed
refactor: rework timeouts
1 parent cc79250 commit 6fe3653

File tree

5 files changed

+76
-59
lines changed

5 files changed

+76
-59
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@ import encodeCommand from '../RESP/encoder';
33
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
44
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
6-
import { AbortError, ErrorReply, TimeoutDuringMaintanance, TimeoutError } from '../errors';
6+
import { AbortError, ErrorReply, CommandTimeoutDuringMaintananceError, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
8-
import assert from 'assert';
98

109
export interface CommandOptions<T = TypeMapping> {
1110
chainId?: symbol;
@@ -71,10 +70,12 @@ export default class RedisCommandsQueue {
7170

7271
#pushHandlers: PushHandler[] = [this.#onPush.bind(this)];
7372

74-
// If this value is set, we are in a maintenance mode.
75-
// This means any existing commands should have their timeout
76-
// overwritten to the new timeout. And all new commands should
77-
// have their timeout set as the new timeout.
73+
#inMaintenance = false;
74+
75+
set inMaintenance(value: boolean) {
76+
this.#inMaintenance = value;
77+
}
78+
7879
#maintenanceCommandTimeout: number | undefined
7980

8081
setMaintenanceCommandTimeout(ms: number | undefined) {
@@ -87,29 +88,24 @@ export default class RedisCommandsQueue {
8788
this.#toWrite.forEachNode(node => {
8889
const command = node.value;
8990

90-
// If the command didnt have a timeout, skip it
91-
if (!command.timeout) return;
92-
93-
// Remove existing timeout listener
91+
// Remove timeout listener if it exists
9492
RedisCommandsQueue.#removeTimeoutListener(command)
9593

96-
//TODO see if this is needed
97-
// // Keep a flag to know if we were in maintenance at this point in time.
98-
// // To be used in the timeout listener, which needs to know which exact error to use.
99-
// const wasMaintenance = !!this.#maintenanceCommandTimeout
100-
10194
// Determine newTimeout
10295
const newTimeout = this.#maintenanceCommandTimeout ?? command.timeout?.originalTimeout;
103-
assert(newTimeout !== undefined, 'Trying to reset timeout to `undefined`')
96+
// if no timeout is given and the command didnt have any timeout before, skip
97+
if (!newTimeout) return;
98+
10499

100+
// Overwrite the command's timeout
105101
const signal = AbortSignal.timeout(newTimeout);
106102
command.timeout = {
107103
signal,
108104
listener: () => {
109105
this.#toWrite.remove(node);
110-
command.reject(this.#maintenanceCommandTimeout ? new TimeoutDuringMaintanance(newTimeout) : new TimeoutError());
106+
command.reject(this.#inMaintenance ? new CommandTimeoutDuringMaintananceError(newTimeout) : new TimeoutError());
111107
},
112-
originalTimeout: command.timeout.originalTimeout
108+
originalTimeout: command.timeout?.originalTimeout
113109
};
114110
signal.addEventListener('abort', command.timeout.listener, { once: true });
115111
});
@@ -219,23 +215,17 @@ export default class RedisCommandsQueue {
219215
typeMapping: options?.typeMapping
220216
};
221217

222-
// If #commandTimeout was explicitly set, this
223-
// means we are in maintenance mode and should
218+
// If #maintenanceCommandTimeout was explicitly set, we should
224219
// use it instead of the timeout provided by the command
225220
const timeout = this.#maintenanceCommandTimeout || options?.timeout
226221
if (timeout) {
227222

228-
//TODO see if this is needed
229-
// // Keep a flag to know if we were in maintenance at this point in time.
230-
// // To be used in the timeout listener, which needs to know which exact error to use.
231-
// const wasMaintenance = !!this.#maintenanceCommandTimeout
232-
233223
const signal = AbortSignal.timeout(timeout);
234224
value.timeout = {
235225
signal,
236226
listener: () => {
237227
this.#toWrite.remove(node);
238-
value.reject(this.#maintenanceCommandTimeout ? new TimeoutDuringMaintanance(timeout) : new TimeoutError());
228+
value.reject(this.#inMaintenance ? new CommandTimeoutDuringMaintananceError(timeout) : new TimeoutError());
239229
},
240230
originalTimeout: options?.timeout
241231
};

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,16 @@ export const MAINTENANCE_EVENTS = {
1010
} as const;
1111

1212
const PN = {
13-
MOVING: "MOVING",
14-
MIGRATING: "MIGRATING",
15-
MIGRATED: "MIGRATED",
16-
FAILING_OVER: "FAILING_OVER",
17-
FAILED_OVER: "FAILED_OVER",
13+
MOVING: "MOVING",
14+
MIGRATING: "MIGRATING",
15+
MIGRATED: "MIGRATED",
16+
FAILING_OVER: "FAILING_OVER",
17+
FAILED_OVER: "FAILED_OVER",
18+
};
19+
20+
export interface SocketTimeoutUpdate {
21+
inMaintenance: boolean,
22+
timeout?: number
1823
}
1924

2025
export default class EnterpriseMaintenanceManager extends EventEmitter {
@@ -91,27 +96,25 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
9196
};
9297

9398
#onMigrating = async () => {
94-
this.#commandsQueue.setMaintenanceCommandTimeout(this.#getCommandTimeout());
95-
this.emit(
96-
MAINTENANCE_EVENTS.TIMEOUTS_UPDATE,
97-
this.#getSocketTimeout(),
99+
this.#commandsQueue.inMaintenance = true;
100+
this.#commandsQueue.setMaintenanceCommandTimeout(
101+
this.#options.gracefulMaintenance?.relaxedCommandTimeout,
98102
);
103+
104+
this.emit(MAINTENANCE_EVENTS.TIMEOUTS_UPDATE, {
105+
inMaintenance: true,
106+
timeout: this.#options.gracefulMaintenance?.relaxedSocketTimeout
107+
} satisfies SocketTimeoutUpdate);
99108
};
100109

101110
#onMigrated = async () => {
111+
this.#commandsQueue.inMaintenance = false;
102112
this.#commandsQueue.setMaintenanceCommandTimeout(undefined);
103-
this.emit(MAINTENANCE_EVENTS.TIMEOUTS_UPDATE, undefined);
104-
};
105113

106-
#getSocketTimeout(): number | undefined {
107-
return this.#options.gracefulMaintenance?.handleTimeouts === "error"
108-
? this.#options.socket?.socketTimeout
109-
: this.#options.gracefulMaintenance?.handleTimeouts;
110-
}
114+
this.emit(MAINTENANCE_EVENTS.TIMEOUTS_UPDATE, {
115+
inMaintenance: false,
116+
timeout: undefined
117+
} satisfies SocketTimeoutUpdate);
118+
};
111119

112-
#getCommandTimeout(): number | undefined {
113-
return this.#options.gracefulMaintenance?.handleTimeouts === "error"
114-
? this.#options.commandOptions?.timeout
115-
: this.#options.gracefulMaintenance?.handleTimeouts;
116-
}
117-
}
120+
};

packages/client/lib/client/index.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider }
2020
import { BasicCommandParser, CommandParser } from './parser';
2121
import SingleEntryCache from '../single-entry-cache';
2222
import { version } from '../../package.json'
23-
import EnterpriseMaintenanceManager, { MAINTENANCE_EVENTS } from './enterprise-maintenance-manager';
23+
import EnterpriseMaintenanceManager, { MAINTENANCE_EVENTS, SocketTimeoutUpdate } from './enterprise-maintenance-manager';
2424

2525
export interface RedisClientOptions<
2626
M extends RedisModules = RedisModules,
@@ -177,11 +177,19 @@ export interface RedisClientOptions<
177177
/**
178178
* Designates how failed commands should be handled. A failed command is when the time isn’t sufficient to deal with the responses on the old connection before the server shuts it down
179179
*/
180-
handleFailedCommands: 'exception' | 'retry',
180+
handleFailedCommands?: 'exception' | 'retry',
181181
/**
182-
* Specify whether we should throw a TimeoutDuringMaintanance exception or provide more relaxed timeout, in order to minimize command timeouts during maintenance.
182+
* Specifies a more relaxed timeout (in milliseconds) for commands during a maintenance window.
183+
* This helps minimize command timeouts during maintenance. If not provided, the `commandOptions.timeout`
184+
* will be used instead. Timeouts during maintenance period result in a `CommandTimeoutDuringMaintanance` error.
183185
*/
184-
handleTimeouts: 'error' | number,
186+
relaxedCommandTimeout?: number,
187+
/**
188+
* Specifies a more relaxed timeout (in milliseconds) for the socket during a maintenance window.
189+
* This helps minimize socket timeouts during maintenance. If not provided, the `socket.timeout`
190+
* will be used instead. Timeouts during maintenance period result in a `SocketTimeoutDuringMaintanance` error.
191+
*/
192+
relaxedSocketTimeout?: number
185193
}
186194
}
187195

@@ -490,7 +498,10 @@ export default class RedisClient<
490498
new EnterpriseMaintenanceManager(this.#queue, this.#options!)
491499
.on(MAINTENANCE_EVENTS.PAUSE_WRITING, () => this._self.#pausedForMaintenance = true )
492500
.on(MAINTENANCE_EVENTS.RESUME_WRITING, this.#resumeFromMaintenance.bind(this))
493-
.on(MAINTENANCE_EVENTS.TIMEOUTS_UPDATE, (mtm: number | undefined) => this._self.#socket.setMaintenanceTimeout(mtm))
501+
.on(MAINTENANCE_EVENTS.TIMEOUTS_UPDATE, (value: SocketTimeoutUpdate) => {
502+
this._self.#socket.inMaintenance = value.inMaintenance;
503+
this._self.#socket.setMaintenanceTimeout(value.timeout);
504+
})
494505
}
495506

496507
if (options?.clientSideCache) {

packages/client/lib/client/socket.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { EventEmitter, once } from 'node:events';
22
import net from 'node:net';
33
import tls from 'node:tls';
4-
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError, SocketTimeoutError, TimeoutDuringMaintanance } from '../errors';
4+
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError, SocketTimeoutError, SocketTimeoutDuringMaintananceError } from '../errors';
55
import { setTimeout } from 'node:timers/promises';
66
import { RedisArgument } from '../RESP/types';
77

@@ -81,6 +81,12 @@ export default class RedisSocket extends EventEmitter {
8181
return this.#socketEpoch;
8282
}
8383

84+
#inMaintenance = false;
85+
86+
set inMaintenance(value: boolean) {
87+
this.#inMaintenance = value;
88+
}
89+
8490
constructor(options?: RedisSocketOptions) {
8591
super();
8692

@@ -237,12 +243,14 @@ export default class RedisSocket extends EventEmitter {
237243
}
238244

239245
setMaintenanceTimeout(ms?: number) {
246+
if (this.#maintenanceTimeout === ms) return;
247+
240248
this.#maintenanceTimeout = ms;
241249

242250
if(ms !== undefined) {
243251
this.#socket?.setTimeout(ms);
244-
} else if (this.#socketTimeout !== undefined) {
245-
this.#socket?.setTimeout(this.#socketTimeout);
252+
} else {
253+
this.#socket?.setTimeout(this.#socketTimeout ?? 0);
246254
}
247255
}
248256

@@ -268,8 +276,8 @@ export default class RedisSocket extends EventEmitter {
268276

269277
if (this.#socketTimeout) {
270278
socket.once('timeout', () => {
271-
const error = this.#maintenanceTimeout
272-
? new TimeoutDuringMaintanance(this.#socketTimeout!)
279+
const error = this.#inMaintenance
280+
? new SocketTimeoutDuringMaintananceError(this.#socketTimeout!)
273281
: new SocketTimeoutError(this.#socketTimeout!)
274282
socket.destroy(error);
275283
});

packages/client/lib/errors.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,17 @@ export class BlobError extends ErrorReply {}
7676

7777
export class TimeoutError extends Error {}
7878

79-
export class TimeoutDuringMaintanance extends Error {
79+
export class SocketTimeoutDuringMaintananceError extends TimeoutError {
8080
constructor(timeout: number) {
8181
super(`Socket timeout during maintenance. Expecting data, but didn't receive any in ${timeout}ms.`);
8282
}
8383
}
8484

85+
export class CommandTimeoutDuringMaintananceError extends TimeoutError {
86+
constructor(timeout: number) {
87+
super(`Command timeout during maintenance. Waited to write command for more than ${timeout}ms.`);
88+
}
89+
}
8590

8691
export class MultiErrorReply extends ErrorReply {
8792
replies: Array<ErrorReply>;

0 commit comments

Comments
 (0)