@@ -29,6 +29,9 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
29
29
30
30
/// A dispatch queue for handling peer connection operations.
31
31
let dispatchQueue = DispatchQueue ( label: " io.getstream.peerconnection " )
32
+
33
+ /// A dispatch queue for safely accessing `source`. RTCPeerConnection is not thread-safe.
34
+ private let connectionQueue = DispatchQueue ( label: " io.getstream.peerconnection.connection " )
32
35
33
36
/// A publisher for RTCPeerConnectionEvents.
34
37
lazy var publisher : AnyPublisher < RTCPeerConnectionEvent , Never > = delegatePublisher
@@ -83,11 +86,14 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
83
86
return
84
87
}
85
88
86
- source. setLocalDescription ( sessionDescription) { error in
87
- if let error = error {
88
- continuation. resume ( throwing: error)
89
- } else {
90
- continuation. resume ( returning: ( ) )
89
+ connectionQueue. async { [ weak self] in
90
+ guard let self else { return }
91
+ source. setLocalDescription ( sessionDescription) { error in
92
+ if let error = error {
93
+ continuation. resume ( throwing: error)
94
+ } else {
95
+ continuation. resume ( returning: ( ) )
96
+ }
91
97
}
92
98
}
93
99
} as ( )
@@ -108,12 +114,15 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
108
114
return
109
115
}
110
116
111
- source. setRemoteDescription ( sessionDescription) { error in
112
- if let error = error {
113
- continuation. resume ( throwing: error)
114
- } else {
115
- self . subject. send ( HasRemoteDescription ( sessionDescription: sessionDescription) )
116
- continuation. resume ( returning: ( ) )
117
+ connectionQueue. async { [ weak self] in
118
+ guard let self else { return }
119
+ source. setRemoteDescription ( sessionDescription) { error in
120
+ if let error = error {
121
+ continuation. resume ( throwing: error)
122
+ } else {
123
+ self . subject. send ( HasRemoteDescription ( sessionDescription: sessionDescription) )
124
+ continuation. resume ( returning: ( ) )
125
+ }
117
126
}
118
127
}
119
128
} as ( )
@@ -162,8 +171,13 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
162
171
with track: RTCMediaStreamTrack ,
163
172
init transceiverInit: RTCRtpTransceiverInit
164
173
) -> RTCRtpTransceiver ? {
165
- let result = source. addTransceiver ( with: track, init: transceiverInit)
166
- storeTransceiver ( result, trackType: trackType)
174
+ var result : RTCRtpTransceiver ?
175
+ connectionQueue. sync {
176
+ result = source. addTransceiver ( with: track, init: transceiverInit)
177
+ }
178
+ if let result {
179
+ storeTransceiver ( result, trackType: trackType)
180
+ }
167
181
return result
168
182
}
169
183
@@ -195,18 +209,25 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
195
209
196
210
/// Restarts the ICE gathering process.
197
211
func restartIce( ) {
198
- source. restartIce ( )
212
+ connectionQueue. async { [ weak self] in
213
+ guard let self else { return }
214
+ self . source. restartIce ( )
215
+ }
199
216
}
200
217
201
218
/// Closes the peer connection.
202
219
func close( ) async {
203
- Task { @MainActor in
204
- /// It's very important to close any transceivers **before** we close the connection, to make
205
- /// sure that access to `RTCVideoTrack` properties, will be handled correctly. Otherwise
206
- /// if we try to access any property/method on a `RTCVideoTrack` instance whose
207
- /// peerConnection has closed, we will get blocked on the Main Thread.
208
- source. transceivers. forEach { $0. stopInternal ( ) }
209
- source. close ( )
220
+ await withCheckedContinuation { continuation in
221
+ connectionQueue. async { [ weak self] in
222
+ guard let self else { return }
223
+ /// It's very important to close any transceivers **before** we close the connection, to make
224
+ /// sure that access to `RTCVideoTrack` properties, will be handled correctly. Otherwise
225
+ /// if we try to access any property/method on a `RTCVideoTrack` instance whose
226
+ /// peerConnection has closed, we will get blocked on the Main Thread.
227
+ self . source. transceivers. forEach { $0. stopInternal ( ) }
228
+ self . source. close ( )
229
+ continuation. resume ( )
230
+ }
210
231
}
211
232
}
212
233
0 commit comments