Skip to content

Commit c6981ff

Browse files
committed
isolation
Signed-off-by: Adam Fowler <[email protected]>
1 parent 4acc6c1 commit c6981ff

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

Sources/Valkey/Subscriptions/ValkeyClient+subscribe.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ extension ValkeyClient {
2525
///
2626
/// - Parameters:
2727
/// - channels: list of channels to subscribe to
28+
/// - isolation: Actor isolation
2829
/// - process: Closure that is called with subscription async sequence
2930
/// - Returns: Return value of closure
3031
@inlinable
3132
public func subscribe<Value>(
3233
to channels: String...,
34+
isolation: isolated (any Actor)? = #isolation,
3335
process: (ValkeySubscription) async throws -> sending Value
3436
) async throws -> Value {
3537
try await self.subscribe(to: channels, process: process)
@@ -45,10 +47,12 @@ extension ValkeyClient {
4547
///
4648
/// - Parameters:
4749
/// - channels: list of channels to subscribe to
50+
/// - isolation: Actor isolation
4851
/// - process: Closure that is called with subscription async sequence
4952
/// - Returns: Return value of closure
5053
public func subscribe<Value>(
5154
to channels: [String],
55+
isolation: isolated (any Actor)? = #isolation,
5256
process: (ValkeySubscription) async throws -> sending Value
5357
) async throws -> Value {
5458
try await self.subscribe(
@@ -67,11 +71,13 @@ extension ValkeyClient {
6771
///
6872
/// - Parameters:
6973
/// - patterns: list of channel patterns to subscribe to
74+
/// - isolation: Actor isolation
7075
/// - process: Closure that is called with subscription async sequence
7176
/// - Returns: Return value of closure
7277
@inlinable
7378
public func psubscribe<Value>(
7479
to patterns: String...,
80+
isolation: isolated (any Actor)? = #isolation,
7581
process: (ValkeySubscription) async throws -> sending Value
7682
) async throws -> Value {
7783
try await self.psubscribe(to: patterns, process: process)
@@ -86,11 +92,13 @@ extension ValkeyClient {
8692
///
8793
/// - Parameters:
8894
/// - patterns: list of channel patterns to subscribe to
95+
/// - isolation: Actor isolation
8996
/// - process: Closure that is called with subscription async sequence
9097
/// - Returns: Return value of closure
9198
@inlinable
9299
public func psubscribe<Value>(
93100
to patterns: [String],
101+
isolation: isolated (any Actor)? = #isolation,
94102
process: (ValkeySubscription) async throws -> sending Value
95103
) async throws -> Value {
96104
try await self.subscribe(
@@ -109,11 +117,13 @@ extension ValkeyClient {
109117
///
110118
/// - Parameters:
111119
/// - shardchannels: list of shard channels to subscribe to
120+
/// - isolation: Actor isolation
112121
/// - process: Closure that is called with subscription async sequence
113122
/// - Returns: Return value of closure
114123
@inlinable
115124
public func ssubscribe<Value>(
116125
to shardchannels: String...,
126+
isolation: isolated (any Actor)? = #isolation,
117127
process: (ValkeySubscription) async throws -> sending Value
118128
) async throws -> Value {
119129
try await self.ssubscribe(to: shardchannels, process: process)
@@ -128,11 +138,13 @@ extension ValkeyClient {
128138
///
129139
/// - Parameters:
130140
/// - shardchannels: list of shard channels to subscribe to
141+
/// - isolation: Actor isolation
131142
/// - process: Closure that is called with subscription async sequence
132143
/// - Returns: Return value of closure
133144
@inlinable
134145
public func ssubscribe<Value>(
135146
to shardchannels: [String],
147+
isolation: isolated (any Actor)? = #isolation,
136148
process: (ValkeySubscription) async throws -> sending Value
137149
) async throws -> Value {
138150
try await self.subscribe(
@@ -152,10 +164,12 @@ extension ValkeyClient {
152164
/// channel
153165
///
154166
/// - Parameters:
167+
/// - isolation: Actor isolation
155168
/// - process: Closure that is called with async sequence of key invalidations
156169
/// - Returns: Return value of closure
157170
@inlinable
158171
public func subscribeKeyInvalidations<Value>(
172+
isolation: isolated (any Actor)? = #isolation,
159173
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>) async throws -> sending Value
160174
) async throws -> Value {
161175
try await self.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in

Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,15 @@ extension ValkeyConnection {
167167
/// channel
168168
///
169169
/// - Parameters:
170+
/// - isolation: Actor isolation
170171
/// - process: Closure that is called with async sequence of key invalidations
171172
/// - Returns: Return value of closure
172173
@inlinable
173174
public func subscribeKeyInvalidations<Value>(
175+
isolation: isolated (any Actor)? = #isolation,
174176
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>) async throws -> sending Value
175177
) async throws -> sending Value {
176-
try await self.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in
178+
try await self.subscribe(to: [ValkeySubscriptions.invalidateChannel], isolation: isolation) { subscription in
177179
let keys = subscription.map { ValkeyKey($0.message) }
178180
return try await process(keys)
179181
}

0 commit comments

Comments
 (0)