Skip to content

Commit 75c89ae

Browse files
committed
Fix thread safety for RTCPeerConnection access
1 parent 1a0ff3c commit 75c89ae

File tree

1 file changed

+117
-25
lines changed

1 file changed

+117
-25
lines changed

Sources/StreamVideo/WebRTC/v2/PeerConnection/StreamRTCPeerConnection.swift

Lines changed: 117 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
2828
var subject: PassthroughSubject<RTCPeerConnectionEvent, Never> { delegatePublisher.publisher }
2929

3030
/// A dispatch queue for handling peer connection operations.
31-
let dispatchQueue = DispatchQueue(label: "io.getstream.peerconnection")
31+
let dispatchQueue = DispatchQueue(label: "io.getstream.peerconnection.dispatch")
3232

3333
/// A publisher for RTCPeerConnectionEvents.
3434
lazy var publisher: AnyPublisher<RTCPeerConnectionEvent, Never> = delegatePublisher
@@ -38,6 +38,8 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
3838

3939
private let delegatePublisher = DelegatePublisher()
4040
private let source: RTCPeerConnection
41+
/// A dispatch queue for safely accessing `source`. RTCPeerConnection is not thread-safe.
42+
private let connectionQueue = DispatchQueue(label: "io.getstream.peerconnection.connection")
4143

4244
/// Initializes a new StreamRTCPeerConnection.
4345
///
@@ -83,11 +85,14 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
8385
return
8486
}
8587

86-
source.setLocalDescription(sessionDescription) { error in
87-
if let error = error {
88-
continuation.resume(throwing: error)
89-
} else {
90-
continuation.resume(returning: ())
88+
connectionQueue.async { [weak self] in
89+
guard let self else { return }
90+
source.setLocalDescription(sessionDescription) { error in
91+
if let error = error {
92+
continuation.resume(throwing: error)
93+
} else {
94+
continuation.resume(returning: ())
95+
}
9196
}
9297
}
9398
} as ()
@@ -108,11 +113,14 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
108113
return
109114
}
110115

111-
source.setRemoteDescription(sessionDescription) { error in
112-
if let error = error {
113-
continuation.resume(throwing: error)
114-
} else {
115-
continuation.resume(returning: ())
116+
connectionQueue.async { [weak self] in
117+
guard let self else { return }
118+
source.setRemoteDescription(sessionDescription) { error in
119+
if let error = error {
120+
continuation.resume(throwing: error)
121+
} else {
122+
continuation.resume(returning: ())
123+
}
116124
}
117125
}
118126
} as ()
@@ -126,7 +134,24 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
126134
func offer(
127135
for constraints: RTCMediaConstraints
128136
) async throws -> RTCSessionDescription {
129-
try await source.offer(for: constraints)
137+
try await withCheckedThrowingContinuation { [weak self] continuation in
138+
guard let self else {
139+
continuation.resume(throwing: ClientError.Unknown("RTCPeerConnection instance is unavailable."))
140+
return
141+
}
142+
143+
connectionQueue.async { [weak self] in
144+
guard let self else { return }
145+
Task {
146+
do {
147+
let offer = try await self.source.offer(for: constraints)
148+
continuation.resume(returning: offer)
149+
} catch {
150+
continuation.resume(throwing: error)
151+
}
152+
}
153+
}
154+
}
130155
}
131156

132157
/// Creates an answer asynchronously.
@@ -137,15 +162,45 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
137162
func answer(
138163
for constraints: RTCMediaConstraints
139164
) async throws -> RTCSessionDescription {
140-
try await source.answer(for: constraints)
165+
try await withCheckedThrowingContinuation { [weak self] continuation in
166+
guard let self else {
167+
continuation.resume(throwing: ClientError.Unknown("RTCPeerConnection instance is unavailable."))
168+
return
169+
}
170+
171+
connectionQueue.async { [weak self] in
172+
guard let self else { return }
173+
Task {
174+
do {
175+
let offer = try await self.source.answer(for: constraints)
176+
continuation.resume(returning: offer)
177+
} catch {
178+
continuation.resume(throwing: error)
179+
}
180+
}
181+
}
182+
}
141183
}
142184

143185
/// Retrieves the statistics of the peer connection.
144186
///
145187
/// - Returns: An RTCStatisticsReport containing the connection statistics.
146188
/// - Throws: An error if retrieving statistics fails.
147189
func statistics() async throws -> RTCStatisticsReport? {
148-
await source.statistics()
190+
try await withCheckedThrowingContinuation { [weak self] continuation in
191+
guard let self else {
192+
continuation.resume(throwing: ClientError.Unknown("RTCPeerConnection instance is unavailable."))
193+
return
194+
}
195+
196+
connectionQueue.async { [weak self] in
197+
guard let self else { return }
198+
Task {
199+
let statistics = await self.source.statistics()
200+
continuation.resume(returning: statistics)
201+
}
202+
}
203+
}
149204
}
150205

151206
// MARK: - Forwarding API
@@ -161,8 +216,13 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
161216
with track: RTCMediaStreamTrack,
162217
init transceiverInit: RTCRtpTransceiverInit
163218
) -> RTCRtpTransceiver? {
164-
let result = source.addTransceiver(with: track, init: transceiverInit)
165-
storeTransceiver(result, trackType: trackType)
219+
var result: RTCRtpTransceiver?
220+
connectionQueue.sync {
221+
result = source.addTransceiver(with: track, init: transceiverInit)
222+
}
223+
if let result {
224+
storeTransceiver(result, trackType: trackType)
225+
}
166226
return result
167227
}
168228

