@@ -21,11 +21,11 @@ final class AsyncWebSocket: NSObject, @unchecked Sendable, URLSessionWebSocketDe
21
21
private var continuationFinished = false
22
22
private let continuationLock = NSLock ( )
23
23
24
- private var _isConnected = false
25
- private let isConnectedLock = NSLock ( )
26
- private( set) var isConnected : Bool {
27
- get { isConnectedLock . withLock { _isConnected } }
28
- set { isConnectedLock . withLock { _isConnected = newValue } }
24
+ private var _closeError : WebSocketClosedError ? = nil
25
+ private let closeErrorLock = NSLock ( )
26
+ private( set) var closeError : WebSocketClosedError ? {
27
+ get { closeErrorLock . withLock { _closeError } }
28
+ set { closeErrorLock . withLock { _closeError = newValue } }
29
29
}
30
30
31
31
init ( urlSession: URLSession = GenAIURLSession . default, urlRequest: URLRequest ) {
@@ -40,44 +40,55 @@ final class AsyncWebSocket: NSObject, @unchecked Sendable, URLSessionWebSocketDe
40
40
41
41
func connect( ) -> AsyncThrowingStream < URLSessionWebSocketTask . Message , Error > {
42
42
webSocketTask. resume ( )
43
- isConnected = true
43
+ closeError = nil
44
44
startReceiving ( )
45
45
return stream
46
46
}
47
47
48
48
func disconnect( ) {
49
- webSocketTask. cancel ( with: . goingAway, reason: nil )
50
- isConnected = false
51
- continuationLock. withLock {
52
- self . continuation. finish ( )
53
- self . continuationFinished = true
54
- }
49
+ if let closeError { return }
50
+
51
+ close ( code: . goingAway, reason: nil )
55
52
}
56
53
57
54
func send( _ message: URLSessionWebSocketTask . Message ) async throws {
58
- // TODO: Throw error if socket already closed
55
+ if let closeError {
56
+ throw closeError
57
+ }
59
58
try await webSocketTask. send ( message)
60
59
}
61
60
62
61
private func startReceiving( ) {
63
62
Task {
64
- while !Task. isCancelled && self . webSocketTask. isOpen && self . isConnected {
65
- let message = try await webSocketTask. receive ( )
66
- // TODO: Check continuationFinished before yielding. Use the same thread for NSLock.
67
- continuation. yield ( message)
63
+ while !Task. isCancelled && self . webSocketTask. isOpen && self . closeError == nil {
64
+ do {
65
+ let message = try await webSocketTask. receive ( )
66
+ continuation. yield ( message)
67
+ } catch {
68
+ close ( code: webSocketTask. closeCode, reason: webSocketTask. closeReason)
69
+ }
68
70
}
69
71
}
70
72
}
71
73
74
+ private func close( code: URLSessionWebSocketTask . CloseCode , reason: Data ? ) {
75
+ let error = WebSocketClosedError ( closeCode: code, closeReason: reason)
76
+ closeError = error
77
+
78
+ webSocketTask. cancel ( with: code, reason: reason)
79
+
80
+ continuationLock. withLock {
81
+ guard !continuationFinished else { return }
82
+ self . continuation. finish ( throwing: error)
83
+ self . continuationFinished = true
84
+ }
85
+ }
86
+
72
87
func urlSession( _ session: URLSession ,
73
88
webSocketTask: URLSessionWebSocketTask ,
74
89
didCloseWith closeCode: URLSessionWebSocketTask . CloseCode ,
75
90
reason: Data ? ) {
76
- continuationLock. withLock {
77
- guard !continuationFinished else { return }
78
- continuation. finish ( )
79
- continuationFinished = true
80
- }
91
+ close ( code: closeCode, reason: reason)
81
92
}
82
93
}
83
94
0 commit comments