Skip to content

Commit 5106ebf

Browse files
authored
Call CLIENT SETINFO on initialisation of connection (#170)
* Fire and forget CLIENT.SETINFO on connection initialization Signed-off-by: Adam Fowler <[email protected]> * ignore -> forget Signed-off-by: Adam Fowler <[email protected]> * Add version.swift with library version/name strings Signed-off-by: Adam Fowler <[email protected]> * Add test to check library name and version are being set Signed-off-by: Adam Fowler <[email protected]> * Add support for returning dummy value from CLIENT SETINFO command Moved non-custom command handling to channel handler Signed-off-by: Adam Fowler <[email protected]> * Add BenchmarkCommandHandler parameter to makeLocalServer Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]>
1 parent 079bf84 commit 5106ebf

File tree

7 files changed

+105
-22
lines changed

7 files changed

+105
-22
lines changed

Benchmarks/ValkeyBenchmarks/Shared.swift

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,29 +33,27 @@ let defaultMetrics: [BenchmarkMetric] =
3333
.throughput,
3434
]
3535

36-
func makeLocalServer() async throws -> Channel {
37-
struct GetHandler: BenchmarkCommandHandler {
38-
static let expectedCommand = RESPToken.Value.bulkString(ByteBuffer(string: "GET"))
39-
static let response = ByteBuffer(string: "$3\r\nBar\r\n")
40-
func handle(command: RESPToken.Value, parameters: RESPToken.Array.Iterator, write: (ByteBuffer) -> Void) {
41-
switch command {
42-
case Self.expectedCommand:
43-
write(Self.response)
44-
case .bulkString(ByteBuffer(string: "PING")):
45-
write(ByteBuffer(string: "$4\r\nPONG\r\n"))
46-
case .bulkString(let string):
47-
fatalError("Unexpected command: \(String(buffer: string))")
48-
default:
49-
fatalError("Unexpected value: \(command)")
50-
}
36+
struct BenchmarkGetHandler: BenchmarkCommandHandler {
37+
static let expectedCommand = RESPToken.Value.bulkString(ByteBuffer(string: "GET"))
38+
static let response = ByteBuffer(string: "$3\r\nBar\r\n")
39+
func handle(command: RESPToken.Value, parameters: RESPToken.Array.Iterator, write: (ByteBuffer) -> Void) {
40+
switch command {
41+
case Self.expectedCommand:
42+
write(Self.response)
43+
case .bulkString(let string):
44+
fatalError("Unexpected command: \(String(buffer: string))")
45+
default:
46+
fatalError("Unexpected value: \(command)")
5147
}
5248
}
53-
return try await ServerBootstrap(group: NIOSingletons.posixEventLoopGroup)
49+
}
50+
func makeLocalServer(commandHandler: some BenchmarkCommandHandler = BenchmarkGetHandler()) async throws -> Channel {
51+
try await ServerBootstrap(group: NIOSingletons.posixEventLoopGroup)
5452
.serverChannelOption(.socketOption(.so_reuseaddr), value: 1)
5553
.childChannelInitializer { channel in
5654
do {
5755
try channel.pipeline.syncOperations.addHandler(
58-
ValkeyServerChannelHandler(commandHandler: GetHandler())
56+
ValkeyServerChannelHandler(commandHandler: commandHandler)
5957
)
6058
return channel.eventLoop.makeSucceededVoidFuture()
6159
} catch {
@@ -66,7 +64,7 @@ func makeLocalServer() async throws -> Channel {
6664
.get()
6765
}
6866

69-
protocol BenchmarkCommandHandler {
67+
protocol BenchmarkCommandHandler: Sendable {
7068
func handle(command: RESPToken.Value, parameters: RESPToken.Array.Iterator, write: (ByteBuffer) -> Void)
7169
}
7270

@@ -78,6 +76,11 @@ final class ValkeyServerChannelHandler<Handler: BenchmarkCommandHandler>: Channe
7876
private var decoder = NIOSingleStepByteToMessageProcessor(RESPTokenDecoder())
7977
private let helloCommand = RESPToken.Value.bulkString(ByteBuffer(string: "HELLO"))
8078
private let helloResponse = ByteBuffer(string: "%1\r\n+server\r\n+fake\r\n")
79+
private let pingCommand = RESPToken.Value.bulkString(ByteBuffer(string: "PING"))
80+
private let pongResponse = ByteBuffer(string: "$4\r\nPONG\r\n")
81+
private let clientCommand = RESPToken.Value.bulkString(ByteBuffer(string: "CLIENT"))
82+
private let setInfoSubCommand = RESPToken.Value.bulkString(ByteBuffer(string: "SETINFO"))
83+
private let okResponse = ByteBuffer(string: "+2OK\r\n")
8184
private let commandHandler: Handler
8285

8386
init(commandHandler: Handler) {
@@ -102,6 +105,20 @@ final class ValkeyServerChannelHandler<Handler: BenchmarkCommandHandler>: Channe
102105
case helloCommand:
103106
context.writeAndFlush(self.wrapOutboundOut(helloResponse), promise: nil)
104107

108+
case pingCommand:
109+
context.writeAndFlush(self.wrapOutboundOut(pongResponse), promise: nil)
110+
111+
case clientCommand:
112+
var subCommandIterator = iterator
113+
switch subCommandIterator.next()?.value {
114+
case setInfoSubCommand:
115+
context.writeAndFlush(self.wrapOutboundOut(okResponse), promise: nil)
116+
default:
117+
commandHandler.handle(command: command, parameters: iterator) {
118+
context.writeAndFlush(self.wrapOutboundOut($0), promise: nil)
119+
}
120+
}
121+
105122
default:
106123
commandHandler.handle(command: command, parameters: iterator) {
107124
context.writeAndFlush(self.wrapOutboundOut($0), promise: nil)

Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ extension ValkeyChannelHandler {
225225
case .nio(let promise):
226226
self = .connected(state)
227227
return .waitForPromise(promise)
228-
case .swift:
228+
case .swift, .forget:
229229
preconditionFailure("Connected state cannot be setup with a Swift continuation")
230230
}
231231
case .active(let state):

Sources/Valkey/Connection/ValkeyChannelHandler.swift

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@ import NIOCore
2020
enum ValkeyPromise<T: Sendable>: Sendable {
2121
case nio(EventLoopPromise<T>)
2222
case swift(CheckedContinuation<T, any Error>)
23+
case forget
2324

2425
func succeed(_ t: T) {
2526
switch self {
2627
case .nio(let eventLoopPromise):
2728
eventLoopPromise.succeed(t)
2829
case .swift(let checkedContinuation):
2930
checkedContinuation.resume(returning: t)
31+
case .forget:
32+
break
3033
}
3134
}
3235

@@ -36,6 +39,8 @@ enum ValkeyPromise<T: Sendable>: Sendable {
3639
eventLoopPromise.fail(e)
3740
case .swift(let checkedContinuation):
3841
checkedContinuation.resume(throwing: e)
42+
case .forget:
43+
break
3944
}
4045
}
4146
}
@@ -154,6 +159,33 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
154159
}
155160
}
156161

162+
/// Write valkey command/commands to channel
163+
/// - Parameters:
164+
/// - request: Valkey command request
165+
/// - promise: Promise to fulfill when command is complete
166+
@inlinable
167+
func writeAndForget<Command: ValkeyCommand>(command: Command, requestID: Int) {
168+
self.eventLoop.assertInEventLoop()
169+
let pendingCommand = PendingCommand(
170+
promise: .forget,
171+
requestID: requestID,
172+
deadline: .now() + self.configuration.commandTimeout
173+
)
174+
switch self.stateMachine.sendCommand(pendingCommand) {
175+
case .sendCommand(let context):
176+
self.encoder.reset()
177+
command.encode(into: &self.encoder)
178+
let buffer = self.encoder.buffer
179+
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
180+
if self.deadlineCallback == nil {
181+
self.scheduleDeadlineCallback(deadline: .now() + self.configuration.commandTimeout)
182+
}
183+
184+
case .throwError:
185+
break
186+
}
187+
}
188+
157189
@usableFromInline
158190
func write(request: ValkeyRequest) {
159191
self.eventLoop.assertInEventLoop()

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
132132
}
133133
}
134134
let connection = try await future.get()
135-
try await connection.waitOnActive()
135+
try await connection.initialHandshake()
136136
return connection
137137
}
138138

@@ -144,8 +144,10 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
144144
self.channel.close(mode: .all, promise: nil)
145145
}
146146

147-
func waitOnActive() async throws {
147+
func initialHandshake() async throws {
148148
try await self.channelHandler.waitOnActive().get()
149+
self.executeAndForget(command: CLIENT.SETINFO(attr: .libname(valkeySwiftLibraryName)))
150+
self.executeAndForget(command: CLIENT.SETINFO(attr: .libver(valkeySwiftLibraryVersion)))
149151
}
150152

151153
/// Send RESP command to Valkey connection
@@ -172,6 +174,10 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
172174
}
173175
}
174176

177+
func executeAndForget<Command: ValkeyCommand>(command: Command) {
178+
self.channelHandler.writeAndForget(command: command, requestID: Self.requestIDGenerator.next())
179+
}
180+
175181
/// Pipeline a series of commands to Valkey connection
176182
///
177183
/// Once all the responses for the commands have been received the function returns

Sources/Valkey/ValkeyConnectionFactory.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ package final class ValkeyConnectionFactory: Sendable {
9494
logger: logger
9595
)
9696
}.get()
97-
try await connection.waitOnActive()
97+
try await connection.initialHandshake()
9898
return connection
9999
}
100100
}

Sources/Valkey/Version.swift

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the valkey-swift open source project
4+
//
5+
// Copyright (c) 2025 the valkey-swift project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of valkey-swift project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
package let valkeySwiftLibraryName = "valkey-swift"
16+
package let valkeySwiftLibraryVersion = "0.1.0"

Tests/IntegrationTests/ValkeyTests.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,4 +859,16 @@ struct GeneratedCommands {
859859
}
860860
}
861861
}
862+
863+
@available(valkeySwift 1.0, *)
864+
@Test
865+
func testClientInfo() async throws {
866+
var logger = Logger(label: "Valkey")
867+
logger.logLevel = .trace
868+
try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in
869+
let clients = try await String(buffer: client.clientList())
870+
#expect(clients.firstRange(of: "lib-name=\(valkeySwiftLibraryName)") != nil)
871+
#expect(clients.firstRange(of: "lib-ver=\(valkeySwiftLibraryVersion)") != nil)
872+
}
873+
}
862874
}

0 commit comments

Comments
 (0)