Skip to content

Commit f4ef3d8

Browse files
Connection timeout (#96)
* Add connection timeout * Don't add timeout for blocking commands * Cancel deadlineCallback on close * less than or equal, preconditions on empty commands list in closing state * HitDeadlineAction.doNothing -> HitDeadlineAction.clearCallback * Missed a file * Fix tests * Don't schedule callback for blocking commands * Add blockingCommandTimeout, add deadlineCallbackAction on receivedResponse * Add testBlockingCommandTimeout * Update Sources/Valkey/Connection/ValkeyChannelHandler.swift Co-authored-by: Fabian Fett <[email protected]> --------- Co-authored-by: Fabian Fett <[email protected]>
1 parent ebae84b commit f4ef3d8

File tree

7 files changed

+323
-26
lines changed

7 files changed

+323
-26
lines changed

Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,16 @@ extension ValkeyChannelHandler {
113113
}
114114
}
115115

116+
@usableFromInline
117+
enum DeadlineCallbackAction {
118+
case cancel
119+
case reschedule(NIODeadline)
120+
case doNothing
121+
}
122+
116123
@usableFromInline
117124
enum ReceivedResponseAction {
118-
case respond(PendingCommand)
125+
case respond(PendingCommand, DeadlineCallbackAction)
119126
case respondAndClose(PendingCommand)
120127
case closeWithError(Error)
121128
}
@@ -132,21 +139,83 @@ extension ValkeyChannelHandler {
132139
return .closeWithError(ValkeyClientError(.unsolicitedToken, message: "Received a token without having sent a command"))
133140
}
134141
self = .active(state)
135-
return .respond(command)
142+
let deadlineCallback: DeadlineCallbackAction =
143+
if let nextCommand = state.pendingCommands.first {
144+
if nextCommand.deadline < command.deadline {
145+
// if the next command has an earlier deadline than the current then reschedule the callback
146+
.reschedule(nextCommand.deadline)
147+
} else {
148+
// otherwise do nothing
149+
.doNothing
150+
}
151+
} else {
152+
// if there are no more commands cancel the callback
153+
.cancel
154+
}
155+
return .respond(command, deadlineCallback)
136156
case .closing(var state):
137157
guard let command = state.pendingCommands.popFirst() else {
138-
self = .closed
139-
return .closeWithError(ValkeyClientError(.unsolicitedToken, message: "Received a token without having sent a command"))
158+
preconditionFailure("Cannot be in closing state with no pending commands")
140159
}
141-
if state.pendingCommands.count == 0 {
160+
if let nextCommand = state.pendingCommands.first {
161+
self = .closing(state)
162+
let deadlineCallback: DeadlineCallbackAction =
163+
if nextCommand.deadline < command.deadline {
164+
// if the next command has an earlier deadline than the current then reschedule the callback
165+
.reschedule(nextCommand.deadline)
166+
} else {
167+
// otherwise do nothing
168+
.doNothing
169+
}
170+
return .respond(command, deadlineCallback)
171+
} else {
142172
self = .closed
143173
return .respondAndClose(command)
174+
}
175+
case .closed:
176+
preconditionFailure("Cannot receive command on closed connection")
177+
}
178+
}
179+
180+
@usableFromInline
181+
enum HitDeadlineAction {
182+
case failPendingCommandsAndClose(Context, Deque<PendingCommand>)
183+
case reschedule(NIODeadline)
184+
case clearCallback
185+
}
186+
187+
@usableFromInline
188+
mutating func hitDeadline(now: NIODeadline) -> HitDeadlineAction {
189+
switch consume self.state {
190+
case .initializing:
191+
preconditionFailure("Cannot cancel when initializing")
192+
case .active(let state):
193+
if let firstCommand = state.pendingCommands.first {
194+
if firstCommand.deadline <= now {
195+
self = .closed
196+
return .failPendingCommandsAndClose(state.context, state.pendingCommands)
197+
} else {
198+
self = .active(state)
199+
return .reschedule(firstCommand.deadline)
200+
}
201+
} else {
202+
self = .active(state)
203+
return .clearCallback
204+
}
205+
case .closing(let state):
206+
guard let firstCommand = state.pendingCommands.first else {
207+
preconditionFailure("Cannot be in closing state with no pending commands")
208+
}
209+
if firstCommand.deadline <= now {
210+
self = .closed
211+
return .failPendingCommandsAndClose(state.context, state.pendingCommands)
144212
} else {
145213
self = .closing(state)
146-
return .respond(command)
214+
return .reschedule(firstCommand.deadline)
147215
}
148216
case .closed:
149-
preconditionFailure("Cannot receive command on closed connection")
217+
self = .closed
218+
return .clearCallback
150219
}
151220
}
152221

