Skip to content

Commit 0b19580

Browse files
authored
fix(realtime): web socket message listener doesn't stop (#284)
* fix(realtime): web socket message listener doesn't stop * Do not expose task
1 parent a701bb1 commit 0b19580

File tree

1 file changed

+102
-31
lines changed

1 file changed

+102
-31
lines changed

Sources/Realtime/V2/WebSocketClient.swift

Lines changed: 102 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
3838
private let logger: (any SupabaseLogger)?
3939

4040
struct MutableState {
41-
var task: URLSessionWebSocketTask?
4241
var continuation: AsyncStream<ConnectionStatus>.Continuation?
42+
var stream: SocketStream?
4343
}
4444

4545
let mutableState = LockIsolated(MutableState())
@@ -56,8 +56,9 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
5656
func connect() -> AsyncStream<ConnectionStatus> {
5757
mutableState.withValue { state in
5858
let session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil)
59-
state.task = session.webSocketTask(with: realtimeURL)
60-
state.task?.resume()
59+
let task = session.webSocketTask(with: realtimeURL)
60+
state.stream = SocketStream(task: task)
61+
task.resume()
6162

6263
let (stream, continuation) = AsyncStream<ConnectionStatus>.makeStream()
6364
state.continuation = continuation
@@ -67,49 +68,48 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
6768

6869
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) {
6970
mutableState.withValue { state in
70-
state.task?.cancel(with: closeCode, reason: nil)
71+
state.stream?.cancel(with: closeCode)
7172
}
7273
}
7374

7475
func receive() -> AsyncThrowingStream<RealtimeMessageV2, any Error> {
75-
let (stream, continuation) = AsyncThrowingStream<RealtimeMessageV2, any Error>.makeStream()
76-
77-
Task {
78-
while let message = try await mutableState.task?.receive() {
79-
do {
80-
switch message {
81-
case let .string(stringMessage):
82-
logger?.verbose("Received message: \(stringMessage)")
83-
84-
guard let data = stringMessage.data(using: .utf8) else {
85-
throw RealtimeError("Expected a UTF8 encoded message.")
86-
}
87-
88-
let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
89-
continuation.yield(message)
90-
91-
case .data:
92-
fallthrough
93-
default:
94-
throw RealtimeError("Unsupported message type.")
76+
mutableState.withValue { mutableState in
77+
guard let stream = mutableState.stream else {
78+
return .finished(
79+
throwing: RealtimeError(
80+
"receive() called before connect(). Make sure to call `connect()` before calling `receive()`."
81+
)
82+
)
83+
}
84+
85+
return stream.map { message in
86+
switch message {
87+
case let .string(stringMessage):
88+
self.logger?.verbose("Received message: \(stringMessage)")
89+
90+
guard let data = stringMessage.data(using: .utf8) else {
91+
throw RealtimeError("Expected a UTF8 encoded message.")
9592
}
96-
} catch {
97-
continuation.finish(throwing: error)
93+
94+
let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
95+
return message
96+
97+
case .data:
98+
fallthrough
99+
default:
100+
throw RealtimeError("Unsupported message type.")
98101
}
99102
}
100-
101-
continuation.finish()
103+
.eraseToThrowingStream()
102104
}
103-
104-
return stream
105105
}
106106

107107
func send(_ message: RealtimeMessageV2) async throws {
108108
let data = try JSONEncoder().encode(message)
109109
let string = String(decoding: data, as: UTF8.self)
110110

111111
logger?.verbose("Sending message: \(string)")
112-
try await mutableState.task?.send(.string(string))
112+
try await mutableState.stream?.send(.string(string))
113113
}
114114

115115
// MARK: - URLSessionWebSocketDelegate
@@ -144,3 +144,74 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
144144
mutableState.continuation?.yield(.error(error))
145145
}
146146
}
147+
148+
typealias WebSocketStream = AsyncThrowingStream<URLSessionWebSocketTask.Message, any Error>
149+
150+
final class SocketStream: AsyncSequence, Sendable {
151+
typealias AsyncIterator = WebSocketStream.Iterator
152+
typealias Element = URLSessionWebSocketTask.Message
153+
154+
struct MutableState {
155+
var continuation: WebSocketStream.Continuation?
156+
var stream: WebSocketStream?
157+
}
158+
159+
private let task: URLSessionWebSocketTask
160+
private let mutableState = LockIsolated(MutableState())
161+
162+
private func makeStreamIfNeeded() -> WebSocketStream {
163+
mutableState.withValue { state in
164+
if let stream = state.stream {
165+
return stream
166+
}
167+
168+
let stream = WebSocketStream { continuation in
169+
state.continuation = continuation
170+
waitForNextValue()
171+
}
172+
173+
state.stream = stream
174+
return stream
175+
}
176+
}
177+
178+
private func waitForNextValue() {
179+
guard task.closeCode == .invalid else {
180+
mutableState.continuation?.finish()
181+
return
182+
}
183+
184+
task.receive { [weak self] result in
185+
guard let continuation = self?.mutableState.continuation else { return }
186+
187+
do {
188+
let message = try result.get()
189+
continuation.yield(message)
190+
self?.waitForNextValue()
191+
} catch {
192+
continuation.finish(throwing: error)
193+
}
194+
}
195+
}
196+
197+
init(task: URLSessionWebSocketTask) {
198+
self.task = task
199+
}
200+
201+
deinit {
202+
mutableState.continuation?.finish()
203+
}
204+
205+
func makeAsyncIterator() -> WebSocketStream.Iterator {
206+
makeStreamIfNeeded().makeAsyncIterator()
207+
}
208+
209+
func cancel(with closeCode: URLSessionWebSocketTask.CloseCode = .goingAway) {
210+
task.cancel(with: closeCode, reason: nil)
211+
mutableState.continuation?.finish()
212+
}
213+
214+
func send(_ message: URLSessionWebSocketTask.Message) async throws {
215+
try await task.send(message)
216+
}
217+
}

0 commit comments

Comments
 (0)