@@ -179,7 +179,7 @@ class MCastClient(object): # skipcq: PYL-R0205
179179 """The source port for the client."""
180180
181181 # skipcq: TCV-002
182- def __init__ (self , * args , ** kwargs ):
182+ def __init__ (self , * args , ** kwargs ) -> None : # pragma: no cover
183183 """
184184 Initialize a MCastClient object with optional group address and source port.
185185
@@ -240,11 +240,11 @@ def __init__(self, *args, **kwargs):
240240
241241 """
242242 # skipcq: TCV-002
243- if str ( "grp_addr" ) in kwargs :
243+ if "grp_addr" in kwargs : # pragma: no branch
244244 self ._group_addr = kwargs .get ("grp_addr" , None ) # skipcq: PTC-W0039 - ensure None
245- if str ( "src_port" ) in kwargs :
245+ if "src_port" in kwargs : # pragma: no branch
246246 self ._source_port = kwargs .get ("src_port" , 0 )
247- else :
247+ else : # pragma: no branch
248248 self ._source_port = int (
249249 50000 + (
250250 int (random .SystemRandom ().randbytes (int (60000 ).__sizeof__ ()).hex (), 16 ) % 9999
@@ -253,7 +253,7 @@ def __init__(self, *args, **kwargs):
253253
254254 # skipcq: TCV-002
255255 @staticmethod
256- def say (address , port , sock , msg ):
256+ def say (address : str , port : int , sock : socket . socket , msg : str ) -> None : # pragma: no cover
257257 """
258258 Send a message to a specified multicast address and port, then receive and print it.
259259
@@ -306,9 +306,9 @@ def say(address, port, sock, msg):
306306
307307 """
308308 # skipcq: TCV-002
309- sock .sendto (bytes (msg + "\n " , "utf-8" ), (address , port ))
310- received = str (sock .recv (1024 ), "utf-8" )
311- sp = " " * 4
309+ sock .sendto (bytes (msg + "\n " , "utf-8" ), (address , port )) # pragma: no cover
310+ received = str (sock .recv (1024 ), "utf-8" ) # pragma: no cover
311+ sp = " " * 4 # pragma: no cover
312312 if (sys .stdout .isatty ()): # pragma: no cover
313313 print (f"Sent: { sp } { msg } " ) # skipcq: PYL-C0209 - must remain compatible
314314 print (f"Received: { received } " ) # skipcq: PYL-C0209 - must remain compatible
@@ -340,7 +340,7 @@ class MyUDPHandler(socketserver.BaseRequestHandler):
340340 __module__ = "tests.MulticastUDPClient.MyUDPHandler"
341341
342342 # skipcq: TCV-002
343- def handle (self ):
343+ def handle (self ) -> None : # pragma: no cover
344344 """
345345 Handle incoming UDP requests.
346346
@@ -389,15 +389,15 @@ def handle(self):
389389
390390 """
391391 # skipcq: TCV-002
392- data = self .request [0 ].strip ()
393- sock = self .request [1 ]
394- print (f"{ self .client_address [0 ]} wrote: " )
395- print (data )
396- sock .sendto (data .upper (), self .client_address )
392+ data = self .request [0 ].strip () # pragma: no cover
393+ sock = self .request [1 ] # pragma: no cover
394+ print (f"{ self .client_address [0 ]} wrote: " ) # pragma: no cover
395+ print (data ) # pragma: no cover
396+ sock .sendto (data .upper (), self .client_address ) # pragma: no cover
397397
398398
399399# skipcq: TCV-002
400- def main ():
400+ def main () -> None : # pragma: no cover
401401 """
402402 The main test operations.
403403
@@ -423,16 +423,16 @@ def main():
423423
424424 """
425425 # skipcq: TCV-002
426- HOST , PORT = "224.0.0.1" , 59991
427- data = "TEST This is a test"
428- sock = socket .socket (socket .AF_INET , socket .SOCK_DGRAM , socket .IPPROTO_UDP )
429- tsts_fxr = MCastClient ()
430- print (str ((HOST , PORT )))
431- tsts_fxr .say (HOST , PORT , sock , data )
432- tsts_fxr .say (HOST , PORT , sock , str ( "STOP" ))
426+ HOST , PORT = "224.0.0.1" , 59991 # pragma: no cover
427+ data = "TEST This is a test" # pragma: no cover
428+ sock = socket .socket (socket .AF_INET , socket .SOCK_DGRAM , socket .IPPROTO_UDP ) # pragma: no cover
429+ tsts_fxr = MCastClient () # pragma: no cover
430+ print (str ((HOST , PORT ))) # pragma: no cover
431+ tsts_fxr .say (HOST , PORT , sock , data ) # pragma: no cover
432+ tsts_fxr .say (HOST , PORT , sock , "STOP" ) # pragma: no cover
433433
434434
435- if __name__ == "__main__" :
435+ if __name__ == "__main__" : # pragma: no branch
436436 main () # skipcq: TCV-002
437437 # skipcq: PYL-R1722
438438 exit (0 ) # skipcq: PYL-R1722 -- intentionally allow overwriteing exit for testing.
0 commit comments