Skip to content

Commit 817bb41

Browse files
committed
Add client subscribe that reconnects if lost connection
Signed-off-by: Adam Fowler <[email protected]>
1 parent cb2bca1 commit 817bb41

File tree

5 files changed

+281
-39
lines changed

5 files changed

+281
-39
lines changed

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -328,15 +328,13 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
328328
/// create a BSD sockets based bootstrap
329329
private static func createSocketsBootstrap(eventLoopGroup: EventLoopGroup) -> ClientBootstrap {
330330
ClientBootstrap(group: eventLoopGroup)
331-
.channelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
332331
}
333332

334333
#if canImport(Network)
335334
/// create a NIOTransportServices bootstrap using Network.framework
336335
private static func createTSBootstrap(eventLoopGroup: EventLoopGroup, tlsOptions: NWProtocolTLS.Options?) -> NIOTSConnectionBootstrap? {
337336
guard
338-
let bootstrap = NIOTSConnectionBootstrap(validatingGroup: eventLoopGroup)?
339-
.channelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
337+
let bootstrap = NIOTSConnectionBootstrap(validatingGroup: eventLoopGroup)
340338
else {
341339
return nil
342340
}
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the valkey-swift open source project
4+
//
5+
// Copyright (c) 2025 the valkey-swift project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of valkey-swift project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
@available(valkeySwift 1.0, *)
18+
extension ValkeyClient {
19+
/// Subscribe to list of channels and run closure with subscription
20+
///
21+
/// When the closure is exited the channels are automatically unsubscribed from. It is
22+
/// possible to have multiple subscriptions running on the same connection and unsubscribe
23+
/// commands will only be sent to Valkey when there are no subscriptions active for that
24+
/// channel
25+
///
26+
/// - Parameters:
27+
/// - channels: list of channels to subscribe to
28+
/// - isolation: Actor isolation
29+
/// - process: Closure that is called with subscription async sequence
30+
/// - Returns: Return value of closure
31+
@inlinable
32+
public func subscribe<Value>(
33+
to channels: String...,
34+
process: (ValkeySubscription) async throws -> sending Value
35+
) async throws -> Value {
36+
try await self.subscribe(to: channels, process: process)
37+
}
38+
39+
@inlinable
40+
/// Subscribe to list of channels and run closure with subscription
41+
///
42+
/// When the closure is exited the channels are automatically unsubscribed from. It is
43+
/// possible to have multiple subscriptions running on the same connection and unsubscribe
44+
/// commands will only be sent to Valkey when there are no subscriptions active for that
45+
/// channel
46+
///
47+
/// - Parameters:
48+
/// - channels: list of channels to subscribe to
49+
/// - isolation: Actor isolation
50+
/// - process: Closure that is called with subscription async sequence
51+
/// - Returns: Return value of closure
52+
public func subscribe<Value>(
53+
to channels: [String],
54+
process: (ValkeySubscription) async throws -> sending Value
55+
) async throws -> Value {
56+
try await self.subscribe(
57+
command: SUBSCRIBE(channel: channels),
58+
filters: channels.map { .channel($0) },
59+
process: process
60+
)
61+
}
62+
63+
/// Subscribe to list of channel patterns and run closure with subscription
64+
///
65+
/// When the closure is exited the patterns are automatically unsubscribed from. It is
66+
/// possible to have multiple subscriptions running on the same connection and unsubscribe
67+
/// commands will only be sent to Valkey when there are no subscriptions active for that
68+
/// pattern
69+
///
70+
/// - Parameters:
71+
/// - patterns: list of channel patterns to subscribe to
72+
/// - isolation: Actor isolation
73+
/// - process: Closure that is called with subscription async sequence
74+
/// - Returns: Return value of closure
75+
@inlinable
76+
public func psubscribe<Value>(
77+
to patterns: String...,
78+
process: (ValkeySubscription) async throws -> sending Value
79+
) async throws -> Value {
80+
try await self.psubscribe(to: patterns, process: process)
81+
}
82+
83+
/// Subscribe to list of pattern matching channels and run closure with subscription
84+
///
85+
/// When the closure is exited the patterns are automatically unsubscribed from. It is
86+
/// possible to have multiple subscriptions running on the same connection and unsubscribe
87+
/// commands will only be sent to Valkey when there are no subscriptions active for that
88+
/// pattern
89+
///
90+
/// - Parameters:
91+
/// - patterns: list of channel patterns to subscribe to
92+
/// - isolation: Actor isolation
93+
/// - process: Closure that is called with subscription async sequence
94+
/// - Returns: Return value of closure
95+
@inlinable
96+
public func psubscribe<Value>(
97+
to patterns: [String],
98+
process: (ValkeySubscription) async throws -> sending Value
99+
) async throws -> Value {
100+
try await self.subscribe(
101+
command: PSUBSCRIBE(pattern: patterns),
102+
filters: patterns.map { .pattern($0) },
103+
process: process
104+
)
105+
}
106+
107+
/// Subscribe to list of shard channels and run closure with subscription
108+
///
109+
/// When the closure is exited the shard channels are automatically unsubscribed from. It is
110+
/// possible to have multiple subscriptions running on the same connection and unsubscribe
111+
/// commands will only be sent to Valkey when there are no subscriptions active for that
112+
/// pattern
113+
///
114+
/// - Parameters:
115+
/// - shardchannel: list of shard channels to subscribe to
116+
/// - isolation: Actor isolation
117+
/// - process: Closure that is called with subscription async sequence
118+
/// - Returns: Return value of closure
119+
@inlinable
120+
public func ssubscribe<Value>(
121+
to shardchannel: String...,
122+
process: (ValkeySubscription) async throws -> sending Value
123+
) async throws -> Value {
124+
try await self.ssubscribe(to: shardchannel, process: process)
125+
}
126+
127+
/// Subscribe to list of shard channels and run closure with subscription
128+
///
129+
/// When the closure is exited the shard channels are automatically unsubscribed from. It is
130+
/// possible to have multiple subscriptions running on the same connection and unsubscribe
131+
/// commands will only be sent to Valkey when there are no subscriptions active for that
132+
/// pattern
133+
///
134+
/// - Parameters:
135+
/// - shardchannel: list of shard channels to subscribe to
136+
/// - isolation: Actor isolation
137+
/// - process: Closure that is called with subscription async sequence
138+
/// - Returns: Return value of closure
139+
@inlinable
140+
public func ssubscribe<Value>(
141+
to shardchannel: [String],
142+
process: (ValkeySubscription) async throws -> sending Value
143+
) async throws -> Value {
144+
try await self.subscribe(
145+
command: SSUBSCRIBE(shardchannel: shardchannel),
146+
filters: shardchannel.map { .shardChannel($0) },
147+
process: process
148+
)
149+
}
150+
151+
/// Subscribe to key invalidation channel required for client-side caching
152+
///
153+
/// See https://valkey.io/topics/client-side-caching/ for more details
154+
///
155+
/// When the closure is exited the channel is automatically unsubscribed from. It is
156+
/// possible to have multiple subscriptions running on the same connection and unsubscribe
157+
/// commands will only be sent to Valkey when there are no subscriptions active for that
158+
/// channel
159+
///
160+
/// - Parameters:
161+
/// - process: Closure that is called with async sequence of key invalidations
162+
/// - Returns: Return value of closure
163+
@inlinable
164+
public func subscribeKeyInvalidations<Value>(
165+
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>) async throws -> sending Value
166+
) async throws -> Value {
167+
try await self.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in
168+
let keys = subscription.map { ValkeyKey($0.message) }
169+
return try await process(keys)
170+
}
171+
}
172+
173+
@inlinable
174+
func subscribe<Value>(
175+
command: some ValkeyCommand,
176+
filters: [ValkeySubscriptionFilter],
177+
isolation: isolated (any Actor)? = #isolation,
178+
process: (ValkeySubscription) async throws -> sending Value
179+
) async throws -> Value {
180+
try await withThrowingTaskGroup(of: Void.self, isolation: isolation) { group in
181+
let (stream, cont) = ValkeySubscription.makeStream()
182+
group.addTask {
183+
while true {
184+
do {
185+
try Task.checkCancellation()
186+
return try await self.withConnection { connection in
187+
try await connection.subscribe(command: command, filters: filters) { subscription in
188+
for try await message in subscription {
189+
cont.yield(message)
190+
}
191+
}
192+
cont.finish()
193+
}
194+
} catch let error as ValkeyClientError {
195+
switch error.errorCode {
196+
case .connectionClosed, .connectionClosedDueToCancellation, .connectionClosing:
197+
break
198+
default:
199+
cont.finish(throwing: error)
200+
return
201+
}
202+
} catch {
203+
cont.finish(throwing: error)
204+
return
205+
}
206+
}
207+
}
208+
let value = try await process(stream)
209+
group.cancelAll()
210+
return value
211+
}
212+
}
213+
}

Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,12 @@ extension ValkeyConnection {
5555
isolation: isolated (any Actor)? = #isolation,
5656
process: (ValkeySubscription) async throws -> sending Value
5757
) async throws -> sending Value {
58-
let command = SUBSCRIBE(channels: channels)
59-
let (id, stream) = try await subscribe(command: command, filters: channels.map { .channel($0) })
60-
let value: Value
61-
do {
62-
value = try await process(stream)
63-
try Task.checkCancellation()
64-
} catch {
65-
_ = try? await unsubscribe(id: id)
66-
throw error
67-
}
68-
_ = try await unsubscribe(id: id)
69-
return value
58+
try await self.subscribe(
59+
command: SUBSCRIBE(channels: channels),
60+
filters: channels.map { .channel($0) },
61+
isolation: isolation,
62+
process: process
63+
)
7064
}
7165

