|
13 | 13 |
|
14 | 14 | import abc
|
15 | 15 | import argparse
|
| 16 | +import contextlib |
16 | 17 | import errno
|
17 | 18 | import logging
|
18 | 19 | import os
|
@@ -965,20 +966,134 @@ def run_telnet_client(self, host: str, port: int, active_sock=None) -> None:
|
965 | 966 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
966 | 967 | sock.connect((host, port))
|
967 | 968 |
|
968 |
| - # Otherwise, use a pure python implementation. This will work well for logging, |
969 |
| - # but input is line based only. |
| 969 | + if platform.system() == 'Windows': |
| 970 | + self._windows_telnet_client(sock) |
| 971 | + else: |
| 972 | + self._unix_telnet_client(sock) |
| 973 | + |
| 974 | + |
| 975 | + def _unix_telnet_client(self, sock: socket.socket) -> None: |
| 976 | + """ |
| 977 | + Start a telnet client on Unix. |
| 978 | +
|
| 979 | + Selectors are used to register both the given socket and the stdin and wait on |
| 980 | + incoming data, to later process it. |
| 981 | + """ |
| 982 | + import termios |
| 983 | + import tty |
| 984 | + |
970 | 985 | sel = selectors.DefaultSelector()
|
971 | 986 | sel.register(sys.stdin, selectors.EVENT_READ)
|
972 | 987 | sel.register(sock, selectors.EVENT_READ)
|
973 |
| - while True: |
974 |
| - events = sel.select() |
975 |
| - for key, _ in events: |
976 |
| - if key.fileobj == sys.stdin: |
977 |
| - text = sys.stdin.readline() |
978 |
| - if text: |
979 |
| - sock.send(text.encode()) |
980 |
| - |
981 |
| - elif key.fileobj == sock: |
| 988 | + |
| 989 | + |
| 990 | + fd = sys.stdin.fileno() |
| 991 | + orig_attr = termios.tcgetattr(fd) |
| 992 | + try: |
| 993 | + # Enable cbreak mode on the keyboard input, that way all shell editing |
| 994 | + # commands (arrow keys, backspace, etc.) work as expected. |
| 995 | + tty.setcbreak(fd) |
| 996 | + while True: |
| 997 | + events = sel.select() |
| 998 | + for key, _ in events: |
| 999 | + if key.fileobj == sys.stdin: |
| 1000 | + data = os.read(fd, 2048) |
| 1001 | + if data: |
| 1002 | + sock.send(data) |
| 1003 | + |
| 1004 | + elif key.fileobj == sock: |
| 1005 | + resp = sock.recv(2048) |
| 1006 | + if resp: |
| 1007 | + print(resp.decode(), end='', flush=True) |
| 1008 | + except OSError as e: |
| 1009 | + print("\n\nCommunication was closed, reason:", e) |
| 1010 | + finally: |
| 1011 | + termios.tcsetattr(fd, termios.TCSADRAIN, orig_attr) |
| 1012 | + |
| 1013 | + |
| 1014 | + def _windows_telnet_client(self, sock): |
| 1015 | + """ |
| 1016 | + Start a telnet client on Windows. |
| 1017 | +
|
| 1018 | + Since we don't have tty package on the Windows, a different approach to |
| 1019 | + establish two-way communction: |
| 1020 | + - Two threads are started. |
| 1021 | + - One uses the kbhit() and getch() from msvcrt package to read every single |
| 1022 | + character from input. |
| 1023 | + - The other one reads data from the socket and prints it to the screen. |
| 1024 | +
|
| 1025 | + Additional logic is need to convert arrow keys and navigation keys into |
| 1026 | + corresponding ANSI escape sequences |
| 1027 | + Additional logic with the stop event is needed to handle CTRL+C and any errors. |
| 1028 | + """ |
| 1029 | + import msvcrt |
| 1030 | + import threading |
| 1031 | + |
| 1032 | + def send_keyboard_input(sock: socket.socket, |
| 1033 | + stop: threading.Event, |
| 1034 | + err: threading.Event): |
| 1035 | + KEY_MAP = { |
| 1036 | + # Arrow keys |
| 1037 | + b'\xe0H': b'\x1b[A', # Up |
| 1038 | + b'\xe0P': b'\x1b[B', # Down |
| 1039 | + b'\xe0K': b'\x1b[D', # Left |
| 1040 | + b'\xe0M': b'\x1b[C', # Right |
| 1041 | + |
| 1042 | + # Navigation keys |
| 1043 | + b'\xe0G': b'\x1b[H', # Home |
| 1044 | + b'\xe0O': b'\x1b[F', # End |
| 1045 | + b'\xe0R': b'\x1b[2~', # Insert |
| 1046 | + b'\xe0S': b'\x1b[3~', # Delete |
| 1047 | + } |
| 1048 | + |
| 1049 | + try: |
| 1050 | + while not stop.is_set(): |
| 1051 | + if msvcrt.kbhit(): |
| 1052 | + ch = msvcrt.getch() |
| 1053 | + if ch == b'\xe0': |
| 1054 | + ch2 = msvcrt.getch() |
| 1055 | + if ansi := KEY_MAP.get(ch + ch2): |
| 1056 | + sock.sendall(ansi) |
| 1057 | + elif ch == b'\r': |
| 1058 | + # Normalize CR to CRLF |
| 1059 | + sock.sendall(b"\r\n") |
| 1060 | + else: |
| 1061 | + sock.sendall(ch) |
| 1062 | + except OSError as e: |
| 1063 | + if not err.is_set() and not stop.is_set(): |
| 1064 | + err.set() |
| 1065 | + print("\n\nCommunication was closed, reason:", e) |
| 1066 | + stop.set() |
| 1067 | + |
| 1068 | + def receive_rtt_data(sock: socket.socket, |
| 1069 | + stop: threading.Event, |
| 1070 | + err: threading.Event): |
| 1071 | + try: |
| 1072 | + while not stop.is_set(): |
982 | 1073 | resp = sock.recv(2048)
|
983 |
| - if resp: |
984 |
| - print(resp.decode(), end='') |
| 1074 | + if not resp: |
| 1075 | + print("\nConnection closed by the remote host, exiting!") |
| 1076 | + stop.set() |
| 1077 | + break |
| 1078 | + print(resp.decode(errors="replace"), end='', flush=True) |
| 1079 | + except OSError as e: |
| 1080 | + if not err.is_set() and not stop.is_set(): |
| 1081 | + err.set() |
| 1082 | + print("\n\nCommunication was closed, reason:", e) |
| 1083 | + stop.set() |
| 1084 | + |
| 1085 | + stop = threading.Event() |
| 1086 | + err = threading.Event() |
| 1087 | + threading.Thread(target=send_keyboard_input, args=(sock, stop, err)).start() |
| 1088 | + threading.Thread(target=receive_rtt_data, args=(sock, stop, err)).start() |
| 1089 | + |
| 1090 | + try: |
| 1091 | + while True and not stop.is_set(): |
| 1092 | + # Wait for an event in the loop. Without timeout the |
| 1093 | + # KeyboardInterrupt would not be detected. |
| 1094 | + stop.wait(timeout=0.1) |
| 1095 | + except KeyboardInterrupt: |
| 1096 | + stop.set() |
| 1097 | + finally: |
| 1098 | + with contextlib.suppress(OSError): |
| 1099 | + sock.close() |
0 commit comments