@@ -175,7 +235,32 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
175235
/// - Parameter candidate: The ICE candidate to add.
176236
/// - Throws: An error if adding the candidate fails.
177237
func add(_ candidate: RTCIceCandidate) async throws {
178-
try await source.add(candidate)
238+
try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation<Void, Error>) in
239+
guard let self else {
240+
continuation.resume(
241+
throwing: ClientError.Unknown("RTCPeerConnection instance is unavailable.")
242+
)
243+
return
244+
}
245+
246+
connectionQueue.async { [weak self] in
247+
guard let self else {
248+
continuation.resume(
249+
throwing: ClientError.Unknown("RTCPeerConnection instance is unavailable.")
250+
)
251+
return
252+
}
253+
254+
Task {
255+
do {
256+
try await self.source.add(candidate)
257+
continuation.resume(returning: ())
258+
} catch {
259+
continuation.resume(throwing: error)
260+
}
261+
}
262+
}
263+
}
179264
}
180265

181266
// MARK: - Publishing API
@@ -194,18 +279,25 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
194279

195280
/// Restarts the ICE gathering process.
196281
func restartIce() {
197-
source.restartIce()
282+
connectionQueue.async { [weak self] in
283+
guard let self else { return }
284+
self.source.restartIce()
285+
}
198286
}
199287

200288
/// Closes the peer connection.
201289
func close() async {
202-
Task { @MainActor in
203-
/// It's very important to close any transceivers **before** we close the connection, to make
204-
/// sure that access to `RTCVideoTrack` properties, will be handled correctly. Otherwise
205-
/// if we try to access any property/method on a `RTCVideoTrack` instance whose
206-
/// peerConnection has closed, we will get blocked on the Main Thread.
207-
source.transceivers.forEach { $0.stopInternal() }
208-
source.close()
290+
await withCheckedContinuation { continuation in
291+
connectionQueue.async { [weak self] in
292+
guard let self else { return }
293+
/// It's very important to close any transceivers **before** we close the connection, to make
294+
/// sure that access to `RTCVideoTrack` properties, will be handled correctly. Otherwise
295+
/// if we try to access any property/method on a `RTCVideoTrack` instance whose
296+
/// peerConnection has closed, we will get blocked on the Main Thread.
297+
self.source.transceivers.forEach { $0.stopInternal() }
298+
self.source.close()
299+
continuation.resume()
300+
}
209301
}
210302
}
211303

0 commit comments

Comments
 (0)