7266
/// Subscribe to list of channel patterns and run closure with subscription
@@ -108,18 +102,12 @@ extension ValkeyConnection {
108102
isolation: isolated (any Actor)? = #isolation,
109103
process: (ValkeySubscription) async throws -> sending Value
110104
) async throws -> sending Value {
111-
let command = PSUBSCRIBE(patterns: patterns)
112-
let (id, stream) = try await subscribe(command: command, filters: patterns.map { .pattern($0) })
113-
let value: Value
114-
do {
115-
value = try await process(stream)
116-
try Task.checkCancellation()
117-
} catch {
118-
_ = try? await unsubscribe(id: id)
119-
throw error
120-
}
121-
_ = try await unsubscribe(id: id)
122-
return value
105+
try await self.subscribe(
106+
command: PSUBSCRIBE(patterns: patterns),
107+
filters: patterns.map { .pattern($0) },
108+
isolation: isolation,
109+
process: process
110+
)
123111
}
124112

125113
/// Subscribe to list of shard channels and run closure with subscription
@@ -161,18 +149,12 @@ extension ValkeyConnection {
161149
isolation: isolated (any Actor)? = #isolation,
162150
process: (ValkeySubscription) async throws -> sending Value
163151
) async throws -> sending Value {
164-
let command = SSUBSCRIBE(shardchannels: shardchannel)
165-
let (id, stream) = try await subscribe(command: command, filters: shardchannel.map { .shardChannel($0) })
166-
let value: Value
167-
do {
168-
value = try await process(stream)
169-
try Task.checkCancellation()
170-
} catch {
171-
_ = try? await unsubscribe(id: id)
172-
throw error
173-
}
174-
_ = try await unsubscribe(id: id)
175-
return value
152+
try await self.subscribe(
153+
command: SSUBSCRIBE(shardchannels: shardchannel),
154+
filters: shardchannel.map { .shardChannel($0) },
155+
isolation: isolation,
156+
process: process
157+
)
176158
}
177159

