Skip to content

Commit ff8e1ce

Browse files
Subscription connection manager v2 (#194)
* Add SubscriptionConnection Manager Signed-off-by: Adam Fowler <[email protected]> * Get client to run subscription connection manager Signed-off-by: Adam Fowler <[email protected]> * Non-copyable subscription connection manager state Signed-off-by: Adam Fowler <[email protected]> * Comments Signed-off-by: Adam Fowler <[email protected]> * Only run lease in background Signed-off-by: Adam Fowler <[email protected]> * Rename to SubscriptionConnectionStateMachine.swift Signed-off-by: Adam Fowler <[email protected]> * Update Sources/Valkey/Subscriptions/SubscriptionConnectionStateMachine.swift Co-authored-by: Fabian Fett <[email protected]> Signed-off-by: Adam Fowler <[email protected]> * Use withConnection Signed-off-by: Adam Fowler <[email protected]> * Remove temp code from test Signed-off-by: Adam Fowler <[email protected]> * Allow required to immediately release the connection Signed-off-by: Adam Fowler <[email protected]> * Add leaseID Signed-off-by: Adam Fowler <[email protected]> * Resume continuations outside locks Signed-off-by: Adam Fowler <[email protected]> * swift format fix Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]> Co-authored-by: Fabian Fett <[email protected]>
1 parent eab8f7c commit ff8e1ce

File tree

3 files changed

+569
-13
lines changed

3 files changed

