diff --git a/Examples/SlackClone/AppView.swift b/Examples/SlackClone/AppView.swift index e15aaed5f..61193a2a7 100644 --- a/Examples/SlackClone/AppView.swift +++ b/Examples/SlackClone/AppView.swift @@ -15,13 +15,15 @@ final class AppViewModel { var session: Session? var selectedChannel: Channel? - var realtimeConnectionStatus: RealtimeClientV2.Status? + var realtimeConnectionStatus: RealtimeClientStatus? init() { Task { for await (event, session) in supabase.auth.authStateChanges { Logger.main.debug("AuthStateChange: \(event.rawValue)") - guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else { return } + guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else { + return + } self.session = session if session == nil { diff --git a/Examples/SlackClone/Supabase.swift b/Examples/SlackClone/Supabase.swift index 2723557ed..e57513e47 100644 --- a/Examples/SlackClone/Supabase.swift +++ b/Examples/SlackClone/Supabase.swift @@ -21,8 +21,9 @@ let decoder: JSONDecoder = { }() let supabase = SupabaseClient( - supabaseURL: URL(string: "http://localhost:54321")!, - supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0", + supabaseURL: URL(string: "http://127.0.0.1:54321")!, + supabaseKey: + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0", options: SupabaseClientOptions( db: .init(encoder: encoder, decoder: decoder), auth: .init(redirectToURL: URL(string: "com.supabase.slack-clone://login-callback")), diff --git a/Package.swift b/Package.swift index 1e8278e6d..86d770cd7 100644 --- a/Package.swift +++ b/Package.swift @@ -120,6 +120,7 @@ let package = Package( name: "Realtime", dependencies: [ .product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"), + .product(name: "IssueReporting", package: "xctest-dynamic-overlay"), "Helpers", ] ), diff --git a/Sources/Realtime/V2/CallbackManager.swift b/Sources/Realtime/V2/CallbackManager.swift index 1723b9f47..3d92e1849 100644 --- a/Sources/Realtime/V2/CallbackManager.swift +++ b/Sources/Realtime/V2/CallbackManager.swift @@ -1,10 +1,3 @@ -// -// CallbackManager.swift -// -// -// Created by Guilherme Souza on 24/12/23. -// - import ConcurrencyExtras import Foundation import Helpers @@ -26,6 +19,10 @@ final class CallbackManager: Sendable { mutableState.callbacks } + deinit { + reset() + } + @discardableResult func addBroadcastCallback( event: String, diff --git a/Sources/Realtime/V2/PushV2.swift b/Sources/Realtime/V2/PushV2.swift index 77e8f1bea..199e6b74a 100644 --- a/Sources/Realtime/V2/PushV2.swift +++ b/Sources/Realtime/V2/PushV2.swift @@ -27,25 +27,31 @@ actor PushV2 { } func send() async -> PushStatus { - await channel?.socket.push(message) - - if channel?.config.broadcast.acknowledgeBroadcasts == true { - do { - return try await withTimeout(interval: channel?.socket.options().timeoutInterval ?? 10) { - await withCheckedContinuation { - self.receivedContinuation = $0 - } + guard let channel = channel else { + return .error + } + + await channel.socket.push(message) + + if !channel.config.broadcast.acknowledgeBroadcasts { + // channel was configured with `ack = false`, + // don't wait for a response and return `ok`. + return .ok + } + + do { + return try await withTimeout(interval: channel.socket.options().timeoutInterval) { + await withCheckedContinuation { continuation in + self.receivedContinuation = continuation } - } catch is TimeoutError { - channel?.logger?.debug("Push timed out.") - return .timeout - } catch { - channel?.logger?.error("Error sending push: \(error)") - return .error } + } catch is TimeoutError { + channel.logger?.debug("Push timed out.") + return .timeout + } catch { + channel.logger?.error("Error sending push: \(error.localizedDescription)") + return .error } - - return .ok } func didReceive(status: PushStatus) { diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index 8facb88c4..5e52455ce 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -1,14 +1,8 @@ -// -// RealtimeChannelV2.swift -// -// -// Created by Guilherme Souza on 26/12/23. -// - import ConcurrencyExtras import Foundation import HTTPTypes import Helpers +import IssueReporting #if canImport(FoundationNetworking) import FoundationNetworking @@ -123,18 +117,14 @@ public final class RealtimeChannelV2: Sendable { public func subscribe() async { if socket.status() != .connected { if socket.options().connectOnSubscribe != true { - fatalError( + reportIssue( "You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?" ) + return } await socket.connect() } - guard status != .subscribed else { - logger?.warning("Channel \(topic) is already subscribed") - return - } - socket.addChannel(self) status = .subscribing @@ -266,15 +256,21 @@ public final class RealtimeChannelV2: Sendable { } } + /// Tracks the given state in the channel. + /// - Parameter state: The state to be tracked, conforming to `Codable`. + /// - Throws: An error if the tracking fails. public func track(_ state: some Codable) async throws { try await track(state: JSONObject(state)) } + /// Tracks the given state in the channel. + /// - Parameter state: The state to be tracked as a `JSONObject`. public func track(state: JSONObject) async { - assert( - status == .subscribed, - "You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?" - ) + if status != .subscribed { + reportIssue( + "You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?" + ) + } await push( ChannelEvent.presence, @@ -286,6 +282,7 @@ public final class RealtimeChannelV2: Sendable { ) } + /// Stops tracking the current state in the channel. public func untrack() async { await push( ChannelEvent.presence, @@ -520,10 +517,12 @@ public final class RealtimeChannelV2: Sendable { filter: String?, callback: @escaping @Sendable (AnyAction) -> Void ) -> RealtimeSubscription { - precondition( - status != .subscribed, - "You cannot call postgresChange after joining the channel" - ) + guard status != .subscribed else { + reportIssue( + "You cannot call postgresChange after joining the channel, this won't work as expected." + ) + return RealtimeSubscription {} + } let config = PostgresJoinConfig( event: event, diff --git a/Sources/Realtime/V2/RealtimeClientV2.swift b/Sources/Realtime/V2/RealtimeClientV2.swift index cd4ead9fd..5c072c4b9 100644 --- a/Sources/Realtime/V2/RealtimeClientV2.swift +++ b/Sources/Realtime/V2/RealtimeClientV2.swift @@ -20,8 +20,13 @@ public final class RealtimeClientV2: Sendable { var accessToken: String? var ref = 0 var pendingHeartbeatRef: Int? + + /// Long-running task that keeps sending heartbeat messages. var heartbeatTask: Task? + + /// Long-running task for listening for incoming messages from WebSocket. var messageTask: Task? + var connectionTask: Task? var channels: [String: RealtimeChannelV2] = [:] var sendBuffer: [@Sendable () async -> Void] = [] @@ -34,13 +39,14 @@ public final class RealtimeClientV2: Sendable { let http: any HTTPClientType let apikey: String? + /// All managed channels indexed by their topics. public var channels: [String: RealtimeChannelV2] { mutableState.channels } private let statusEventEmitter = EventEmitter(initialEvent: .disconnected) - /// AsyncStream that emits when connection status change. + /// Listen for connection status changes. /// /// You can also use ``onStatusChange(_:)`` for a closure based method. public var statusChange: AsyncStream { @@ -198,6 +204,13 @@ public final class RealtimeClientV2: Sendable { await connect(reconnect: true) } + /// Creates a new channel and bind it to this client. + /// - Parameters: + /// - topic: Channel's topic. + /// - options: Configuration options for the channel. + /// - Returns: Channel instance. + /// + /// - Note: This method doesn't subscribe to the channel, call ``RealtimeChannelV2/subscribe()`` on the returned channel instance. public func channel( _ topic: String, options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in } @@ -223,6 +236,9 @@ public final class RealtimeClientV2: Sendable { } } + /// Unsubscribe and removes channel. + /// + /// If there is no channel left, client is disconnected. public func removeChannel(_ channel: RealtimeChannelV2) async { if channel.status == .subscribed { await channel.unsubscribe() @@ -238,6 +254,7 @@ public final class RealtimeClientV2: Sendable { } } + /// Unsubscribes and removes all channels. public func removeAllChannels() async { await withTaskGroup(of: Void.self) { group in for channel in channels.values { @@ -327,7 +344,11 @@ public final class RealtimeClientV2: Sendable { } } - public func disconnect() { + /// Disconnects client. + /// - Parameters: + /// - code: A numeric status code to send on disconnect. + /// - reason: A custom reason for the disconnect. + public func disconnect(code: Int? = nil, reason: String? = nil) { options.logger?.debug("Closing WebSocket connection") mutableState.withValue { $0.ref = 0 @@ -335,7 +356,7 @@ public final class RealtimeClientV2: Sendable { $0.heartbeatTask?.cancel() $0.connectionTask?.cancel() } - ws.disconnect() + ws.disconnect(code: code, reason: reason) status = .disconnected } @@ -388,13 +409,14 @@ public final class RealtimeClientV2: Sendable { try Task.checkCancellation() try await self?.ws.send(message) } catch { - self?.options.logger?.error(""" - Failed to send message: - \(message) - - Error: - \(error) - """) + self?.options.logger?.error( + """ + Failed to send message: + \(message) + + Error: + \(error) + """) } } diff --git a/Sources/Realtime/V2/RealtimeMessageV2.swift b/Sources/Realtime/V2/RealtimeMessageV2.swift index d288aece0..200498cd4 100644 --- a/Sources/Realtime/V2/RealtimeMessageV2.swift +++ b/Sources/Realtime/V2/RealtimeMessageV2.swift @@ -1,10 +1,3 @@ -// -// RealtimeMessageV2.swift -// -// -// Created by Guilherme Souza on 11/01/24. -// - import Foundation import Helpers diff --git a/Sources/Realtime/V2/WebSocketClient.swift b/Sources/Realtime/V2/WebSocketClient.swift index b977a699f..0634f7743 100644 --- a/Sources/Realtime/V2/WebSocketClient.swift +++ b/Sources/Realtime/V2/WebSocketClient.swift @@ -13,6 +13,10 @@ import Helpers import FoundationNetworking #endif +enum WebSocketClientError: Error { + case unsupportedData +} + enum ConnectionStatus { case connected case disconnected(reason: String, code: URLSessionWebSocketTask.CloseCode) @@ -23,7 +27,7 @@ protocol WebSocketClient: Sendable { func send(_ message: RealtimeMessageV2) async throws func receive() -> AsyncThrowingStream func connect() -> AsyncStream - func disconnect() + func disconnect(code: Int?, reason: String?) } final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @unchecked Sendable { @@ -33,7 +37,7 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @ struct MutableState { var continuation: AsyncStream.Continuation? - var connection: WebSocketConnection? + var task: URLSessionWebSocketTask? } private let mutableState = LockIsolated(MutableState()) @@ -47,11 +51,15 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @ logger = options.logger } + deinit { + mutableState.task?.cancel(with: .goingAway, reason: nil) + } + func connect() -> AsyncStream { mutableState.withValue { state in let session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil) let task = session.webSocketTask(with: realtimeURL) - state.connection = WebSocketConnection(task: task) + state.task = task task.resume() let (stream, continuation) = AsyncStream.makeStream() @@ -60,27 +68,55 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @ } } - func disconnect() { + func disconnect(code: Int?, reason: String?) { mutableState.withValue { state in - state.connection?.close() + if let code { + state.task?.cancel( + with: URLSessionWebSocketTask.CloseCode(rawValue: code) ?? .invalid, + reason: reason?.data(using: .utf8)) + } else { + state.task?.cancel() + } } } func receive() -> AsyncThrowingStream { - guard let connection = mutableState.connection else { - return .finished( - throwing: RealtimeError( - "receive() called before connect(). Make sure to call `connect()` before calling `receive()`." - ) - ) + AsyncThrowingStream { [weak self] in + guard let self else { return nil } + + let task = mutableState.task + + guard + let message = try await task?.receive(), + !Task.isCancelled + else { return nil } + + switch message { + case .data(let data): + let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data) + return message + + case .string(let string): + guard let data = string.data(using: .utf8) else { + throw WebSocketClientError.unsupportedData + } + + let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data) + return message + + @unknown default: + assertionFailure("Unsupported message type.") + task?.cancel(with: .unsupportedData, reason: nil) + throw WebSocketClientError.unsupportedData + } } - - return connection.receive() } func send(_ message: RealtimeMessageV2) async throws { logger?.verbose("Sending message: \(message)") - try await mutableState.connection?.send(message) + + let data = try JSONEncoder().encode(message) + try await mutableState.task?.send(.data(data)) } // MARK: - URLSessionWebSocketDelegate diff --git a/Sources/Realtime/V2/WebSocketConnection.swift b/Sources/Realtime/V2/WebSocketConnection.swift deleted file mode 100644 index 8e30ac539..000000000 --- a/Sources/Realtime/V2/WebSocketConnection.swift +++ /dev/null @@ -1,77 +0,0 @@ -// -// WebSocketConnection.swift -// -// -// Created by Guilherme Souza on 29/03/24. -// - -import Foundation - -#if canImport(FoundationNetworking) - import FoundationNetworking -#endif - -enum WebSocketConnectionError: Error { - case unsupportedData -} - -final class WebSocketConnection: Sendable { - private let task: URLSessionWebSocketTask - private let encoder: JSONEncoder - private let decoder: JSONDecoder - - init( - task: URLSessionWebSocketTask, - encoder: JSONEncoder = JSONEncoder(), - decoder: JSONDecoder = JSONDecoder() - ) { - self.task = task - self.encoder = encoder - self.decoder = decoder - - task.resume() - } - - deinit { - task.cancel(with: .goingAway, reason: nil) - } - - func receiveOnce() async throws -> Incoming { - switch try await task.receive() { - case let .data(data): - let message = try decoder.decode(Incoming.self, from: data) - return message - - case let .string(string): - guard let data = string.data(using: .utf8) else { - throw WebSocketConnectionError.unsupportedData - } - - let message = try decoder.decode(Incoming.self, from: data) - return message - - @unknown default: - assertionFailure("Unsupported message type.") - task.cancel(with: .unsupportedData, reason: nil) - throw WebSocketConnectionError.unsupportedData - } - } - - func send(_ message: Outgoing) async throws { - let data = try encoder.encode(message) - try await task.send(.data(data)) - } - - func receive() -> AsyncThrowingStream { - AsyncThrowingStream { [weak self] in - guard let self else { return nil } - - let message = try await receiveOnce() - return Task.isCancelled ? nil : message - } - } - - func close() { - task.cancel(with: .normalClosure, reason: nil) - } -} diff --git a/Tests/RealtimeTests/MockWebSocketClient.swift b/Tests/RealtimeTests/MockWebSocketClient.swift index d6e3fc51f..bcabc958c 100644 --- a/Tests/RealtimeTests/MockWebSocketClient.swift +++ b/Tests/RealtimeTests/MockWebSocketClient.swift @@ -93,5 +93,6 @@ final class MockWebSocketClient: WebSocketClient { return stream } - func disconnect() {} + func disconnect(code: Int?, reason: String?) { + } } diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index e8ff0570d..35a318cc1 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -1,10 +1,12 @@ import ConcurrencyExtras import CustomDump import Helpers -@testable import Realtime +import InlineSnapshotTesting import TestHelpers import XCTest +@testable import Realtime + #if canImport(FoundationNetworking) import FoundationNetworking #endif @@ -91,10 +93,48 @@ final class RealtimeTests: XCTestCase { ws.mockReceive(.messagesSubscribed) await channel.subscribe() - expectNoDifference( - ws.sentMessages, - [.subscribeToMessages(ref: "1", joinRef: "1")] - ) + assertInlineSnapshot(of: ws.sentMessages, as: .json) { + """ + [ + { + "event" : "phx_join", + "join_ref" : "1", + "payload" : { + "access_token" : "anon.api.key", + "config" : { + "broadcast" : { + "ack" : false, + "self" : false + }, + "postgres_changes" : [ + { + "event" : "INSERT", + "schema" : "public", + "table" : "messages" + }, + { + "event" : "UPDATE", + "schema" : "public", + "table" : "messages" + }, + { + "event" : "DELETE", + "schema" : "public", + "table" : "messages" + } + ], + "presence" : { + "key" : "" + }, + "private" : false + } + }, + "ref" : "1", + "topic" : "realtime:public:messages" + } + ] + """ + } } func testSubscribeTimeout() async throws { @@ -132,39 +172,56 @@ final class RealtimeTests: XCTestCase { try? await Task.sleep(nanoseconds: NSEC_PER_SEC * 2) - let joinSentMessages = ws.sentMessages.filter { $0.event == "phx_join" } - - let expectedMessages = try [ - RealtimeMessageV2( - joinRef: "1", - ref: "1", - topic: "realtime:public:messages", - event: "phx_join", - payload: JSONObject( - RealtimeJoinPayload( - config: RealtimeJoinConfig(), - accessToken: apiKey - ) - ) - ), - RealtimeMessageV2( - joinRef: "2", - ref: "2", - topic: "realtime:public:messages", - event: "phx_join", - payload: JSONObject( - RealtimeJoinPayload( - config: RealtimeJoinConfig(), - accessToken: apiKey - ) - ) - ), - ] - - expectNoDifference( - joinSentMessages, - expectedMessages - ) + assertInlineSnapshot(of: ws.sentMessages.filter { $0.event == "phx_join" }, as: .json) { + """ + [ + { + "event" : "phx_join", + "join_ref" : "1", + "payload" : { + "access_token" : "anon.api.key", + "config" : { + "broadcast" : { + "ack" : false, + "self" : false + }, + "postgres_changes" : [ + + ], + "presence" : { + "key" : "" + }, + "private" : false + } + }, + "ref" : "1", + "topic" : "realtime:public:messages" + }, + { + "event" : "phx_join", + "join_ref" : "2", + "payload" : { + "access_token" : "anon.api.key", + "config" : { + "broadcast" : { + "ack" : false, + "self" : false + }, + "postgres_changes" : [ + + ], + "presence" : { + "key" : "" + }, + "private" : false + } + }, + "ref" : "2", + "topic" : "realtime:public:messages" + } + ] + """ + } } func testHeartbeat() async throws { @@ -262,30 +319,27 @@ final class RealtimeTests: XCTestCase { try await channel.broadcast(event: "test", message: ["value": 42]) let request = await http.receivedRequests.last - expectNoDifference( - request?.headers, - [ - .contentType: "application/json", - .apiKey: "anon.api.key", - .authorization: "Bearer anon.api.key", - ] - ) - - let body = try XCTUnwrap(request?.body) - let json = try JSONDecoder().decode(JSONObject.self, from: body) - expectNoDifference( - json, - [ - "messages": [ - [ - "topic": "realtime:public:messages", - "event": "test", - "payload": ["value": 42], - "private": false, - ], - ], - ] - ) + assertInlineSnapshot(of: request?.urlRequest, as: .raw(pretty: true)) { + """ + POST https://localhost:54321/realtime/v1/api/broadcast + Authorization: Bearer anon.api.key + Content-Type: application/json + apiKey: anon.api.key + + { + "messages" : [ + { + "event" : "test", + "payload" : { + "value" : 42 + }, + "private" : false, + "topic" : "realtime:public:messages" + } + ] + } + """ + } } private func connectSocketAndWait() async { @@ -295,31 +349,6 @@ final class RealtimeTests: XCTestCase { } extension RealtimeMessageV2 { - static func subscribeToMessages(ref: String?, joinRef: String?) -> RealtimeMessageV2 { - Self( - joinRef: joinRef, - ref: ref, - topic: "realtime:public:messages", - event: "phx_join", - payload: [ - "access_token": "anon.api.key", - "config": [ - "broadcast": [ - "self": false, - "ack": false, - ], - "postgres_changes": [ - ["table": "messages", "event": "INSERT", "schema": "public"], - ["table": "messages", "schema": "public", "event": "UPDATE"], - ["schema": "public", "table": "messages", "event": "DELETE"], - ], - "presence": ["key": ""], - "private": false, - ], - ] - ) - } - static let messagesSubscribed = Self( joinRef: nil, ref: "2", @@ -328,29 +357,12 @@ extension RealtimeMessageV2 { payload: [ "response": [ "postgres_changes": [ - ["id": 43783255, "event": "INSERT", "schema": "public", "table": "messages"], - ["id": 124973000, "event": "UPDATE", "schema": "public", "table": "messages"], - ["id": 85243397, "event": "DELETE", "schema": "public", "table": "messages"], - ], + ["id": 43_783_255, "event": "INSERT", "schema": "public", "table": "messages"], + ["id": 124_973_000, "event": "UPDATE", "schema": "public", "table": "messages"], + ["id": 85_243_397, "event": "DELETE", "schema": "public", "table": "messages"], + ] ], "status": "ok", ] ) - - static let heartbeatResponse = Self( - joinRef: nil, - ref: "1", - topic: "phoenix", - event: "phx_reply", - payload: [ - "response": [:], - "status": "ok", - ] - ) -} - -struct TestLogger: SupabaseLogger { - func log(message: SupabaseLogMessage) { - print(message.description) - } }