178160
/// Subscribe to key invalidation channel required for client-side caching
@@ -197,6 +179,26 @@ extension ValkeyConnection {
197179
}
198180
}
199181

182+
@inlinable
183+
func subscribe<Value>(
184+
command: some ValkeyCommand,
185+
filters: [ValkeySubscriptionFilter],
186+
isolation: isolated (any Actor)? = #isolation,
187+
process: (ValkeySubscription) async throws -> sending Value
188+
) async throws -> sending Value {
189+
let (id, stream) = try await subscribe(command: command, filters: filters)
190+
let value: Value
191+
do {
192+
value = try await process(stream)
193+
try Task.checkCancellation()
194+
} catch {
195+
_ = try? await unsubscribe(id: id)
196+
throw error
197+
}
198+
_ = try await unsubscribe(id: id)
199+
return value
200+
}
201+
200202
@usableFromInline
201203
func subscribe(
202204
command: some ValkeyCommand,

Sources/Valkey/Subscriptions/ValkeySubscription.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@ public struct ValkeySubscriptionMessage: Sendable, Equatable {
3535
public struct ValkeySubscription: AsyncSequence, Sendable {
3636
public typealias Element = ValkeySubscriptionMessage
3737

38+
@usableFromInline
3839
typealias BaseAsyncSequence = AsyncThrowingStream<ValkeySubscriptionMessage, Error>
40+
@usableFromInline
3941
typealias Continuation = BaseAsyncSequence.Continuation
4042

4143
let base: BaseAsyncSequence
4244

45+
@usableFromInline
4346
static func makeStream() -> (Self, Self.Continuation) {
4447
let (stream, continuation) = BaseAsyncSequence.makeStream()
4548
return (.init(base: stream), continuation)

Tests/IntegrationTests/ValkeyTests.swift

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,32 @@ struct GeneratedCommands {
455455
}
456456
}
457457

458+
@Test
459+
@available(valkeySwift 1.0, *)
460+
func testClientSubscriptions() async throws {
461+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
462+
var logger = Logger(label: "Subscriptions")
463+
logger.logLevel = .trace
464+
try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in
465+
try await withThrowingTaskGroup(of: Void.self) { group in
466+
group.addTask {
467+
try await client.subscribe(to: "testSubscriptions") { subscription in
468+
cont.finish()
469+
var iterator = subscription.makeAsyncIterator()
470+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "hello" }
471+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "goodbye" }
472+
}
473+
}
474+
try await client.withConnection { connection in
475+
await stream.first { _ in true }
476+
_ = try await connection.publish(channel: "testSubscriptions", message: "hello")
477+
_ = try await connection.publish(channel: "testSubscriptions", message: "goodbye")
478+
}
479+
try await group.waitForAll()
480+
}
481+
}
482+
}
483+
458484
/// Test two different subscriptions to the same channel both receive messages and that when one ends the other still
459485
/// receives messages
460486
@Test

0 commit comments

Comments
 (0)