Skip to content

Commit 39d99dc

Browse files
committed
Call unsubscribe from unstructured Task to avoid cancellation
Signed-off-by: Adam Fowler <[email protected]>
1 parent 131170d commit 39d99dc

File tree

3 files changed

+56
-1
lines changed

3 files changed

+56
-1
lines changed

Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,10 @@ extension ValkeyConnection {
194194
value = try await process(stream)
195195
try Task.checkCancellation()
196196
} catch {
197-
_ = try? await unsubscribe(id: id)
197+
// call unsubscrobe to avoid it being cancelled
198+
_ = await Task {
199+
try await unsubscribe(id: id)
200+
}.result
198201
throw error
199202
}
200203
_ = try await unsubscribe(id: id)

Tests/IntegrationTests/ValkeyTests.swift

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,30 @@ struct GeneratedCommands {
468468
}
469469
}
470470

471+
@Test
472+
@available(valkeySwift 1.0, *)
473+
func testCancelSubscription() async throws {
474+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
475+
var logger = Logger(label: "Subscriptions")
476+
logger.logLevel = .trace
477+
try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in
478+
await withThrowingTaskGroup(of: Void.self) { group in
479+
group.addTask {
480+
try await client.withConnection { connection in
481+
try await connection.subscribe(to: "testCancelSubscriptions") { subscription in
482+
cont.finish()
483+
for try await _ in subscription {
484+
}
485+
}
486+
#expect(await connection.isSubscriptionsEmpty())
487+
}
488+
}
489+
await stream.first { _ in true }
490+
group.cancelAll()
491+
}
492+
}
493+
}
494+
471495
@Test
472496
@available(valkeySwift 1.0, *)
473497
func testClientSubscriptions() async throws {
@@ -494,6 +518,27 @@ struct GeneratedCommands {
494518
}
495519
}
496520

521+
@Test
522+
@available(valkeySwift 1.0, *)
523+
func testClientCancelSubscription() async throws {
524+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
525+
var logger = Logger(label: "Subscriptions")
526+
logger.logLevel = .trace
527+
try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in
528+
await withThrowingTaskGroup(of: Void.self) { group in
529+
group.addTask {
530+
try await client.subscribe(to: "testCancelSubscriptions") { subscription in
531+
cont.finish()
532+
for try await _ in subscription {
533+
}
534+
}
535+
}
536+
await stream.first { _ in true }
537+
group.cancelAll()
538+
}
539+
}
540+
}
541+
497542
/// Test two different subscriptions to the same channel both receive messages and that when one ends the other still
498543
/// receives messages
499544
@Test

Tests/ValkeyTests/ValkeySubscriptionTests.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,9 +729,16 @@ struct SubscriptionTests {
729729
try await channel.writeInbound(RESPToken(.push([.bulkString("subscribe"), .bulkString("test"), .number(1)])).base)
730730
// push message
731731
try await channel.writeInbound(RESPToken(.push([.bulkString("message"), .bulkString("test"), .bulkString("Testing!")])).base)
732+
732733
}
733734
try await group.next()
734735
group.cancelAll()
736+
737+
// respond to unsubscribe after cancellation
738+
let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
739+
#expect(outbound == RESPToken(.command(["UNSUBSCRIBE", "test"])).base)
740+
// push unsubcribe
741+
try await channel.writeInbound(RESPToken(.push([.bulkString("unsubscribe"), .bulkString("test"), .number(1)])).base)
735742
}
736743
#expect(await connection.isSubscriptionsEmpty())
737744
}

0 commit comments

Comments
 (0)