@@ -44,18 +44,18 @@ const RESP2_PUSH_TYPE_MAPPING = {
44
44
} ;
45
45
46
46
export default class RedisCommandsQueue {
47
- private readonly _maxLength : number | null | undefined ;
48
- private readonly _toWrite = new DoublyLinkedList < CommandToWrite > ( ) ;
49
- private readonly _waitingForReply = new SinglyLinkedList < CommandWaitingForReply > ( ) ;
50
- private readonly _onShardedChannelMoved : OnShardedChannelMoved ;
47
+ readonly #maxLength : number | null | undefined ;
48
+ readonly #toWrite = new DoublyLinkedList < CommandToWrite > ( ) ;
49
+ readonly #waitingForReply = new SinglyLinkedList < CommandWaitingForReply > ( ) ;
50
+ readonly #onShardedChannelMoved : OnShardedChannelMoved ;
51
51
52
- private readonly _pubSub = new PubSub ( ) ;
52
+ readonly #pubSub = new PubSub ( ) ;
53
53
54
54
get isPubSubActive ( ) {
55
- return this . _pubSub . isActive ;
55
+ return this . #pubSub . isActive ;
56
56
}
57
57
58
- private _chainInExecution : symbol | undefined ;
58
+ #chainInExecution : symbol | undefined ;
59
59
60
60
decoder : Decoder ;
61
61
@@ -64,92 +64,92 @@ export default class RedisCommandsQueue {
64
64
maxLength : number | null | undefined ,
65
65
onShardedChannelMoved : EventEmitter [ 'emit' ]
66
66
) {
67
- this . decoder = this . _initiateDecoder ( respVersion ) ;
68
- this . _maxLength = maxLength ;
69
- this . _onShardedChannelMoved = onShardedChannelMoved ;
67
+ this . decoder = this . #initiateDecoder ( respVersion ) ;
68
+ this . #maxLength = maxLength ;
69
+ this . #onShardedChannelMoved = onShardedChannelMoved ;
70
70
}
71
71
72
- private _initiateDecoder ( respVersion : RespVersions | null | undefined ) {
72
+ #initiateDecoder ( respVersion : RespVersions | null | undefined ) {
73
73
return respVersion === 3 ?
74
- this . _initiateResp3Decoder ( ) :
75
- this . _initiateResp2Decoder ( ) ;
74
+ this . #initiateResp3Decoder ( ) :
75
+ this . #initiateResp2Decoder ( ) ;
76
76
}
77
77
78
- private _onReply ( reply : ReplyUnion ) {
79
- this . _waitingForReply . shift ( ) ! . resolve ( reply ) ;
78
+ #onReply ( reply : ReplyUnion ) {
79
+ this . #waitingForReply . shift ( ) ! . resolve ( reply ) ;
80
80
}
81
81
82
- private _onErrorReply ( err : ErrorReply ) {
83
- this . _waitingForReply . shift ( ) ! . reject ( err ) ;
82
+ #onErrorReply ( err : ErrorReply ) {
83
+ this . #waitingForReply . shift ( ) ! . reject ( err ) ;
84
84
}
85
85
86
- private _onPush ( push : Array < any > ) {
86
+ #onPush ( push : Array < any > ) {
87
87
// TODO: type
88
- if ( this . _pubSub . handleMessageReply ( push ) ) return true ;
88
+ if ( this . #pubSub . handleMessageReply ( push ) ) return true ;
89
89
90
90
const isShardedUnsubscribe = PubSub . isShardedUnsubscribe ( push ) ;
91
- if ( isShardedUnsubscribe && ! this . _waitingForReply . length ) {
91
+ if ( isShardedUnsubscribe && ! this . #waitingForReply . length ) {
92
92
const channel = push [ 1 ] . toString ( ) ;
93
- this . _onShardedChannelMoved (
93
+ this . #onShardedChannelMoved (
94
94
channel ,
95
- this . _pubSub . removeShardedListeners ( channel )
95
+ this . #pubSub . removeShardedListeners ( channel )
96
96
) ;
97
97
return true ;
98
98
} else if ( isShardedUnsubscribe || PubSub . isStatusReply ( push ) ) {
99
- const head = this . _waitingForReply . head ! . value ;
99
+ const head = this . #waitingForReply . head ! . value ;
100
100
if (
101
101
( Number . isNaN ( head . channelsCounter ! ) && push [ 2 ] === 0 ) ||
102
102
-- head . channelsCounter ! === 0
103
103
) {
104
- this . _waitingForReply . shift ( ) ! . resolve ( ) ;
104
+ this . #waitingForReply . shift ( ) ! . resolve ( ) ;
105
105
}
106
106
return true ;
107
107
}
108
108
}
109
109
110
- private _getTypeMapping ( ) {
111
- return this . _waitingForReply . head ! . value . typeMapping ?? { } ;
110
+ #getTypeMapping ( ) {
111
+ return this . #waitingForReply . head ! . value . typeMapping ?? { } ;
112
112
}
113
113
114
- private _initiateResp3Decoder ( ) {
114
+ #initiateResp3Decoder ( ) {
115
115
return new Decoder ( {
116
- onReply : reply => this . _onReply ( reply ) ,
117
- onErrorReply : err => this . _onErrorReply ( err ) ,
116
+ onReply : reply => this . #onReply ( reply ) ,
117
+ onErrorReply : err => this . #onErrorReply ( err ) ,
118
118
onPush : push => {
119
- if ( ! this . _onPush ( push ) ) {
119
+ if ( ! this . #onPush ( push ) ) {
120
120
121
121
}
122
122
} ,
123
- getTypeMapping : ( ) => this . _getTypeMapping ( )
123
+ getTypeMapping : ( ) => this . #getTypeMapping ( )
124
124
} ) ;
125
125
}
126
126
127
- private _initiateResp2Decoder ( ) {
127
+ #initiateResp2Decoder ( ) {
128
128
return new Decoder ( {
129
129
onReply : reply => {
130
- if ( this . _pubSub . isActive && Array . isArray ( reply ) ) {
131
- if ( this . _onPush ( reply ) ) return ;
130
+ if ( this . #pubSub . isActive && Array . isArray ( reply ) ) {
131
+ if ( this . #onPush ( reply ) ) return ;
132
132
133
133
if ( PONG . equals ( reply [ 0 ] as Buffer ) ) {
134
- const { resolve, typeMapping } = this . _waitingForReply . shift ( ) ! ,
134
+ const { resolve, typeMapping } = this . #waitingForReply . shift ( ) ! ,
135
135
buffer = ( ( reply [ 1 ] as Buffer ) . length === 0 ? reply [ 0 ] : reply [ 1 ] ) as Buffer ;
136
136
resolve ( typeMapping ?. [ RESP_TYPES . SIMPLE_STRING ] === Buffer ? buffer : buffer . toString ( ) ) ;
137
137
return ;
138
138
}
139
139
}
140
140
141
- this . _onReply ( reply ) ;
141
+ this . #onReply ( reply ) ;
142
142
} ,
143
- onErrorReply : err => this . _onErrorReply ( err ) ,
143
+ onErrorReply : err => this . #onErrorReply ( err ) ,
144
144
// PUSH type does not exist in RESP2
145
145
// PubSub is handled in onReply
146
146
// @ts -expect-error
147
147
onPush : undefined ,
148
148
getTypeMapping : ( ) => {
149
149
// PubSub push is an Array in RESP2
150
- return this . _pubSub . isActive ?
150
+ return this . #pubSub . isActive ?
151
151
RESP2_PUSH_TYPE_MAPPING :
152
- this . _getTypeMapping ( ) ;
152
+ this . #getTypeMapping ( ) ;
153
153
}
154
154
} ) ;
155
155
}
@@ -180,7 +180,7 @@ export default class RedisCommandsQueue {
180
180
options ?: CommandOptions ,
181
181
resolveOnWrite ?: boolean
182
182
) : Promise < T > {
183
- if ( this . _maxLength && this . _toWrite . length + this . _waitingForReply . length >= this . _maxLength ) {
183
+ if ( this . #maxLength && this . #toWrite . length + this . #waitingForReply . length >= this . #maxLength ) {
184
184
return Promise . reject ( new Error ( 'The queue is full' ) ) ;
185
185
} else if ( options ?. abortSignal ?. aborted ) {
186
186
return Promise . reject ( new AbortError ( ) ) ;
@@ -204,14 +204,14 @@ export default class RedisCommandsQueue {
204
204
value . abort = {
205
205
signal,
206
206
listener : ( ) => {
207
- this . _toWrite . remove ( node ) ;
207
+ this . #toWrite . remove ( node ) ;
208
208
value . reject ( new AbortError ( ) ) ;
209
209
}
210
210
} ;
211
211
signal . addEventListener ( 'abort' , value . abort . listener , { once : true } ) ;
212
212
}
213
213
214
- node = this . _toWrite . add ( value , options ?. asap ) ;
214
+ node = this . #toWrite . add ( value , options ?. asap ) ;
215
215
} ) ;
216
216
}
217
217
@@ -221,8 +221,8 @@ export default class RedisCommandsQueue {
221
221
listener : PubSubListener < T > ,
222
222
returnBuffers ?: T
223
223
) {
224
- return this . _addPubSubCommand (
225
- this . _pubSub . subscribe ( type , channels , listener , returnBuffers )
224
+ return this . #addPubSubCommand (
225
+ this . #pubSub . subscribe ( type , channels , listener , returnBuffers )
226
226
) ;
227
227
}
228
228
@@ -232,17 +232,17 @@ export default class RedisCommandsQueue {
232
232
listener ?: PubSubListener < T > ,
233
233
returnBuffers ?: T
234
234
) {
235
- return this . _addPubSubCommand (
236
- this . _pubSub . unsubscribe ( type , channels , listener , returnBuffers )
235
+ return this . #addPubSubCommand (
236
+ this . #pubSub . unsubscribe ( type , channels , listener , returnBuffers )
237
237
) ;
238
238
}
239
239
240
240
resubscribe ( ) : Promise < any > | undefined {
241
- const commands = this . _pubSub . resubscribe ( ) ;
241
+ const commands = this . #pubSub . resubscribe ( ) ;
242
242
if ( ! commands . length ) return ;
243
243
244
244
return Promise . all (
245
- commands . map ( command => this . _addPubSubCommand ( command , true ) )
245
+ commands . map ( command => this . #addPubSubCommand ( command , true ) )
246
246
) ;
247
247
}
248
248
@@ -251,26 +251,26 @@ export default class RedisCommandsQueue {
251
251
channel : string ,
252
252
listeners : ChannelListeners
253
253
) {
254
- return this . _addPubSubCommand (
255
- this . _pubSub . extendChannelListeners ( type , channel , listeners )
254
+ return this . #addPubSubCommand (
255
+ this . #pubSub . extendChannelListeners ( type , channel , listeners )
256
256
) ;
257
257
}
258
258
259
259
extendPubSubListeners ( type : PubSubType , listeners : PubSubTypeListeners ) {
260
- return this . _addPubSubCommand (
261
- this . _pubSub . extendTypeListeners ( type , listeners )
260
+ return this . #addPubSubCommand (
261
+ this . #pubSub . extendTypeListeners ( type , listeners )
262
262
) ;
263
263
}
264
264
265
265
getPubSubListeners ( type : PubSubType ) {
266
- return this . _pubSub . getTypeListeners ( type ) ;
266
+ return this . #pubSub . getTypeListeners ( type ) ;
267
267
}
268
268
269
- private _addPubSubCommand ( command : PubSubCommand , asap = false ) {
269
+ #addPubSubCommand ( command : PubSubCommand , asap = false ) {
270
270
if ( command === undefined ) return ;
271
271
272
272
return new Promise < void > ( ( resolve , reject ) => {
273
- this . _toWrite . add ( {
273
+ this . #toWrite . add ( {
274
274
args : command . args ,
275
275
chainId : undefined ,
276
276
abort : undefined ,
@@ -290,23 +290,23 @@ export default class RedisCommandsQueue {
290
290
}
291
291
292
292
isWaitingToWrite ( ) {
293
- return this . _toWrite . length > 0 ;
293
+ return this . #toWrite . length > 0 ;
294
294
}
295
295
296
296
* commandsToWrite ( ) {
297
- let toSend = this . _toWrite . shift ( ) ;
297
+ let toSend = this . #toWrite . shift ( ) ;
298
298
while ( toSend ) {
299
299
let encoded : CommandArguments ;
300
300
try {
301
301
encoded = encodeCommand ( toSend . args ) ;
302
302
} catch ( err ) {
303
303
toSend . reject ( err ) ;
304
- toSend = this . _toWrite . shift ( ) ;
304
+ toSend = this . #toWrite . shift ( ) ;
305
305
continue ;
306
306
}
307
307
308
308
if ( toSend . abort ) {
309
- RedisCommandsQueue . _removeAbortListener ( toSend ) ;
309
+ RedisCommandsQueue . #removeAbortListener ( toSend ) ;
310
310
toSend . abort = undefined ;
311
311
}
312
312
@@ -316,68 +316,68 @@ export default class RedisCommandsQueue {
316
316
// TODO reuse `toSend` or create new object?
317
317
( toSend as any ) . args = undefined ;
318
318
319
- this . _chainInExecution = toSend . chainId ;
319
+ this . #chainInExecution = toSend . chainId ;
320
320
toSend . chainId = undefined ;
321
321
322
- this . _waitingForReply . push ( toSend ) ;
322
+ this . #waitingForReply . push ( toSend ) ;
323
323
}
324
324
325
325
yield encoded ;
326
- toSend = this . _toWrite . shift ( ) ;
326
+ toSend = this . #toWrite . shift ( ) ;
327
327
}
328
328
}
329
329
330
- private _flushWaitingForReply ( err : Error ) : void {
331
- for ( const node of this . _waitingForReply ) {
330
+ #flushWaitingForReply ( err : Error ) : void {
331
+ for ( const node of this . #waitingForReply ) {
332
332
node . reject ( err ) ;
333
333
}
334
- this . _waitingForReply . reset ( ) ;
334
+ this . #waitingForReply . reset ( ) ;
335
335
}
336
336
337
- private static _removeAbortListener ( command : CommandToWrite ) {
337
+ static #removeAbortListener ( command : CommandToWrite ) {
338
338
command . abort ! . signal . removeEventListener ( 'abort' , command . abort ! . listener ) ;
339
339
}
340
340
341
- private static _flushToWrite ( toBeSent : CommandToWrite , err : Error ) {
341
+ static #flushToWrite ( toBeSent : CommandToWrite , err : Error ) {
342
342
if ( toBeSent . abort ) {
343
- RedisCommandsQueue . _removeAbortListener ( toBeSent ) ;
343
+ RedisCommandsQueue . #removeAbortListener ( toBeSent ) ;
344
344
}
345
345
346
346
toBeSent . reject ( err ) ;
347
347
}
348
348
349
349
flushWaitingForReply ( err : Error ) : void {
350
350
this . decoder . reset ( ) ;
351
- this . _pubSub . reset ( ) ;
351
+ this . #pubSub . reset ( ) ;
352
352
353
- this . _flushWaitingForReply ( err ) ;
353
+ this . #flushWaitingForReply ( err ) ;
354
354
355
- if ( ! this . _chainInExecution ) return ;
355
+ if ( ! this . #chainInExecution ) return ;
356
356
357
- while ( this . _toWrite . head ?. value . chainId === this . _chainInExecution ) {
358
- RedisCommandsQueue . _flushToWrite (
359
- this . _toWrite . shift ( ) ! ,
357
+ while ( this . #toWrite . head ?. value . chainId === this . #chainInExecution ) {
358
+ RedisCommandsQueue . #flushToWrite (
359
+ this . #toWrite . shift ( ) ! ,
360
360
err
361
361
) ;
362
362
}
363
363
364
- this . _chainInExecution = undefined ;
364
+ this . #chainInExecution = undefined ;
365
365
}
366
366
367
367
flushAll ( err : Error ) : void {
368
368
this . decoder . reset ( ) ;
369
- this . _pubSub . reset ( ) ;
370
- this . _flushWaitingForReply ( err ) ;
371
- for ( const node of this . _toWrite ) {
372
- RedisCommandsQueue . _flushToWrite ( node , err ) ;
369
+ this . #pubSub . reset ( ) ;
370
+ this . #flushWaitingForReply ( err ) ;
371
+ for ( const node of this . #toWrite ) {
372
+ RedisCommandsQueue . #flushToWrite ( node , err ) ;
373
373
}
374
- this . _toWrite . reset ( ) ;
374
+ this . #toWrite . reset ( ) ;
375
375
}
376
376
377
377
isEmpty ( ) {
378
378
return (
379
- this . _toWrite . length === 0 &&
380
- this . _waitingForReply . length === 0
379
+ this . #toWrite . length === 0 &&
380
+ this . #waitingForReply . length === 0
381
381
) ;
382
382
}
383
383
}
0 commit comments