1
1
import json
2
2
import logging
3
3
from typing import Any
4
- from aiortc . rtcicetransport import candidate_from_aioice
4
+
5
5
from aioice .candidate import Candidate
6
6
from aiortc import (
7
7
RTCConfiguration ,
8
- RTCIceCandidate ,
9
8
RTCPeerConnection ,
10
9
RTCSessionDescription ,
11
10
)
11
+ from aiortc .rtcicetransport import candidate_from_aioice
12
12
from multiaddr import Multiaddr
13
13
import trio
14
14
15
- from libp2p .abc import INetStream , IRawConnection
15
+ from libp2p .abc import IHost , INetStream , IRawConnection
16
16
from libp2p .peer .id import ID
17
17
18
- from .pb import Message
19
18
from ..async_bridge import TrioSafeWebRTCOperations
20
19
from ..connection import WebRTCRawConnection
21
20
from ..constants import (
24
23
SDPHandshakeError ,
25
24
WebRTCError ,
26
25
)
27
-
28
- from libp2p .abc import (
29
- IHost
30
- )
26
+ from .pb import Message
31
27
32
28
logger = logging .getLogger ("webrtc.private.initiate_connection" )
33
29
@@ -88,14 +84,14 @@ async def initiate_connection(
88
84
89
85
# Setup data channel ready event
90
86
data_channel_ready = trio .Event ()
91
-
87
+
92
88
@data_channel .on ("open" )
93
- def on_data_channel_open ():
89
+ def on_data_channel_open () -> None :
94
90
logger .info ("Data channel opened" )
95
91
data_channel_ready .set ()
96
92
97
93
@data_channel .on ("error" )
98
- def on_data_channel_error (error : Any ):
94
+ def on_data_channel_error (error : Any ) -> None :
99
95
logger .error (f"Data channel error: { error } " )
100
96
101
97
# Register data channel event handlers
@@ -120,19 +116,18 @@ def on_data_channel_error(error: Any):
120
116
offer_msg .data = offer .sdp
121
117
await _send_signaling_message (signaling_stream , offer_msg )
122
118
123
- # (Note: aiortc does not emit ice candidate event, per candidate (like js)
124
- # but sends it along SDP. To maintain interop, we extract adn resend in given format )
119
+ # (Note: aiortc does not emit ice candidate event, per candidate (like js)
120
+ # but sends it along SDP.
121
+ # To maintain interop, we extract and resend in given format)
125
122
await _send_ice_candidates (signaling_stream , peer_connection )
126
-
123
+
127
124
# Wait for answer
128
125
answer_msg = await _receive_signaling_message (signaling_stream , timeout )
129
126
if answer_msg .type != Message .SDP_ANSWER :
130
127
raise SDPHandshakeError (f"Expected answer, got: { answer_msg .type } " )
131
128
132
129
# Set remote description
133
- answer = RTCSessionDescription (
134
- sdp = answer_msg .data , type = 'answer'
135
- )
130
+ answer = RTCSessionDescription (sdp = answer_msg .data , type = "answer" )
136
131
bridge = TrioSafeWebRTCOperations ._get_bridge ()
137
132
async with bridge :
138
133
await bridge .set_remote_description (peer_connection , answer )
@@ -146,6 +141,7 @@ def on_data_channel_error(error: Any):
146
141
147
142
# Wait for data channel to be ready
148
143
connection_failed = trio .Event ()
144
+
149
145
def on_connection_state_change () -> None :
150
146
state = peer_connection .connectionState
151
147
logger .debug (f"Connection state: { state } " )
@@ -181,9 +177,9 @@ def on_connection_state_change() -> None:
181
177
is_initiator = True ,
182
178
)
183
179
184
- logger .debug (' initiator connected, closing init channel' )
180
+ logger .debug (" initiator connected, closing init channel" )
185
181
data_channel .close ()
186
-
182
+
187
183
logger .info (f"Successfully established WebRTC connection to { target_peer_id } " )
188
184
return connection
189
185
@@ -217,12 +213,10 @@ async def _send_signaling_message(stream: INetStream, message: Message) -> None:
217
213
raise
218
214
219
215
220
- async def _receive_signaling_message (
221
- stream : INetStream , timeout : float
222
- ) -> Message :
216
+ async def _receive_signaling_message (stream : INetStream , timeout : float ) -> Message :
223
217
"""Receive a signaling message from the stream"""
224
218
try :
225
- with trio .move_on_after (timeout ) as cancel_scope :
219
+ with trio .move_on_after (timeout ):
226
220
# Read message data
227
221
message_data = await stream .read ()
228
222
deserealized_msg = Message ()
@@ -235,30 +229,29 @@ async def _receive_signaling_message(
235
229
raise
236
230
237
231
238
- async def _send_ice_candidates (stream , peer_connection ):
232
+ async def _send_ice_candidates (
233
+ stream : INetStream , peer_connection : RTCPeerConnection
234
+ ) -> None :
239
235
# Get SDP offer from localDescription to extract ICE Candidate
240
236
sdp = peer_connection .localDescription .sdp
241
237
sdp_lines = sdp .splitlines ()
242
-
238
+
243
239
msg = Message ()
244
240
msg .type = Message .ICE_CANDIDATE
245
241
# Extract ICE_Candidate and send each separately
246
242
for line in sdp_lines :
247
243
if line .startswith ("a=candidate:" ):
248
- cand_str = line [len ("a=" ):]
249
- candidate_init = {
250
- "candidate" : cand_str ,
251
- "sdpMLineIndex" : 0
252
- }
244
+ cand_str = line [len ("a=" ) :]
245
+ candidate_init = {"candidate" : cand_str , "sdpMLineIndex" : 0 }
253
246
data = json .dumps (candidate_init )
254
247
msg .data = data
255
248
await _send_signaling_message (stream , msg )
256
249
logger .debug ("Sent ICE candidate init: %s" , candidate_init )
257
250
# Mark end-of-candidates
258
251
msg = Message (type = Message .ICE_CANDIDATE , data = json .dumps (None ))
259
- await _send_signaling_message (stream , msg )
252
+ await _send_signaling_message (stream , msg )
260
253
logger .debug ("Sent end-of-ICE marker" )
261
-
254
+
262
255
263
256
async def _handle_incoming_ice_candidates (
264
257
stream : INetStream , peer_connection : RTCPeerConnection , timeout : float
@@ -274,31 +267,31 @@ async def _handle_incoming_ice_candidates(
274
267
if cancel_scope .cancelled_caught :
275
268
logger .warning ("ICE candidate receive timeout" )
276
269
break
277
-
270
+
278
271
# stream ended or we became connected
279
272
if not message :
280
273
logger .error ("Null message recieved" )
281
274
break
282
-
275
+
283
276
if message .type != Message .ICE_CANDIDATE :
284
277
logger .error ("ICE candidate message expected. Exiting..." )
285
278
raise WebRTCError ("ICE candidate message expected." )
286
- break
287
-
279
+ break
280
+
288
281
# Candidate init cannot be null
289
- if message .data == '' :
282
+ if message .data == "" :
290
283
logger .debug ("candidate received is empty" )
291
284
continue
292
-
285
+
293
286
logger .debug ("Recieved new ICE Candidate" )
294
287
try :
295
288
candidate_init = json .loads (message .data )
296
289
except json .JSONDecodeError :
297
290
logger .error ("Invalid ICE candidate JSON: %s" , message .data )
298
291
break
299
-
292
+
300
293
bridge = TrioSafeWebRTCOperations ._get_bridge ()
301
-
294
+
302
295
# None means ICE gathering is fully complete
303
296
if candidate_init is None :
304
297
logger .debug ("Received ICE candidate null → end-of-ice signal" )
@@ -308,7 +301,9 @@ async def _handle_incoming_ice_candidates(
308
301
309
302
# CandidateInit is expected to be a dict
310
303
if isinstance (candidate_init , dict ) and "candidate" in candidate_init :
311
- candidate = candidate_from_aioice (Candidate .from_sdp (candidate_init ["candidate" ]))
304
+ candidate = candidate_from_aioice (
305
+ Candidate .from_sdp (candidate_init ["candidate" ])
306
+ )
312
307
async with bridge :
313
308
await bridge .add_ice_candidate (peer_connection , candidate )
314
309
logger .debug ("Added ICE candidate: %r" , candidate_init )
0 commit comments