Skip to content

Add ValkeyClient.subscribe functions that uses one connection and.reconnects if it is lost #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Sources/Valkey/Connection/ValkeyConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import NIOTransportServices

/// A single connection to a Valkey database.
@available(valkeySwift 1.0, *)
public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
public final actor ValkeyConnection: ValkeyClientProtocol, Sendable, Identifiable {
nonisolated public let unownedExecutor: UnownedSerialExecutor

/// Request ID generator
Expand Down Expand Up @@ -367,15 +367,13 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
/// create a BSD sockets based bootstrap
private static func createSocketsBootstrap(eventLoopGroup: EventLoopGroup) -> ClientBootstrap {
ClientBootstrap(group: eventLoopGroup)
.channelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client doesn't need half closure, and it meant we were leaving connections open if the database was closed

}

#if canImport(Network)
/// create a NIOTransportServices bootstrap using Network.framework
private static func createTSBootstrap(eventLoopGroup: EventLoopGroup, tlsOptions: NWProtocolTLS.Options?) -> NIOTSConnectionBootstrap? {
guard
let bootstrap = NIOTSConnectionBootstrap(validatingGroup: eventLoopGroup)?
.channelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
let bootstrap = NIOTSConnectionBootstrap(validatingGroup: eventLoopGroup)
else {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Valkey/Documentation.docc/Pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ try await connection.clientTracking(

#### Subscribing to Invalidation Events

Once tracking is enabled you can subscribe to invalidation events using ``ValkeyConnection/subscribeKeyInvalidations(process:)``. The AsyncSequence passed to the `process` closure is a list of keys that have been invalidated.
Once tracking is enabled you can subscribe to invalidation events using ``ValkeyConnection/subscribeKeyInvalidations(isolation:process:)``. The AsyncSequence passed to the `process` closure is a list of keys that have been invalidated.

```swift
try await connection.subscribeKeyInvalidations { keys in
Expand Down
8 changes: 7 additions & 1 deletion Sources/Valkey/Node/ValkeyNodeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,15 @@ extension ValkeyNodeClient {
return try await operation(connection)
}

private func leaseConnection() async throws -> ValkeyConnection {
@usableFromInline
func leaseConnection() async throws -> ValkeyConnection {
try await self.connectionPool.leaseConnection()
}

@usableFromInline
func releaseConnection(_ connection: ValkeyConnection) {
self.connectionPool.releaseConnection(connection)
}
}

/// Extend ValkeyNode so we can call commands directly from it
Expand Down
193 changes: 193 additions & 0 deletions Sources/Valkey/Subscriptions/ValkeyClient+subscribe.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the valkey-swift open source project
//
// Copyright (c) 2025 the valkey-swift project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of valkey-swift project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore
import Synchronization

@available(valkeySwift 1.0, *)
extension ValkeyClient {
@inlinable
func withSubscriptionConnection<Value>(
isolation: isolated (any Actor)? = #isolation,
operation: (ValkeyConnection) async throws -> sending Value
) async throws -> sending Value {
try await self.subscriptionConnection.withValue {
try await operation($0)
} acquire: {
try await self.node.leaseConnection()
} release: {
self.node.releaseConnection($0)
}
}

/// Subscribe to list of channels and run closure with subscription
///
/// When the closure is exited the channels are automatically unsubscribed from. It is
/// possible to have multiple subscriptions running on the same connection and unsubscribe
/// commands will only be sent to Valkey when there are no subscriptions active for that
/// channel
///
/// - Parameters:
/// - channels: list of channels to subscribe to
/// - isolation: Actor isolation
/// - process: Closure that is called with subscription async sequence
/// - Returns: Return value of closure
@inlinable
public func subscribe<Value>(
to channels: String...,
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> Value {
try await self.subscribe(to: channels, process: process)
}

@inlinable
/// Subscribe to list of channels and run closure with subscription
///
/// When the closure is exited the channels are automatically unsubscribed from. It is
/// possible to have multiple subscriptions running on the same connection and unsubscribe
/// commands will only be sent to Valkey when there are no subscriptions active for that
/// channel
///
/// - Parameters:
/// - channels: list of channels to subscribe to
/// - isolation: Actor isolation
/// - process: Closure that is called with subscription async sequence
/// - Returns: Return value of closure
public func subscribe<Value>(
to channels: [String],
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> Value {
try await self.subscribe(
command: SUBSCRIBE(channels: channels),
filters: channels.map { .channel($0) },
process: process
)
}

/// Subscribe to list of channel patterns and run closure with subscription
///
/// When the closure is exited the patterns are automatically unsubscribed from. It is
/// possible to have multiple subscriptions running on the same connection and unsubscribe
/// commands will only be sent to Valkey when there are no subscriptions active for that
/// pattern
///
/// - Parameters:
/// - patterns: list of channel patterns to subscribe to
/// - isolation: Actor isolation
/// - process: Closure that is called with subscription async sequence
/// - Returns: Return value of closure
@inlinable
public func psubscribe<Value>(
to patterns: String...,
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> Value {
try await self.psubscribe(to: patterns, process: process)
}

/// Subscribe to list of pattern matching channels and run closure with subscription
///
/// When the closure is exited the patterns are automatically unsubscribed from. It is
/// possible to have multiple subscriptions running on the same connection and unsubscribe
/// commands will only be sent to Valkey when there are no subscriptions active for that
/// pattern
///
/// - Parameters:
/// - patterns: list of channel patterns to subscribe to
/// - isolation: Actor isolation
/// - process: Closure that is called with subscription async sequence
/// - Returns: Return value of closure
@inlinable
public func psubscribe<Value>(
to patterns: [String],
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> Value {
try await self.subscribe(
command: PSUBSCRIBE(patterns: patterns),
filters: patterns.map { .pattern($0) },
process: process
)
}

/// Subscribe to key invalidation channel required for client-side caching
///
/// See https://valkey.io/topics/client-side-caching/ for more details
///
/// When the closure is exited the channel is automatically unsubscribed from. It is
/// possible to have multiple subscriptions running on the same connection and unsubscribe
/// commands will only be sent to Valkey when there are no subscriptions active for that
/// channel
///
/// - Parameters:
/// - isolation: Actor isolation
/// - process: Closure that is called with async sequence of key invalidations
/// - Returns: Return value of closure
@inlinable
public func subscribeKeyInvalidations<Value>(
isolation: isolated (any Actor)? = #isolation,
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>) async throws -> sending Value
) async throws -> Value {
try await self.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in
let keys = subscription.map { ValkeyKey($0.message) }
return try await process(keys)
}
}

@inlinable
func subscribe<Value>(
command: some ValkeyCommand,
filters: [ValkeySubscriptionFilter],
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> Value {
try await withThrowingTaskGroup(of: Void.self, isolation: isolation) { group in
let (stream, cont) = ValkeySubscription.makeStream()
group.addTask {
while true {
do {
try Task.checkCancellation()
return try await self.withSubscriptionConnection { connection in
try await connection.subscribe(command: command, filters: filters) { subscription in
// push messages on connection subscription to client subscription
for try await message in subscription {
cont.yield(message)
}
}
cont.finish()
}
} catch let error as ValkeyClientError {
// if connection closes for some reason don't exit loop so it opens a new connection
switch error.errorCode {
case .connectionClosed, .connectionClosedDueToCancellation, .connectionClosing:
self.subscriptionConnection.reset()
break
default:
cont.finish(throwing: error)
return
}
} catch {
cont.finish(throwing: error)
return
}
}
}
let value = try await process(stream)
group.cancelAll()
return value
}
}
}
91 changes: 49 additions & 42 deletions Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,12 @@ extension ValkeyConnection {
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> sending Value {
let command = SUBSCRIBE(channels: channels)
let (id, stream) = try await subscribe(command: command, filters: channels.map { .channel($0) })
let value: Value
do {
value = try await process(stream)
try Task.checkCancellation()
} catch {
_ = try? await unsubscribe(id: id)
throw error
}
_ = try await unsubscribe(id: id)
return value
try await self.subscribe(
command: SUBSCRIBE(channels: channels),
filters: channels.map { .channel($0) },
isolation: isolation,
process: process
)
}

/// Subscribe to list of channel patterns and run closure with subscription
Expand Down Expand Up @@ -108,18 +102,12 @@ extension ValkeyConnection {
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> sending Value {
let command = PSUBSCRIBE(patterns: patterns)
let (id, stream) = try await subscribe(command: command, filters: patterns.map { .pattern($0) })
let value: Value
do {
value = try await process(stream)
try Task.checkCancellation()
} catch {
_ = try? await unsubscribe(id: id)
throw error
}
_ = try await unsubscribe(id: id)
return value
try await self.subscribe(
command: PSUBSCRIBE(patterns: patterns),
filters: patterns.map { .pattern($0) },
isolation: isolation,
process: process
)
}

/// Subscribe to list of shard channels and run closure with subscription
Expand All @@ -130,17 +118,17 @@ extension ValkeyConnection {
/// pattern
///
/// - Parameters:
/// - shardchannel: list of shard channels to subscribe to
/// - shardchannels: list of shard channels to subscribe to
/// - isolation: Actor isolation
/// - process: Closure that is called with subscription async sequence
/// - Returns: Return value of closure
@inlinable
public func ssubscribe<Value>(
to shardchannel: String...,
to shardchannels: String...,
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> sending Value {
try await self.ssubscribe(to: shardchannel, process: process)
try await self.ssubscribe(to: shardchannels, process: process)
}

/// Subscribe to list of shard channels and run closure with subscription
Expand All @@ -151,28 +139,22 @@ extension ValkeyConnection {
/// pattern
///
/// - Parameters:
/// - shardchannel: list of shard channels to subscribe to
/// - shardchannels: list of shard channels to subscribe to
/// - isolation: Actor isolation
/// - process: Closure that is called with subscription async sequence
/// - Returns: Return value of closure
@inlinable
public func ssubscribe<Value>(
to shardchannel: [String],
to shardchannels: [String],
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> sending Value {
let command = SSUBSCRIBE(shardchannels: shardchannel)
let (id, stream) = try await subscribe(command: command, filters: shardchannel.map { .shardChannel($0) })
let value: Value
do {
value = try await process(stream)
try Task.checkCancellation()
} catch {
_ = try? await unsubscribe(id: id)
throw error
}
_ = try await unsubscribe(id: id)
return value
try await self.subscribe(
command: SSUBSCRIBE(shardchannels: shardchannels),
filters: shardchannels.map { .shardChannel($0) },
isolation: isolation,
process: process
)
}

/// Subscribe to key invalidation channel required for client-side caching
Expand All @@ -185,18 +167,43 @@ extension ValkeyConnection {
/// channel
///
/// - Parameters:
/// - isolation: Actor isolation
/// - process: Closure that is called with async sequence of key invalidations
/// - Returns: Return value of closure
@inlinable
public func subscribeKeyInvalidations<Value>(
isolation: isolated (any Actor)? = #isolation,
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>) async throws -> sending Value
) async throws -> sending Value {
try await self.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in
try await self.subscribe(to: [ValkeySubscriptions.invalidateChannel], isolation: isolation) { subscription in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for a follow up: can we use CollectionOfOne here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this then requires some Collection through a whole series of functions. Can we do this in a separate PR

let keys = subscription.map { ValkeyKey($0.message) }
return try await process(keys)
}
}

@inlinable
func subscribe<Value>(
command: some ValkeyCommand,
filters: [ValkeySubscriptionFilter],
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> sending Value {
let (id, stream) = try await subscribe(command: command, filters: filters)
let value: Value
do {
value = try await process(stream)
try Task.checkCancellation()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it becomes borderline impossible to exit this closure without CancellationError, is this the behavior we want?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not totally sure what you mean here. Its up to the contents of the closure how they exit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the point with this check it isn't anymore.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw. we must make sure that the unsubscribe job runs outside of this task, as this will always close the connection otherwise. I think we'll need to do a

await Task {
  // cancellation shield
  self.unsubscribe()
}.get()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkCancellation was there to ensure a CancellationError was thrown if the Task was cancelled. AsyncStream doesn't throw an error when it is cancelled.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the unsubscribe into its own unstructured Task

} catch {
// call unsubscrobe to avoid it being cancelled
_ = await Task {
try await unsubscribe(id: id)
}.result
throw error
}
_ = try await unsubscribe(id: id)
return value
}

@usableFromInline
func subscribe(
command: some ValkeyCommand,
Expand Down
Loading
Loading