Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions Sources/Realtime/Deprecated.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,76 @@
//

import Foundation
import Helpers

@available(*, deprecated, renamed: "RealtimeMessage")
public typealias Message = RealtimeMessage

extension RealtimeClientV2 {
@available(*, deprecated, renamed: "channels")
public var subscriptions: [String: RealtimeChannelV2] {
channels
}

@available(*, deprecated, renamed: "RealtimeClientOptions")
public struct Configuration: Sendable {
var url: URL
var apiKey: String
var headers: [String: String]
var heartbeatInterval: TimeInterval
var reconnectDelay: TimeInterval
var timeoutInterval: TimeInterval
var disconnectOnSessionLoss: Bool
var connectOnSubscribe: Bool
var logger: (any SupabaseLogger)?

public init(
url: URL,
apiKey: String,
headers: [String: String] = [:],
heartbeatInterval: TimeInterval = 15,
reconnectDelay: TimeInterval = 7,
timeoutInterval: TimeInterval = 10,
disconnectOnSessionLoss: Bool = true,
connectOnSubscribe: Bool = true,
logger: (any SupabaseLogger)? = nil
) {
self.url = url
self.apiKey = apiKey
self.headers = headers
self.heartbeatInterval = heartbeatInterval
self.reconnectDelay = reconnectDelay
self.timeoutInterval = timeoutInterval
self.disconnectOnSessionLoss = disconnectOnSessionLoss
self.connectOnSubscribe = connectOnSubscribe
self.logger = logger
}
}

@available(*, deprecated, renamed: "RealtimeClientStatus")
public typealias Status = RealtimeClientStatus

@available(*, deprecated, renamed: "RealtimeClientV2.init(url:options:)")
public convenience init(config: Configuration) {
self.init(
url: config.url,
options: RealtimeClientOptions(
headers: config.headers,
heartbeatInterval: config.heartbeatInterval,
reconnectDelay: config.reconnectDelay,
timeoutInterval: config.timeoutInterval,
disconnectOnSessionLoss: config.disconnectOnSessionLoss,
connectOnSubscribe: config.connectOnSubscribe,
logger: config.logger
)
)
}
}

extension RealtimeChannelV2 {
@available(*, deprecated, renamed: "RealtimeSubscription")
public typealias Subscription = ObservationToken

@available(*, deprecated, renamed: "RealtimeChannelStatus")
public typealias Status = RealtimeChannelStatus
}
12 changes: 0 additions & 12 deletions Sources/Realtime/RealtimeChannel+AsyncAwait.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,6 @@ extension RealtimeChannelV2 {
.eraseToStream()
}

