diff --git a/Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift b/Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift index ed559e53..203c788e 100644 --- a/Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift +++ b/Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift @@ -225,7 +225,7 @@ extension ValkeyChannelHandler { case .nio(let promise): self = .connected(state) return .waitForPromise(promise) - case .swift: + case .swift, .request: preconditionFailure("Connected state cannot be setup with a Swift continuation") } case .active(let state): diff --git a/Sources/Valkey/Connection/ValkeyChannelHandler.swift b/Sources/Valkey/Connection/ValkeyChannelHandler.swift index ad46444a..c71bbe5c 100644 --- a/Sources/Valkey/Connection/ValkeyChannelHandler.swift +++ b/Sources/Valkey/Connection/ValkeyChannelHandler.swift @@ -16,10 +16,12 @@ import DequeModule import Logging import NIOCore +@available(valkeySwift 1.0, *) @usableFromInline enum ValkeyPromise: Sendable { case nio(EventLoopPromise) case swift(CheckedContinuation) + case request(ValkeyConnectionRequest) func succeed(_ t: T) { switch self { @@ -27,6 +29,8 @@ enum ValkeyPromise: Sendable { eventLoopPromise.succeed(t) case .swift(let checkedContinuation): checkedContinuation.resume(returning: t) + case .request(let request): + request.succeed(t) } } @@ -36,10 +40,13 @@ enum ValkeyPromise: Sendable { eventLoopPromise.fail(e) case .swift(let checkedContinuation): checkedContinuation.resume(throwing: e) + case .request(let request): + request.fail(e) } } } +@available(valkeySwift 1.0, *) @usableFromInline enum ValkeyRequest: Sendable { case single(buffer: ByteBuffer, promise: ValkeyPromise, id: Int) @@ -125,6 +132,36 @@ final class ValkeyChannelHandler: ChannelInboundHandler { self.logger = logger } + /// Write valkey command/commands to channel + /// - Parameters: + /// - request: Valkey command request + /// - promise: Promise to fulfill when command is complete + @inlinable + func write(command: Command, request: ValkeyConnectionRequest) { + self.eventLoop.assertInEventLoop() + let deadline: NIODeadline = + command.isBlocking ? .now() + self.configuration.blockingCommandTimeout : .now() + self.configuration.commandTimeout + let pendingCommand = PendingCommand( + promise: .request(request), + requestID: request.id, + deadline: deadline + ) + switch self.stateMachine.sendCommand(pendingCommand) { + case .sendCommand(let context): + self.encoder.reset() + command.encode(into: &self.encoder) + let buffer = self.encoder.buffer + context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil) + if self.deadlineCallback == nil { + self.scheduleDeadlineCallback(deadline: deadline) + } + + case .throwError(let error): + request.fail(error) + } + } + + /// Write valkey command/commands to channel /// - Parameters: /// - request: Valkey command request diff --git a/Sources/Valkey/Connection/ValkeyConnection+ConnectionPool.swift b/Sources/Valkey/Connection/ValkeyConnection+ConnectionPool.swift index f65babc4..7e1ec842 100644 --- a/Sources/Valkey/Connection/ValkeyConnection+ConnectionPool.swift +++ b/Sources/Valkey/Connection/ValkeyConnection+ConnectionPool.swift @@ -29,17 +29,21 @@ extension ValkeyConnection: PooledConnection { /// Keep alive behavior for Valkey connection @available(valkeySwift 1.0, *) +@usableFromInline struct ValkeyKeepAliveBehavior: ConnectionKeepAliveBehavior { + @usableFromInline let behavior: ValkeyClientConfiguration.KeepAliveBehavior? init(_ behavior: ValkeyClientConfiguration.KeepAliveBehavior?) { self.behavior = behavior } + @inlinable var keepAliveFrequency: Duration? { self.behavior?.frequency } + @inlinable func runKeepAlive(for connection: ValkeyConnection) async throws { _ = try await connection.ping() } @@ -47,15 +51,17 @@ struct ValkeyKeepAliveBehavior: ConnectionKeepAliveBehavior { /// Connection id generator for Valkey connection pool @available(valkeySwift 1.0, *) +@usableFromInline package final class ConnectionIDGenerator: ConnectionIDGeneratorProtocol { static let globalGenerator = ConnectionIDGenerator() - - private let atomic: Atomic + @usableFromInline + let atomic: Atomic init() { self.atomic = .init(0) } + @inlinable package func next() -> Int { self.atomic.wrappingAdd(1, ordering: .relaxed).oldValue } @@ -63,15 +69,18 @@ package final class ConnectionIDGenerator: ConnectionIDGeneratorProtocol { /// Valkey client connection pool metrics @available(valkeySwift 1.0, *) +@usableFromInline final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate { + @usableFromInline typealias ConnectionID = ValkeyConnection.ID - + @usableFromInline let logger: Logger init(logger: Logger) { self.logger = logger } + @inlinable func startedConnecting(id: ConnectionID) { self.logger.debug( "Creating new connection", @@ -83,6 +92,7 @@ final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate { /// A connection attempt failed with the given error. After some period of /// time ``startedConnecting(id:)`` may be called again. + @inlinable func connectFailed(id: ConnectionID, error: Error) { self.logger.debug( "Connection creation failed", @@ -122,6 +132,7 @@ final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate { ) } + @inlinable func keepAliveTriggered(id: ConnectionID) { self.logger.debug( "run ping pong", @@ -131,12 +142,15 @@ final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate { ) } + @inlinable func keepAliveSucceeded(id: ConnectionID) {} + @inlinable func keepAliveFailed(id: ValkeyConnection.ID, error: Error) {} /// The remote peer is quiescing the connection: no new streams will be created on it. The /// connection will eventually be closed and removed from the pool. + @inlinable func connectionClosing(id: ConnectionID) { self.logger.debug( "Close connection", @@ -148,6 +162,7 @@ final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate { /// The connection was closed. The connection may be established again in the future (notified /// via ``startedConnecting(id:)``). + @inlinable func connectionClosed(id: ConnectionID, error: Error?) { self.logger.debug( "Connection closed", @@ -157,14 +172,17 @@ final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate { ) } + @inlinable func requestQueueDepthChanged(_ newDepth: Int) { } + @inlinable func connectSucceeded(id: ValkeyConnection.ID, streamCapacity: UInt16) { } + @inlinable func connectionUtilizationChanged(id: ValkeyConnection.ID, streamsUsed: UInt16, streamCapacity: UInt16) { } diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 152522c5..bb9daac0 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -36,7 +36,9 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable { /// Logger used by Server let logger: Logger @usableFromInline - let channel: any Channel + nonisolated let executor: any NIOSerialEventLoopExecutor + @usableFromInline + nonisolated let channel: any Channel @usableFromInline let channelHandler: ValkeyChannelHandler let configuration: ValkeyConnectionConfiguration @@ -50,7 +52,11 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable { configuration: ValkeyConnectionConfiguration, logger: Logger ) { - self.unownedExecutor = channel.eventLoop.executor.asUnownedSerialExecutor() + guard let executor = channel.eventLoop as? any NIOSerialEventLoopExecutor else { + fatalError() + } + self.executor = executor + self.unownedExecutor = executor.asUnownedSerialExecutor() self.channel = channel self.channelHandler = channelHandler self.configuration = configuration @@ -172,6 +178,21 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable { } } + @inlinable + nonisolated func _write(command: Command, request: ValkeyConnectionRequest) { + if self.executor.inEventLoop { + self.assumeIsolated { connection in + connection.channelHandler.write(command: command, request: request) + } + } else { + self.executor.execute { + self.assumeIsolated { connection in + connection.channelHandler.write(command: command, request: request) + } + } + } + } + /// Pipeline a series of commands to Valkey connection /// /// This function will only return once it has the results of all the commands sent @@ -261,10 +282,13 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable { } } + + let future: EventLoopFuture switch address.value { case .hostname(let host, let port): - future = connect.connect(host: host, port: port) + let socketAddress = try! SocketAddress(ipAddress: host, port: port) + future = connect.connect(to: socketAddress) future.whenSuccess { _ in logger.debug("Client connected to \(host):\(port)") } diff --git a/Sources/Valkey/ValkeyClient.swift b/Sources/Valkey/ValkeyClient.swift index dc23258a..ebaf2135 100644 --- a/Sources/Valkey/ValkeyClient.swift +++ b/Sources/Valkey/ValkeyClient.swift @@ -31,12 +31,13 @@ import ServiceLifecycle /// Supports TLS via both NIOSSL and Network framework. @available(valkeySwift 1.0, *) public final class ValkeyClient: Sendable { + @usableFromInline typealias Pool = ConnectionPool< ValkeyConnection, ValkeyConnection.ID, ConnectionIDGenerator, - ConnectionRequest, - ConnectionRequest.ID, + ValkeyConnectionRequest, + Int, ValkeyKeepAliveBehavior, ValkeyClientMetrics, ContinuousClock @@ -44,6 +45,7 @@ public final class ValkeyClient: Sendable { /// Server address let serverAddress: ValkeyServerAddress /// Connection pool + @usableFromInline let connectionPool: Pool let connectionFactory: ValkeyConnectionFactory @@ -56,7 +58,10 @@ public final class ValkeyClient: Sendable { /// running atomic let runningAtomic: Atomic - /// Creates a new Valkey client + @usableFromInline + let requestIDGenerator = IDGenerator() + + /// Initialize Valkey client /// /// - Parameters: /// - address: Valkey database address @@ -95,7 +100,7 @@ public final class ValkeyClient: Sendable { self.connectionPool = .init( configuration: poolConfiguration, idGenerator: connectionIDGenerator, - requestType: ConnectionRequest.self, + requestType: ValkeyConnectionRequest.self, keepAliveBehavior: .init(connectionFactory.configuration.keepAliveBehavior), observabilityDelegate: ValkeyClientMetrics(logger: logger), clock: .continuous @@ -148,19 +153,20 @@ extension ValkeyClient { isolation: isolated (any Actor)? = #isolation, operation: (ValkeyConnection) async throws -> sending Value ) async throws -> Value { - let connection = try await self.leaseConnection() - - defer { self.connectionPool.releaseConnection(connection) } - - return try await operation(connection) + fatalError() +// let connection = try await self.leaseConnection() +// +// defer { self.connectionPool.releaseConnection(connection) } +// +// return try await operation(connection) } - private func leaseConnection() async throws -> ValkeyConnection { - if !self.runningAtomic.load(ordering: .relaxed) { - self.logger.warning("Trying to lease connection from `ValkeyClient`, but `ValkeyClient.run()` hasn't been called yet.") - } - return try await self.connectionPool.leaseConnection() - } +// private func leaseConnection() async throws -> ValkeyConnection { +// if !self.runningAtomic.load(ordering: .relaxed) { +// self.logger.warning("Trying to lease connection from `ValkeyClient`, but `ValkeyClient.run()` hasn't been called yet.") +// } +// return try await self.connectionPool.leaseConnection() +// } } @@ -178,8 +184,17 @@ extension ValkeyClient: ValkeyConnectionProtocol { @inlinable func _send(_ command: Command) async throws -> RESPToken { - try await self.withConnection { connection in - try await connection._send(command: command) + let id = self.requestIDGenerator.next() + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let request = ValkeyConnectionRequest( + id: id, + pool: self.connectionPool, + continuation: continuation + ) { connection, request in + connection._write(command: command, request: request) + } + + self.connectionPool.leaseConnection(request) } } } @@ -209,3 +224,112 @@ extension ValkeyClient { @available(valkeySwift 1.0, *) extension ValkeyClient: Service {} #endif // ServiceLifecycle + +@available(valkeySwift 1.0, *) +@usableFromInline +enum RequestState: AtomicRepresentable, Sendable { + @usableFromInline + typealias AtomicRepresentation = Unmanaged? + + case waitingForConnection + case onConnection(ValkeyConnection) + + @usableFromInline + static func decodeAtomicRepresentation(_ storage: consuming Unmanaged?) -> RequestState { + if let storage { + return .onConnection(storage.takeRetainedValue()) + } else { + return .waitingForConnection + } + } + + @usableFromInline + static func encodeAtomicRepresentation(_ value: consuming RequestState) -> Unmanaged? { + switch value { + case .onConnection(let connection): + return Unmanaged.passRetained(connection) + case .waitingForConnection: + return nil + } + } +} + +@available(valkeySwift 1.0, *) +@usableFromInline +final class ValkeyConnectionRequest: Sendable, ConnectionRequestProtocol { + @usableFromInline + typealias Connection = ValkeyConnection + + @usableFromInline + let id: Int + @usableFromInline + let pool: ValkeyClient.Pool + @usableFromInline + let continuation: CheckedContinuation + @usableFromInline + let lock: Mutex + @usableFromInline + let onConnection: @Sendable (Connection, ValkeyConnectionRequest) -> () + + @inlinable + init( + id: Int, + pool: ValkeyClient.Pool, + continuation: CheckedContinuation, + _ onConnection: @escaping @Sendable (Connection, ValkeyConnectionRequest) -> () + ) where T == RESPToken { + self.id = id + self.pool = pool + self.continuation = continuation + self.onConnection = onConnection + self.lock = .init(.waitingForConnection) + } + + @inlinable + func complete(with result: Result) { + switch result { + case .success(let connection): + self.lock.withLock { state in + state = .onConnection(connection) + } + self.onConnection(connection, self) + case .failure(let error): + continuation.resume(throwing: error) + + } + } + + @inlinable + func succeed(_ t: T) { + self.continuation.resume(returning: t) + let connection = self.lock.withLock { state -> ValkeyConnection? in + switch state { + case .onConnection(let connection): + return connection + case .waitingForConnection: + return nil + } + } + if let connection { + self.pool.releaseConnection(connection, streams: 1) + } + } + + @inlinable + func fail(_ error: any Error) { + self.continuation.resume(throwing: error) + } + + func cancel() { + self.lock.withLock { state in + switch state { + case .onConnection(let connection): + connection.cancel(requestID: self.id) + + case .waitingForConnection: + self.pool.cancelLeaseConnection(self.id) + } + } + } + +}