Sources/Valkey/Connection/ValkeyChannelHandler.swift

Lines changed: 84 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,49 @@ enum ValkeyRequest: Sendable {
4848

4949
@usableFromInline
5050
final class ValkeyChannelHandler: ChannelInboundHandler {
51+
@usableFromInline
5152
struct Configuration {
5253
let authentication: ValkeyClientConfiguration.Authentication?
54+
@usableFromInline
55+
let connectionTimeout: TimeAmount
56+
@usableFromInline
57+
let blockingCommandTimeout: TimeAmount
5358
let clientName: String?
5459
}
5560
@usableFromInline
5661
struct PendingCommand {
5762
@usableFromInline
58-
internal init(promise: ValkeyPromise<RESPToken>, requestID: Int) {
63+
internal init(promise: ValkeyPromise<RESPToken>, requestID: Int, deadline: NIODeadline) {
5964
self.promise = promise
6065
self.requestID = requestID
66+
self.deadline = deadline
6167
}
6268

6369
var promise: ValkeyPromise<RESPToken>
6470
let requestID: Int
71+
let deadline: NIODeadline
6572
}
73+
74+
struct ValkeyDeadlineSchedule: NIOScheduledCallbackHandler {
75+
let channelHandler: NIOLoopBound<ValkeyChannelHandler>
76+
77+
func handleScheduledCallback(eventLoop: some NIOCore.EventLoop) {
78+
let channelHandler = self.channelHandler.value
79+
switch channelHandler.stateMachine.hitDeadline(now: .now()) {
80+
case .failPendingCommandsAndClose(let context, let commands):
81+
for command in commands {
82+
command.promise.fail(ValkeyClientError(.timeout))
83+
}
84+
channelHandler.closeSubscriptionsAndConnection(context: context, error: ValkeyClientError(.timeout))
85+
case .reschedule(let deadline):
86+
channelHandler.scheduleDeadlineCallback(deadline: deadline)
87+
case .clearCallback:
88+
channelHandler.deadlineCallback = nil
89+
break
90+
}
91+
}
92+
}
93+
6694
@usableFromInline
6795
typealias OutboundOut = ByteBuffer
6896
@usableFromInline
@@ -78,11 +106,16 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
78106
@usableFromInline
79107
/*private*/ var subscriptions: ValkeySubscriptions
80108

109+
@usableFromInline
110+
private(set) var deadlineCallback: NIOScheduledCallback?
111+
81112
private var decoder: NIOSingleStepByteToMessageProcessor<RESPTokenDecoder>
82113
private let logger: Logger
83114
private var isClosed = false
84-
private let configuration: Configuration
115+
@usableFromInline
116+
/* private*/ let configuration: Configuration
85117

118+
/// Initialize a ValkeyChannelHandler
86119
init(configuration: Configuration, eventLoop: EventLoop, logger: Logger) {
87120
self.configuration = configuration
88121
self.eventLoop = eventLoop
@@ -99,12 +132,22 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
99132
@inlinable
100133
func write<Command: ValkeyCommand>(command: Command, continuation: CheckedContinuation<RESPToken, any Error>, requestID: Int) {
101134
self.eventLoop.assertInEventLoop()
102-
switch self.stateMachine.sendCommand(PendingCommand(promise: .swift(continuation), requestID: requestID)) {
135+
let deadline: NIODeadline =
136+
command.isBlocking ? .now() + self.configuration.blockingCommandTimeout : .now() + self.configuration.connectionTimeout
137+
let pendingCommand = PendingCommand(
138+
promise: .swift(continuation),
139+
requestID: requestID,
140+
deadline: deadline
141+
)
142+
switch self.stateMachine.sendCommand(pendingCommand) {
103143
case .sendCommand(let context):
104144
self.encoder.reset()
105145
command.encode(into: &self.encoder)
106146
let buffer = self.encoder.buffer
107147
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
148+
if self.deadlineCallback == nil {
149+
self.scheduleDeadlineCallback(deadline: deadline)
150+
}
108151

109152
case .throwError(let error):
110153
continuation.resume(throwing: error)
@@ -114,21 +157,30 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
114157
@usableFromInline
115158
func write(request: ValkeyRequest) {
116159
self.eventLoop.assertInEventLoop()
160+
let deadline = .now() + self.configuration.connectionTimeout
117161
switch request {
118162
case .single(let buffer, let tokenPromise, let requestID):
119-
let pendingCommand = PendingCommand(promise: tokenPromise, requestID: requestID)
163+
let pendingCommand = PendingCommand(promise: tokenPromise, requestID: requestID, deadline: deadline)
120164
switch self.stateMachine.sendCommand(pendingCommand) {
121165
case .sendCommand(let context):
122166
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
167+
if self.deadlineCallback == nil {
168+
scheduleDeadlineCallback(deadline: deadline)
169+
}
123170
case .throwError(let error):
124171
tokenPromise.fail(error)
125172
}
126173

127174
case .multiple(let buffer, let tokenPromises, let requestID):
128-
let pendingCommands = tokenPromises.map { PendingCommand(promise: $0, requestID: requestID) }
175+
let pendingCommands = tokenPromises.map {
176+
PendingCommand(promise: $0, requestID: requestID, deadline: deadline)
177+
}
129178
switch self.stateMachine.sendCommands(pendingCommands) {
130179
case .sendCommand(let context):
131180
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
181+
if self.deadlineCallback == nil {
182+
scheduleDeadlineCallback(deadline: deadline)
183+
}
132184
case .throwError(let error):
133185
for promise in tokenPromises {
134186
promise.fail(error)
@@ -315,7 +367,8 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
315367
switch token.identifier {
316368
case .simpleError, .bulkError:
317369
switch self.stateMachine.receivedResponse() {
318-
case .respond(let command):
370+
case .respond(let command, let deadlineAction):
371+
self.processDeadlineCallbackAction(action: deadlineAction)
319372
command.promise.fail(ValkeyClientError(.commandError, message: token.errorString.map { String(buffer: $0) }))
320373
case .respondAndClose(let command):
321374
command.promise.fail(ValkeyClientError(.commandError, message: token.errorString.map { String(buffer: $0) }))
@@ -330,7 +383,8 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
330383
do {
331384
if try self.subscriptions.notify(token) == true {
332385
switch self.stateMachine.receivedResponse() {
333-
case .respond(let command):
386+
case .respond(let command, let deadlineAction):
387+
self.processDeadlineCallbackAction(action: deadlineAction)
334388
command.promise.succeed(Self.simpleOk)
335389
case .respondAndClose(let command):
336390
command.promise.succeed(Self.simpleOk)
@@ -356,7 +410,8 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
356410
.set,
357411
.attribute:
358412
switch self.stateMachine.receivedResponse() {
359-
case .respond(let command):
413+
case .respond(let command, let deadlineAction):
414+
self.processDeadlineCallbackAction(action: deadlineAction)
360415
command.promise.succeed(token)
361416
case .respondAndClose(let command):
362417
command.promise.succeed(token)
@@ -382,6 +437,26 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
382437
}
383438
}
384439

440+
@usableFromInline
441+
func scheduleDeadlineCallback(deadline: NIODeadline) {
442+
self.deadlineCallback = try? self.eventLoop.scheduleCallback(
443+
at: deadline,
444+
handler: ValkeyDeadlineSchedule(channelHandler: .init(self, eventLoop: self.eventLoop))
445+
)
446+
}
447+
448+
func processDeadlineCallbackAction(action: StateMachine<ChannelHandlerContext>.DeadlineCallbackAction) {
449+
switch action {
450+
case .cancel:
451+
self.deadlineCallback?.cancel()
452+
self.deadlineCallback = nil
453+
case .reschedule(let deadline):
454+
self.scheduleDeadlineCallback(deadline: deadline)
455+
case .doNothing:
456+
break
457+
}
458+
}
459+
385460
private func closeSubscriptionsAndConnection(context: ChannelHandlerContext, error: (any Error)? = nil) {
386461
if let error {
387462
context.fireErrorCaught(error)
@@ -411,6 +486,7 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
411486
command.promise.fail(ValkeyClientError.init(.connectionClosed))
412487
}
413488
self.subscriptions.close(error: ValkeyClientError.init(.connectionClosed))
489+
self.deadlineCallback?.cancel()
414490
case .doNothing:
415491
break
416492
}

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,12 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
303303
break
304304
}
305305
let valkeyChannelHandler = ValkeyChannelHandler(
306-
configuration: .init(authentication: configuration.authentication, clientName: clientName),
306+
configuration: .init(
307+
authentication: configuration.authentication,
308+
connectionTimeout: .init(configuration.connectionTimeout),
309+
blockingCommandTimeout: .init(configuration.blockingCommandTimeout),
310+
clientName: clientName
311+
),
307312
eventLoop: channel.eventLoop,
308313
logger: logger
309314
)

Sources/Valkey/ValkeyClientConfiguration.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,23 +62,35 @@ public struct ValkeyClientConfiguration: Sendable {
6262
public var connectionPool: ConnectionPoolConfiguration
6363
/// keep alive behavior
6464
public var keepAliveBehavior: KeepAliveBehavior
65+
/// dead connection timeout
66+
public var connectionTimeout: Duration
67+
/// global timeout for blocking commands
68+
public var blockingCommandTimeout: Duration
6569

6670
/// TLS setup
6771
public var tls: TLS
6872

6973
/// Initialize ValkeyClientConfiguration
7074
/// - Parameters
7175
/// - authentication: Authentication details
76+
/// - connectionPool: Connection pool configuration
77+
/// - keepAliveBehavior: Connection keep alive behavior
78+
/// - connectionTimeout: Timeout for connection response
79+
/// - blockingCommandTimeout: Blocking command response timeout
7280
/// - tlsConfiguration: TLS configuration
7381
public init(
7482
authentication: Authentication? = nil,
7583
connectionPool: ConnectionPoolConfiguration = .init(),
7684
keepAliveBehavior: KeepAliveBehavior = .init(),
85+
connectionTimeout: Duration = .seconds(30),
86+
blockingCommandTimeout: Duration = .seconds(120),
7787
tls: TLS = .disable
7888
) {
7989
self.authentication = authentication
8090
self.connectionPool = connectionPool
8191
self.keepAliveBehavior = keepAliveBehavior
92+
self.connectionTimeout = connectionTimeout
93+
self.blockingCommandTimeout = blockingCommandTimeout
8294
self.tls = tls
8395
}
8496
}

Sources/Valkey/ValkeyClientError.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public struct ValkeyClientError: Error, CustomStringConvertible, Equatable {
2525
case tokenDoesNotExist
2626
case cancelled
2727
case connectionClosedDueToCancellation
28+
case timeout
2829
}
2930

3031
fileprivate let value: _Internal
@@ -50,6 +51,8 @@ public struct ValkeyClientError: Error, CustomStringConvertible, Equatable {
5051
public static var cancelled: Self { .init(.cancelled) }
5152
/// Connection was closed because another command was cancelled
5253
public static var connectionClosedDueToCancellation: Self { .init(.connectionClosedDueToCancellation) }
54+
/// Connection was closed because it timed out
55+
public static var timeout: Self { .init(.timeout) }
5356
}
5457

5558
public let errorCode: ErrorCode
@@ -70,6 +73,7 @@ public struct ValkeyClientError: Error, CustomStringConvertible, Equatable {
7073
case .tokenDoesNotExist: self.message ?? "Expected token does not exist."
7174
case .cancelled: self.message ?? "Task was cancelled."
7275
case .connectionClosedDueToCancellation: self.message ?? "Connection was closed because another command was cancelled."
76+
case .timeout: self.message ?? "Connection was closed because it timed out."
7377
}
7478
}
7579
}

0 commit comments

Comments
 (0)