|
13 | 13 | from typing import Tuple, Dict |
14 | 14 | from types import FunctionType |
15 | 15 |
|
16 | | -VERSION = "1.1.7" |
| 16 | +VERSION = "1.1.7.2" |
17 | 17 |
|
18 | 18 |
|
19 | 19 | def decode_n0(response_to_decode: bytes, head_len: int): |
@@ -56,7 +56,7 @@ def decode_no(response_to_decode: bytes, head_len: int): |
56 | 56 | """ |
57 | 57 | BUFFER_SIZE: Dict[str, str] = { |
58 | 58 | '0': '2K bytes', '1': '8K bytes', '2': '16K bytes', '3': '32K bytes'} |
59 | | - NET_PROTO: Dict[str, str] = {'0': 'TCP', '1': 'UDP'} |
| 59 | + NET_PROTO: Dict[str, str] = {'0': 'UDP', '1': 'TCP'} |
60 | 60 | response_to_decode, msg_len, str_pointer = common_parser(response_to_decode, head_len) |
61 | 61 | if response_to_decode[str_pointer:str_pointer + 2] == '00': # No errors |
62 | 62 | if len(response_to_decode) >= (24 + head_len): # Mode 00 |
@@ -125,8 +125,8 @@ def decode_ni(response_to_decode: bytes, head_len: int): |
125 | 125 | str_pointer = str_pointer + 8 |
126 | 126 | print("Remote port number: ", response_to_decode[str_pointer:str_pointer + 4]) |
127 | 127 | str_pointer = str_pointer + 4 |
128 | | - print("Connection Status: ", NET_CONNECTION_STATUS.get(response_to_decode[str_pointer:str_pointer + 1] |
129 | | - , 'Reserved')) |
| 128 | + print("Connection Status: ", NET_CONNECTION_STATUS.get(response_to_decode[str_pointer:str_pointer + 1], |
| 129 | + 'Reserved')) |
130 | 130 | str_pointer = str_pointer + 1 |
131 | 131 | print("Duration: ", response_to_decode[str_pointer:str_pointer + 8]) |
132 | 132 | str_pointer = str_pointer + 8 |
@@ -729,7 +729,7 @@ def run_test(ip_addr: str, port: int, host_command: str, proto: str = "tcp", hea |
729 | 729 | if proto not in ['tcp', 'udp', 'tls']: |
730 | 730 | print("invalid protocol parameter, It needs to be tcp, udp or tls") |
731 | 731 | return -1 |
732 | | - |
| 732 | + connection = None |
733 | 733 | try: |
734 | 734 |
|
735 | 735 | # calculate the size and format it correctly |
@@ -796,18 +796,16 @@ def run_test(ip_addr: str, port: int, host_command: str, proto: str = "tcp", hea |
796 | 796 | decoder_funct(data, header_len) |
797 | 797 |
|
798 | 798 | except ConnectionError as e: |
799 | | - print("Connection issue: ", e.message) |
| 799 | + print("Connection issue: ", e) |
800 | 800 | except FileNotFoundError as e: |
801 | 801 | print("The client certificate file or the client key file cannot be found or accessed.\n" + |
802 | | - "Check value passed to the parameters --keyfile and --crtfile", e.message) |
| 802 | + "Check value passed to the parameters --keyfile and --crtfile", e) |
803 | 803 | except Exception as e: |
804 | | - if hasattr(e, 'message'): |
805 | | - print("Unexpected issue:", e.message) |
806 | | - else: |
807 | | - print("Unexpected issue:", e) |
| 804 | + print("Unexpected issue:", e) |
808 | 805 |
|
809 | 806 | finally: |
810 | | - connection.close() |
| 807 | + if connection is not None: |
| 808 | + connection.close() |
811 | 809 |
|
812 | 810 |
|
813 | 811 | def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, int]: |
@@ -1016,27 +1014,3 @@ def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, i |
1016 | 1014 | run_test(args.host, args.port, command, args.proto, len(args.header), None) |
1017 | 1015 | print("") |
1018 | 1016 | print("DONE") |
1019 | | - |
1020 | | - |
1021 | | -def setup_connection(ip_addr: str, port: int, proto: str = "tcp") -> socket: |
1022 | | - if proto == "tcp": |
1023 | | - # creates the TCP socket |
1024 | | - connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
1025 | | - connection.connect((ip_addr, port)) |
1026 | | - return connection |
1027 | | - |
1028 | | - elif proto == "tls": |
1029 | | - # creates the TCP TLS socket |
1030 | | - connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
1031 | | - ciphers = "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:AES128-GCM-SHA256:AES128-SHA256:HIGH:" |
1032 | | - ciphers += "!aNULL:!eNULL:!EXPORT:!DSS:!DES:!RC4:!3DES:!MD5:!PSK" |
1033 | | - ssl_sock = ssl.wrap_socket(connection, args.keyfile, args.crtfile) |
1034 | | - ssl_sock.connect((ip_addr, port)) |
1035 | | - return connection |
1036 | | - |
1037 | | - elif proto == "udp": |
1038 | | - # create the UDP socket |
1039 | | - connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
1040 | | - return connection |
1041 | | - |
1042 | | - return None |
0 commit comments