Skip to content

Commit 763d8eb

Browse files
authored
Move pendingCommands inside state machine (#92)
1 parent 4361a02 commit 763d8eb

File tree

5 files changed

+433
-146
lines changed

5 files changed

+433
-146
lines changed

Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift

Lines changed: 169 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,43 +12,73 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import DequeModule
1516
import NIOCore
1617

1718
extension ValkeyChannelHandler {
1819
@usableFromInline
19-
struct StateMachine<Context> {
20+
struct StateMachine<Context>: ~Copyable {
2021
@usableFromInline
21-
enum State {
22+
enum State: ~Copyable {
2223
case initializing
2324
case active(ActiveState)
24-
case closing(ClosingState)
25+
case closing(ActiveState)
2526
case closed
27+
28+
@usableFromInline
29+
var description: String {
30+
borrowing get {
31+
switch self {
32+
case .initializing: "initializing"
33+
case .active: "active"
34+
case .closing: "closing"
35+
case .closed: "closed"
36+
}
37+
}
38+
}
2639
}
2740
@usableFromInline
2841
var state: State
2942

3043
@usableFromInline
3144
struct ActiveState {
3245
let context: Context
33-
}
46+
var pendingCommands: Deque<PendingCommand>
3447

35-
@usableFromInline
36-
struct ClosingState {
37-
let context: Context
48+
func cancel(requestID: Int) -> (cancel: [PendingCommand], connectionClosedDueToCancellation: [PendingCommand]) {
49+
var withRequestID = [PendingCommand]()
50+
var withoutRequestID = [PendingCommand]()
51+
for command in pendingCommands {
52+
if command.requestID == requestID {
53+
withRequestID.append(command)
54+
} else {
55+
withoutRequestID.append(command)
56+
}
57+
}
58+
return (withRequestID, withoutRequestID)
59+
}
3860
}
3961

4062
init() {
4163
self.state = .initializing
4264
}
4365

66+
private init(_ state: consuming State) {
67+
self.state = state
68+
}
69+
4470
/// handler has become active
4571
@usableFromInline
4672
mutating func setActive(context: Context) {
47-
switch self.state {
73+
switch consume self.state {
4874
case .initializing:
49-
self.state = .active(.init(context: context))
50-
case .active, .closing, .closed:
51-
preconditionFailure("Cannot set active state when state is \(self.state)")
75+
self = .active(.init(context: context, pendingCommands: []))
76+
case .active:
77+
preconditionFailure("Cannot set active state when state is active")
78+
case .closing:
79+
preconditionFailure("Cannot set active state when state is closing")
80+
case .closed:
81+
preconditionFailure("Cannot set active state when state is closed")
5282
}
5383
}
5484

@@ -60,104 +90,203 @@ extension ValkeyChannelHandler {
6090

6191
/// handler wants to send a command
6292
@usableFromInline
63-
func sendCommand() -> SendCommandAction {
64-
switch self.state {
93+
mutating func sendCommand(_ pendingCommand: PendingCommand) -> SendCommandAction {
94+
self.sendCommands(CollectionOfOne(pendingCommand))
95+
}
96+
97+
/// handler wants to send pipelined commands
98+
@usableFromInline
99+
mutating func sendCommands(_ pendingCommands: some Collection<PendingCommand>) -> SendCommandAction {
100+
switch consume self.state {
65101
case .initializing:
66102
preconditionFailure("Cannot send command when initializing")
67-
case .active(let state):
103+
case .active(var state):
104+
state.pendingCommands.append(contentsOf: pendingCommands)
105+
self = .active(state)
68106
return .sendCommand(state.context)
69-
case .closing:
107+
case .closing(let state):
108+
self = .closing(state)
70109
return .throwError(ValkeyClientError(.connectionClosing))
71110
case .closed:
111+
self = .closed
72112
return .throwError(ValkeyClientError(.connectionClosed))
73113
}
74114
}
75115

116+
@usableFromInline
117+
enum ReceivedResponseAction {
118+
case respond(PendingCommand)
119+
case respondAndClose(PendingCommand)
120+
case closeWithError(Error)
121+
}
122+
123+
/// handler wants to send a command
124+
@usableFromInline
125+
mutating func receivedResponse() -> ReceivedResponseAction {
126+
switch consume self.state {
127+
case .initializing:
128+
preconditionFailure("Cannot send command when initializing")
129+
case .active(var state):
130+
guard let command = state.pendingCommands.popFirst() else {
131+
self = .closed
132+
return .closeWithError(ValkeyClientError(.unsolicitedToken, message: "Received a token without having sent a command"))
133+
}
134+
self = .active(state)
135+
return .respond(command)
136+
case .closing(var state):
137+
guard let command = state.pendingCommands.popFirst() else {
138+
self = .closed
139+
return .closeWithError(ValkeyClientError(.unsolicitedToken, message: "Received a token without having sent a command"))
140+
}
141+
if state.pendingCommands.count == 0 {
142+
self = .closed
143+
return .respondAndClose(command)
144+
} else {
145+
self = .closing(state)
146+
return .respond(command)
147+
}
148+
case .closed:
149+
preconditionFailure("Cannot receive command on closed connection")
150+
}
151+
}
152+
76153
@usableFromInline
77154
enum CancelAction {
78-
case cancelAndCloseConnection(Context)
155+
case failPendingCommandsAndClose(Context, cancel: [PendingCommand], closeConnectionDueToCancel: [PendingCommand])
79156
case doNothing
80157
}
81158

82159
/// handler wants to send a command
83160
@usableFromInline
84-
mutating func cancel() -> CancelAction {
85-
switch self.state {
161+
mutating func cancel(requestID: Int) -> CancelAction {
162+
switch consume self.state {
86163
case .initializing:
87164
preconditionFailure("Cannot cancel when initializing")
88165
case .active(let state):
89-
self.state = .closed
90-
return .cancelAndCloseConnection(state.context)
166+
let (cancel, closeConnectionDueToCancel) = state.cancel(requestID: requestID)
167+
if cancel.count > 0 {
168+
self = .closed
169+
return .failPendingCommandsAndClose(
170+
state.context,
171+
cancel: cancel,
172+
closeConnectionDueToCancel: closeConnectionDueToCancel
173+
)
174+
} else {
175+
self = .active(state)
176+
return .doNothing
177+
}
91178
case .closing(let state):
92-
self.state = .closed
93-
return .cancelAndCloseConnection(state.context)
179+
let (cancel, closeConnectionDueToCancel) = state.cancel(requestID: requestID)
180+
if cancel.count > 0 {
181+
self = .closed
182+
return .failPendingCommandsAndClose(
183+
state.context,
184+
cancel: cancel,
185+
closeConnectionDueToCancel: closeConnectionDueToCancel
186+
)
187+
} else {
188+
self = .closing(state)
189+
return .doNothing
190+
}
94191
case .closed:
192+
self = .closed
95193
return .doNothing
96194
}
97195
}
98196

99197
@usableFromInline
100198
enum GracefulShutdownAction {
101199
case waitForPendingCommands(Context)
200+
case closeConnection(Context)
102201
case doNothing
103202
}
104203
/// Want to gracefully shutdown the handler
105204
@usableFromInline
106205
mutating func gracefulShutdown() -> GracefulShutdownAction {
107-
switch self.state {
206+
switch consume self.state {
108207
case .initializing:
109-
self.state = .closed
208+
self = .closed
110209
return .doNothing
111210
case .active(let state):
112-
self.state = .closing(ClosingState(context: state.context))
113-
return .waitForPendingCommands(state.context)
114-
case .closed, .closing:
211+
if state.pendingCommands.count > 0 {
212+
self = .closing(.init(context: state.context, pendingCommands: state.pendingCommands))
213+
return .waitForPendingCommands(state.context)
214+
} else {
215+
self = .closed
216+
return .closeConnection(state.context)
217+
}
218+
case .closing(let state):
219+
self = .closing(state)
220+
return .doNothing
221+
case .closed:
222+
self = .closed
115223
return .doNothing
116224
}
117225
}
118226

119227
@usableFromInline
120228
enum CloseAction {
121-
case close(Context)
229+
case failPendingCommandsAndClose(Context, Deque<PendingCommand>)
122230
case doNothing
123231
}
124232
/// Want to close the connection
125233
@usableFromInline
126234
mutating func close() -> CloseAction {
127-
switch self.state {
235+
switch consume self.state {
128236
case .initializing:
129-
self.state = .closed
237+
self = .closed
130238
return .doNothing
131239
case .active(let state):
132-
self.state = .closed
133-
return .close(state.context)
240+
self = .closed
241+
return .failPendingCommandsAndClose(state.context, state.pendingCommands)
134242
case .closing(let state):
135-
self.state = .closed
136-
return .close(state.context)
243+
self = .closed
244+
return .failPendingCommandsAndClose(state.context, state.pendingCommands)
137245
case .closed:
246+
self = .closed
138247
return .doNothing
139248
}
140249
}
141250

142251
@usableFromInline
143252
enum SetClosedAction {
144-
case failPendingCommands
253+
case failPendingCommandsAndSubscriptions(Deque<PendingCommand>)
145254
case doNothing
146255
}
147256

148257
/// The connection has been closed
149258
@usableFromInline
150259
mutating func setClosed() -> SetClosedAction {
151-
switch self.state {
260+
switch consume self.state {
152261
case .initializing:
153-
self.state = .closed
262+
self = .closed
154263
return .doNothing
155-
case .active, .closing:
156-
self.state = .closed
157-
return .failPendingCommands
264+
case .active(let state):
265+
self = .closed
266+
return .failPendingCommandsAndSubscriptions(state.pendingCommands)
267+
case .closing(let state):
268+
self = .closed
269+
return .failPendingCommandsAndSubscriptions(state.pendingCommands)
158270
case .closed:
271+
self = .closed
159272
return .doNothing
160273
}
161274
}
275+
276+
private static var initializing: Self {
277+
StateMachine(.initializing)
278+
}
279+
280+
private static func active(_ state: ActiveState) -> Self {
281+
StateMachine(.active(state))
282+
}
283+
284+
private static func closing(_ state: ActiveState) -> Self {
285+
StateMachine(.closing(state))
286+
}
287+
288+
private static var closed: Self {
289+
StateMachine(.closed)
290+
}
162291
}
163292
}

0 commit comments

Comments
 (0)