@@ -32,7 +32,7 @@ public struct RealtimeChannelConfig: Sendable {
3232
3333struct Socket : Sendable {
3434 var broadcastURL : @Sendable ( ) -> URL
35- var status : @Sendable ( ) -> RealtimeClientV2 . Status
35+ var status : @Sendable ( ) -> RealtimeClientStatus
3636 var options : @Sendable ( ) -> RealtimeClientOptions
3737 var accessToken : @Sendable ( ) -> String ?
3838 var apiKey : @Sendable ( ) -> String ?
@@ -64,16 +64,6 @@ extension Socket {
6464}
6565
6666public final class RealtimeChannelV2 : Sendable {
67- @available ( * , deprecated, renamed: " RealtimeSubscription " )
68- public typealias Subscription = ObservationToken
69-
70- public enum Status : Sendable {
71- case unsubscribed
72- case subscribing
73- case subscribed
74- case unsubscribing
75- }
76-
7767 struct MutableState {
7868 var clientChanges : [ PostgresJoinConfig ] = [ ]
7969 var joinRef : String ?
@@ -88,14 +78,14 @@ public final class RealtimeChannelV2: Sendable {
8878 let socket : Socket
8979
9080 let callbackManager = CallbackManager ( )
91- private let statusEventEmitter = EventEmitter < Status > ( initialEvent: . unsubscribed)
81+ private let statusEventEmitter = EventEmitter < RealtimeChannelStatus > ( initialEvent: . unsubscribed)
9282
93- public private( set) var status : Status {
83+ public private( set) var status : RealtimeChannelStatus {
9484 get { statusEventEmitter. lastEvent }
9585 set { statusEventEmitter. emit ( newValue) }
9686 }
9787
98- public var statusChange : AsyncStream < Status > {
88+ public var statusChange : AsyncStream < RealtimeChannelStatus > {
9989 statusEventEmitter. stream ( )
10090 }
10191
@@ -105,7 +95,7 @@ public final class RealtimeChannelV2: Sendable {
10595 ///
10696 /// - Note: Use ``statusChange`` if you prefer to use Async/Await.
10797 public func onStatusChange(
108- _ listener: @escaping @Sendable ( Status ) -> Void
98+ _ listener: @escaping @Sendable ( RealtimeChannelStatus ) -> Void
10999 ) -> ObservationToken {
110100 statusEventEmitter. attach ( listener)
111101 }
@@ -137,10 +127,15 @@ public final class RealtimeChannelV2: Sendable {
137127 await socket. connect ( )
138128 }
139129
130+ guard status != . subscribed else {
131+ logger? . warning ( " Channel \( topic) is already subscribed " )
132+ return
133+ }
134+
140135 socket. addChannel ( self )
141136
142137 status = . subscribing
143- logger? . debug ( " subscribing to channel \( topic) " )
138+ logger? . debug ( " Subscribing to channel \( topic) " )
144139
145140 let joinConfig = RealtimeJoinConfig (
146141 broadcast: config. broadcast,
@@ -157,7 +152,7 @@ public final class RealtimeChannelV2: Sendable {
157152 let joinRef = socket. makeRef ( ) . description
158153 mutableState. withValue { $0. joinRef = joinRef }
159154
160- logger? . debug ( " subscribing to channel with body: \( joinConfig) " )
155+ logger? . debug ( " Subscribing to channel with body: \( joinConfig) " )
161156
162157 await push (
163158 RealtimeMessageV2 (
@@ -175,17 +170,17 @@ public final class RealtimeChannelV2: Sendable {
175170 }
176171 } catch {
177172 if error is TimeoutError {
178- logger? . debug ( " subscribe timed out." )
173+ logger? . debug ( " Subscribe timed out." )
179174 await subscribe ( )
180175 } else {
181- logger? . error ( " subscribe failed: \( error) " )
176+ logger? . error ( " Subscribe failed: \( error) " )
182177 }
183178 }
184179 }
185180
186181 public func unsubscribe( ) async {
187182 status = . unsubscribing
188- logger? . debug ( " unsubscribing from channel \( topic) " )
183+ logger? . debug ( " Unsubscribing from channel \( topic) " )
189184
190185 await push (
191186 RealtimeMessageV2 (
@@ -324,7 +319,7 @@ public final class RealtimeChannelV2: Sendable {
324319 )
325320 }
326321
327- func onMessage( _ message: RealtimeMessageV2 ) {
322+ func onMessage( _ message: RealtimeMessageV2 ) async {
328323 do {
329324 guard let eventType = message. eventType else {
330325 logger? . debug ( " Received message without event type: \( message) " )
@@ -349,7 +344,7 @@ public final class RealtimeChannelV2: Sendable {
349344 throw RealtimeError ( " Received a reply with unexpected payload: \( message) " )
350345 }
351346
352- didReceiveReply ( ref: ref, status: status)
347+ await didReceiveReply ( ref: ref, status: status)
353348
354349 if message. payload [ " response " ] ? . objectValue? . keys
355350 . contains ( ChannelEvent . postgresChanges) == true
@@ -435,13 +430,9 @@ public final class RealtimeChannelV2: Sendable {
435430 callbackManager. triggerBroadcast ( event: event, json: payload)
436431
437432 case . close:
438- Task { [ weak self] in
439- guard let self else { return }
440-
441- await socket. removeChannel ( self )
442- logger? . debug ( " Unsubscribed from channel \( message. topic) " )
443- status = . unsubscribed
444- }
433+ await socket. removeChannel ( self )
434+ logger? . debug ( " Unsubscribed from channel \( message. topic) " )
435+ status = . unsubscribed
445436
446437 case . error:
447438 logger? . debug (
@@ -601,12 +592,10 @@ public final class RealtimeChannelV2: Sendable {
601592 return await push. send ( )
602593 }
603594
604- private func didReceiveReply( ref: String , status: String ) {
605- Task {
606- let push = mutableState. withValue {
607- $0. pushes. removeValue ( forKey: ref)
608- }
609- await push? . didReceive ( status: PushStatus ( rawValue: status) ?? . ok)
595+ private func didReceiveReply( ref: String , status: String ) async {
596+ let push = mutableState. withValue {
597+ $0. pushes. removeValue ( forKey: ref)
610598 }
599+ await push? . didReceive ( status: PushStatus ( rawValue: status) ?? . ok)
611600 }
612601}
0 commit comments