diff --git a/Sources/Realtime/Deprecated.swift b/Sources/Realtime/Deprecated.swift index 3adedca48..27d32f91a 100644 --- a/Sources/Realtime/Deprecated.swift +++ b/Sources/Realtime/Deprecated.swift @@ -6,6 +6,76 @@ // import Foundation +import Helpers @available(*, deprecated, renamed: "RealtimeMessage") public typealias Message = RealtimeMessage + +extension RealtimeClientV2 { + @available(*, deprecated, renamed: "channels") + public var subscriptions: [String: RealtimeChannelV2] { + channels + } + + @available(*, deprecated, renamed: "RealtimeClientOptions") + public struct Configuration: Sendable { + var url: URL + var apiKey: String + var headers: [String: String] + var heartbeatInterval: TimeInterval + var reconnectDelay: TimeInterval + var timeoutInterval: TimeInterval + var disconnectOnSessionLoss: Bool + var connectOnSubscribe: Bool + var logger: (any SupabaseLogger)? + + public init( + url: URL, + apiKey: String, + headers: [String: String] = [:], + heartbeatInterval: TimeInterval = 15, + reconnectDelay: TimeInterval = 7, + timeoutInterval: TimeInterval = 10, + disconnectOnSessionLoss: Bool = true, + connectOnSubscribe: Bool = true, + logger: (any SupabaseLogger)? = nil + ) { + self.url = url + self.apiKey = apiKey + self.headers = headers + self.heartbeatInterval = heartbeatInterval + self.reconnectDelay = reconnectDelay + self.timeoutInterval = timeoutInterval + self.disconnectOnSessionLoss = disconnectOnSessionLoss + self.connectOnSubscribe = connectOnSubscribe + self.logger = logger + } + } + + @available(*, deprecated, renamed: "RealtimeClientStatus") + public typealias Status = RealtimeClientStatus + + @available(*, deprecated, renamed: "RealtimeClientV2.init(url:options:)") + public convenience init(config: Configuration) { + self.init( + url: config.url, + options: RealtimeClientOptions( + headers: config.headers, + heartbeatInterval: config.heartbeatInterval, + reconnectDelay: config.reconnectDelay, + timeoutInterval: config.timeoutInterval, + disconnectOnSessionLoss: config.disconnectOnSessionLoss, + connectOnSubscribe: config.connectOnSubscribe, + logger: config.logger + ) + ) + } +} + +extension RealtimeChannelV2 { + @available(*, deprecated, renamed: "RealtimeSubscription") + public typealias Subscription = ObservationToken + + @available(*, deprecated, renamed: "RealtimeChannelStatus") + public typealias Status = RealtimeChannelStatus +} diff --git a/Sources/Realtime/RealtimeChannel+AsyncAwait.swift b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift index e5e85244e..02f3fc89a 100644 --- a/Sources/Realtime/RealtimeChannel+AsyncAwait.swift +++ b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift @@ -59,18 +59,6 @@ extension RealtimeChannelV2 { .eraseToStream() } - /// Listen for postgres changes in a channel. - public func postgresChange( - _: SelectAction.Type, - schema: String = "public", - table: String? = nil, - filter: String? = nil - ) -> AsyncStream { - postgresChange(event: .select, schema: schema, table: table, filter: filter) - .compactMap { $0.wrappedAction as? SelectAction } - .eraseToStream() - } - /// Listen for postgres changes in a channel. public func postgresChange( _: AnyAction.Type, diff --git a/Sources/Realtime/V2/PostgresAction.swift b/Sources/Realtime/V2/PostgresAction.swift index 75f9758a6..320fea96f 100644 --- a/Sources/Realtime/V2/PostgresAction.swift +++ b/Sources/Realtime/V2/PostgresAction.swift @@ -56,29 +56,18 @@ public struct DeleteAction: PostgresAction, HasOldRecord, HasRawMessage { public let rawMessage: RealtimeMessageV2 } -public struct SelectAction: PostgresAction, HasRecord, HasRawMessage { - public static let eventType: PostgresChangeEvent = .select - - public let columns: [Column] - public let commitTimestamp: Date - public let record: [String: AnyJSON] - public let rawMessage: RealtimeMessageV2 -} - public enum AnyAction: PostgresAction, HasRawMessage { public static let eventType: PostgresChangeEvent = .all case insert(InsertAction) case update(UpdateAction) case delete(DeleteAction) - case select(SelectAction) var wrappedAction: any PostgresAction & HasRawMessage { switch self { case let .insert(action): action case let .update(action): action case let .delete(action): action - case let .select(action): action } } diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index c7d9a72e0..431a2ddf6 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -32,7 +32,7 @@ public struct RealtimeChannelConfig: Sendable { struct Socket: Sendable { var broadcastURL: @Sendable () -> URL - var status: @Sendable () -> RealtimeClientV2.Status + var status: @Sendable () -> RealtimeClientStatus var options: @Sendable () -> RealtimeClientOptions var accessToken: @Sendable () -> String? var apiKey: @Sendable () -> String? @@ -64,16 +64,6 @@ extension Socket { } public final class RealtimeChannelV2: Sendable { - @available(*, deprecated, renamed: "RealtimeSubscription") - public typealias Subscription = ObservationToken - - public enum Status: Sendable { - case unsubscribed - case subscribing - case subscribed - case unsubscribing - } - struct MutableState { var clientChanges: [PostgresJoinConfig] = [] var joinRef: String? @@ -88,14 +78,14 @@ public final class RealtimeChannelV2: Sendable { let socket: Socket let callbackManager = CallbackManager() - private let statusEventEmitter = EventEmitter(initialEvent: .unsubscribed) + private let statusEventEmitter = EventEmitter(initialEvent: .unsubscribed) - public private(set) var status: Status { + public private(set) var status: RealtimeChannelStatus { get { statusEventEmitter.lastEvent } set { statusEventEmitter.emit(newValue) } } - public var statusChange: AsyncStream { + public var statusChange: AsyncStream { statusEventEmitter.stream() } @@ -105,7 +95,7 @@ public final class RealtimeChannelV2: Sendable { /// /// - Note: Use ``statusChange`` if you prefer to use Async/Await. public func onStatusChange( - _ listener: @escaping @Sendable (Status) -> Void + _ listener: @escaping @Sendable (RealtimeChannelStatus) -> Void ) -> ObservationToken { statusEventEmitter.attach(listener) } @@ -137,10 +127,15 @@ public final class RealtimeChannelV2: Sendable { await socket.connect() } + guard status != .subscribed else { + logger?.warning("Channel \(topic) is already subscribed") + return + } + socket.addChannel(self) status = .subscribing - logger?.debug("subscribing to channel \(topic)") + logger?.debug("Subscribing to channel \(topic)") let joinConfig = RealtimeJoinConfig( broadcast: config.broadcast, @@ -157,7 +152,7 @@ public final class RealtimeChannelV2: Sendable { let joinRef = socket.makeRef().description mutableState.withValue { $0.joinRef = joinRef } - logger?.debug("subscribing to channel with body: \(joinConfig)") + logger?.debug("Subscribing to channel with body: \(joinConfig)") await push( RealtimeMessageV2( @@ -175,17 +170,17 @@ public final class RealtimeChannelV2: Sendable { } } catch { if error is TimeoutError { - logger?.debug("subscribe timed out.") + logger?.debug("Subscribe timed out.") await subscribe() } else { - logger?.error("subscribe failed: \(error)") + logger?.error("Subscribe failed: \(error)") } } } public func unsubscribe() async { status = .unsubscribing - logger?.debug("unsubscribing from channel \(topic)") + logger?.debug("Unsubscribing from channel \(topic)") await push( RealtimeMessageV2( @@ -324,7 +319,7 @@ public final class RealtimeChannelV2: Sendable { ) } - func onMessage(_ message: RealtimeMessageV2) { + func onMessage(_ message: RealtimeMessageV2) async { do { guard let eventType = message.eventType else { logger?.debug("Received message without event type: \(message)") @@ -349,7 +344,7 @@ public final class RealtimeChannelV2: Sendable { throw RealtimeError("Received a reply with unexpected payload: \(message)") } - didReceiveReply(ref: ref, status: status) + await didReceiveReply(ref: ref, status: status) if message.payload["response"]?.objectValue?.keys .contains(ChannelEvent.postgresChanges) == true @@ -409,16 +404,6 @@ public final class RealtimeChannelV2: Sendable { ) ) - case "SELECT": - action = .select( - SelectAction( - columns: postgresActions.columns, - commitTimestamp: postgresActions.commitTimestamp, - record: postgresActions.record ?? [:], - rawMessage: message - ) - ) - default: throw RealtimeError("Unknown event type: \(postgresActions.type)") } @@ -435,13 +420,9 @@ public final class RealtimeChannelV2: Sendable { callbackManager.triggerBroadcast(event: event, json: payload) case .close: - Task { [weak self] in - guard let self else { return } - - await socket.removeChannel(self) - logger?.debug("Unsubscribed from channel \(message.topic)") - status = .unsubscribed - } + await socket.removeChannel(self) + logger?.debug("Unsubscribed from channel \(message.topic)") + status = .unsubscribed case .error: logger?.debug( @@ -601,12 +582,10 @@ public final class RealtimeChannelV2: Sendable { return await push.send() } - private func didReceiveReply(ref: String, status: String) { - Task { - let push = mutableState.withValue { - $0.pushes.removeValue(forKey: ref) - } - await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok) + private func didReceiveReply(ref: String, status: String) async { + let push = mutableState.withValue { + $0.pushes.removeValue(forKey: ref) } + await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok) } } diff --git a/Sources/Realtime/V2/RealtimeClientV2.swift b/Sources/Realtime/V2/RealtimeClientV2.swift index 8f09e2a85..8c5d7a395 100644 --- a/Sources/Realtime/V2/RealtimeClientV2.swift +++ b/Sources/Realtime/V2/RealtimeClientV2.swift @@ -16,55 +16,6 @@ import Helpers public typealias JSONObject = Helpers.JSONObject public final class RealtimeClientV2: Sendable { - @available(*, deprecated, renamed: "RealtimeClientOptions") - public struct Configuration: Sendable { - var url: URL - var apiKey: String - var headers: [String: String] - var heartbeatInterval: TimeInterval - var reconnectDelay: TimeInterval - var timeoutInterval: TimeInterval - var disconnectOnSessionLoss: Bool - var connectOnSubscribe: Bool - var logger: (any SupabaseLogger)? - - public init( - url: URL, - apiKey: String, - headers: [String: String] = [:], - heartbeatInterval: TimeInterval = 15, - reconnectDelay: TimeInterval = 7, - timeoutInterval: TimeInterval = 10, - disconnectOnSessionLoss: Bool = true, - connectOnSubscribe: Bool = true, - logger: (any SupabaseLogger)? = nil - ) { - self.url = url - self.apiKey = apiKey - self.headers = headers - self.heartbeatInterval = heartbeatInterval - self.reconnectDelay = reconnectDelay - self.timeoutInterval = timeoutInterval - self.disconnectOnSessionLoss = disconnectOnSessionLoss - self.connectOnSubscribe = connectOnSubscribe - self.logger = logger - } - } - - public enum Status: Sendable, CustomStringConvertible { - case disconnected - case connecting - case connected - - public var description: String { - switch self { - case .disconnected: "Disconnected" - case .connecting: "Connecting" - case .connected: "Connected" - } - } - } - struct MutableState { var accessToken: String? var ref = 0 @@ -72,7 +23,8 @@ public final class RealtimeClientV2: Sendable { var heartbeatTask: Task? var messageTask: Task? var connectionTask: Task? - var subscriptions: [String: RealtimeChannelV2] = [:] + var channels: [String: RealtimeChannelV2] = [:] + var sendBuffer: [@Sendable () async -> Void] = [] } let url: URL @@ -82,21 +34,21 @@ public final class RealtimeClientV2: Sendable { let http: any HTTPClientType let apikey: String? - public var subscriptions: [String: RealtimeChannelV2] { - mutableState.subscriptions + public var channels: [String: RealtimeChannelV2] { + mutableState.channels } - private let statusEventEmitter = EventEmitter(initialEvent: .disconnected) + private let statusEventEmitter = EventEmitter(initialEvent: .disconnected) /// AsyncStream that emits when connection status change. /// /// You can also use ``onStatusChange(_:)`` for a closure based method. - public var statusChange: AsyncStream { + public var statusChange: AsyncStream { statusEventEmitter.stream() } /// The current connection status. - public private(set) var status: Status { + public private(set) var status: RealtimeClientStatus { get { statusEventEmitter.lastEvent } set { statusEventEmitter.emit(newValue) } } @@ -107,27 +59,11 @@ public final class RealtimeClientV2: Sendable { /// /// - Note: Use ``statusChange`` if you prefer to use Async/Await. public func onStatusChange( - _ listener: @escaping @Sendable (Status) -> Void + _ listener: @escaping @Sendable (RealtimeClientStatus) -> Void ) -> RealtimeSubscription { statusEventEmitter.attach(listener) } - @available(*, deprecated, renamed: "RealtimeClientV2.init(url:options:)") - public convenience init(config: Configuration) { - self.init( - url: config.url, - options: RealtimeClientOptions( - headers: config.headers, - heartbeatInterval: config.heartbeatInterval, - reconnectDelay: config.reconnectDelay, - timeoutInterval: config.timeoutInterval, - disconnectOnSessionLoss: config.disconnectOnSessionLoss, - connectOnSubscribe: config.connectOnSubscribe, - logger: config.logger - ) - ) - } - public convenience init(url: URL, options: RealtimeClientOptions) { var interceptors: [any HTTPClientInterceptor] = [] @@ -173,7 +109,7 @@ public final class RealtimeClientV2: Sendable { mutableState.withValue { $0.heartbeatTask?.cancel() $0.messageTask?.cancel() - $0.subscriptions = [:] + $0.channels = [:] } } @@ -237,6 +173,8 @@ public final class RealtimeClientV2: Sendable { if reconnect { await rejoinChannels() } + + await flushSendBuffer() } private func onDisconnected() async { @@ -281,7 +219,7 @@ public final class RealtimeClientV2: Sendable { public func addChannel(_ channel: RealtimeChannelV2) { mutableState.withValue { - $0.subscriptions[channel.topic] = channel + $0.channels[channel.topic] = channel } } @@ -291,24 +229,34 @@ public final class RealtimeClientV2: Sendable { } mutableState.withValue { - $0.subscriptions[channel.topic] = nil + $0.channels[channel.topic] = nil } - if subscriptions.isEmpty { + if channels.isEmpty { options.logger?.debug("No more subscribed channel in socket") disconnect() } } public func removeAllChannels() async { - for channel in subscriptions.values { - await removeChannel(channel) + await withTaskGroup(of: Void.self) { group in + for channel in channels.values { + group.addTask { await self.removeChannel(channel) } + } + + await group.waitForAll() } } private func rejoinChannels() async { - for channel in subscriptions.values { - await channel.subscribe() + await withTaskGroup(of: Void.self) { group in + for channel in channels.values { + group.addTask { + await channel.subscribe() + } + } + + await group.waitForAll() } } @@ -398,7 +346,7 @@ public final class RealtimeClientV2: Sendable { $0.accessToken = token } - for channel in subscriptions.values { + for channel in channels.values { if let token, channel.status == .subscribed { await channel.updateAuth(jwt: token) } @@ -406,8 +354,8 @@ public final class RealtimeClientV2: Sendable { } private func onMessage(_ message: RealtimeMessageV2) async { - mutableState.withValue { - let channel = $0.subscriptions[message.topic] + let channel = mutableState.withValue { + let channel = $0.channels[message.topic] if let ref = message.ref, Int(ref) == $0.pendingHeartbeatRef { $0.pendingHeartbeatRef = nil @@ -415,29 +363,55 @@ public final class RealtimeClientV2: Sendable { } else { options.logger? .debug("Received event \(message.event) for channel \(channel?.topic ?? "null")") - channel?.onMessage(message) } + return channel + } + + if let channel { + await channel.onMessage(message) + } else { + options.logger?.warning("No channel subscribed to \(message.topic). Ignoring message.") } } /// Push out a message if the socket is connected. - /// - Parameter message: The message to push through the socket. + /// + /// If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established. public func push(_ message: RealtimeMessageV2) async { - guard status == .connected else { - options.logger?.warning("Trying to push a message while socket is not connected. This is not supported yet.") - return + let callback = { @Sendable [weak self] in + do { + // Check cancellation before sending, because this push may have been cancelled before a connection was established. + try Task.checkCancellation() + try await self?.ws.send(message) + } catch { + self?.options.logger?.error(""" + Failed to send message: + \(message) + + Error: + \(error) + """) + } + } + + if status == .connected { + await callback() + } else { + mutableState.withValue { + $0.sendBuffer.append(callback) + } } + } - do { - try await ws.send(message) - } catch { - options.logger?.debug(""" - Failed to send message: - \(message) + private func flushSendBuffer() async { + let sendBuffer = mutableState.withValue { + let copy = $0.sendBuffer + $0.sendBuffer = [] + return copy + } - Error: - \(error) - """) + for send in sendBuffer { + await send() } } diff --git a/Sources/Realtime/V2/RealtimeJoinConfig.swift b/Sources/Realtime/V2/RealtimeJoinConfig.swift index 1b510494f..55f865304 100644 --- a/Sources/Realtime/V2/RealtimeJoinConfig.swift +++ b/Sources/Realtime/V2/RealtimeJoinConfig.swift @@ -52,7 +52,6 @@ public enum PostgresChangeEvent: String, Codable, Sendable { case insert = "INSERT" case update = "UPDATE" case delete = "DELETE" - case select = "SELECT" case all = "*" } diff --git a/Sources/Realtime/V2/Types.swift b/Sources/Realtime/V2/Types.swift index 7ca5a6a17..52e7f80b9 100644 --- a/Sources/Realtime/V2/Types.swift +++ b/Sources/Realtime/V2/Types.swift @@ -62,3 +62,24 @@ public struct RealtimeClientOptions: Sendable { } public typealias RealtimeSubscription = ObservationToken + +public enum RealtimeChannelStatus: Sendable { + case unsubscribed + case subscribing + case subscribed + case unsubscribing +} + +public enum RealtimeClientStatus: Sendable, CustomStringConvertible { + case disconnected + case connecting + case connected + + public var description: String { + switch self { + case .disconnected: "Disconnected" + case .connecting: "Connecting" + case .connected: "Connected" + } + } +} diff --git a/Tests/RealtimeTests/RealtimeChannelTests.swift b/Tests/RealtimeTests/RealtimeChannelTests.swift index baad47460..b260f40db 100644 --- a/Tests/RealtimeTests/RealtimeChannelTests.swift +++ b/Tests/RealtimeTests/RealtimeChannelTests.swift @@ -11,38 +11,58 @@ import XCTest import XCTestDynamicOverlay final class RealtimeChannelTests: XCTestCase { - var sut: RealtimeChannelV2! + let sut = RealtimeChannelV2( + topic: "topic", + config: RealtimeChannelConfig( + broadcast: BroadcastJoinConfig(), + presence: PresenceJoinConfig(), + isPrivate: false + ), + socket: .mock, + logger: nil + ) - func testOnPostgresChange() { - sut = RealtimeChannelV2( - topic: "topic", - config: RealtimeChannelConfig( - broadcast: BroadcastJoinConfig(), - presence: PresenceJoinConfig(), - isPrivate: false - ), - socket: .mock, - logger: nil - ) - var subscriptions = Set() - sut.onPostgresChange(AnyAction.self) { _ in }.store(in: &subscriptions) - sut.onPostgresChange(InsertAction.self) { _ in }.store(in: &subscriptions) - sut.onPostgresChange(UpdateAction.self) { _ in }.store(in: &subscriptions) - sut.onPostgresChange(DeleteAction.self) { _ in }.store(in: &subscriptions) + func testAttachCallbacks() { + var subscriptions = Set() + + sut.onPostgresChange( + AnyAction.self, + schema: "public", + table: "users", + filter: "id=eq.1" + ) { _ in }.store(in: &subscriptions) + sut.onPostgresChange( + InsertAction.self, + schema: "private" + ) { _ in }.store(in: &subscriptions) + sut.onPostgresChange( + UpdateAction.self, + table: "messages" + ) { _ in }.store(in: &subscriptions) + sut.onPostgresChange( + DeleteAction.self + ) { _ in }.store(in: &subscriptions) + + sut.onBroadcast(event: "test") { _ in }.store(in: &subscriptions) + sut.onBroadcast(event: "cursor-pos") { _ in }.store(in: &subscriptions) + + sut.onPresenceChange { _ in }.store(in: &subscriptions) assertInlineSnapshot(of: sut.callbackManager.callbacks, as: .dump) { """ - ▿ 4 elements + ▿ 7 elements ▿ RealtimeCallback ▿ postgres: PostgresCallback - callback: (Function) ▿ filter: PostgresJoinConfig ▿ event: Optional - some: PostgresChangeEvent.all - - filter: Optional.none + ▿ filter: Optional + - some: "id=eq.1" - id: 0 - schema: "public" - - table: Optional.none + ▿ table: Optional + - some: "users" - id: 1 ▿ RealtimeCallback ▿ postgres: PostgresCallback @@ -52,7 +72,7 @@ final class RealtimeChannelTests: XCTestCase { - some: PostgresChangeEvent.insert - filter: Optional.none - id: 0 - - schema: "public" + - schema: "private" - table: Optional.none - id: 2 ▿ RealtimeCallback @@ -64,7 +84,8 @@ final class RealtimeChannelTests: XCTestCase { - filter: Optional.none - id: 0 - schema: "public" - - table: Optional.none + ▿ table: Optional + - some: "messages" - id: 3 ▿ RealtimeCallback ▿ postgres: PostgresCallback @@ -77,6 +98,20 @@ final class RealtimeChannelTests: XCTestCase { - schema: "public" - table: Optional.none - id: 4 + ▿ RealtimeCallback + ▿ broadcast: BroadcastCallback + - callback: (Function) + - event: "test" + - id: 5 + ▿ RealtimeCallback + ▿ broadcast: BroadcastCallback + - callback: (Function) + - event: "cursor-pos" + - id: 6 + ▿ RealtimeCallback + ▿ presence: PresenceCallback + - callback: (Function) + - id: 7 """ } diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 405fdb37b..64c0b89ff 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -63,7 +63,7 @@ final class RealtimeTests: XCTestCase { } .store(in: &subscriptions) - let socketStatuses = LockIsolated([RealtimeClientV2.Status]()) + let socketStatuses = LockIsolated([RealtimeClientStatus]()) sut.onStatusChange { status in socketStatuses.withValue { $0.append(status) } @@ -80,7 +80,7 @@ final class RealtimeTests: XCTestCase { let heartbeatTask = sut.mutableState.heartbeatTask XCTAssertNotNil(heartbeatTask) - let channelStatuses = LockIsolated([RealtimeChannelV2.Status]()) + let channelStatuses = LockIsolated([RealtimeChannelStatus]()) channel.onStatusChange { status in channelStatuses.withValue { $0.append(status) @@ -205,7 +205,7 @@ final class RealtimeTests: XCTestCase { return nil } - let statuses = LockIsolated<[RealtimeClientV2.Status]>([]) + let statuses = LockIsolated<[RealtimeClientStatus]>([]) Task { for await status in sut.statusChange {