diff --git a/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift b/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift index 5832e47a..a4be3333 100644 --- a/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift +++ b/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift @@ -21,10 +21,37 @@ import Valkey @available(valkeySwift 1.0, *) func connectionBenchmarks() { + makeConnectionCreateAndDropBenchmark() makeConnectionGETBenchmark() makeConnectionPipelineBenchmark() } +@available(valkeySwift 1.0, *) +@discardableResult +func makeConnectionCreateAndDropBenchmark() -> Benchmark? { + let serverMutex = Mutex<(any Channel)?>(nil) + + return Benchmark("Connection: Create and drop benchmark", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in + let port = serverMutex.withLock { $0 }!.localAddress!.port! + let logger = Logger(label: "test") + for _ in benchmark.scaledIterations { + benchmark.startMeasurement() + try await ValkeyConnection.withConnection( + address: .hostname("127.0.0.1", port: port), + configuration: .init(), + logger: logger + ) { _ in + } + benchmark.stopMeasurement() + } + } setup: { + let server = try await makeLocalServer() + serverMutex.withLock { $0 = server } + } teardown: { + try await serverMutex.withLock { $0 }?.close().get() + } +} + @available(valkeySwift 1.0, *) @discardableResult func makeConnectionGETBenchmark() -> Benchmark? { diff --git a/Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift b/Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift index f8c04b42..7c70e01b 100644 --- a/Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift +++ b/Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift @@ -65,14 +65,8 @@ extension ValkeyChannelHandler { @usableFromInline struct ConnectedState { let context: Context - var pendingHelloCommand: PendingCommand - - func cancel(requestID: Int) -> PendingCommand? { - if pendingHelloCommand.requestID == requestID { - return pendingHelloCommand - } - return nil - } + let pendingHelloCommand: PendingCommand + var pendingCommands: Deque } init() { @@ -85,11 +79,11 @@ extension ValkeyChannelHandler { /// handler has become active @usableFromInline - mutating func setConnected(context: Context, pendingHelloCommand: PendingCommand) { + mutating func setConnected(context: Context, pendingHelloCommand: PendingCommand, pendingCommands: Deque) { switch consume self.state { case .initialized: self = .connected( - .init(context: context, pendingHelloCommand: pendingHelloCommand) + .init(context: context, pendingHelloCommand: pendingHelloCommand, pendingCommands: pendingCommands) ) case .connected: preconditionFailure("Cannot set connected state when state is connected") @@ -162,7 +156,7 @@ extension ValkeyChannelHandler { self = .closed(error) return .respondAndClose(state.pendingHelloCommand, error) default: - self = .active(.init(context: state.context, pendingCommands: .init())) + self = .active(.init(context: state.context, pendingCommands: state.pendingCommands)) return .respond(state.pendingHelloCommand, .cancel) } case .active(var state): @@ -204,8 +198,12 @@ extension ValkeyChannelHandler { self = .closed(nil) return .respondAndClose(command, nil) } - case .closed: - preconditionFailure("Cannot receive command on closed connection") + case .closed(let error): + guard let error else { + preconditionFailure("Cannot receive command on closed connection with no error") + } + self = .closed(error) + return .closeWithError(error) } } @@ -233,7 +231,7 @@ extension ValkeyChannelHandler { return .done case .closing(let state): self = .closing(state) - return .done + return .reportedClosed(nil) case .closed(let error): self = .closed(error) return .reportedClosed(error) @@ -255,7 +253,9 @@ extension ValkeyChannelHandler { case .connected(let state): if state.pendingHelloCommand.deadline <= now { self = .closed(ValkeyClientError(.timeout)) - return .failPendingCommandsAndClose(state.context, [state.pendingHelloCommand]) + var pendingCommands = state.pendingCommands + pendingCommands.prepend(state.pendingHelloCommand) + return .failPendingCommandsAndClose(state.context, pendingCommands) } else { self = .connected(state) return .reschedule(state.pendingHelloCommand.deadline) @@ -296,24 +296,14 @@ extension ValkeyChannelHandler { case doNothing } - /// handler wants to send a command + /// handler wants to cancel a command @usableFromInline mutating func cancel(requestID: Int) -> CancelAction { switch consume self.state { case .initialized: preconditionFailure("Cannot cancel when initialized") - case .connected(let state): - if let command = state.cancel(requestID: requestID) { - self = .closed(CancellationError()) - return .failPendingCommandsAndClose( - state.context, - cancel: [command], - closeConnectionDueToCancel: [] - ) - } else { - self = .connected(state) - return .doNothing - } + case .connected: + preconditionFailure("Cannot cancel while in connected state") case .active(let state): let (cancel, closeConnectionDueToCancel) = state.cancel(requestID: requestID) if cancel.count > 0 { @@ -360,7 +350,9 @@ extension ValkeyChannelHandler { self = .closed(nil) return .doNothing case .connected(let state): - self = .closing(.init(context: state.context, pendingCommands: [state.pendingHelloCommand])) + var pendingCommands = state.pendingCommands + pendingCommands.prepend(state.pendingHelloCommand) + self = .closing(.init(context: state.context, pendingCommands: pendingCommands)) return .waitForPendingCommands(state.context) case .active(let state): if state.pendingCommands.count > 0 { @@ -393,7 +385,9 @@ extension ValkeyChannelHandler { return .doNothing case .connected(let state): self = .closed(nil) - return .failPendingCommandsAndClose(state.context, [state.pendingHelloCommand]) + var pendingCommands = state.pendingCommands + pendingCommands.prepend(state.pendingHelloCommand) + return .failPendingCommandsAndClose(state.context, state.pendingCommands) case .active(let state): self = .closed(nil) return .failPendingCommandsAndClose(state.context, state.pendingCommands) @@ -421,7 +415,9 @@ extension ValkeyChannelHandler { return .doNothing case .connected(let state): self = .closed(nil) - return .failPendingCommandsAndSubscriptions([state.pendingHelloCommand]) + var pendingCommands = state.pendingCommands + pendingCommands.prepend(state.pendingHelloCommand) + return .failPendingCommandsAndSubscriptions(state.pendingCommands) case .active(let state): self = .closed(nil) return .failPendingCommandsAndSubscriptions(state.pendingCommands) diff --git a/Sources/Valkey/Connection/ValkeyChannelHandler.swift b/Sources/Valkey/Connection/ValkeyChannelHandler.swift index 3c9529d1..d230d0ca 100644 --- a/Sources/Valkey/Connection/ValkeyChannelHandler.swift +++ b/Sources/Valkey/Connection/ValkeyChannelHandler.swift @@ -159,33 +159,6 @@ final class ValkeyChannelHandler: ChannelInboundHandler { } } - /// Write valkey command/commands to channel - /// - Parameters: - /// - request: Valkey command request - /// - promise: Promise to fulfill when command is complete - @inlinable - func writeAndForget(command: Command, requestID: Int) { - self.eventLoop.assertInEventLoop() - let pendingCommand = PendingCommand( - promise: .forget, - requestID: requestID, - deadline: .now() + self.configuration.commandTimeout - ) - switch self.stateMachine.sendCommand(pendingCommand) { - case .sendCommand(let context): - self.encoder.reset() - command.encode(into: &self.encoder) - let buffer = self.encoder.buffer - context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil) - if self.deadlineCallback == nil { - self.scheduleDeadlineCallback(deadline: .now() + self.configuration.commandTimeout) - } - - case .throwError: - break - } - } - @usableFromInline func write(request: ValkeyRequest) { self.eventLoop.assertInEventLoop() @@ -307,25 +280,35 @@ final class ValkeyChannelHandler: ChannelInboundHandler { @usableFromInline func setConnected(context: ChannelHandlerContext) { // Send initial HELLO command - let command = HELLO( + let helloCommand = HELLO( arguments: .init( protover: 3, auth: configuration.authentication.map { .init(username: $0.username, password: $0.password) }, clientname: configuration.clientName ) ) + // set client info + let clientInfoLibName = CLIENT.SETINFO(attr: .libname(valkeySwiftLibraryName)) + let clientInfoLibVersion = CLIENT.SETINFO(attr: .libver(valkeySwiftLibraryVersion)) + self.encoder.reset() - command.encode(into: &self.encoder) - let buffer = self.encoder.buffer + helloCommand.encode(into: &self.encoder) + clientInfoLibName.encode(into: &self.encoder) + clientInfoLibVersion.encode(into: &self.encoder) let promise = eventLoop.makePromise(of: RESPToken.self) + let deadline = .now() + self.configuration.commandTimeout - context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil) - scheduleDeadlineCallback(deadline: deadline) + context.writeAndFlush(self.wrapOutboundOut(self.encoder.buffer), promise: nil) + self.scheduleDeadlineCallback(deadline: deadline) self.stateMachine.setConnected( context: context, - pendingHelloCommand: .init(promise: .nio(promise), requestID: 0, deadline: deadline) + pendingHelloCommand: .init(promise: .nio(promise), requestID: 0, deadline: deadline), + pendingCommands: [ + .init(promise: .forget, requestID: 0, deadline: deadline), // CLIENT.SETINFO libname + .init(promise: .forget, requestID: 0, deadline: deadline), // CLIENT.SETINFO libver + ] ) } diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index c4cbbbcd..b08de869 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -132,7 +132,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } let connection = try await future.get() - try await connection.initialHandshake() + try await connection.waitOnActive() return connection } @@ -144,10 +144,8 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { self.channel.close(mode: .all, promise: nil) } - func initialHandshake() async throws { + func waitOnActive() async throws { try await self.channelHandler.waitOnActive().get() - self.executeAndForget(command: CLIENT.SETINFO(attr: .libname(valkeySwiftLibraryName))) - self.executeAndForget(command: CLIENT.SETINFO(attr: .libver(valkeySwiftLibraryVersion))) } /// Send RESP command to Valkey connection @@ -174,10 +172,6 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } - func executeAndForget(command: Command) { - self.channelHandler.writeAndForget(command: command, requestID: Self.requestIDGenerator.next()) - } - /// Pipeline a series of commands to Valkey connection /// /// Once all the responses for the commands have been received the function returns diff --git a/Sources/Valkey/ValkeyConnectionFactory.swift b/Sources/Valkey/ValkeyConnectionFactory.swift index c83cd7aa..3a1ddd08 100644 --- a/Sources/Valkey/ValkeyConnectionFactory.swift +++ b/Sources/Valkey/ValkeyConnectionFactory.swift @@ -94,7 +94,7 @@ package final class ValkeyConnectionFactory: Sendable { logger: logger ) }.get() - try await connection.initialHandshake() + try await connection.waitOnActive() return connection } } diff --git a/Sources/Valkey/Version.swift b/Sources/Valkey/Version.swift index bc04f903..098788fe 100644 --- a/Sources/Valkey/Version.swift +++ b/Sources/Valkey/Version.swift @@ -12,5 +12,7 @@ // //===----------------------------------------------------------------------===// +/// library name reported to server using CLIENT SETINFO package let valkeySwiftLibraryName = "valkey-swift" +/// library version reported to server using CLIENT SETINFO package let valkeySwiftLibraryVersion = "0.1.0" diff --git a/Tests/ValkeyTests/Utils/NIOAsyncTestingChannel+hello.swift b/Tests/ValkeyTests/Utils/NIOAsyncTestingChannel+hello.swift index 09b20c70..65f0856b 100644 --- a/Tests/ValkeyTests/Utils/NIOAsyncTestingChannel+hello.swift +++ b/Tests/ValkeyTests/Utils/NIOAsyncTestingChannel+hello.swift @@ -21,7 +21,15 @@ import Testing extension NIOAsyncTestingChannel { func processHello() async throws { let hello = try await self.waitForOutboundWrite(as: ByteBuffer.self) - #expect(hello == RESPToken(.array([.bulkString("HELLO"), .bulkString("3")])).base) + var expectedBuffer = ByteBuffer() + expectedBuffer.writeImmutableBuffer(RESPToken(.array([.bulkString("HELLO"), .bulkString("3")])).base) + expectedBuffer.writeImmutableBuffer( + RESPToken(.array([.bulkString("CLIENT"), .bulkString("SETINFO"), .bulkString("lib-name"), .bulkString(valkeySwiftLibraryName)])).base + ) + expectedBuffer.writeImmutableBuffer( + RESPToken(.array([.bulkString("CLIENT"), .bulkString("SETINFO"), .bulkString("lib-ver"), .bulkString(valkeySwiftLibraryVersion)])).base + ) + #expect(hello == expectedBuffer) try await self.writeInbound( RESPToken( .map([ @@ -35,5 +43,7 @@ extension NIOAsyncTestingChannel { ]) ).base ) + try await self.writeInbound(RESPToken.ok.base) + try await self.writeInbound(RESPToken.ok.base) } } diff --git a/Tests/ValkeyTests/ValkeyChannelHandlerStateMachineTests.swift b/Tests/ValkeyTests/ValkeyChannelHandlerStateMachineTests.swift index 9a66c952..e7a21587 100644 --- a/Tests/ValkeyTests/ValkeyChannelHandlerStateMachineTests.swift +++ b/Tests/ValkeyTests/ValkeyChannelHandlerStateMachineTests.swift @@ -183,7 +183,12 @@ struct ValkeyChannelHandlerStateMachineTests { } expect( stateMachine.state - == .closing(.init(context: "testGracefulShutdown", pendingCommands: [.init(promise: .nio(promise), requestID: 23, deadline: .now())])) + == .closing( + .init( + context: "testGracefulShutdown", + pendingCommands: [.init(promise: .nio(promise), requestID: 23, deadline: .now())] + ) + ) ) switch stateMachine.receivedResponse(token: .ok) { case .respondAndClose(let command, let error): @@ -218,7 +223,10 @@ struct ValkeyChannelHandlerStateMachineTests { expect( stateMachine.state == .closing( - .init(context: "testClosedClosingState", pendingCommands: [.init(promise: .nio(promise), requestID: 17, deadline: .now())]) + .init( + context: "testClosedClosingState", + pendingCommands: [.init(promise: .nio(promise), requestID: 17, deadline: .now())] + ) ) ) switch stateMachine.setClosed() { @@ -460,7 +468,7 @@ extension ValkeyChannelHandler.StateMachine.State { case .connected(let lhs): switch rhs { case .connected(let rhs): - return lhs.context == rhs.context && lhs.pendingHelloCommand.requestID == rhs.pendingHelloCommand.requestID + return lhs.context == rhs.context && lhs.pendingCommands.map { $0.requestID } == rhs.pendingCommands.map { $0.requestID } default: return false } @@ -535,7 +543,8 @@ extension ValkeyChannelHandler.StateMachine { let promise = EmbeddedEventLoop().makePromise(of: RESPToken.self) self.setConnected( context: context, - pendingHelloCommand: .init(promise: .nio(promise), requestID: 0, deadline: .now() + .seconds(30)) + pendingHelloCommand: .init(promise: .nio(promise), requestID: 0, deadline: .now() + .seconds(30)), + pendingCommands: [] ) } diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index d6b91fb5..9b60d98d 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -46,8 +46,9 @@ struct ConnectionTests { let logger = Logger(label: "test") _ = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: .init(), logger: logger) - let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) - #expect(outbound == RESPToken(.command(["HELLO", "3"])).base) + var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + let hello3 = RESPToken(.command(["HELLO", "3"])).base + #expect(outbound.readSlice(length: hello3.readableBytes) == hello3) } @Test @@ -57,8 +58,9 @@ struct ConnectionTests { let logger = Logger(label: "test") _ = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: .init(), logger: logger) - let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) - #expect(outbound == RESPToken(.command(["HELLO", "3"])).base) + var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + let hello3 = RESPToken(.command(["HELLO", "3"])).base + #expect(outbound.readSlice(length: hello3.readableBytes) == hello3) await #expect(throws: ValkeyClientError(.commandError, message: "Not supported")) { try await channel.writeInbound(RESPToken(.bulkError("Not supported")).base) } @@ -79,8 +81,9 @@ struct ConnectionTests { logger: logger ) - let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) - #expect(outbound == RESPToken(.command(["HELLO", "3", "AUTH", "john", "smith"])).base) + var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + let hello3 = RESPToken(.command(["HELLO", "3", "AUTH", "john", "smith"])).base + #expect(outbound.readSlice(length: hello3.readableBytes) == hello3) } @Test @@ -95,8 +98,9 @@ struct ConnectionTests { logger: logger ) - let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) - #expect(outbound == RESPToken(.command(["HELLO", "3", "SETNAME", "Testing"])).base) + var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + let hello3 = RESPToken(.command(["HELLO", "3", "SETNAME", "Testing"])).base + #expect(outbound.readSlice(length: hello3.readableBytes) == hello3) } @Test