Skip to content

Commit f61c56b

Browse files
committed
Call unsubscribe from unstructured Task to avoid cancellation
Signed-off-by: Adam Fowler <[email protected]>
1 parent c6981ff commit f61c56b

File tree

3 files changed

+56
-29
lines changed

3 files changed

+56
-29
lines changed

Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,10 @@ extension ValkeyConnection {
194194
value = try await process(stream)
195195
try Task.checkCancellation()
196196
} catch {
197-
_ = try? await unsubscribe(id: id)
197+
// call unsubscrobe to avoid it being cancelled
198+
_ = await Task {
199+
try await unsubscribe(id: id)
200+
}.result
198201
throw error
199202
}
200203
_ = try await unsubscribe(id: id)

Tests/IntegrationTests/ValkeyTests.swift

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,30 @@ struct GeneratedCommands {
455455
}
456456
}
457457

458+
@Test
459+
@available(valkeySwift 1.0, *)
460+
func testCancelSubscription() async throws {
461+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
462+
var logger = Logger(label: "Subscriptions")
463+
logger.logLevel = .trace
464+
try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in
465+
await withThrowingTaskGroup(of: Void.self) { group in
466+
group.addTask {
467+
try await client.withConnection { connection in
468+
try await connection.subscribe(to: "testCancelSubscriptions") { subscription in
469+
cont.finish()
470+
for try await _ in subscription {
471+
}
472+
}
473+
#expect(await connection.isSubscriptionsEmpty())
474+
}
475+
}
476+
await stream.first { _ in true }
477+
group.cancelAll()
478+
}
479+
}
480+
}
481+
458482
@Test
459483
@available(valkeySwift 1.0, *)
460484
func testClientSubscriptions() async throws {
@@ -481,6 +505,27 @@ struct GeneratedCommands {
481505
}
482506
}
483507

508+
@Test
509+
@available(valkeySwift 1.0, *)
510+
func testClientCancelSubscription() async throws {
511+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
512+
var logger = Logger(label: "Subscriptions")
513+
logger.logLevel = .trace
514+
try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in
515+
await withThrowingTaskGroup(of: Void.self) { group in
516+
group.addTask {
517+
try await client.subscribe(to: "testCancelSubscriptions") { subscription in
518+
cont.finish()
519+
for try await _ in subscription {
520+
}
521+
}
522+
}
523+
await stream.first { _ in true }
524+
group.cancelAll()
525+
}
526+
}
527+
}
528+
484529
/// Test two different subscriptions to the same channel both receive messages and that when one ends the other still
485530
/// receives messages
486531
@Test
@@ -824,32 +869,4 @@ struct GeneratedCommands {
824869
}
825870
}
826871
}
827-
828-
@available(valkeySwift 1.0, *)
829-
@Test
830-
func testGEOPOS() async throws {
831-
var logger = Logger(label: "Valkey")
832-
logger.logLevel = .trace
833-
try await withValkeyConnection(.hostname(valkeyHostname, port: 6379), logger: logger) { connection in
834-
try await withKey(connection: connection) { key in
835-
let count = try await connection.geoadd(
836-
key,
837-
datas: [.init(longitude: 1.0, latitude: 53.0, member: "Edinburgh"), .init(longitude: 1.4, latitude: 53.5, member: "Glasgow")]
838-
)
839-
#expect(count == 2)
840-
let search = try await connection.geosearch(
841-
key,
842-
from: .fromlonlat(.init(longitude: 0.0, latitude: 53.0)),
843-
by: .circle(.init(radius: 10000, unit: .mi)),
844-
withcoord: true,
845-
withdist: true,
846-
withhash: true
847-
)
848-
print(search.map { $0.member })
849-
try print(search.map { try $0.attributes[0].decode(as: Double.self) })
850-
try print(search.map { try $0.attributes[1].decode(as: String.self) })
851-
try print(search.map { try $0.attributes[2].decode(as: GeoCoordinates.self) })
852-
}
853-
}
854-
}
855872
}

Tests/ValkeyTests/ValkeySubscriptionTests.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,9 +729,16 @@ struct SubscriptionTests {
729729
try await channel.writeInbound(RESPToken(.push([.bulkString("subscribe"), .bulkString("test"), .number(1)])).base)
730730
// push message
731731
try await channel.writeInbound(RESPToken(.push([.bulkString("message"), .bulkString("test"), .bulkString("Testing!")])).base)
732+
732733
}
733734
try await group.next()
734735
group.cancelAll()
736+
737+
// respond to unsubscribe after cancellation
738+
let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
739+
#expect(outbound == RESPToken(.command(["UNSUBSCRIBE", "test"])).base)
740+
// push unsubcribe
741+
try await channel.writeInbound(RESPToken(.push([.bulkString("unsubscribe"), .bulkString("test"), .number(1)])).base)
735742
}
736743
#expect(await connection.isSubscriptionsEmpty())
737744
}

0 commit comments

Comments
 (0)