1
- import { SinglyLinkedList , DoublyLinkedNode , DoublyLinkedList } from './linked-list' ;
1
+ import { DoublyLinkedNode , DoublyLinkedList , EmptyAwareSinglyLinkedList } from './linked-list' ;
2
2
import encodeCommand from '../RESP/encoder' ;
3
3
import { Decoder , PUSH_TYPE_MAPPING , RESP_TYPES } from '../RESP/decoder' ;
4
4
import { TypeMapping , ReplyUnion , RespVersions , RedisArgument } from '../RESP/types' ;
5
5
import { ChannelListeners , PubSub , PubSubCommand , PubSubListener , PubSubType , PubSubTypeListeners } from './pub-sub' ;
6
- import { AbortError , ErrorReply , TimeoutError } from '../errors' ;
6
+ import { AbortError , ErrorReply , CommandTimeoutDuringMaintenanceError , TimeoutError } from '../errors' ;
7
7
import { MonitorCallback } from '.' ;
8
+ import { dbgMaintenance } from './enterprise-maintenance-manager' ;
8
9
9
10
export interface CommandOptions < T = TypeMapping > {
10
11
chainId ?: symbol ;
@@ -30,6 +31,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
30
31
timeout : {
31
32
signal : AbortSignal ;
32
33
listener : ( ) => unknown ;
34
+ originalTimeout : number | undefined ;
33
35
} | undefined ;
34
36
}
35
37
@@ -50,22 +52,74 @@ const RESP2_PUSH_TYPE_MAPPING = {
50
52
[ RESP_TYPES . SIMPLE_STRING ] : Buffer
51
53
} ;
52
54
55
+ // Try to handle a push notification. Return whether you
56
+ // successfully consumed the notification or not. This is
57
+ // important in order for the queue to be able to pass the
58
+ // notification to another handler if the current one did not
59
+ // succeed.
60
+ type PushHandler = ( pushItems : Array < any > ) => boolean ;
61
+
53
62
export default class RedisCommandsQueue {
54
63
readonly #respVersion;
55
64
readonly #maxLength;
56
65
readonly #toWrite = new DoublyLinkedList < CommandToWrite > ( ) ;
57
- readonly #waitingForReply = new SinglyLinkedList < CommandWaitingForReply > ( ) ;
66
+ readonly #waitingForReply = new EmptyAwareSinglyLinkedList < CommandWaitingForReply > ( ) ;
58
67
readonly #onShardedChannelMoved;
59
68
#chainInExecution: symbol | undefined ;
60
69
readonly decoder ;
61
70
readonly #pubSub = new PubSub ( ) ;
62
71
72
+ #pushHandlers: PushHandler [ ] = [ this . #onPush. bind ( this ) ] ;
73
+
74
+ #maintenanceCommandTimeout: number | undefined
75
+
76
+ setMaintenanceCommandTimeout ( ms : number | undefined ) {
77
+ // Prevent possible api misuse
78
+ if ( this . #maintenanceCommandTimeout === ms ) {
79
+ dbgMaintenance ( `Queue already set maintenanceCommandTimeout to ${ ms } , skipping` ) ;
80
+ return ;
81
+ } ;
82
+
83
+ dbgMaintenance ( `Setting maintenance command timeout to ${ ms } ` ) ;
84
+ this . #maintenanceCommandTimeout = ms ;
85
+
86
+ if ( this . #maintenanceCommandTimeout === undefined ) {
87
+ dbgMaintenance ( `Queue will keep maintenanceCommandTimeout for exisitng commands, just to be on the safe side. New commands will receive normal timeouts` ) ;
88
+ return ;
89
+ }
90
+
91
+ let counter = 0 ;
92
+ const total = this . #toWrite. length ;
93
+
94
+ // Overwrite timeouts of all eligible toWrite commands
95
+ for ( const node of this . #toWrite. nodes ( ) ) {
96
+ const command = node . value ;
97
+
98
+ // Remove timeout listener if it exists
99
+ RedisCommandsQueue . #removeTimeoutListener( command )
100
+
101
+ counter ++ ;
102
+ const newTimeout = this . #maintenanceCommandTimeout;
103
+
104
+ // Overwrite the command's timeout
105
+ const signal = AbortSignal . timeout ( newTimeout ) ;
106
+ command . timeout = {
107
+ signal,
108
+ listener : ( ) => {
109
+ this . #toWrite. remove ( node ) ;
110
+ command . reject ( new CommandTimeoutDuringMaintenanceError ( newTimeout ) ) ;
111
+ } ,
112
+ originalTimeout : command . timeout ?. originalTimeout
113
+ } ;
114
+ signal . addEventListener ( 'abort' , command . timeout . listener , { once : true } ) ;
115
+ } ;
116
+ dbgMaintenance ( `Total of ${ counter } of ${ total } timeouts reset to ${ ms } ` ) ;
117
+ }
118
+
63
119
get isPubSubActive ( ) {
64
120
return this . #pubSub. isActive ;
65
121
}
66
122
67
- #invalidateCallback?: ( key : RedisArgument | null ) => unknown ;
68
-
69
123
constructor (
70
124
respVersion : RespVersions ,
71
125
maxLength : number | null | undefined ,
@@ -107,6 +161,7 @@ export default class RedisCommandsQueue {
107
161
}
108
162
return true ;
109
163
}
164
+ return false
110
165
}
111
166
112
167
#getTypeMapping( ) {
@@ -119,30 +174,27 @@ export default class RedisCommandsQueue {
119
174
onErrorReply : err => this . #onErrorReply( err ) ,
120
175
//TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used
121
176
onPush : push => {
122
- if ( ! this . #onPush( push ) ) {
123
- // currently only supporting "invalidate" over RESP3 push messages
124
- switch ( push [ 0 ] . toString ( ) ) {
125
- case "invalidate" : {
126
- if ( this . #invalidateCallback) {
127
- if ( push [ 1 ] !== null ) {
128
- for ( const key of push [ 1 ] ) {
129
- this . #invalidateCallback( key ) ;
130
- }
131
- } else {
132
- this . #invalidateCallback( null ) ;
133
- }
134
- }
135
- break ;
136
- }
137
- }
177
+ for ( const pushHandler of this . #pushHandlers) {
178
+ if ( pushHandler ( push ) ) return
138
179
}
139
180
} ,
140
181
getTypeMapping : ( ) => this . #getTypeMapping( )
141
182
} ) ;
142
183
}
143
184
144
- setInvalidateCallback ( callback ?: ( key : RedisArgument | null ) => unknown ) {
145
- this . #invalidateCallback = callback ;
185
+ addPushHandler ( handler : PushHandler ) : void {
186
+ this . #pushHandlers. push ( handler ) ;
187
+ }
188
+
189
+ async waitForInflightCommandsToComplete ( ) : Promise < void > {
190
+ // In-flight commands already completed
191
+ if ( this . #waitingForReply. length === 0 ) {
192
+ return
193
+ } ;
194
+ // Otherwise wait for in-flight commands to fire `empty` event
195
+ return new Promise ( resolve => {
196
+ this . #waitingForReply. events . on ( 'empty' , resolve )
197
+ } ) ;
146
198
}
147
199
148
200
addCommand < T > (
@@ -168,15 +220,20 @@ export default class RedisCommandsQueue {
168
220
typeMapping : options ?. typeMapping
169
221
} ;
170
222
171
- const timeout = options ?. timeout ;
223
+ // If #maintenanceCommandTimeout was explicitly set, we should
224
+ // use it instead of the timeout provided by the command
225
+ const timeout = this . #maintenanceCommandTimeout ?? options ?. timeout ;
226
+ const wasInMaintenance = this . #maintenanceCommandTimeout !== undefined ;
172
227
if ( timeout ) {
228
+
173
229
const signal = AbortSignal . timeout ( timeout ) ;
174
230
value . timeout = {
175
231
signal,
176
232
listener : ( ) => {
177
233
this . #toWrite. remove ( node ) ;
178
- value . reject ( new TimeoutError ( ) ) ;
179
- }
234
+ value . reject ( wasInMaintenance ? new CommandTimeoutDuringMaintenanceError ( timeout ) : new TimeoutError ( ) ) ;
235
+ } ,
236
+ originalTimeout : options ?. timeout
180
237
} ;
181
238
signal . addEventListener ( 'abort' , value . timeout . listener , { once : true } ) ;
182
239
}
@@ -432,7 +489,7 @@ export default class RedisCommandsQueue {
432
489
}
433
490
434
491
static #removeTimeoutListener( command : CommandToWrite ) {
435
- command . timeout ! . signal . removeEventListener ( 'abort' , command . timeout ! . listener ) ;
492
+ command . timeout ? .signal . removeEventListener ( 'abort' , command . timeout ! . listener ) ;
436
493
}
437
494
438
495
static #flushToWrite( toBeSent : CommandToWrite , err : Error ) {
0 commit comments