Skip to content

Commit 411e724

Browse files
Rework connection creation (#121)
Co-authored-by: Fabian Fett <[email protected]>
1 parent 0e3e17f commit 411e724

11 files changed

+517
-49
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,14 @@ public final class ValkeyClusterClient: Sendable {
9898
/// - nodeDiscovery: A ``ValkeyNodeDiscovery`` service that discovers Valkey nodes for the client in the cluster.
9999
/// - eventLoopGroup: The event loop group used for handling connections. Defaults to the global singleton.
100100
/// - logger: A logger for recording internal events and diagnostic information.
101+
/// - connectionFactory: An overwrite to provide create your own underlying `Channel`s. Use this to wrap connections
102+
/// in other NIO protocols (like SSH).
101103
public init(
102104
clientConfiguration: ValkeyClientConfiguration,
103105
nodeDiscovery: some ValkeyNodeDiscovery,
104106
eventLoopGroup: EventLoopGroup = MultiThreadedEventLoopGroup.singleton,
105-
logger: Logger
107+
logger: Logger,
108+
connectionFactory: (@Sendable (ValkeyServerAddress, any EventLoop) async throws -> any Channel)? = nil
106109
) {
107110
self.logger = logger
108111

@@ -113,6 +116,10 @@ public final class ValkeyClusterClient: Sendable {
113116
let factory = ValkeyClientFactory(
114117
logger: logger,
115118
configuration: clientConfiguration,
119+
connectionFactory: ValkeyConnectionFactory(
120+
configuration: clientConfiguration,
121+
customHandler: connectionFactory
122+
),
116123
eventLoopGroup: eventLoopGroup
117124
)
118125

@@ -128,6 +135,8 @@ public final class ValkeyClusterClient: Sendable {
128135
self.nodeDiscovery = nodeDiscovery
129136
}
130137

138+
139+
131140
// MARK: - Public methods -
132141

133142
/// Sends a command to the appropriate node in the Valkey cluster and returns the response.
@@ -675,6 +684,7 @@ package struct ValkeyClientFactory: ValkeyNodeConnectionPoolFactory {
675684
var configuration: ValkeyClientConfiguration
676685
var eventLoopGroup: any EventLoopGroup
677686
let connectionIDGenerator = ConnectionIDGenerator()
687+
let connectionFactory: ValkeyConnectionFactory
678688

679689
/// Creates a new `ValkeyClientFactory` instance.
680690
///
@@ -685,10 +695,12 @@ package struct ValkeyClientFactory: ValkeyNodeConnectionPoolFactory {
685695
package init(
686696
logger: Logger,
687697
configuration: ValkeyClientConfiguration,
698+
connectionFactory: ValkeyConnectionFactory,
688699
eventLoopGroup: any EventLoopGroup
689700
) {
690701
self.logger = logger
691702
self.configuration = configuration
703+
self.connectionFactory = connectionFactory
692704
self.eventLoopGroup = eventLoopGroup
693705
}
694706

@@ -711,8 +723,8 @@ package struct ValkeyClientFactory: ValkeyNodeConnectionPoolFactory {
711723

712724
return ValkeyClient(
713725
serverAddress,
714-
configuration: self.configuration,
715726
connectionIDGenerator: self.connectionIDGenerator,
727+
connectionFactory: self.connectionFactory,
716728
eventLoopGroup: self.eventLoopGroup,
717729
logger: self.logger
718730
)

Sources/Valkey/Connection/ValkeyChannelHandler.swift

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ enum ValkeyRequest: Sendable {
5151
final class ValkeyChannelHandler: ChannelInboundHandler {
5252
@usableFromInline
5353
struct Configuration {
54-
let authentication: ValkeyClientConfiguration.Authentication?
54+
let authentication: ValkeyConnectionConfiguration.Authentication?
5555
@usableFromInline
56-
let connectionTimeout: TimeAmount
56+
let commandTimeout: TimeAmount
5757
@usableFromInline
5858
let blockingCommandTimeout: TimeAmount
5959
let clientName: String?
@@ -134,7 +134,7 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
134134
func write<Command: ValkeyCommand>(command: Command, continuation: CheckedContinuation<RESPToken, any Error>, requestID: Int) {
135135
self.eventLoop.assertInEventLoop()
136136
let deadline: NIODeadline =
137-
command.isBlocking ? .now() + self.configuration.blockingCommandTimeout : .now() + self.configuration.connectionTimeout
137+
command.isBlocking ? .now() + self.configuration.blockingCommandTimeout : .now() + self.configuration.commandTimeout
138138
let pendingCommand = PendingCommand(
139139
promise: .swift(continuation),
140140
requestID: requestID,
@@ -158,7 +158,7 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
158158
@usableFromInline
159159
func write(request: ValkeyRequest) {
160160
self.eventLoop.assertInEventLoop()
161-
let deadline = .now() + self.configuration.connectionTimeout
161+
let deadline = .now() + self.configuration.commandTimeout
162162
switch request {
163163
case .single(let buffer, let tokenPromise, let requestID):
164164
let pendingCommand = PendingCommand(promise: tokenPromise, requestID: requestID, deadline: deadline)
@@ -493,3 +493,15 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
493493
}
494494
}
495495
}
496+
497+
@available(valkeySwift 1.0, *)
498+
extension ValkeyChannelHandler.Configuration {
499+
init(_ other: ValkeyConnectionConfiguration) {
500+
self.init(
501+
authentication: other.authentication,
502+
commandTimeout: .init(other.commandTimeout),
503+
blockingCommandTimeout: .init(other.blockingCommandTimeout),
504+
clientName: other.clientName
505+
)
506+
}
507+
}

Sources/Valkey/Connection/ValkeyConnection+ConnectionPool.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ struct ValkeyKeepAliveBehavior: ConnectionKeepAliveBehavior {
4747

4848
/// Connection id generator for Valkey connection pool
4949
@available(valkeySwift 1.0, *)
50-
final class ConnectionIDGenerator: ConnectionIDGeneratorProtocol {
50+
package final class ConnectionIDGenerator: ConnectionIDGeneratorProtocol {
5151
static let globalGenerator = ConnectionIDGenerator()
5252

5353
private let atomic: Atomic<Int>
@@ -56,7 +56,7 @@ final class ConnectionIDGenerator: ConnectionIDGeneratorProtocol {
5656
self.atomic = .init(0)
5757
}
5858

59-
func next() -> Int {
59+
package func next() -> Int {
6060
self.atomic.wrappingAdd(1, ordering: .relaxed).oldValue
6161
}
6262
}

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
3939
let channel: any Channel
4040
@usableFromInline
4141
let channelHandler: ValkeyChannelHandler
42-
let configuration: ValkeyClientConfiguration
42+
let configuration: ValkeyConnectionConfiguration
4343
let isClosed: Atomic<Bool>
4444

4545
/// Initialize connection
46-
private init(
46+
init(
4747
channel: any Channel,
4848
connectionID: ID,
4949
channelHandler: ValkeyChannelHandler,
50-
configuration: ValkeyClientConfiguration,
50+
configuration: ValkeyConnectionConfiguration,
5151
logger: Logger
5252
) {
5353
self.unownedExecutor = channel.eventLoop.executor.asUnownedSerialExecutor()
@@ -60,7 +60,7 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
6060
}
6161

6262
/// Connect to Valkey database and return connection
63-
///
63+
///
6464
/// - Parameters:
6565
/// - address: Internet address of database
6666
/// - connectionID: Connection identifier
@@ -73,7 +73,7 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
7373
address: ValkeyServerAddress,
7474
connectionID: ID,
7575
name: String? = nil,
76-
configuration: ValkeyClientConfiguration,
76+
configuration: ValkeyConnectionConfiguration,
7777
eventLoop: EventLoop = MultiThreadedEventLoopGroup.singleton.any(),
7878
logger: Logger
7979
) async throws -> ValkeyConnection {
@@ -194,7 +194,7 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
194194
address: ValkeyServerAddress,
195195
connectionID: ID,
196196
eventLoop: EventLoop,
197-
configuration: ValkeyClientConfiguration,
197+
configuration: ValkeyConnectionConfiguration,
198198
clientName: String?,
199199
logger: Logger
200200
) -> EventLoopFuture<ValkeyConnection> {
@@ -218,7 +218,7 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
218218

219219
let connect = bootstrap.channelInitializer { channel in
220220
do {
221-
try self._setupChannel(channel, configuration: configuration, clientName: clientName, logger: logger)
221+
try self._setupChannel(channel, configuration: configuration, logger: logger)
222222
return eventLoop.makeSucceededVoidFuture()
223223
} catch {
224224
return eventLoop.makeFailedFuture(error)
@@ -253,26 +253,29 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
253253

254254
package static func setupChannelAndConnect(
255255
_ channel: any Channel,
256-
configuration: ValkeyClientConfiguration = .init(),
257-
clientName: String? = nil,
256+
configuration: ValkeyConnectionConfiguration = .init(),
258257
logger: Logger
259258
) async throws -> ValkeyConnection {
260259
if !channel.eventLoop.inEventLoop {
261260
return try await channel.eventLoop.flatSubmit {
262-
self._setupChannelAndConnect(channel, configuration: configuration, clientName: clientName, logger: logger)
261+
self._setupChannelAndConnect(channel, configuration: configuration, logger: logger)
263262
}.get()
264263
}
265-
return try await self._setupChannelAndConnect(channel, configuration: configuration, clientName: clientName, logger: logger).get()
264+
return try await self._setupChannelAndConnect(channel, configuration: configuration, logger: logger).get()
266265
}
267266

268267
private static func _setupChannelAndConnect(
269268
_ channel: any Channel,
270-
configuration: ValkeyClientConfiguration,
271-
clientName: String? = nil,
269+
tlsSetting: TLSSetting = .disable,
270+
configuration: ValkeyConnectionConfiguration,
272271
logger: Logger
273272
) -> EventLoopFuture<ValkeyConnection> {
274273
do {
275-
let handler = try self._setupChannel(channel, configuration: configuration, clientName: clientName, logger: logger)
274+
let handler = try self._setupChannel(
275+
channel,
276+
configuration: configuration,
277+
logger: logger
278+
)
276279
let connection = ValkeyConnection(
277280
channel: channel,
278281
connectionID: 0,
@@ -288,11 +291,16 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
288291
}
289292
}
290293

294+
@usableFromInline
295+
enum TLSSetting {
296+
case enable(NIOSSLContext, serverName: String?)
297+
case disable
298+
}
299+
291300
@discardableResult
292-
private static func _setupChannel(
301+
static func _setupChannel(
293302
_ channel: any Channel,
294-
configuration: ValkeyClientConfiguration,
295-
clientName: String?,
303+
configuration: ValkeyConnectionConfiguration,
296304
logger: Logger
297305
) throws -> ValkeyChannelHandler {
298306
channel.eventLoop.assertInEventLoop()
@@ -304,12 +312,7 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
304312
break
305313
}
306314
let valkeyChannelHandler = ValkeyChannelHandler(
307-
configuration: .init(
308-
authentication: configuration.authentication,
309-
connectionTimeout: .init(configuration.connectionTimeout),
310-
blockingCommandTimeout: .init(configuration.blockingCommandTimeout),
311-
clientName: clientName
312-
),
315+
configuration: .init(configuration),
313316
eventLoop: channel.eventLoop,
314317
logger: logger
315318
)
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the valkey-swift project
4+
//
5+
// Copyright (c) 2025 the valkey-swift authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See valkey-swift/CONTRIBUTORS.txt for the list of valkey-swift authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOSSL
16+
17+
/// A configuration object that defines how to connect to a Valkey server.
18+
///
19+
/// `ValkeyConnectionConfiguration` allows you to customize various aspects of the connection,
20+
/// including authentication credentials, timeouts, and TLS security settings.
21+
///
22+
/// Example usage:
23+
/// ```swift
24+
/// // Basic configuration
25+
/// let config = ValkeyConnectionConfiguration()
26+
///
27+
/// // Configuration with authentication
28+
/// let authConfig = ValkeyConnectionConfiguration(
29+
/// authentication: .init(username: "user", password: "pass"),
30+
/// commandTimeout: .seconds(60)
31+
/// )
32+
///
33+
/// // Configuration with TLS
34+
/// let sslContext = try NIOSSLContext(configuration: .makeClientConfiguration())
35+
/// let secureConfig = ValkeyConnectionConfiguration(
36+
/// authentication: .init(username: "user", password: "pass"),
37+
/// tls: .enable(sslContext, tlsServerName: "your-valkey-server.com")
38+
/// )
39+
/// ```
40+
public struct ValkeyConnectionConfiguration: Sendable {
41+
/// Configuration for TLS (Transport Layer Security) encryption.
42+
///
43+
/// This structure allows you to enable or disable encrypted connections to the Valkey server.
44+
/// When enabled, it requires an `NIOSSLContext` and optionally a server name for SNI (Server Name Indication).
45+
public struct TLS: Sendable {
46+
enum Base {
47+
case disable
48+
case enable(NIOSSLContext, String?)
49+
}
50+
let base: Base
51+
52+
/// Disables TLS for the connection.
53+
///
54+
/// Use this option when connecting to a Valkey server that doesn't require encryption.
55+
public static var disable: Self { .init(base: .disable) }
56+
57+
/// Enables TLS for the connection.
58+
///
59+
/// - Parameters:
60+
/// - sslContext: The SSL context used to establish the secure connection
61+
/// - tlsServerName: Optional server name for SNI (Server Name Indication)
62+
/// - Returns: A configured TLS instance
63+
public static func enable(_ sslContext: NIOSSLContext, tlsServerName: String?) throws -> Self {
64+
.init(base: .enable(sslContext, tlsServerName))
65+
}
66+
}
67+
68+
/// Authentication credentials for accessing a Valkey server.
69+
///
70+
/// Use this structure to provide username and password credentials when the server
71+
/// requires authentication for access.
72+
public struct Authentication: Sendable {
73+
/// The username for authentication
74+
public var username: String
75+
/// The password for authentication
76+
public var password: String
77+
78+
/// Creates a new authentication configuration.
79+
///
80+
/// - Parameters:
81+
/// - username: The username for server authentication
82+
/// - password: The password for server authentication
83+
public init(username: String, password: String) {
84+
self.username = username
85+
self.password = password
86+
}
87+
}
88+
89+
/// Optional authentication credentials for accessing the Valkey server.
90+
/// Set this property when connecting to a server that requires authentication.
91+
public var authentication: Authentication?
92+
93+
/// TLS configuration for the connection.
94+
/// Use `.disable` for unencrypted connections or `.enable(...)` for secure connections.
95+
public var tls: TLS
96+
97+
/// The maximum time to wait for a response to a command before considering the connection dead.
98+
///
99+
/// This timeout applies to all standard commands sent to the Valkey server.
100+
/// Default value is 30 seconds.
101+
public var commandTimeout: Duration
102+
103+
/// The maximum time to wait for a response to blocking commands.
104+
///
105+
/// This timeout applies specifically to blocking commands (like BLPOP, BRPOP, etc.)
106+
/// that may wait for conditions to be met before returning.
107+
/// Default value is 120 seconds.
108+
public var blockingCommandTimeout: Duration
109+
110+
/// The client name to identify this connection to the Valkey server.
111+
///
112+
/// When specified, this name will be sent to the server using the `HELLO` command
113+
/// during connection initialization. This can be useful for debugging and monitoring purposes,
114+
/// allowing you to identify different clients connected to the server.
115+
/// Default value is `nil` (no client name is set).
116+
public var clientName: String?
117+
118+
/// Creates a new Valkey connection configuration.
119+
///
120+
/// Use this initializer to create a configuration object that can be used to establish
121+
/// a connection to a Valkey server with the specified parameters.
122+
///
123+
/// - Parameters:
124+
/// - authentication: Optional credentials for accessing the Valkey server. Set to `nil` for unauthenticated access.
125+
/// - commandTimeout: Maximum time to wait for a response to standard commands. Defaults to 30 seconds.
126+
/// - blockingCommandTimeout: Maximum time to wait for a response to blocking commands. Defaults to 120 seconds.
127+
/// - tls: TLS configuration for secure connections. Defaults to `.disable` for unencrypted connections.
128+
/// - clientName: Optional name to identify this client connection on the server. Defaults to `nil`.
129+
public init(
130+
authentication: Authentication? = nil,
131+
commandTimeout: Duration = .seconds(30),
132+
blockingCommandTimeout: Duration = .seconds(120),
133+
tls: TLS = .disable,
134+
clientName: String? = nil
135+
) {
136+
self.authentication = authentication
137+
self.commandTimeout = commandTimeout
138+
self.blockingCommandTimeout = blockingCommandTimeout
139+
self.tls = tls
140+
self.clientName = clientName
141+
}
142+
}

0 commit comments

Comments
 (0)