+569
-13
lines changed
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the valkey-swift project
4+
//
5+
// Copyright (c) 2025 the valkey-swift authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See valkey-swift/CONTRIBUTORS.txt for the list of valkey-swift authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import Synchronization
17+
import _ValkeyConnectionPool
18+
19+
@available(valkeySwift 1.0, *)
20+
extension ValkeyClient {
21+
/// Run operation with the valkey subscription connection
22+
///
23+
/// - Parameters:
24+
/// - isolation: Actor isolation
25+
/// - operation: Closure to run with subscription connection
26+
@usableFromInline
27+
func withSubscriptionConnection<Value>(
28+
isolation: isolated (any Actor)? = #isolation,
29+
_ operation: (ValkeyConnection) async throws -> sending Value
30+
) async throws -> sending Value {
31+
let id = self.subscriptionConnectionIDGenerator.next()
32+
33+
let connection = try await withTaskCancellationHandler {
34+
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<ValkeyConnection, Error>) in
35+
self.leaseSubscriptionConnection(id: id, request: cont)
36+
}
37+
} onCancel: {
38+
self.cancelSubscriptionConnection(id: id)
39+
}
40+
41+
defer {
42+
self.releaseSubscriptionConnection(id: id)
43+
}
44+
return try await operation(connection)
45+
}
46+
47+
func leaseSubscriptionConnection(id: Int, request: CheckedContinuation<ValkeyConnection, Error>) {
48+
self.logger.trace("Get subscription connection", metadata: ["valkey_subscription_connection_id": .stringConvertible(id)])
49+
enum LeaseAction {
50+
case cancel
51+
case action(ConnectionStateMachine.GetAction)
52+
}
53+
let action: LeaseAction = self.subscriptionConnectionStateMachine.withLock { stateMachine in
54+
if Task.isCancelled {
55+
return .cancel
56+
}
57+
return .action(stateMachine.get(id: id, request: request))
58+
}
59+
switch action {
60+
case .cancel:
61+
request.resume(throwing: CancellationError())
62+
case .action(let getAction):
63+
switch getAction {
64+
case .startAcquire(let leaseID):
65+
self.queueAction(.leaseSubscriptionConnection(leaseID: leaseID))
66+
case .completeRequest(let connection):
67+
request.resume(returning: connection)
68+
case .doNothing:
69+
break
70+
}
71+
}
72+
}
73+
74+
func acquiredSubscriptionConnection(leaseID: Int, connection: ValkeyConnection, releaseContinuation: CheckedContinuation<Void, Never>) {
75+
let action = self.subscriptionConnectionStateMachine.withLock { stateMachine in
76+
stateMachine.acquired(leaseID: leaseID, value: connection, releaseRequest: releaseContinuation)
77+
}
78+
switch action {
79+
case .yield(let continuations):
80+
for cont in continuations {
81+
cont.resume(returning: connection)
82+
}
83+
case .release:
84+
releaseContinuation.resume()
85+
}
86+
87+
}
88+
89+
func errorAcquiringSubscriptionConnection(leaseID: Int, error: Error) {
90+
let action = self.subscriptionConnectionStateMachine.withLock { stateMachine in
91+
stateMachine.errorAcquiring(leaseID: leaseID, error: error)
92+
}
93+
switch action {
94+
case .yield(let continuations):
95+
for cont in continuations {
96+
cont.resume(throwing: error)
97+
}
98+
case .doNothing:
99+
break
100+
}
101+
102+
}
103+
104+
func releaseSubscriptionConnection(id: Int) {
105+
self.logger.trace("Release subscription connection", metadata: ["valkey_subscription_connection_id": .stringConvertible(id)])
106+
let action = self.subscriptionConnectionStateMachine.withLock { stateMachine in
107+
stateMachine.release(id: id)
108+
}
109+
switch action {
110+
case .release(let continuation):
111+
continuation.resume()
112+
self.logger.trace("Released connection for subscriptions")
113+
case .doNothing:
114+
break
115+
}
116+
117+
}
118+
119+
func cancelSubscriptionConnection(id: Int) {
120+
self.logger.trace("Cancel subscription connection", metadata: ["valkey_subscription_connection_id": .stringConvertible(id)])
121+
let action = self.subscriptionConnectionStateMachine.withLock { stateMachine in
122+
stateMachine.cancel(id: id)
123+
}
124+
switch action {
125+
case .cancel(let cont):
126+
cont.resume(throwing: CancellationError())
127+
case .release(let continuation):
128+
continuation.resume()
129+
self.logger.trace("Released connection for subscriptions")
130+
case .doNothing:
131+
break
132+
}
133+
}
134+
}
135+
136+
/// StateMachine for acquiring Subscription Connection.
137+
@usableFromInline
138+
struct SubscriptionConnectionStateMachine<Value, Request, ReleaseRequest>: ~Copyable {
139+
enum State: ~Copyable {
140+
/// We have no connection
141+
case uninitialized(nextLeaseID: Int)
142+
/// We are acquiring a connection
143+
case acquiring(leaseID: Int, waiters: [Int: Request])
144+
/// We have a connection
145+
case acquired(AcquiredState)
146+
147+
struct AcquiredState {
148+
var leaseID: Int
149+
var value: Value
150+
var requestIDs: Set<Int>
151+
var releaseRequest: ReleaseRequest
152+
}
153+
}
154+
var state: State
155+
156+
init() {
157+
self.state = .uninitialized(nextLeaseID: 0)
158+
}
159+
160+
init(state: consuming State) {
161+
self.state = state
162+
}
163+
164+
enum GetAction {
165+
case startAcquire(Int)
166+
case doNothing
167+
case completeRequest(Value)
168+
}
169+
170+
mutating func get(id: Int, request: Request) -> GetAction {
171+
switch consume self.state {
172+
case .uninitialized(let leaseID):
173+
self = .acquiring(leaseID: leaseID, waiters: [id: request])
174+
return .startAcquire(leaseID)
175+
case .acquiring(let leaseID, var waiters):
176+
waiters[id] = request
177+
self = .acquiring(leaseID: leaseID, waiters: waiters)
178+
return .doNothing
179+
case .acquired(var state):
180+
state.requestIDs.insert(id)
181+
self = .acquired(state)
182+
return .completeRequest(state.value)
183+
}
184+
}
185+
186+
enum CancelAction {
187+
case cancel(Request)
188+
case release(ReleaseRequest)
189+
case doNothing
190+
}
191+
192+
mutating func cancel(id: Int) -> CancelAction {
193+
switch consume self.state {
194+
case .uninitialized(let leaseID):
195+
self = .uninitialized(nextLeaseID: leaseID)
196+
return .doNothing
197+
case .acquiring(let leaseID, var waiters):
198+
guard let continuation = waiters.removeValue(forKey: id) else {
199+
self = .acquiring(leaseID: leaseID, waiters: waiters)
200+
return .doNothing
201+
}
202+
if waiters.isEmpty {
203+
self = .uninitialized(nextLeaseID: leaseID + 1)
204+
} else {
205+
self = .acquiring(leaseID: leaseID, waiters: waiters)
206+
}
207+
return .cancel(continuation)
208+
case .acquired(var state):
209+
state.requestIDs.remove(id)
210+
if state.requestIDs.isEmpty {
211+
self = .uninitialized(nextLeaseID: state.leaseID + 1)
212+
return .release(state.releaseRequest)
213+
} else {
214+
self = .acquired(state)
215+
return .doNothing
216+
}
217+
}
218+
}
219+
220+
enum AcquiredAction {
221+
case yield([Request])
222+
case release
223+
}
224+
225+
mutating func acquired(leaseID: Int, value: Value, releaseRequest: ReleaseRequest) -> AcquiredAction {
226+
switch consume self.state {
227+
case .uninitialized(let leaseID):
228+
self = .uninitialized(nextLeaseID: leaseID)
229+
return .release
230+
case .acquiring(let storedLeaseID, let waiters):
231+
if storedLeaseID != leaseID {
232+
self = .acquiring(leaseID: storedLeaseID, waiters: waiters)
233+
return .release
234+
}
235+
let continuations = waiters.values
236+
self = .acquired(.init(leaseID: leaseID, value: value, requestIDs: .init(waiters.keys), releaseRequest: releaseRequest))
237+
return .yield(.init(continuations))
238+
case .acquired(let state):
239+
if state.leaseID != leaseID {
240+
self = .acquired(state)
241+
return .release
242+
} else {
243+
preconditionFailure("Acquired connection twice")
244+
}
245+
}
246+
}
247+
248+
enum ErrorAcquiringAction {
249+
case yield([Request])
250+
case doNothing
251+
}
252+
253+
mutating func errorAcquiring(leaseID: Int, error: Error) -> ErrorAcquiringAction {
254+
switch consume self.state {
255+
case .uninitialized(let leaseID):
256+
self = .uninitialized(nextLeaseID: leaseID)
257+
return .doNothing
258+
case .acquiring(let storedLeaseID, let waiters):
259+
if storedLeaseID != leaseID {
260+
self = .acquiring(leaseID: storedLeaseID, waiters: waiters)
261+
return .doNothing
262+
}
263+
let continuations = waiters.values
264+
self = .uninitialized(nextLeaseID: leaseID + 1)
265+
return .yield(.init(continuations))
266+
case .acquired(let state):
267+
if state.leaseID != leaseID {
268+
self = .acquired(state)
269+
return .doNothing
270+
} else {
271+
preconditionFailure("Error acquiring connection we already have")
272+
}
273+
}
274+
}
275+
276+
enum ReleaseAction {
277+
case release(ReleaseRequest)
278+
case doNothing
279+
}
280+
281+
mutating func release(id: Int) -> ReleaseAction {
282+
switch consume self.state {
283+
case .uninitialized:
284+
preconditionFailure("Cannot release connection when in an uninitialized state")
285+
case .acquiring:
286+
preconditionFailure("Cannot release connection while acquiring a new connection")
287+
case .acquired(var state):
288+
state.requestIDs.remove(id)
289+
if state.requestIDs.isEmpty {
290+
self = .uninitialized(nextLeaseID: state.leaseID + 1)
291+
return .release(state.releaseRequest)
292+
} else {
293+
self = .acquired(state)
294+
return .doNothing
295+
}
296+
}
297+
}
298+
299+
static private func uninitialized(nextLeaseID: Int) -> Self { .init(state: .uninitialized(nextLeaseID: nextLeaseID)) }
300+
static private func acquiring(leaseID: Int, waiters: [Int: Request]) -> Self { .init(state: .acquiring(leaseID: leaseID, waiters: waiters)) }
301+
static private func acquired(_ state: State.AcquiredState) -> Self { .init(state: .acquired(state)) }
302+
}

0 commit comments

Comments
 (0)