Skip to content

Commit 9ac1f12

Browse files
committed
feat(client): Add maintenance pause mechanism for socket operations
- Introduced `#pausedForMaintenance` flag to temporarily halt writing commands to the socket during maintenance windows. - Added `#resumeFromMaintenance` method to handle socket replacement and resume operations after maintenance. - Updated `#write` method to respect the `#pausedForMaintenance` flag, preventing new commands from being written during maintenance.
1 parent 724f0e0 commit 9ac1f12

File tree

1 file changed

+18
-0
lines changed

1 file changed

+18
-0
lines changed

packages/client/lib/client/index.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,12 @@ export default class RedisClient<
442442
#watchEpoch?: number;
443443
#clientSideCache?: ClientSideCacheProvider;
444444
#credentialsSubscription: Disposable | null = null;
445+
// Flag used to pause writing to the socket during maintenance windows.
446+
// When true, prevents new commands from being written while waiting for:
447+
// 1. New socket to be ready after maintenance redirect
448+
// 2. In-flight commands on the old socket to complete
449+
#pausedForMaintenance = false;
450+
445451
get clientSideCache() {
446452
return this._self.#clientSideCache;
447453
}
@@ -480,6 +486,15 @@ export default class RedisClient<
480486
return this._self.#dirtyWatch !== undefined
481487
}
482488

489+
async #resumeFromMaintenance(newSocket: RedisSocket) {
490+
this._self.#socket.removeAllListeners();
491+
this._self.#socket.destroy();
492+
this._self.#socket = newSocket;
493+
this._self.#pausedForMaintenance = false;
494+
await this._self.#initiateSocket();
495+
this._self.#maybeScheduleWrite();
496+
}
497+
483498
/**
484499
* Marks the client's WATCH command as invalidated due to a topology change.
485500
* This will cause any subsequent EXEC in a transaction to fail with a WatchError.
@@ -1135,6 +1150,9 @@ export default class RedisClient<
11351150
}
11361151

11371152
#write() {
1153+
if(this.#pausedForMaintenance) {
1154+
return
1155+
}
11381156
this.#socket.write(this.#queue.commandsToWrite());
11391157
}
11401158

0 commit comments

Comments
 (0)