@@ -13,6 +13,10 @@ import Helpers
1313 import FoundationNetworking
1414#endif
1515
16+ enum WebSocketClientError : Error {
17+ case unsupportedData
18+ }
19+
1620enum ConnectionStatus {
1721 case connected
1822 case disconnected( reason: String , code: URLSessionWebSocketTask . CloseCode )
@@ -23,7 +27,7 @@ protocol WebSocketClient: Sendable {
2327 func send( _ message: RealtimeMessageV2 ) async throws
2428 func receive( ) -> AsyncThrowingStream < RealtimeMessageV2 , any Error >
2529 func connect( ) -> AsyncStream < ConnectionStatus >
26- func disconnect( )
30+ func disconnect( code : Int ? , reason : String ? )
2731}
2832
2933final class WebSocket : NSObject , URLSessionWebSocketDelegate , WebSocketClient , @unchecked Sendable {
@@ -33,7 +37,7 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
3337
3438 struct MutableState {
3539 var continuation : AsyncStream < ConnectionStatus > . Continuation ?
36- var connection : WebSocketConnection < RealtimeMessageV2 , RealtimeMessageV2 > ?
40+ var task : URLSessionWebSocketTask ?
3741 }
3842
3943 private let mutableState = LockIsolated ( MutableState ( ) )
@@ -47,11 +51,15 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
4751 logger = options. logger
4852 }
4953
54+ deinit {
55+ mutableState. task? . cancel ( with: . goingAway, reason: nil )
56+ }
57+
5058 func connect( ) -> AsyncStream < ConnectionStatus > {
5159 mutableState. withValue { state in
5260 let session = URLSession ( configuration: configuration, delegate: self , delegateQueue: nil )
5361 let task = session. webSocketTask ( with: realtimeURL)
54- state. connection = WebSocketConnection ( task: task )
62+ state. task = task
5563 task. resume ( )
5664
5765 let ( stream, continuation) = AsyncStream< ConnectionStatus> . makeStream( )
@@ -60,27 +68,55 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
6068 }
6169 }
6270
63- func disconnect( ) {
71+ func disconnect( code : Int ? , reason : String ? ) {
6472 mutableState. withValue { state in
65- state. connection? . close ( )
73+ if let code {
74+ state. task? . cancel (
75+ with: URLSessionWebSocketTask . CloseCode ( rawValue: code) ?? . invalid,
76+ reason: reason? . data ( using: . utf8) )
77+ } else {
78+ state. task? . cancel ( )
79+ }
6680 }
6781 }
6882
6983 func receive( ) -> AsyncThrowingStream < RealtimeMessageV2 , any Error > {
70- guard let connection = mutableState. connection else {
71- return . finished(
72- throwing: RealtimeError (
73- " receive() called before connect(). Make sure to call `connect()` before calling `receive()`. "
74- )
75- )
84+ AsyncThrowingStream { [ weak self] in
85+ guard let self else { return nil }
86+
87+ let task = mutableState. task
88+
89+ guard
90+ let message = try await task? . receive ( ) ,
91+ !Task. isCancelled
92+ else { return nil }
93+
94+ switch message {
95+ case . data( let data) :
96+ let message = try JSONDecoder ( ) . decode ( RealtimeMessageV2 . self, from: data)
97+ return message
98+
99+ case . string( let string) :
100+ guard let data = string. data ( using: . utf8) else {
101+ throw WebSocketClientError . unsupportedData
102+ }
103+
104+ let message = try JSONDecoder ( ) . decode ( RealtimeMessageV2 . self, from: data)
105+ return message
106+
107+ @unknown default :
108+ assertionFailure ( " Unsupported message type. " )
109+ task? . cancel ( with: . unsupportedData, reason: nil )
110+ throw WebSocketClientError . unsupportedData
111+ }
76112 }
77-
78- return connection. receive ( )
79113 }
80114
81115 func send( _ message: RealtimeMessageV2 ) async throws {
82116 logger? . verbose ( " Sending message: \( message) " )
83- try await mutableState. connection? . send ( message)
117+
118+ let data = try JSONEncoder ( ) . encode ( message)
119+ try await mutableState. task? . send ( . data( data) )
84120 }
85121
86122 // MARK: - URLSessionWebSocketDelegate
0 commit comments