Skip to content

Make ValkeyClient go brrrr #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ extension ValkeyChannelHandler {
case .nio(let promise):
self = .connected(state)
return .waitForPromise(promise)
case .swift:
case .swift, .request:
preconditionFailure("Connected state cannot be setup with a Swift continuation")
}
case .active(let state):
Expand Down
37 changes: 37 additions & 0 deletions Sources/Valkey/Connection/ValkeyChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ import DequeModule
import Logging
import NIOCore

@available(valkeySwift 1.0, *)
@usableFromInline
enum ValkeyPromise<T: Sendable>: Sendable {
case nio(EventLoopPromise<T>)
case swift(CheckedContinuation<T, any Error>)
case request(ValkeyConnectionRequest<T>)

func succeed(_ t: T) {
switch self {
case .nio(let eventLoopPromise):
eventLoopPromise.succeed(t)
case .swift(let checkedContinuation):
checkedContinuation.resume(returning: t)
case .request(let request):
request.succeed(t)
}
}

Expand All @@ -36,10 +40,13 @@ enum ValkeyPromise<T: Sendable>: Sendable {
eventLoopPromise.fail(e)
case .swift(let checkedContinuation):
checkedContinuation.resume(throwing: e)
case .request(let request):
request.fail(e)
}
}
}

@available(valkeySwift 1.0, *)
@usableFromInline
enum ValkeyRequest: Sendable {
case single(buffer: ByteBuffer, promise: ValkeyPromise<RESPToken>, id: Int)
Expand Down Expand Up @@ -125,6 +132,36 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
self.logger = logger
}

/// Write valkey command/commands to channel
/// - Parameters:
/// - request: Valkey command request
/// - promise: Promise to fulfill when command is complete
@inlinable
func write<Command: ValkeyCommand>(command: Command, request: ValkeyConnectionRequest<RESPToken>) {
self.eventLoop.assertInEventLoop()
let deadline: NIODeadline =
command.isBlocking ? .now() + self.configuration.blockingCommandTimeout : .now() + self.configuration.commandTimeout
let pendingCommand = PendingCommand(
promise: .request(request),
requestID: request.id,
deadline: deadline
)
switch self.stateMachine.sendCommand(pendingCommand) {
case .sendCommand(let context):
self.encoder.reset()
command.encode(into: &self.encoder)
let buffer = self.encoder.buffer
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
if self.deadlineCallback == nil {
self.scheduleDeadlineCallback(deadline: deadline)
}

case .throwError(let error):
request.fail(error)
}
}


