Skip to content

Commit 010cada

Browse files
committed
Call unsubscribe from unstructured Task to avoid cancellation
Signed-off-by: Adam Fowler <[email protected]>
1 parent 4162c2c commit 010cada

File tree

3 files changed

+56
-1
lines changed

3 files changed

+56
-1
lines changed

Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,10 @@ extension ValkeyConnection {
194194
value = try await process(stream)
195195
try Task.checkCancellation()
196196
} catch {
197-
_ = try? await unsubscribe(id: id)
197+
// call unsubscrobe to avoid it being cancelled
198+
_ = await Task {
199+
try await unsubscribe(id: id)
200+
}.result
198201
throw error
199202
}
200203
_ = try await unsubscribe(id: id)

Tests/IntegrationTests/ValkeyTests.swift

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,30 @@ struct GeneratedCommands {
488488
}
489489
}
490490

491+
@Test
492+
@available(valkeySwift 1.0, *)
493+
func testCancelSubscription() async throws {
494+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
495+
var logger = Logger(label: "Subscriptions")
496+
logger.logLevel = .trace
497+
try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in
498+
await withThrowingTaskGroup(of: Void.self) { group in
499+
group.addTask {
500+
try await client.withConnection { connection in
501+
try await connection.subscribe(to: "testCancelSubscriptions") { subscription in
502+
cont.finish()
503+
for try await _ in subscription {
504+
}
505+
}
506+
#expect(await connection.isSubscriptionsEmpty())
507+
}
508+
}
509+
await stream.first { _ in true }
510+
group.cancelAll()
511+
}
512+
}
513+
}
514+
491515
@Test
492516
@available(valkeySwift 1.0, *)
493517
func testClientSubscriptions() async throws {
@@ -514,6 +538,27 @@ struct GeneratedCommands {
514538
}
515539
}
516540

541+
@Test
542+
@available(valkeySwift 1.0, *)
543+
func testClientCancelSubscription() async throws {
544+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
545+
var logger = Logger(label: "Subscriptions")
546+
logger.logLevel = .trace
547+
try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in
548+
await withThrowingTaskGroup(of: Void.self) { group in
549+
group.addTask {
550+
try await client.subscribe(to: "testCancelSubscriptions") { subscription in
551+
cont.finish()
552+
for try await _ in subscription {
553+
}
554+
}
555+
}
556+
await stream.first { _ in true }
557+
group.cancelAll()
558+
}
559+
}
560+
}
561+
517562
/// Test two different subscriptions to the same channel both receive messages and that when one ends the other still
518563
/// receives messages
519564
@Test

Tests/ValkeyTests/ValkeySubscriptionTests.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,9 +729,16 @@ struct SubscriptionTests {
729729
try await channel.writeInbound(RESPToken(.push([.bulkString("subscribe"), .bulkString("test"), .number(1)])).base)
730730
// push message
731731
try await channel.writeInbound(RESPToken(.push([.bulkString("message"), .bulkString("test"), .bulkString("Testing!")])).base)
732+
732733
}
733734
try await group.next()
734735
group.cancelAll()
736+
737+
// respond to unsubscribe after cancellation
738+
let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
739+
#expect(outbound == RESPToken(.command(["UNSUBSCRIBE", "test"])).base)
740+
// push unsubcribe
741+
try await channel.writeInbound(RESPToken(.push([.bulkString("unsubscribe"), .bulkString("test"), .number(1)])).base)
735742
}
736743
#expect(await connection.isSubscriptionsEmpty())
737744
}

0 commit comments

Comments
 (0)