11import socket
22import time
3+ import sys
4+ import random
5+
6+ MOTD_PKT = b'\x01 \x00 \x00 \x00 \x00 $\r \x12 \xd3 \x00 \xff \xff \x00 \xfe \xfe \xfe \xfe \xfd \xfd \xfd \xfd \x12 4Vx\n '
7+
38
49def getLocalHostIP ():
510 localHostIP = socket .gethostbyname (socket .gethostname ())
6- try :
7- s = socket .socket (socket .AF_INET ,socket .SOCK_DGRAM )
8- s .connect (('8.8.8.8' , 80 ))
9- localHostIP = s .getsockname ()[0 ]
11+ try :
12+ s = socket .socket (socket .AF_INET , socket .SOCK_DGRAM )
13+ s .connect (('8.8.8.8' , 80 ))
14+ localHostIP = s .getsockname ()[0 ]
1015 finally :
1116 s .close ()
1217 return localHostIP
1318
19+
1420def getTime ():
1521 return time .strftime ('%H:%M:%S' )
1622
23+
1724def log (* content , level : str = "INFO" , info : str = "" , quiet : bool = False ):
1825 if quiet :
1926 return
@@ -25,4 +32,86 @@ def log(*content, level: str = "INFO", info: str = "", quiet: bool = False):
2532 if info != "" :
2633 print (f"[{ date } { info } ] { content } " )
2734 else :
28- print (f"[{ date } ] { content } " )
35+ print (f"[{ date } ] { content } " )
36+
37+
38+ def get_ip_list (ip : str ):
39+ # ip = "42.186.0-255.0-255" -> len(ip_list) == 65536
40+ ip_list = []
41+ processed_ip_segs = [[], [], [], []]
42+ ip_seg_index = 0
43+ ip_segs = ip .split ("." ) # 42.186.0-255.0-255 -> [42, 186, 0-255, 0-255]
44+ for ip_seg in ip_segs :
45+ if "-" in ip_seg :
46+ seg_list = ip_seg .split ("-" ) # 0-255 -> [0, 255]
47+ for i in range (int (seg_list [0 ]), int (seg_list [1 ])+ 1 ): # range(0, 255+1)
48+ processed_ip_segs [ip_seg_index ].append (str (i ))
49+ else :
50+ processed_ip_segs [ip_seg_index ].append (
51+ ip_seg ) # append("42") & append("186")
52+ ip_seg_index += 1
53+
54+ for processed_ip_seg_a in processed_ip_segs [0 ]: # 42
55+ for processed_ip_seg_b in processed_ip_segs [1 ]: # 186
56+ for processed_ip_seg_c in processed_ip_segs [2 ]: # range(0, 256)
57+ # range(0, 256)
58+ for processed_ip_seg_d in processed_ip_segs [3 ]:
59+ ip_list .append (
60+ f"{ processed_ip_seg_a } .{ processed_ip_seg_b } .{ processed_ip_seg_c } .{ processed_ip_seg_d } " )
61+ return ip_list
62+
63+
64+ def decode_unicode (string : str ):
65+ is_special_unicode = False
66+ for index in range (0 , len (string ), 5 ):
67+ if string [index ] != "u" :
68+ is_special_unicode = False
69+ break
70+ else :
71+ is_special_unicode = True
72+
73+ if is_special_unicode :
74+ string = string .replace ("u" , "\\ u" )
75+
76+ try :
77+ string .encode ('ascii' )
78+ except UnicodeEncodeError :
79+ return string
80+ else :
81+ return string .encode ("utf-8" ).decode ("unicode-escape" )
82+
83+
84+ def get_udp_socket (port = random .randint (1024 , 65535 )):
85+ if not port :
86+ port = random .randint (1024 , 65535 )
87+ sockets = socket .socket (socket .AF_INET , socket .SOCK_DGRAM )
88+ sockets .settimeout (1 )
89+ if sys .platform .startswith ('win32' ):
90+ sockets .bind (("" , port ))
91+ return sockets
92+
93+
94+ def parse_raw_pkt (pkt ):
95+ server_data , addr = pkt
96+ # motd packet must contain b"MCPE"
97+ if b"MCPE" not in server_data or len (server_data ) <= 30 :
98+ return None , addr
99+ infos = []
100+ prefix , motd_info = server_data .split (b"MCPE" )
101+ infos_byte = motd_info .split (b";" )
102+ for info in infos_byte :
103+ try :
104+ context = info .decode ()
105+ except UnicodeDecodeError :
106+ context = str (info )[2 :- 1 ]
107+ infos .append (context )
108+ try :
109+ infos = {"motd" : decode_unicode (infos [1 ]), "version_id" : infos [2 ], "version" : infos [3 ], "online" : infos [4 ],
110+ "max_player" : infos [5 ], "unique_id" : infos [6 ], "map" : decode_unicode (infos [7 ]), "gamemode" : infos [8 ],
111+ "source_port_v4" : infos [10 ], "source_port_v6" : infos [11 ], "addr" : f"{ addr [0 ]} :{ addr [1 ]} " }
112+ except IndexError :
113+ return None , addr
114+ if infos ["source_port_v4" ] != str (addr [1 ]):
115+ # return None, addr
116+ pass
117+ return infos , addr
0 commit comments