/// Write valkey command/commands to channel
/// - Parameters:
/// - request: Valkey command request
Expand Down
24 changes: 21 additions & 3 deletions Sources/Valkey/Connection/ValkeyConnection+ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,49 +29,58 @@ extension ValkeyConnection: PooledConnection {

/// Keep alive behavior for Valkey connection
@available(valkeySwift 1.0, *)
@usableFromInline
struct ValkeyKeepAliveBehavior: ConnectionKeepAliveBehavior {
@usableFromInline
let behavior: ValkeyClientConfiguration.KeepAliveBehavior?

init(_ behavior: ValkeyClientConfiguration.KeepAliveBehavior?) {
self.behavior = behavior
}

@inlinable
var keepAliveFrequency: Duration? {
self.behavior?.frequency
}

@inlinable
func runKeepAlive(for connection: ValkeyConnection) async throws {
_ = try await connection.ping()
}
}

/// Connection id generator for Valkey connection pool
@available(valkeySwift 1.0, *)
@usableFromInline
package final class ConnectionIDGenerator: ConnectionIDGeneratorProtocol {
static let globalGenerator = ConnectionIDGenerator()

private let atomic: Atomic<Int>
@usableFromInline
let atomic: Atomic<Int>

init() {
self.atomic = .init(0)
}

@inlinable
package func next() -> Int {
self.atomic.wrappingAdd(1, ordering: .relaxed).oldValue
}
}

/// Valkey client connection pool metrics
@available(valkeySwift 1.0, *)
@usableFromInline
final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate {
@usableFromInline
typealias ConnectionID = ValkeyConnection.ID

@usableFromInline
let logger: Logger

init(logger: Logger) {
self.logger = logger
}

@inlinable
func startedConnecting(id: ConnectionID) {
self.logger.debug(
"Creating new connection",
Expand All @@ -83,6 +92,7 @@ final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate {

/// A connection attempt failed with the given error. After some period of
/// time ``startedConnecting(id:)`` may be called again.
@inlinable
func connectFailed(id: ConnectionID, error: Error) {
self.logger.debug(
"Connection creation failed",
Expand Down Expand Up @@ -122,6 +132,7 @@ final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate {
)
}

@inlinable
func keepAliveTriggered(id: ConnectionID) {
self.logger.debug(
"run ping pong",
Expand All @@ -131,12 +142,15 @@ final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate {
)
}

@inlinable
func keepAliveSucceeded(id: ConnectionID) {}

@inlinable
func keepAliveFailed(id: ValkeyConnection.ID, error: Error) {}

/// The remote peer is quiescing the connection: no new streams will be created on it. The
/// connection will eventually be closed and removed from the pool.
@inlinable
func connectionClosing(id: ConnectionID) {
self.logger.debug(
"Close connection",
Expand All @@ -148,6 +162,7 @@ final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate {

/// The connection was closed. The connection may be established again in the future (notified
/// via ``startedConnecting(id:)``).
@inlinable
func connectionClosed(id: ConnectionID, error: Error?) {
self.logger.debug(
"Connection closed",
Expand All @@ -157,14 +172,17 @@ final class ValkeyClientMetrics: ConnectionPoolObservabilityDelegate {
)
}

@inlinable
func requestQueueDepthChanged(_ newDepth: Int) {

}

@inlinable
func connectSucceeded(id: ValkeyConnection.ID, streamCapacity: UInt16) {

}

@inlinable
func connectionUtilizationChanged(id: ValkeyConnection.ID, streamsUsed: UInt16, streamCapacity: UInt16) {

}
Expand Down
30 changes: 27 additions & 3 deletions Sources/Valkey/Connection/ValkeyConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
/// Logger used by Server
let logger: Logger
@usableFromInline
let channel: any Channel
nonisolated let executor: any NIOSerialEventLoopExecutor
@usableFromInline
nonisolated let channel: any Channel
@usableFromInline
let channelHandler: ValkeyChannelHandler
let configuration: ValkeyConnectionConfiguration
Expand All @@ -50,7 +52,11 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
configuration: ValkeyConnectionConfiguration,
logger: Logger
) {
self.unownedExecutor = channel.eventLoop.executor.asUnownedSerialExecutor()
guard let executor = channel.eventLoop as? any NIOSerialEventLoopExecutor else {
fatalError()
}
self.executor = executor
self.unownedExecutor = executor.asUnownedSerialExecutor()
self.channel = channel
self.channelHandler = channelHandler
self.configuration = configuration
Expand Down Expand Up @@ -172,6 +178,21 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
}
}

@inlinable
nonisolated func _write<Command: ValkeyCommand>(command: Command, request: ValkeyConnectionRequest<RESPToken>) {
if self.executor.inEventLoop {
self.assumeIsolated { connection in
connection.channelHandler.write(command: command, request: request)
}
} else {
self.executor.execute {
self.assumeIsolated { connection in
connection.channelHandler.write(command: command, request: request)
}
}
}
}

/// Pipeline a series of commands to Valkey connection
///
/// This function will only return once it has the results of all the commands sent
Expand Down Expand Up @@ -261,10 +282,13 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
}
}



let future: EventLoopFuture<Channel>
switch address.value {
case .hostname(let host, let port):
future = connect.connect(host: host, port: port)
let socketAddress = try! SocketAddress(ipAddress: host, port: port)
future = connect.connect(to: socketAddress)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can see why you did this, but can we keep it to another PR. It would be good to still support hostnames.

future.whenSuccess { _ in
logger.debug("Client connected to \(host):\(port)")
}
Expand Down
Loading
Loading