@@ -14,7 +14,34 @@ public struct RealtimeChannelConfig: Sendable {
14
14
public var presence : PresenceJoinConfig
15
15
}
16
16
17
- public actor RealtimeChannelV2 {
17
+ struct Socket : Sendable {
18
+ var status : @Sendable ( ) -> RealtimeClientV2 . Status
19
+ var options : @Sendable ( ) -> RealtimeClientOptions
20
+ var accessToken : @Sendable ( ) -> String ?
21
+ var makeRef : @Sendable ( ) -> Int
22
+
23
+ var connect : @Sendable ( ) async -> Void
24
+ var addChannel : @Sendable ( _ channel: RealtimeChannelV2 ) -> Void
25
+ var removeChannel : @Sendable ( _ channel: RealtimeChannelV2 ) async -> Void
26
+ var push : @Sendable ( _ message: RealtimeMessageV2 ) async -> Void
27
+ }
28
+
29
+ extension Socket {
30
+ init ( client: RealtimeClientV2 ) {
31
+ self . init (
32
+ status: { [ weak client] in client? . status ?? . disconnected } ,
33
+ options: { [ weak client] in client? . options ?? . init( ) } ,
34
+ accessToken: { [ weak client] in client? . mutableState. accessToken } ,
35
+ makeRef: { [ weak client] in client? . makeRef ( ) ?? 0 } ,
36
+ connect: { [ weak client] in await client? . connect ( ) } ,
37
+ addChannel: { [ weak client] in client? . addChannel ( $0) } ,
38
+ removeChannel: { [ weak client] in await client? . removeChannel ( $0) } ,
39
+ push: { [ weak client] in await client? . push ( $0) }
40
+ )
41
+ }
42
+ }
43
+
44
+ public final class RealtimeChannelV2 : Sendable {
18
45
public typealias Subscription = ObservationToken
19
46
20
47
public enum Status : Sendable {
@@ -24,24 +51,22 @@ public actor RealtimeChannelV2 {
24
51
case unsubscribing
25
52
}
26
53
27
- weak var socket : RealtimeClientV2 ? {
28
- didSet {
29
- assert ( oldValue == nil , " socket should not be modified once set " )
30
- }
54
+ struct MutableState {
55
+ var clientChanges : [ PostgresJoinConfig ] = [ ]
56
+ var joinRef : String ?
57
+ var pushes : [ String : PushV2 ] = [ : ]
31
58
}
32
59
60
+ private let mutableState = LockIsolated ( MutableState ( ) )
61
+
33
62
let topic : String
34
63
let config : RealtimeChannelConfig
35
64
let logger : ( any SupabaseLogger ) ?
65
+ let socket : Socket
36
66
37
67
private let callbackManager = CallbackManager ( )
38
-
39
68
private let statusEventEmitter = EventEmitter < Status > ( initialEvent: . unsubscribed)
40
69
41
- private var clientChanges : [ PostgresJoinConfig ] = [ ]
42
- private var joinRef : String ?
43
- private var pushes : [ String : PushV2 ] = [ : ]
44
-
45
70
public private( set) var status : Status {
46
71
get { statusEventEmitter. lastEvent }
47
72
set { statusEventEmitter. emit ( newValue) }
@@ -54,13 +79,13 @@ public actor RealtimeChannelV2 {
54
79
init (
55
80
topic: String ,
56
81
config: RealtimeChannelConfig ,
57
- socket: RealtimeClientV2 ,
82
+ socket: Socket ,
58
83
logger: ( any SupabaseLogger ) ?
59
84
) {
60
- self . socket = socket
61
85
self . topic = topic
62
86
self . config = config
63
87
self . logger = logger
88
+ self . socket = socket
64
89
}
65
90
66
91
deinit {
@@ -69,32 +94,33 @@ public actor RealtimeChannelV2 {
69
94
70
95
/// Subscribes to the channel
71
96
public func subscribe( ) async {
72
- if await socket? . status != . connected {
73
- if socket? . options. connectOnSubscribe != true {
97
+ if socket. status ( ) != . connected {
98
+ if socket. options ( ) . connectOnSubscribe != true {
74
99
fatalError (
75
100
" You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`? "
76
101
)
77
102
}
78
- await socket? . connect ( )
103
+ await socket. connect ( )
79
104
}
80
105
81
- await socket? . addChannel ( self )
106
+ socket. addChannel ( self )
82
107
83
108
status = . subscribing
84
109
logger? . debug ( " subscribing to channel \( topic) " )
85
110
86
111
let joinConfig = RealtimeJoinConfig (
87
112
broadcast: config. broadcast,
88
113
presence: config. presence,
89
- postgresChanges: clientChanges
114
+ postgresChanges: mutableState . clientChanges
90
115
)
91
116
92
- let payload = await RealtimeJoinPayload (
117
+ let payload = RealtimeJoinPayload (
93
118
config: joinConfig,
94
- accessToken: socket? . accessToken
119
+ accessToken: socket. accessToken ( )
95
120
)
96
121
97
- joinRef = await socket? . makeRef ( ) . description
122
+ let joinRef = socket. makeRef ( ) . description
123
+ mutableState. withValue { $0. joinRef = joinRef }
98
124
99
125
logger? . debug ( " subscribing to channel with body: \( joinConfig) " )
100
126
@@ -109,7 +135,7 @@ public actor RealtimeChannelV2 {
109
135
)
110
136
111
137
do {
112
- try await withTimeout ( interval: socket? . options. timeoutInterval ?? 10 ) { [ self ] in
138
+ try await withTimeout ( interval: socket. options ( ) . timeoutInterval) { [ self ] in
113
139
_ = await statusChange. first { @Sendable in $0 == . subscribed }
114
140
}
115
141
} catch {
@@ -128,8 +154,8 @@ public actor RealtimeChannelV2 {
128
154
129
155
await push (
130
156
RealtimeMessageV2 (
131
- joinRef: joinRef,
132
- ref: socket? . makeRef ( ) . description,
157
+ joinRef: mutableState . joinRef,
158
+ ref: socket. makeRef ( ) . description,
133
159
topic: topic,
134
160
event: ChannelEvent . leave,
135
161
payload: [ : ]
@@ -141,8 +167,8 @@ public actor RealtimeChannelV2 {
141
167
logger? . debug ( " Updating auth token for channel \( topic) " )
142
168
await push (
143
169
RealtimeMessageV2 (
144
- joinRef: joinRef,
145
- ref: socket? . makeRef ( ) . description,
170
+ joinRef: mutableState . joinRef,
171
+ ref: socket. makeRef ( ) . description,
146
172
topic: topic,
147
173
event: ChannelEvent . accessToken,
148
174
payload: [ " access_token " : . string( jwt) ]
@@ -162,8 +188,8 @@ public actor RealtimeChannelV2 {
162
188
163
189
await push (
164
190
RealtimeMessageV2 (
165
- joinRef: joinRef,
166
- ref: socket? . makeRef ( ) . description,
191
+ joinRef: mutableState . joinRef,
192
+ ref: socket. makeRef ( ) . description,
167
193
topic: topic,
168
194
event: ChannelEvent . broadcast,
169
195
payload: [
@@ -187,8 +213,8 @@ public actor RealtimeChannelV2 {
187
213
188
214
await push (
189
215
RealtimeMessageV2 (
190
- joinRef: joinRef,
191
- ref: socket? . makeRef ( ) . description,
216
+ joinRef: mutableState . joinRef,
217
+ ref: socket. makeRef ( ) . description,
192
218
topic: topic,
193
219
event: ChannelEvent . presence,
194
220
payload: [
@@ -203,8 +229,8 @@ public actor RealtimeChannelV2 {
203
229
public func untrack( ) async {
204
230
await push (
205
231
RealtimeMessageV2 (
206
- joinRef: joinRef,
207
- ref: socket? . makeRef ( ) . description,
232
+ joinRef: mutableState . joinRef,
233
+ ref: socket. makeRef ( ) . description,
208
234
topic: topic,
209
235
event: ChannelEvent . presence,
210
236
payload: [
@@ -329,7 +355,7 @@ public actor RealtimeChannelV2 {
329
355
Task { [ weak self] in
330
356
guard let self else { return }
331
357
332
- await socket? . removeChannel ( self )
358
+ await socket. removeChannel ( self )
333
359
logger? . debug ( " Unsubscribed from channel \( message. topic) " )
334
360
}
335
361
@@ -439,7 +465,9 @@ public actor RealtimeChannelV2 {
439
465
filter: filter
440
466
)
441
467
442
- clientChanges. append ( config)
468
+ mutableState. withValue {
469
+ $0. clientChanges. append ( config)
470
+ }
443
471
444
472
let id = callbackManager. addPostgresCallback ( filter: config, callback: callback)
445
473
return Subscription { [ weak callbackManager, logger] in
@@ -464,14 +492,18 @@ public actor RealtimeChannelV2 {
464
492
private func push( _ message: RealtimeMessageV2 ) async -> PushStatus {
465
493
let push = PushV2 ( channel: self , message: message)
466
494
if let ref = message. ref {
467
- pushes [ ref] = push
495
+ mutableState. withValue {
496
+ $0. pushes [ ref] = push
497
+ }
468
498
}
469
499
return await push. send ( )
470
500
}
471
501
472
502
private func didReceiveReply( ref: String , status: String ) {
473
503
Task {
474
- let push = pushes. removeValue ( forKey: ref)
504
+ let push = mutableState. withValue {
505
+ $0. pushes. removeValue ( forKey: ref)
506
+ }
475
507
await push? . didReceive ( status: PushStatus ( rawValue: status) ?? . ok)
476
508
}
477
509
}
0 commit comments