/// Listen for postgres changes in a channel.
public func postgresChange(
_: SelectAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<SelectAction> {
postgresChange(event: .select, schema: schema, table: table, filter: filter)
.compactMap { $0.wrappedAction as? SelectAction }
.eraseToStream()
}

/// Listen for postgres changes in a channel.
public func postgresChange(
_: AnyAction.Type,
Expand Down
11 changes: 0 additions & 11 deletions Sources/Realtime/V2/PostgresAction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,18 @@ public struct DeleteAction: PostgresAction, HasOldRecord, HasRawMessage {
public let rawMessage: RealtimeMessageV2
}

public struct SelectAction: PostgresAction, HasRecord, HasRawMessage {
public static let eventType: PostgresChangeEvent = .select

public let columns: [Column]
public let commitTimestamp: Date
public let record: [String: AnyJSON]
public let rawMessage: RealtimeMessageV2
}

public enum AnyAction: PostgresAction, HasRawMessage {
public static let eventType: PostgresChangeEvent = .all

case insert(InsertAction)
case update(UpdateAction)
case delete(DeleteAction)
case select(SelectAction)

var wrappedAction: any PostgresAction & HasRawMessage {
switch self {
case let .insert(action): action
case let .update(action): action
case let .delete(action): action
case let .select(action): action
}
}

Expand Down
69 changes: 24 additions & 45 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public struct RealtimeChannelConfig: Sendable {

struct Socket: Sendable {
var broadcastURL: @Sendable () -> URL
var status: @Sendable () -> RealtimeClientV2.Status
var status: @Sendable () -> RealtimeClientStatus
var options: @Sendable () -> RealtimeClientOptions
var accessToken: @Sendable () -> String?
var apiKey: @Sendable () -> String?
Expand Down Expand Up @@ -64,16 +64,6 @@ extension Socket {
}

public final class RealtimeChannelV2: Sendable {
@available(*, deprecated, renamed: "RealtimeSubscription")
public typealias Subscription = ObservationToken

public enum Status: Sendable {
case unsubscribed
case subscribing
case subscribed
case unsubscribing
}

struct MutableState {
var clientChanges: [PostgresJoinConfig] = []
var joinRef: String?
Expand All @@ -88,14 +78,14 @@ public final class RealtimeChannelV2: Sendable {
let socket: Socket

let callbackManager = CallbackManager()
private let statusEventEmitter = EventEmitter<Status>(initialEvent: .unsubscribed)
private let statusEventEmitter = EventEmitter<RealtimeChannelStatus>(initialEvent: .unsubscribed)

public private(set) var status: Status {
public private(set) var status: RealtimeChannelStatus {
get { statusEventEmitter.lastEvent }
set { statusEventEmitter.emit(newValue) }
}

public var statusChange: AsyncStream<Status> {
public var statusChange: AsyncStream<RealtimeChannelStatus> {
statusEventEmitter.stream()
}

Expand All @@ -105,7 +95,7 @@ public final class RealtimeChannelV2: Sendable {
///
/// - Note: Use ``statusChange`` if you prefer to use Async/Await.
public func onStatusChange(
_ listener: @escaping @Sendable (Status) -> Void
_ listener: @escaping @Sendable (RealtimeChannelStatus) -> Void
) -> ObservationToken {
statusEventEmitter.attach(listener)
}
Expand Down Expand Up @@ -137,10 +127,15 @@ public final class RealtimeChannelV2: Sendable {
await socket.connect()
}

guard status != .subscribed else {
logger?.warning("Channel \(topic) is already subscribed")
return
}

socket.addChannel(self)

status = .subscribing
logger?.debug("subscribing to channel \(topic)")
logger?.debug("Subscribing to channel \(topic)")

let joinConfig = RealtimeJoinConfig(
broadcast: config.broadcast,
Expand All @@ -157,7 +152,7 @@ public final class RealtimeChannelV2: Sendable {
let joinRef = socket.makeRef().description
mutableState.withValue { $0.joinRef = joinRef }

logger?.debug("subscribing to channel with body: \(joinConfig)")
logger?.debug("Subscribing to channel with body: \(joinConfig)")

await push(
RealtimeMessageV2(
Expand All @@ -175,17 +170,17 @@ public final class RealtimeChannelV2: Sendable {
}
} catch {
if error is TimeoutError {
logger?.debug("subscribe timed out.")
logger?.debug("Subscribe timed out.")
await subscribe()
} else {
logger?.error("subscribe failed: \(error)")
logger?.error("Subscribe failed: \(error)")
}
}
}

public func unsubscribe() async {
status = .unsubscribing
logger?.debug("unsubscribing from channel \(topic)")
logger?.debug("Unsubscribing from channel \(topic)")

await push(
RealtimeMessageV2(
Expand Down Expand Up @@ -324,7 +319,7 @@ public final class RealtimeChannelV2: Sendable {
)
}

func onMessage(_ message: RealtimeMessageV2) {
func onMessage(_ message: RealtimeMessageV2) async {
do {
guard let eventType = message.eventType else {
logger?.debug("Received message without event type: \(message)")
Expand All @@ -349,7 +344,7 @@ public final class RealtimeChannelV2: Sendable {
throw RealtimeError("Received a reply with unexpected payload: \(message)")
}

didReceiveReply(ref: ref, status: status)
await didReceiveReply(ref: ref, status: status)

if message.payload["response"]?.objectValue?.keys
.contains(ChannelEvent.postgresChanges) == true
Expand Down Expand Up @@ -409,16 +404,6 @@ public final class RealtimeChannelV2: Sendable {
)
)

case "SELECT":
action = .select(
SelectAction(
columns: postgresActions.columns,
commitTimestamp: postgresActions.commitTimestamp,
record: postgresActions.record ?? [:],
rawMessage: message
)
)

default:
throw RealtimeError("Unknown event type: \(postgresActions.type)")
}
Expand All @@ -435,13 +420,9 @@ public final class RealtimeChannelV2: Sendable {
callbackManager.triggerBroadcast(event: event, json: payload)

case .close:
Task { [weak self] in
guard let self else { return }

await socket.removeChannel(self)
logger?.debug("Unsubscribed from channel \(message.topic)")
status = .unsubscribed
}
await socket.removeChannel(self)
logger?.debug("Unsubscribed from channel \(message.topic)")
status = .unsubscribed

case .error:
logger?.debug(
Expand Down Expand Up @@ -601,12 +582,10 @@ public final class RealtimeChannelV2: Sendable {
return await push.send()
}

private func didReceiveReply(ref: String, status: String) {
Task {
let push = mutableState.withValue {
$0.pushes.removeValue(forKey: ref)
}
await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
private func didReceiveReply(ref: String, status: String) async {
let push = mutableState.withValue {
$0.pushes.removeValue(forKey: ref)
}
await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
}
}
Loading
Loading