Skip to content

Commit c3b126c

Browse files
authored
Connected state v2 (#125)
* Alternative version of wait on HELLO * Close connection with correct error * Remove isClosed variable * Add error to closed state to record why connection was closed * Add channel handler connected state tests * Add tests for closed error * log trace output
1 parent 80e6518 commit c3b126c

File tree

6 files changed

+408
-123
lines changed

6 files changed

+408
-123
lines changed

Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift

Lines changed: 149 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,18 @@ extension ValkeyChannelHandler {
2121
struct StateMachine<Context>: ~Copyable {
2222
@usableFromInline
2323
enum State: ~Copyable {
24-
case initializing
24+
case initialized
25+
case connected(ConnectedState)
2526
case active(ActiveState)
2627
case closing(ActiveState)
27-
case closed
28+
case closed(Error?)
2829

2930
@usableFromInline
3031
var description: String {
3132
borrowing get {
3233
switch self {
33-
case .initializing: "initializing"
34+
case .initialized: "initialized"
35+
case .connected: "connected"
3436
case .active: "active"
3537
case .closing: "closing"
3638
case .closed: "closed"
@@ -60,8 +62,21 @@ extension ValkeyChannelHandler {
6062
}
6163
}
6264

65+
@usableFromInline
66+
struct ConnectedState {
67+
let context: Context
68+
var pendingHelloCommand: PendingCommand
69+
70+
func cancel(requestID: Int) -> PendingCommand? {
71+
if pendingHelloCommand.requestID == requestID {
72+
return pendingHelloCommand
73+
}
74+
return nil
75+
}
76+
}
77+
6378
init() {
64-
self.state = .initializing
79+
self.state = .initialized
6580
}
6681

6782
private init(_ state: consuming State) {
@@ -70,16 +85,20 @@ extension ValkeyChannelHandler {
7085

7186
/// handler has become active
7287
@usableFromInline
73-
mutating func setActive(context: Context) {
88+
mutating func setConnected(context: Context, pendingHelloCommand: PendingCommand) {
7489
switch consume self.state {
75-
case .initializing:
76-
self = .active(.init(context: context, pendingCommands: []))
90+
case .initialized:
91+
self = .connected(
92+
.init(context: context, pendingHelloCommand: pendingHelloCommand)
93+
)
94+
case .connected:
95+
preconditionFailure("Cannot set connected state when state is connected")
7796
case .active:
78-
preconditionFailure("Cannot set active state when state is active")
97+
preconditionFailure("Cannot set connected state when state is active")
7998
case .closing:
80-
preconditionFailure("Cannot set active state when state is closing")
99+
preconditionFailure("Cannot set connected state when state is closing")
81100
case .closed:
82-
preconditionFailure("Cannot set active state when state is closed")
101+
preconditionFailure("Cannot set connected state when state is closed")
83102
}
84103
}
85104

@@ -99,17 +118,19 @@ extension ValkeyChannelHandler {
99118
@usableFromInline
100119
mutating func sendCommands(_ pendingCommands: some Collection<PendingCommand>) -> SendCommandAction {
101120
switch consume self.state {
102-
case .initializing:
103-
preconditionFailure("Cannot send command when initializing")
121+
case .initialized:
122+
preconditionFailure("Cannot send command when initialized")
123+
case .connected:
124+
preconditionFailure("Cannot send command when in connected state")
104125
case .active(var state):
105126
state.pendingCommands.append(contentsOf: pendingCommands)
106127
self = .active(state)
107128
return .sendCommand(state.context)
108129
case .closing(let state):
109130
self = .closing(state)
110131
return .throwError(ValkeyClientError(.connectionClosing))
111-
case .closed:
112-
self = .closed
132+
case .closed(let error):
133+
self = .closed(error)
113134
return .throwError(ValkeyClientError(.connectionClosed))
114135
}
115136
}
@@ -124,19 +145,29 @@ extension ValkeyChannelHandler {
124145
@usableFromInline
125146
enum ReceivedResponseAction {
126147
case respond(PendingCommand, DeadlineCallbackAction)
127-
case respondAndClose(PendingCommand)
148+
case respondAndClose(PendingCommand, Error?)
128149
case closeWithError(Error)
129150
}
130151

131152
/// handler wants to send a command
132153
@usableFromInline
133-
mutating func receivedResponse() -> ReceivedResponseAction {
154+
mutating func receivedResponse(token: RESPToken) -> ReceivedResponseAction {
134155
switch consume self.state {
135-
case .initializing:
136-
preconditionFailure("Cannot send command when initializing")
156+
case .initialized:
157+
preconditionFailure("Cannot send command when initialized")
158+
case .connected(let state):
159+
switch token.value {
160+
case .bulkError(let message), .simpleError(let message):
161+
let error = ValkeyClientError(.commandError, message: String(buffer: message))
162+
self = .closed(error)
163+
return .respondAndClose(state.pendingHelloCommand, error)
164+
default:
165+
self = .active(.init(context: state.context, pendingCommands: .init()))
166+
return .respond(state.pendingHelloCommand, .cancel)
167+
}
137168
case .active(var state):
138169
guard let command = state.pendingCommands.popFirst() else {
139-
self = .closed
170+
self = .closed(nil)
140171
return .closeWithError(ValkeyClientError(.unsolicitedToken, message: "Received a token without having sent a command"))
141172
}
142173
self = .active(state)
@@ -170,14 +201,45 @@ extension ValkeyChannelHandler {
170201
}
171202
return .respond(command, deadlineCallback)
172203
} else {
173-
self = .closed
174-
return .respondAndClose(command)
204+
self = .closed(nil)
205+
return .respondAndClose(command, nil)
175206
}
176207
case .closed:
177208
preconditionFailure("Cannot receive command on closed connection")
178209
}
179210
}
180211

212+
@usableFromInline
213+
enum WaitOnActiveAction {
214+
case waitForPromise(EventLoopPromise<RESPToken>)
215+
case reportedClosed(Error?)
216+
case done
217+
}
218+
219+
mutating func waitOnActive() -> WaitOnActiveAction {
220+
switch consume self.state {
221+
case .initialized:
222+
preconditionFailure("Cannot wait until connection has succeeded")
223+
case .connected(let state):
224+
switch state.pendingHelloCommand.promise {
225+
case .nio(let promise):
226+
self = .connected(state)
227+
return .waitForPromise(promise)
228+
case .swift:
229+
preconditionFailure("Connected state cannot be setup with a Swift continuation")
230+
}
231+
case .active(let state):
232+
self = .active(state)
233+
return .done
234+
case .closing(let state):
235+
self = .closing(state)
236+
return .done
237+
case .closed(let error):
238+
self = .closed(error)
239+
return .reportedClosed(error)
240+
}
241+
}
242+
181243
@usableFromInline
182244
enum HitDeadlineAction {
183245
case failPendingCommandsAndClose(Context, Deque<PendingCommand>)
@@ -188,12 +250,20 @@ extension ValkeyChannelHandler {
188250
@usableFromInline
189251
mutating func hitDeadline(now: NIODeadline) -> HitDeadlineAction {
190252
switch consume self.state {
191-
case .initializing:
192-
preconditionFailure("Cannot cancel when initializing")
253+
case .initialized:
254+
preconditionFailure("Cannot cancel when initialized")
255+
case .connected(let state):
256+
if state.pendingHelloCommand.deadline <= now {
257+
self = .closed(ValkeyClientError(.timeout))
258+
return .failPendingCommandsAndClose(state.context, [state.pendingHelloCommand])
259+
} else {
260+
self = .connected(state)
261+
return .reschedule(state.pendingHelloCommand.deadline)
262+
}
193263
case .active(let state):
194264
if let firstCommand = state.pendingCommands.first {
195265
if firstCommand.deadline <= now {
196-
self = .closed
266+
self = .closed(ValkeyClientError(.timeout))
197267
return .failPendingCommandsAndClose(state.context, state.pendingCommands)
198268
} else {
199269
self = .active(state)
@@ -208,14 +278,14 @@ extension ValkeyChannelHandler {
208278
preconditionFailure("Cannot be in closing state with no pending commands")
209279
}
210280
if firstCommand.deadline <= now {
211-
self = .closed
281+
self = .closed(ValkeyClientError(.timeout))
212282
return .failPendingCommandsAndClose(state.context, state.pendingCommands)
213283
} else {
214284
self = .closing(state)
215285
return .reschedule(firstCommand.deadline)
216286
}
217-
case .closed:
218-
self = .closed
287+
case .closed(let error):
288+
self = .closed(error)
219289
return .clearCallback
220290
}
221291
}
@@ -230,12 +300,24 @@ extension ValkeyChannelHandler {
230300
@usableFromInline
231301
mutating func cancel(requestID: Int) -> CancelAction {
232302
switch consume self.state {
233-
case .initializing:
234-
preconditionFailure("Cannot cancel when initializing")
303+
case .initialized:
304+
preconditionFailure("Cannot cancel when initialized")
305+
case .connected(let state):
306+
if let command = state.cancel(requestID: requestID) {
307+
self = .closed(CancellationError())
308+
return .failPendingCommandsAndClose(
309+
state.context,
310+
cancel: [command],
311+
closeConnectionDueToCancel: []
312+
)
313+
} else {
314+
self = .connected(state)
315+
return .doNothing
316+
}
235317
case .active(let state):
236318
let (cancel, closeConnectionDueToCancel) = state.cancel(requestID: requestID)
237319
if cancel.count > 0 {
238-
self = .closed
320+
self = .closed(CancellationError())
239321
return .failPendingCommandsAndClose(
240322
state.context,
241323
cancel: cancel,
@@ -248,7 +330,7 @@ extension ValkeyChannelHandler {
248330
case .closing(let state):
249331
let (cancel, closeConnectionDueToCancel) = state.cancel(requestID: requestID)
250332
if cancel.count > 0 {
251-
self = .closed
333+
self = .closed(CancellationError())
252334
return .failPendingCommandsAndClose(
253335
state.context,
254336
cancel: cancel,
@@ -258,8 +340,8 @@ extension ValkeyChannelHandler {
258340
self = .closing(state)
259341
return .doNothing
260342
}
261-
case .closed:
262-
self = .closed
343+
case .closed(let error):
344+
self = .closed(error)
263345
return .doNothing
264346
}
265347
}
@@ -274,22 +356,25 @@ extension ValkeyChannelHandler {
274356
@usableFromInline
275357
mutating func gracefulShutdown() -> GracefulShutdownAction {
276358
switch consume self.state {
277-
case .initializing:
278-
self = .closed
359+
case .initialized:
360+
self = .closed(nil)
279361
return .doNothing
362+
case .connected(let state):
363+
self = .closing(.init(context: state.context, pendingCommands: [state.pendingHelloCommand]))
364+
return .waitForPendingCommands(state.context)
280365
case .active(let state):
281366
if state.pendingCommands.count > 0 {
282367
self = .closing(.init(context: state.context, pendingCommands: state.pendingCommands))
283368
return .waitForPendingCommands(state.context)
284369
} else {
285-
self = .closed
370+
self = .closed(nil)
286371
return .closeConnection(state.context)
287372
}
288373
case .closing(let state):
289374
self = .closing(state)
290375
return .doNothing
291-
case .closed:
292-
self = .closed
376+
case .closed(let error):
377+
self = .closed(error)
293378
return .doNothing
294379
}
295380
}
@@ -303,17 +388,20 @@ extension ValkeyChannelHandler {
303388
@usableFromInline
304389
mutating func close() -> CloseAction {
305390
switch consume self.state {
306-
case .initializing:
307-
self = .closed
391+
case .initialized:
392+
self = .closed(nil)
308393
return .doNothing
394+
case .connected(let state):
395+
self = .closed(nil)
396+
return .failPendingCommandsAndClose(state.context, [state.pendingHelloCommand])
309397
case .active(let state):
310-
self = .closed
398+
self = .closed(nil)
311399
return .failPendingCommandsAndClose(state.context, state.pendingCommands)
312400
case .closing(let state):
313-
self = .closed
401+
self = .closed(nil)
314402
return .failPendingCommandsAndClose(state.context, state.pendingCommands)
315-
case .closed:
316-
self = .closed
403+
case .closed(let error):
404+
self = .closed(error)
317405
return .doNothing
318406
}
319407
}
@@ -328,23 +416,30 @@ extension ValkeyChannelHandler {
328416
@usableFromInline
329417
mutating func setClosed() -> SetClosedAction {
330418
switch consume self.state {
331-
case .initializing:
332-
self = .closed
419+
case .initialized:
420+
self = .closed(nil)
333421
return .doNothing
422+
case .connected(let state):
423+
self = .closed(nil)
424+
return .failPendingCommandsAndSubscriptions([state.pendingHelloCommand])
334425
case .active(let state):
335-
self = .closed
426+
self = .closed(nil)
336427
return .failPendingCommandsAndSubscriptions(state.pendingCommands)
337428
case .closing(let state):
338-
self = .closed
429+
self = .closed(nil)
339430
return .failPendingCommandsAndSubscriptions(state.pendingCommands)
340-
case .closed:
341-
self = .closed
431+
case .closed(let error):
432+
self = .closed(error)
342433
return .doNothing
343434
}
344435
}
345436

346-
private static var initializing: Self {
347-
StateMachine(.initializing)
437+
private static var initialized: Self {
438+
StateMachine(.initialized)
439+
}
440+
441+
private static func connected(_ state: ConnectedState) -> Self {
442+
StateMachine(.connected(state))
348443
}
349444

350445
private static func active(_ state: ActiveState) -> Self {
@@ -355,8 +450,8 @@ extension ValkeyChannelHandler {
355450
StateMachine(.closing(state))
356451
}
357452

358-
private static var closed: Self {
359-
StateMachine(.closed)
453+
private static func closed(_ error: Error?) -> Self {
454+
StateMachine(.closed(error))
360455
}
361456
}
362457
}

0 commit comments

Comments
 (0)