@@ -65,11 +65,12 @@ extension ValkeyChannelHandler {
65
65
@usableFromInline
66
66
struct ConnectedState {
67
67
let context : Context
68
+ let pendingHelloCommand : PendingCommand
68
69
var pendingCommands : Deque < PendingCommand >
69
70
70
- func cancel( requestID: Int ) -> Deque < PendingCommand > ? {
71
- if pendingCommands . first ? . requestID == requestID {
72
- return pendingCommands
71
+ func cancel( requestID: Int ) -> [ PendingCommand ] ? {
72
+ if pendingHelloCommand . requestID == requestID {
73
+ return [ pendingHelloCommand ] + pendingCommands
73
74
}
74
75
return nil
75
76
}
@@ -85,11 +86,11 @@ extension ValkeyChannelHandler {
85
86
86
87
/// handler has become active
87
88
@usableFromInline
88
- mutating func setConnected( context: Context , pendingCommands: Deque < PendingCommand > ) {
89
+ mutating func setConnected( context: Context , pendingHelloCommand : PendingCommand , pendingCommands: Deque < PendingCommand > ) {
89
90
switch consume self. state {
90
91
case . initialized:
91
92
self = . connected(
92
- . init( context: context, pendingCommands: pendingCommands)
93
+ . init( context: context, pendingHelloCommand : pendingHelloCommand , pendingCommands: pendingCommands)
93
94
)
94
95
case . connected:
95
96
preconditionFailure ( " Cannot set connected state when state is connected " )
@@ -155,18 +156,15 @@ extension ValkeyChannelHandler {
155
156
switch consume self. state {
156
157
case . initialized:
157
158
preconditionFailure ( " Cannot send command when initialized " )
158
- case . connected( var state) :
159
- guard let helloCommand = state. pendingCommands. popFirst ( ) else {
160
- preconditionFailure ( " Cannot be in connected state with no pending commands " )
161
- }
159
+ case . connected( let state) :
162
160
switch token. value {
163
161
case . bulkError( let message) , . simpleError( let message) :
164
162
let error = ValkeyClientError ( . commandError, message: String ( buffer: message) )
165
163
self = . closed( error)
166
- return . respondAndClose( helloCommand , error)
164
+ return . respondAndClose( state . pendingHelloCommand , error)
167
165
default :
168
166
self = . active( . init( context: state. context, pendingCommands: state. pendingCommands) )
169
- return . respond( helloCommand , . cancel)
167
+ return . respond( state . pendingHelloCommand , . cancel)
170
168
}
171
169
case . active( var state) :
172
170
guard let command = state. pendingCommands. popFirst ( ) else {
@@ -228,14 +226,11 @@ extension ValkeyChannelHandler {
228
226
case . initialized:
229
227
preconditionFailure ( " Cannot wait until connection has succeeded " )
230
228
case . connected( let state) :
231
- guard let helloCommand = state. pendingCommands. first else {
232
- preconditionFailure ( " Cannot be in connected state with no pending commands " )
233
- }
234
- switch helloCommand. promise {
229
+ switch state. pendingHelloCommand. promise {
235
230
case . nio( let promise) :
236
231
self = . connected( state)
237
232
return . waitForPromise( promise)
238
- case . swift, . ignore :
233
+ case . swift, . forget :
239
234
preconditionFailure ( " Connected state cannot be setup with a Swift continuation " )
240
235
}
241
236
case . active( let state) :
@@ -263,15 +258,14 @@ extension ValkeyChannelHandler {
263
258
case . initialized:
264
259
preconditionFailure ( " Cannot cancel when initialized " )
265
260
case . connected( let state) :
266
- guard let helloCommand = state. pendingCommands. first else {
267
- preconditionFailure ( " Cannot be in connected state with no pending commands " )
268
- }
269
- if helloCommand. deadline <= now {
261
+ if state. pendingHelloCommand. deadline <= now {
270
262
self = . closed( ValkeyClientError ( . timeout) )
271
- return . failPendingCommandsAndClose( state. context, state. pendingCommands)
263
+ var pendingCommands = state. pendingCommands
264
+ pendingCommands. prepend ( state. pendingHelloCommand)
265
+ return . failPendingCommandsAndClose( state. context, pendingCommands)
272
266
} else {
273
267
self = . connected( state)
274
- return . reschedule( helloCommand . deadline)
268
+ return . reschedule( state . pendingHelloCommand . deadline)
275
269
}
276
270
case . active( let state) :
277
271
if let firstCommand = state. pendingCommands. first {
@@ -373,7 +367,9 @@ extension ValkeyChannelHandler {
373
367
self = . closed( nil )
374
368
return . doNothing
375
369
case . connected( let state) :
376
- self = . closing( . init( context: state. context, pendingCommands: state. pendingCommands) )
370
+ var pendingCommands = state. pendingCommands
371
+ pendingCommands. prepend ( state. pendingHelloCommand)
372
+ self = . closing( . init( context: state. context, pendingCommands: pendingCommands) )
377
373
return . waitForPendingCommands( state. context)
378
374
case . active( let state) :
379
375
if state. pendingCommands. count > 0 {
@@ -406,6 +402,8 @@ extension ValkeyChannelHandler {
406
402
return . doNothing
407
403
case . connected( let state) :
408
404
self = . closed( nil )
405
+ var pendingCommands = state. pendingCommands
406
+ pendingCommands. prepend ( state. pendingHelloCommand)
409
407
return . failPendingCommandsAndClose( state. context, state. pendingCommands)
410
408
case . active( let state) :
411
409
self = . closed( nil )
@@ -434,6 +432,8 @@ extension ValkeyChannelHandler {
434
432
return . doNothing
435
433
case . connected( let state) :
436
434
self = . closed( nil )
435
+ var pendingCommands = state. pendingCommands
436
+ pendingCommands. prepend ( state. pendingHelloCommand)
437
437
return . failPendingCommandsAndSubscriptions( state. pendingCommands)
438
438
case . active( let state) :
439
439
self = . closed( nil )
0 commit comments