Skip to content

Commit a20e7cb

Browse files
authored
ValkeyClient.subscribe using withSubscriptionConnection (#195)
* ValkeyClient.subscribe using `withSubscriptionConnection` Signed-off-by: Adam Fowler <[email protected]> * Fix docs Signed-off-by: Adam Fowler <[email protected]> * sending Signed-off-by: Adam Fowler <[email protected]> * File license header Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]>
1 parent 479a286 commit a20e7cb

File tree

8 files changed

+370
-101
lines changed

8 files changed

+370
-101
lines changed

Sources/Valkey/Connection/ValkeyConnection+ConnectionPool.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ struct ValkeyKeepAliveBehavior: ConnectionKeepAliveBehavior {
4040

4141
/// Connection id generator for Valkey connection pool
4242
@available(valkeySwift 1.0, *)
43+
@usableFromInline
4344
package final class ConnectionIDGenerator: ConnectionIDGeneratorProtocol {
4445
static let globalGenerator = ConnectionIDGenerator()
4546

@@ -49,6 +50,7 @@ package final class ConnectionIDGenerator: ConnectionIDGeneratorProtocol {
4950
self.atomic = .init(0)
5051
}
5152

53+
@usableFromInline
5254
package func next() -> Int {
5355
self.atomic.wrappingAdd(1, ordering: .relaxed).oldValue
5456
}

Sources/Valkey/Documentation.docc/Pubsub.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ try await connection.clientTracking(
8585

8686
#### Subscribing to Invalidation Events
8787

88-
Once tracking is enabled you can subscribe to invalidation events using ``ValkeyConnection/subscribeKeyInvalidations(process:)``. The AsyncSequence passed to the `process` closure is a list of keys that have been invalidated.
88+
Once tracking is enabled you can subscribe to invalidation events using ``ValkeyConnection/subscribeKeyInvalidations(isolation:process:)``. The AsyncSequence passed to the `process` closure is a list of keys that have been invalidated.
8989

9090
```swift
9191
try await connection.subscribeKeyInvalidations { keys in

Sources/Valkey/Subscriptions/SubscriptionConnectionStateMachine.swift

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,32 +12,7 @@ import _ValkeyConnectionPool
1212

1313
@available(valkeySwift 1.0, *)
1414
extension ValkeyClient {
15-
/// Run operation with the valkey subscription connection
16-
///
17-
/// - Parameters:
18-
/// - isolation: Actor isolation
19-
/// - operation: Closure to run with subscription connection
2015
@usableFromInline
21-
func withSubscriptionConnection<Value>(
22-
isolation: isolated (any Actor)? = #isolation,
23-
_ operation: (ValkeyConnection) async throws -> sending Value
24-
) async throws -> sending Value {
25-
let id = self.subscriptionConnectionIDGenerator.next()
26-
27-
let connection = try await withTaskCancellationHandler {
28-
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<ValkeyConnection, Error>) in
29-
self.leaseSubscriptionConnection(id: id, request: cont)
30-
}
31-
} onCancel: {
32-
self.cancelSubscriptionConnection(id: id)
33-
}
34-
35-
defer {
36-
self.releaseSubscriptionConnection(id: id)
37-
}
38-
return try await operation(connection)
39-
}
40-
4116
func leaseSubscriptionConnection(id: Int, request: CheckedContinuation<ValkeyConnection, Error>) {
4217
self.logger.trace("Get subscription connection", metadata: ["valkey_subscription_connection_id": .stringConvertible(id)])
4318
enum LeaseAction {
@@ -95,6 +70,7 @@ extension ValkeyClient {
9570

9671
}
9772

73+
@usableFromInline
9874
func releaseSubscriptionConnection(id: Int) {
9975
self.logger.trace("Release subscription connection", metadata: ["valkey_subscription_connection_id": .stringConvertible(id)])
10076
let action = self.subscriptionConnectionStateMachine.withLock { stateMachine in
@@ -110,6 +86,7 @@ extension ValkeyClient {
11086

11187
}
11288

89+
@usableFromInline
11390
func cancelSubscriptionConnection(id: Int) {
11491
self.logger.trace("Cancel subscription connection", metadata: ["valkey_subscription_connection_id": .stringConvertible(id)])
11592
let action = self.subscriptionConnectionStateMachine.withLock { stateMachine in
@@ -290,6 +267,14 @@ struct SubscriptionConnectionStateMachine<Value, Request, ReleaseRequest>: ~Copy
290267
}
291268
}
292269

270+
func isEmpty() -> Bool {
271+
switch self.state {
272+
case .uninitialized: true
273+
case .acquiring: false
274+
case .acquired: false
275+
}
276+
}
277+
293278
static private func uninitialized(nextLeaseID: Int) -> Self { .init(state: .uninitialized(nextLeaseID: nextLeaseID)) }
294279
static private func acquiring(leaseID: Int, waiters: [Int: Request]) -> Self { .init(state: .acquiring(leaseID: leaseID, waiters: waiters)) }
295280
static private func acquired(_ state: State.AcquiredState) -> Self { .init(state: .acquired(state)) }
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
//
2+
// This source file is part of the valkey-swift project
3+
// Copyright (c) 2025 the valkey-swift project authors
4+
//
5+
// See LICENSE.txt for license information
6+
// SPDX-License-Identifier: Apache-2.0
7+
//
8+
import NIOCore
9+
import Synchronization
10+
11+
@available(valkeySwift 1.0, *)
12+
extension ValkeyClient {
13+
/// Run operation with the valkey subscription connection
14+
///
15+
/// - Parameters:
16+
/// - isolation: Actor isolation
17+
/// - operation: Closure to run with subscription connection
18+
@inlinable
19+
func withSubscriptionConnection<Value>(
20+
isolation: isolated (any Actor)? = #isolation,
21+
_ operation: (ValkeyConnection) async throws -> sending Value
22+
) async throws -> sending Value {
23+
let id = self.subscriptionConnectionIDGenerator.next()
24+
25+
let connection = try await withTaskCancellationHandler {
26+
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<ValkeyConnection, Error>) in
27+
self.leaseSubscriptionConnection(id: id, request: cont)
28+
}
29+
} onCancel: {
30+
self.cancelSubscriptionConnection(id: id)
31+
}
32+
33+
defer {
34+
self.releaseSubscriptionConnection(id: id)
35+
}
36+
return try await operation(connection)
37+
}
38+
39+
/// Subscribe to list of channels and run closure with subscription
40+
///
41+
/// When the closure is exited the channels are automatically unsubscribed from.
42+
///
43+
/// When running subscribe from `ValkeyClient` a single connection is used for
44+
/// all subscriptions.
45+
///
46+
/// - Parameters:
47+
/// - channels: list of channels to subscribe to
48+
/// - isolation: Actor isolation
49+
/// - process: Closure that is called with subscription async sequence
50+
/// - Returns: Return value of closure
51+
@inlinable
52+
public func subscribe<Value>(
53+
to channels: String...,
54+
isolation: isolated (any Actor)? = #isolation,
55+
process: (ValkeySubscription) async throws -> sending Value
56+
) async throws -> sending Value {
57+
try await self.subscribe(to: channels, process: process)
58+
}
59+
60+
@inlinable
61+
/// Subscribe to list of channels and run closure with subscription
62+
///
63+
/// When the closure is exited the channels are automatically unsubscribed from.
64+
///
65+
/// When running subscribe from `ValkeyClient` a single connection is used for
66+
/// all subscriptions.
67+
///
68+
/// - Parameters:
69+
/// - channels: list of channels to subscribe to
70+
/// - isolation: Actor isolation
71+
/// - process: Closure that is called with subscription async sequence
72+
/// - Returns: Return value of closure
73+
public func subscribe<Value>(
74+
to channels: [String],
75+
isolation: isolated (any Actor)? = #isolation,
76+
process: (ValkeySubscription) async throws -> sending Value
77+
) async throws -> sending Value {
78+
try await self.subscribe(
79+
command: SUBSCRIBE(channels: channels),
80+
filters: channels.map { .channel($0) },
81+
process: process
82+
)
83+
}
84+
85+
/// Subscribe to list of channel patterns and run closure with subscription
86+
///
87+
/// When the closure is exited the patterns are automatically unsubscribed from.
88+
///
89+
/// When running subscribe from `ValkeyClient` a single connection is used for
90+
/// all subscriptions.
91+
///
92+
/// - Parameters:
93+
/// - patterns: list of channel patterns to subscribe to
94+
/// - isolation: Actor isolation
95+
/// - process: Closure that is called with subscription async sequence
96+
/// - Returns: Return value of closure
97+
@inlinable
98+
public func psubscribe<Value>(
99+
to patterns: String...,
100+
isolation: isolated (any Actor)? = #isolation,
101+
process: (ValkeySubscription) async throws -> sending Value
102+
) async throws -> sending Value {
103+
try await self.psubscribe(to: patterns, process: process)
104+
}
105+
106+
/// Subscribe to list of pattern matching channels and run closure with subscription
107+
///
108+
/// When the closure is exited the patterns are automatically unsubscribed from.
109+
///
110+
/// When running subscribe from `ValkeyClient` a single connection is used for
111+
/// all subscriptions.
112+
///
113+
/// - Parameters:
114+
/// - patterns: list of channel patterns to subscribe to
115+
/// - isolation: Actor isolation
116+
/// - process: Closure that is called with subscription async sequence
117+
/// - Returns: Return value of closure
118+
@inlinable
119+
public func psubscribe<Value>(
120+
to patterns: [String],
121+
isolation: isolated (any Actor)? = #isolation,
122+
process: (ValkeySubscription) async throws -> sending Value
123+
) async throws -> sending Value {
124+
try await self.subscribe(
125+
command: PSUBSCRIBE(patterns: patterns),
126+
filters: patterns.map { .pattern($0) },
127+
process: process
128+
)
129+
}
130+
131+
/// Subscribe to key invalidation channel required for client-side caching
132+
///
133+
/// See https://valkey.io/topics/client-side-caching/ for more details. The `process`
134+
/// closure is provided with a stream of ValkeyKeys that have been invalidated and also
135+
/// the client id of the subscription connection to redirect client tracking messages to.
136+
///
137+
/// When the closure is exited the channel is automatically unsubscribed from.
138+
///
139+
/// When running subscribe from `ValkeyClient` a single connection is used for
140+
/// all subscriptions.
141+
///
142+
/// - Parameters:
143+
/// - isolation: Actor isolation
144+
/// - process: Closure that is called with async sequence of key invalidations and the client id
145+
/// of the connection the subscription is running on.
146+
/// - Returns: Return value of closure
147+
@inlinable
148+
public func subscribeKeyInvalidations<Value>(
149+
isolation: isolated (any Actor)? = #isolation,
150+
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>, Int) async throws -> sending Value
151+
) async throws -> sending Value {
152+
try await withSubscriptionConnection { connection in
153+
let id = try await connection.clientId()
154+
return try await connection.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in
155+
let keys = subscription.map { ValkeyKey($0.message) }
156+
return try await process(keys, id)
157+
}
158+
}
159+
}
160+
161+
@inlinable
162+
func subscribe<Value>(
163+
command: some ValkeyCommand,
164+
filters: [ValkeySubscriptionFilter],
165+
isolation: isolated (any Actor)? = #isolation,
166+
process: (ValkeySubscription) async throws -> sending Value
167+
) async throws -> sending Value {
168+
try await self.withSubscriptionConnection { connection in
169+
try await connection.subscribe(command: command, filters: filters, process: process)
170+
}
171+
}
172+
}

0 commit comments

Comments
 (0)