1
1
import json
2
2
import logging
3
3
from typing import Any
4
-
4
+ from aiortc .rtcicetransport import candidate_from_aioice
5
+ from aioice .candidate import Candidate
5
6
from aiortc import (
6
7
RTCConfiguration ,
7
8
RTCIceCandidate ,
@@ -121,24 +122,8 @@ def on_data_channel_error(error: Any):
121
122
122
123
# (Note: aiortc does not emit ice candidate event, per candidate (like js)
123
124
# but sends it along SDP. To maintain interop, we extract adn resend in given format )
125
+ await _send_ice_candidates (signaling_stream , peer_connection )
124
126
125
- # get SDP from local_descriptor for extracting ice_candidates
126
- sdp = peer_connection .localDescription .sdp
127
- # extract ice_candidates from sdp
128
- ice_candidate = Message ()
129
- ice_candidate .type = Message .ICE_CANDIDATE
130
- for line in sdp .splitlines ():
131
- if line .startswith ("a=candidate:" ):
132
- candidate_line = line [len ("a=" ):]
133
- # Need to make sure the candidate_line matches with expected cadidate in js
134
- logger .Debug ("Candidate sent to peer: " , candidate_line )
135
- ice_candidate .data = candidate_line
136
- await _send_signaling_message (signaling_stream , ice_candidate )
137
-
138
- ice_candidate .data = "" # Empty string signals
139
- await _send_signaling_message (signaling_stream , ice_candidate )
140
- logger .info ("Sent offer and ICE candidates" )
141
-
142
127
# Wait for answer
143
128
answer_msg = await _receive_signaling_message (signaling_stream , timeout )
144
129
if answer_msg .type != Message .SDP_ANSWER :
@@ -245,14 +230,36 @@ async def _receive_signaling_message(
245
230
logger .debug (f"Received signaling message: { deserealized_msg .type } " )
246
231
return deserealized_msg
247
232
248
- if cancel_scope .cancelled_caught :
249
- raise WebRTCError ("Signaling message receive timeout" )
250
-
251
233
except Exception as e :
252
234
logger .error (f"Failed to receive signaling message: { e } " )
253
235
raise
254
236
255
237
238
+ async def _send_ice_candidates (stream , peer_connection ):
239
+ # Get SDP offer from localDescription to extract ICE Candidate
240
+ sdp = peer_connection .localDescription .sdp
241
+ sdp_lines = sdp .splitlines ()
242
+
243
+ msg = Message ()
244
+ msg .type = Message .ICE_CANDIDATE
245
+ # Extract ICE_Candidate and send each separately
246
+ for line in sdp_lines :
247
+ if line .startswith ("a=candidate:" ):
248
+ cand_str = line [len ("a=" ):]
249
+ candidate_init = {
250
+ "candidate" : cand_str ,
251
+ "sdpMLineIndex" : 0
252
+ }
253
+ data = json .dumps (candidate_init )
254
+ msg .data = data
255
+ await _send_signaling_message (stream , msg )
256
+ logger .debug ("Sent ICE candidate init: %s" , candidate_init )
257
+ # Mark end-of-candidates
258
+ msg = Message (type = Message .ICE_CANDIDATE , data = json .dumps (None ))
259
+ await _send_signaling_message (stream , msg )
260
+ logger .debug ("Sent end-of-ICE marker" )
261
+
262
+
256
263
async def _handle_incoming_ice_candidates (
257
264
stream : INetStream , peer_connection : RTCPeerConnection , timeout : float
258
265
) -> None :
@@ -267,34 +274,44 @@ async def _handle_incoming_ice_candidates(
267
274
if cancel_scope .cancelled_caught :
268
275
logger .warning ("ICE candidate receive timeout" )
269
276
break
277
+
278
+ # stream ended or we became connected
279
+ if not message :
280
+ logger .error ("Null message recieved" )
281
+ break
282
+
283
+ if message .type != Message .ICE_CANDIDATE :
284
+ logger .error ("ICE candidate message expected. Exiting..." )
285
+ raise WebRTCError ("ICE candidate message expected." )
286
+ break
287
+
288
+ # Candidate init cannot be null
289
+ if message .data == '' :
290
+ logger .debug ("candidate received is empty" )
291
+ continue
292
+
293
+ logger .debug ("Recieved new ICE Candidate" )
294
+ try :
295
+ candidate_init = json .loads (message .data )
296
+ except json .JSONDecodeError :
297
+ logger .error ("Invalid ICE candidate JSON: %s" , message .data )
298
+ break
299
+
300
+ bridge = TrioSafeWebRTCOperations ._get_bridge ()
301
+
302
+ # None means ICE gathering is fully complete
303
+ if candidate_init is None :
304
+ logger .debug ("Received ICE candidate null → end-of-ice signal" )
305
+ async with bridge :
306
+ await bridge .add_ice_candidate (peer_connection , None )
307
+ return
270
308
271
- if message .get ("type" ) == "ice-candidate" :
272
- # Add ICE candidate
273
- candidate = RTCIceCandidate (
274
- component = message .get ("component" , 1 ),
275
- foundation = message .get ("foundation" , "" ),
276
- ip = message .get ("ip" , "" ),
277
- port = message .get ("port" , 0 ),
278
- priority = message .get ("priority" , 0 ),
279
- protocol = message .get ("protocol" , "udp" ),
280
- type = message .get ("candidateType" , "host" ),
281
- sdpMid = message .get ("sdpMid" ),
282
- sdpMLineIndex = message .get ("sdpMLineIndex" ),
283
- )
284
-
285
- bridge = TrioSafeWebRTCOperations ._get_bridge ()
309
+ # CandidateInit is expected to be a dict
310
+ if isinstance (candidate_init , dict ) and "candidate" in candidate_init :
311
+ candidate = candidate_from_aioice (Candidate .from_sdp (candidate_init ["candidate" ]))
286
312
async with bridge :
287
313
await bridge .add_ice_candidate (peer_connection , candidate )
288
-
289
- logger .debug (f"Added ICE candidate: { candidate .type } " )
290
-
291
- elif message .get ("type" ) == "ice-candidate-end" :
292
- logger .debug ("Received end of ICE candidates" )
293
- break
294
- else :
295
- logger .warning (
296
- f"Unexpected message during ICE exchange: { message .get ('type' )} "
297
- )
314
+ logger .debug ("Added ICE candidate: %r" , candidate_init )
298
315
299
316
except Exception as e :
300
317
logger .warning (f"Error handling ICE candidate: { e } " )
0 commit comments