1+ import json
2+ import asyncio
3+ from opengsq .protocol_base import ProtocolBase
4+ from opengsq .responses .renegadex import Status
5+
6+ class RenegadeX (ProtocolBase ):
7+ full_name = "Renegade X Protocol"
8+ BROADCAST_PORT = 45542
9+
10+ def __init__ (self , host : str , port : int = 7777 , timeout : float = 5.0 ):
11+ super ().__init__ (host , port , timeout )
12+
13+ async def get_status (self ) -> Status :
14+ loop = asyncio .get_running_loop ()
15+ queue = asyncio .Queue ()
16+
17+ class BroadcastProtocol (asyncio .DatagramProtocol ):
18+ def __init__ (self , queue , host ):
19+ self .queue = queue
20+ self .target_host = host
21+
22+ def datagram_received (self , data , addr ):
23+ if addr [0 ] == self .target_host :
24+ self .queue .put_nowait (data )
25+
26+ transport , _ = await loop .create_datagram_endpoint (
27+ lambda : BroadcastProtocol (queue , self ._host ),
28+ local_addr = ('0.0.0.0' , self .BROADCAST_PORT )
29+ )
30+
31+ try :
32+ complete_data = bytearray ()
33+ while True :
34+ try :
35+ data = await asyncio .wait_for (queue .get (), timeout = self ._timeout )
36+ complete_data .extend (data )
37+
38+ try :
39+ json_str = complete_data .decode ('utf-8' )
40+ server_info = json .loads (json_str )
41+ return Status .from_dict (server_info )
42+ except (UnicodeDecodeError , json .JSONDecodeError ):
43+ continue
44+
45+ except asyncio .TimeoutError :
46+ raise TimeoutError ("No broadcast received from the specified server" )
47+
48+ finally :
49+ transport .close ()
0 commit comments