Skip to content

Commit c7ab601

Browse files
authored
Add ValkeyConnection.withConnection() (#155)
* Add ValkeyConnection.withConnection() Replacing ValkeyConnection.connection() Signed-off-by: Adam Fowler <[email protected]> * Remove client name parameter Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]>
1 parent f276565 commit c7ab601

File tree

4 files changed

+65
-22
lines changed

4 files changed

+65
-22
lines changed

Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,18 @@ func makeConnectionGETBenchmark() -> Benchmark? {
3232
return Benchmark("Connection: GET benchmark", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in
3333
let port = serverMutex.withLock { $0 }!.localAddress!.port!
3434
let logger = Logger(label: "test")
35-
let connection = try await ValkeyConnection.connect(
35+
try await ValkeyConnection.withConnection(
3636
address: .hostname("127.0.0.1", port: port),
37-
connectionID: 1,
3837
configuration: .init(),
3938
logger: logger
40-
)
41-
42-
benchmark.startMeasurement()
43-
for _ in benchmark.scaledIterations {
44-
let foo = try await connection.get("foo")
45-
precondition(foo.map { String(buffer: $0) } == "Bar")
39+
) { connection in
40+
benchmark.startMeasurement()
41+
for _ in benchmark.scaledIterations {
42+
let foo = try await connection.get("foo")
43+
precondition(foo.map { String(buffer: $0) } == "Bar")
44+
}
45+
benchmark.stopMeasurement()
4646
}
47-
benchmark.stopMeasurement()
48-
49-
connection.close()
5047
} setup: {
5148
let server = try await makeLocalServer()
5249
serverMutex.withLock { $0 = server }

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,42 +59,74 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
5959
self.isClosed = .init(false)
6060
}
6161

62+
/// Connect to Valkey database and run operation using connection and then close
63+
/// connection.
64+
///
65+
/// To avoid the cost of acquiring the connection and then closing it, it is always
66+
/// preferable to use ``ValkeyClient/withConnection(isolation:operation:)`` which
67+
/// uses a persistent connection pool to provide connections to your Valkey database.
68+
///
69+
/// - Parameters:
70+
/// - address: Internet address of database
71+
/// - configuration: Configuration of Valkey connection
72+
/// - eventLoop: EventLoop to run connection on
73+
/// - logger: Logger for connection
74+
/// - isolation: Actor isolation
75+
/// - operation: Closure handling Valkey connection
76+
/// - Returns: Return value of operation closure
77+
public static func withConnection<Value>(
78+
address: ValkeyServerAddress,
79+
configuration: ValkeyConnectionConfiguration = .init(),
80+
eventLoop: EventLoop = MultiThreadedEventLoopGroup.singleton.any(),
81+
logger: Logger,
82+
isolation: isolated (any Actor)? = #isolation,
83+
operation: (ValkeyConnection) async throws -> sending Value
84+
) async throws -> sending Value {
85+
let connection = try await connect(
86+
address: address,
87+
connectionID: 0,
88+
configuration: configuration,
89+
eventLoop: eventLoop,
90+
logger: logger
91+
)
92+
defer {
93+
connection.close()
94+
}
95+
return try await operation(connection)
96+
}
97+
6298
/// Connect to Valkey database and return connection
6399
///
64100
/// - Parameters:
65101
/// - address: Internet address of database
66102
/// - connectionID: Connection identifier
67-
/// - name: Name of connection. The connection name is used in many of the `CLIENT` commands.
68103
/// - configuration: Configuration of Valkey connection
69104
/// - eventLoop: EventLoop to run connection on
70105
/// - logger: Logger for connection
71106
/// - Returns: ValkeyConnection
72-
public static func connect(
107+
static func connect(
73108
address: ValkeyServerAddress,
74109
connectionID: ID,
75-
name: String? = nil,
76110
configuration: ValkeyConnectionConfiguration,
77111
eventLoop: EventLoop = MultiThreadedEventLoopGroup.singleton.any(),
78112
logger: Logger
79113
) async throws -> ValkeyConnection {
80114
let future =
81115
if eventLoop.inEventLoop {
82-
self._makeClient(
116+
self._makeConnection(
83117
address: address,
84118
connectionID: connectionID,
85119
eventLoop: eventLoop,
86120
configuration: configuration,
87-
clientName: name,
88121
logger: logger
89122
)
90123
} else {
91124
eventLoop.flatSubmit {
92-
self._makeClient(
125+
self._makeConnection(
93126
address: address,
94127
connectionID: connectionID,
95128
eventLoop: eventLoop,
96129
configuration: configuration,
97-
clientName: name,
98130
logger: logger
99131
)
100132
}
@@ -195,12 +227,11 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
195227
}
196228

197229
/// Create Valkey connection and return channel connection is running on and the Valkey channel handler
198-
private static func _makeClient(
230+
private static func _makeConnection(
199231
address: ValkeyServerAddress,
200232
connectionID: ID,
201233
eventLoop: EventLoop,
202234
configuration: ValkeyConnectionConfiguration,
203-
clientName: String?,
204235
logger: Logger
205236
) -> EventLoopFuture<ValkeyConnection> {
206237
eventLoop.assertInEventLoop()

Sources/Valkey/Documentation.docc/Pipelining.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
Sending multiple commands at once without waiting for the response of each command
44

5-
## Overview
6-
75
Valkey pipelining is a technique for improving performance by issuing multiple commands at once without waiting for the response to each individual command. Pipelining not only reduces the latency cost of waiting for the result of each command it also reduces the cost to the server as it reduces I/O costs. Multiple commands can be read with a single syscall, and multiple results are delivered with a single syscall.
86

97
## Implementation

Tests/IntegrationTests/ValkeyTests.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ struct GeneratedCommands {
6868
}
6969
}
7070
}
71+
7172
@Test
7273
@available(valkeySwift 1.0, *)
7374
func testValkeyCommand() async throws {
@@ -95,6 +96,22 @@ struct GeneratedCommands {
9596
}
9697
}
9798

99+
@Test("Test ValkeyConnection.withConnection()")
100+
@available(valkeySwift 1.0, *)
101+
func testWithConnectionSetGet() async throws {
102+
var logger = Logger(label: "Valkey")
103+
logger.logLevel = .debug
104+
try await ValkeyConnection.withConnection(address: .hostname(valkeyHostname, port: 6379), logger: logger) { connection in
105+
try await withKey(connection: connection) { key in
106+
try await connection.set(key, value: "Hello")
107+
let response = try await connection.get(key).map { String(buffer: $0) }
108+
#expect(response == "Hello")
109+
let response2 = try await connection.get("sdf65fsdf").map { String(buffer: $0) }
110+
#expect(response2 == nil)
111+
}
112+
}
113+
}
114+
98115
@Test
99116
@available(valkeySwift 1.0, *)
100117
func testSetGet() async throws {

0 commit comments

Comments
 (0)