13
13
// limitations under the License.
14
14
15
15
import Foundation
16
+ private import FirebaseCoreInternal
16
17
17
18
final class AsyncWebSocket : NSObject , @unchecked Sendable , URLSessionWebSocketDelegate {
18
19
private let webSocketTask : URLSessionWebSocketTask
19
20
private let stream : AsyncThrowingStream < URLSessionWebSocketTask . Message , Error >
20
21
private let continuation : AsyncThrowingStream < URLSessionWebSocketTask . Message , Error > . Continuation
21
22
private var continuationFinished = false
22
23
private let continuationLock = NSLock ( )
23
-
24
- private var _closeError : WebSocketClosedError ? = nil
25
- private let closeErrorLock = NSLock ( )
26
- private( set) var closeError : WebSocketClosedError ? {
27
- get { closeErrorLock. withLock { _closeError } }
28
- set { closeErrorLock. withLock { _closeError = newValue } }
29
- }
24
+ private var closeError : UnfairLock < WebSocketClosedError ? >
30
25
31
26
init ( urlSession: URLSession = GenAIURLSession . default, urlRequest: URLRequest ) {
32
27
webSocketTask = urlSession. webSocketTask ( with: urlRequest)
33
28
( stream, continuation) = AsyncThrowingStream < URLSessionWebSocketTask . Message , Error >
34
29
. makeStream ( )
30
+ closeError = UnfairLock ( nil )
35
31
}
36
32
37
33
deinit {
38
- webSocketTask . cancel ( with : . goingAway , reason : nil )
34
+ disconnect ( )
39
35
}
40
36
41
37
func connect( ) -> AsyncThrowingStream < URLSessionWebSocketTask . Message , Error > {
42
38
webSocketTask. resume ( )
43
- closeError = nil
39
+ closeError. withLock { $0 = nil }
44
40
startReceiving ( )
45
41
return stream
46
42
}
47
43
48
44
func disconnect( ) {
49
- if closeError != nil { return }
45
+ if closeError. value ( ) != nil { return }
50
46
51
47
close ( code: . goingAway, reason: nil )
52
48
}
53
49
54
50
func send( _ message: URLSessionWebSocketTask . Message ) async throws {
55
- if let closeError {
51
+ if let closeError = closeError . value ( ) {
56
52
throw closeError
57
53
}
58
54
try await webSocketTask. send ( message)
59
55
}
60
56
61
57
private func startReceiving( ) {
62
58
Task {
63
- while !Task. isCancelled && self . webSocketTask. isOpen && self . closeError == nil {
59
+ while !Task. isCancelled && self . webSocketTask. isOpen && self . closeError. value ( ) == nil {
64
60
do {
65
61
let message = try await webSocketTask. receive ( )
66
62
continuation. yield ( message)
@@ -73,7 +69,9 @@ final class AsyncWebSocket: NSObject, @unchecked Sendable, URLSessionWebSocketDe
73
69
74
70
private func close( code: URLSessionWebSocketTask . CloseCode , reason: Data ? ) {
75
71
let error = WebSocketClosedError ( closeCode: code, closeReason: reason)
76
- closeError = error
72
+ closeError. withLock {
73
+ $0 = error
74
+ }
77
75
78
76
webSocketTask. cancel ( with: code, reason: reason)
79
77
0 commit comments