Skip to content

Commit bfb6620

Browse files
authored
refactor(realtime): general improvements (#552)
* refactor(realtime): general improvements for realtime * remove select action * add more tests
1 parent 494e8f4 commit bfb6620

File tree

9 files changed

+246
-191
lines changed

9 files changed

+246
-191
lines changed

Sources/Realtime/Deprecated.swift

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,76 @@
66
//
77

88
import Foundation
9+
import Helpers
910

1011
@available(*, deprecated, renamed: "RealtimeMessage")
1112
public typealias Message = RealtimeMessage
13+
14+
extension RealtimeClientV2 {
15+
@available(*, deprecated, renamed: "channels")
16+
public var subscriptions: [String: RealtimeChannelV2] {
17+
channels
18+
}
19+
20+
@available(*, deprecated, renamed: "RealtimeClientOptions")
21+
public struct Configuration: Sendable {
22+
var url: URL
23+
var apiKey: String
24+
var headers: [String: String]
25+
var heartbeatInterval: TimeInterval
26+
var reconnectDelay: TimeInterval
27+
var timeoutInterval: TimeInterval
28+
var disconnectOnSessionLoss: Bool
29+
var connectOnSubscribe: Bool
30+
var logger: (any SupabaseLogger)?
31+
32+
public init(
33+
url: URL,
34+
apiKey: String,
35+
headers: [String: String] = [:],
36+
heartbeatInterval: TimeInterval = 15,
37+
reconnectDelay: TimeInterval = 7,
38+
timeoutInterval: TimeInterval = 10,
39+
disconnectOnSessionLoss: Bool = true,
40+
connectOnSubscribe: Bool = true,
41+
logger: (any SupabaseLogger)? = nil
42+
) {
43+
self.url = url
44+
self.apiKey = apiKey
45+
self.headers = headers
46+
self.heartbeatInterval = heartbeatInterval
47+
self.reconnectDelay = reconnectDelay
48+
self.timeoutInterval = timeoutInterval
49+
self.disconnectOnSessionLoss = disconnectOnSessionLoss
50+
self.connectOnSubscribe = connectOnSubscribe
51+
self.logger = logger
52+
}
53+
}
54+
55+
@available(*, deprecated, renamed: "RealtimeClientStatus")
56+
public typealias Status = RealtimeClientStatus
57+
58+
@available(*, deprecated, renamed: "RealtimeClientV2.init(url:options:)")
59+
public convenience init(config: Configuration) {
60+
self.init(
61+
url: config.url,
62+
options: RealtimeClientOptions(
63+
headers: config.headers,
64+
heartbeatInterval: config.heartbeatInterval,
65+
reconnectDelay: config.reconnectDelay,
66+
timeoutInterval: config.timeoutInterval,
67+
disconnectOnSessionLoss: config.disconnectOnSessionLoss,
68+
connectOnSubscribe: config.connectOnSubscribe,
69+
logger: config.logger
70+
)
71+
)
72+
}
73+
}
74+
75+
extension RealtimeChannelV2 {
76+
@available(*, deprecated, renamed: "RealtimeSubscription")
77+
public typealias Subscription = ObservationToken
78+
79+
@available(*, deprecated, renamed: "RealtimeChannelStatus")
80+
public typealias Status = RealtimeChannelStatus
81+
}

Sources/Realtime/RealtimeChannel+AsyncAwait.swift

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,6 @@ extension RealtimeChannelV2 {
5959
.eraseToStream()
6060
}
6161

62-
/// Listen for postgres changes in a channel.
63-
public func postgresChange(
64-
_: SelectAction.Type,
65-
schema: String = "public",
66-
table: String? = nil,
67-
filter: String? = nil
68-
) -> AsyncStream<SelectAction> {
69-
postgresChange(event: .select, schema: schema, table: table, filter: filter)
70-
.compactMap { $0.wrappedAction as? SelectAction }
71-
.eraseToStream()
72-
}
73-
7462
/// Listen for postgres changes in a channel.
7563
public func postgresChange(
7664
_: AnyAction.Type,

Sources/Realtime/V2/PostgresAction.swift

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,29 +56,18 @@ public struct DeleteAction: PostgresAction, HasOldRecord, HasRawMessage {
5656
public let rawMessage: RealtimeMessageV2
5757
}
5858

59-
public struct SelectAction: PostgresAction, HasRecord, HasRawMessage {
60-
public static let eventType: PostgresChangeEvent = .select
61-
62-
public let columns: [Column]
63-
public let commitTimestamp: Date
64-
public let record: [String: AnyJSON]
65-
public let rawMessage: RealtimeMessageV2
66-
}
67-
6859
public enum AnyAction: PostgresAction, HasRawMessage {
6960
public static let eventType: PostgresChangeEvent = .all
7061

7162
case insert(InsertAction)
7263
case update(UpdateAction)
7364
case delete(DeleteAction)
74-
case select(SelectAction)
7565

7666
var wrappedAction: any PostgresAction & HasRawMessage {
7767
switch self {
7868
case let .insert(action): action
7969
case let .update(action): action
8070
case let .delete(action): action
81-
case let .select(action): action
8271
}
8372
}
8473

Sources/Realtime/V2/RealtimeChannelV2.swift

Lines changed: 24 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public struct RealtimeChannelConfig: Sendable {
3232

3333
struct Socket: Sendable {
3434
var broadcastURL: @Sendable () -> URL
35-
var status: @Sendable () -> RealtimeClientV2.Status
35+
var status: @Sendable () -> RealtimeClientStatus
3636
var options: @Sendable () -> RealtimeClientOptions
3737
var accessToken: @Sendable () -> String?
3838
var apiKey: @Sendable () -> String?
@@ -64,16 +64,6 @@ extension Socket {
6464
}
6565

6666
public final class RealtimeChannelV2: Sendable {
67-
@available(*, deprecated, renamed: "RealtimeSubscription")
68-
public typealias Subscription = ObservationToken
69-
70-
public enum Status: Sendable {
71-
case unsubscribed
72-
case subscribing
73-
case subscribed
74-
case unsubscribing
75-
}
76-
7767
struct MutableState {
7868
var clientChanges: [PostgresJoinConfig] = []
7969
var joinRef: String?
@@ -88,14 +78,14 @@ public final class RealtimeChannelV2: Sendable {
8878
let socket: Socket
8979

9080
let callbackManager = CallbackManager()
91-
private let statusEventEmitter = EventEmitter<Status>(initialEvent: .unsubscribed)
81+
private let statusEventEmitter = EventEmitter<RealtimeChannelStatus>(initialEvent: .unsubscribed)
9282

93-
public private(set) var status: Status {
83+
public private(set) var status: RealtimeChannelStatus {
9484
get { statusEventEmitter.lastEvent }
9585
set { statusEventEmitter.emit(newValue) }
9686
}
9787

98-
public var statusChange: AsyncStream<Status> {
88+
public var statusChange: AsyncStream<RealtimeChannelStatus> {
9989
statusEventEmitter.stream()
10090
}
10191

@@ -105,7 +95,7 @@ public final class RealtimeChannelV2: Sendable {
10595
///
10696
/// - Note: Use ``statusChange`` if you prefer to use Async/Await.
10797
public func onStatusChange(
108-
_ listener: @escaping @Sendable (Status) -> Void
98+
_ listener: @escaping @Sendable (RealtimeChannelStatus) -> Void
10999
) -> ObservationToken {
110100
statusEventEmitter.attach(listener)
111101
}
@@ -137,10 +127,15 @@ public final class RealtimeChannelV2: Sendable {
137127
await socket.connect()
138128
}
139129

130+
guard status != .subscribed else {
131+
logger?.warning("Channel \(topic) is already subscribed")
132+
return
133+
}
134+
140135
socket.addChannel(self)
141136

142137
status = .subscribing
143-
logger?.debug("subscribing to channel \(topic)")
138+
logger?.debug("Subscribing to channel \(topic)")
144139

145140
let joinConfig = RealtimeJoinConfig(
146141
broadcast: config.broadcast,
@@ -157,7 +152,7 @@ public final class RealtimeChannelV2: Sendable {
157152
let joinRef = socket.makeRef().description
158153
mutableState.withValue { $0.joinRef = joinRef }
159154

160-
logger?.debug("subscribing to channel with body: \(joinConfig)")
155+
logger?.debug("Subscribing to channel with body: \(joinConfig)")
161156

162157
await push(
163158
RealtimeMessageV2(
@@ -175,17 +170,17 @@ public final class RealtimeChannelV2: Sendable {
175170
}
176171
} catch {
177172
if error is TimeoutError {
178-
logger?.debug("subscribe timed out.")
173+
logger?.debug("Subscribe timed out.")
179174
await subscribe()
180175
} else {
181-
logger?.error("subscribe failed: \(error)")
176+
logger?.error("Subscribe failed: \(error)")
182177
}
183178
}
184179
}
185180

186181
public func unsubscribe() async {
187182
status = .unsubscribing
188-
logger?.debug("unsubscribing from channel \(topic)")
183+
logger?.debug("Unsubscribing from channel \(topic)")
189184

190185
await push(
191186
RealtimeMessageV2(
@@ -324,7 +319,7 @@ public final class RealtimeChannelV2: Sendable {
324319
)
325320
}
326321

327-
func onMessage(_ message: RealtimeMessageV2) {
322+
func onMessage(_ message: RealtimeMessageV2) async {
328323
do {
329324
guard let eventType = message.eventType else {
330325
logger?.debug("Received message without event type: \(message)")
@@ -349,7 +344,7 @@ public final class RealtimeChannelV2: Sendable {
349344
throw RealtimeError("Received a reply with unexpected payload: \(message)")
350345
}
351346

352-
didReceiveReply(ref: ref, status: status)
347+
await didReceiveReply(ref: ref, status: status)
353348

354349
if message.payload["response"]?.objectValue?.keys
355350
.contains(ChannelEvent.postgresChanges) == true
@@ -409,16 +404,6 @@ public final class RealtimeChannelV2: Sendable {
409404
)
410405
)
411406

412-
case "SELECT":
413-
action = .select(
414-
SelectAction(
415-
columns: postgresActions.columns,
416-
commitTimestamp: postgresActions.commitTimestamp,
417-
record: postgresActions.record ?? [:],
418-
rawMessage: message
419-
)
420-
)
421-
422407
default:
423408
throw RealtimeError("Unknown event type: \(postgresActions.type)")
424409
}
@@ -435,13 +420,9 @@ public final class RealtimeChannelV2: Sendable {
435420
callbackManager.triggerBroadcast(event: event, json: payload)
436421

437422
case .close:
438-
Task { [weak self] in
439-
guard let self else { return }
440-
441-
await socket.removeChannel(self)
442-
logger?.debug("Unsubscribed from channel \(message.topic)")
443-
status = .unsubscribed
444-
}
423+
await socket.removeChannel(self)
424+
logger?.debug("Unsubscribed from channel \(message.topic)")
425+
status = .unsubscribed
445426

446427
case .error:
447428
logger?.debug(
@@ -601,12 +582,10 @@ public final class RealtimeChannelV2: Sendable {
601582
return await push.send()
602583
}
603584

604-
private func didReceiveReply(ref: String, status: String) {
605-
Task {
606-
let push = mutableState.withValue {
607-
$0.pushes.removeValue(forKey: ref)
608-
}
609-
await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
585+
private func didReceiveReply(ref: String, status: String) async {
586+
let push = mutableState.withValue {
587+
$0.pushes.removeValue(forKey: ref)
610588
}
589+
await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
611590
}
612591
}

0 commit comments

Comments
 (0)