3737import argparse
3838import socket
3939import os
40+ import pytest
41+
42+ RED = "\033 [31m"
43+ RESET = "\033 [0m"
4044
4145
4246# Custom formatter to show both default values and description formatting in the help message
@@ -83,8 +87,6 @@ def check_tcp(ip, port, timeout=5.0):
8387
8488
8589def main (args ):
86- RED = "\033 [31m"
87- RESET = "\033 [0m"
8890
8991 logging .basicConfig (level = logging .INFO , format = "[%(levelname)s] %(message)s" )
9092
@@ -97,27 +99,33 @@ def main(args):
9799
98100 # Check IP and port reachability
99101 if not check_tcp (robot_ip , tcp_port ):
100- logging . error (
101- f"{ RED } Cannot reach { robot_ip } :{ tcp_port } .\n "
102+ raise ConnectionError (
103+ f"Cannot reach { robot_ip } :{ tcp_port } .\n "
102104 "Check that the IP address and port are correct.\n "
103105 "If so, ensure that the robot is powered on, reachable on the network, "
104- f"and that the ToolCommForwarder URCap is running.{ RESET } "
106+ f"and that the ToolCommForwarder URCap is running."
107+ )
108+
109+ # Check if parent directory of device_name exists
110+ parent_dir = os .path .dirname (local_device )
111+ if parent_dir and not os .path .exists (parent_dir ):
112+ raise FileNotFoundError (
113+ f"Parent directory '{ parent_dir } ' does not exist.\n "
114+ "Socat needs an existing directory to create the PTY symlink.\n "
115+ "Fix:\n "
116+ f" - Create the parent directory, e.g. 'mkdir -p { parent_dir } '.\n "
117+ f" - Use a different device name with an existing parent directory."
105118 )
106- logging .info ("Exiting tool communication script." )
107- return
108119
109120 # Check if the device_name is a directory
110121 if os .path .isdir (local_device ):
111-
112- logging .error (
113- f"{ RED } '{ local_device } ' exists and is a directory.\n "
122+ raise FileExistsError (
123+ f"'{ local_device } ' exists and is a directory.\n "
114124 "Socat needs a file path to create a PTY symlink, but it cannot replace a directory.\n "
115125 "Fix:\n "
116126 " - Remove the directory.\n "
117- f" - Use a different device name, e.g. '--device-name /tmp/ttyUR0'. { RESET } "
118- )
119- logging .info ("Exiting tool communication script." )
120- return
127+ f" - Use a different device name, e.g. '--device-name /tmp/ttyUR0'."
128+ )
121129
122130 # Configure socat command
123131 socat_config = [
@@ -144,19 +152,64 @@ def main(args):
144152
145153 # Error case when socat is not installed
146154 except FileNotFoundError :
147- logging .error (f"{ RED } Socat not found in PATH. Install it (e.g. apt-get install socat). { RESET } " )
148- logging .info ("Exiting tool communication script." )
149- return
155+ raise FileNotFoundError ("Socat not found in PATH. Install it (e.g. apt-get install socat)." )
150156
151- # Other errors
152- except Exception as e :
153- logging .error (f"{ RED } Unexpected error launching socat: { e } { RESET } " )
154- logging .info ("Exiting tool communication script." )
155- return
156157
157- return
158+ def test_check_tcp ():
159+ assert not check_tcp ("127.0.0.1" , 0.1 )
160+
161+
162+ def test_check_tcp_open_port ():
163+ bind_ip = "127.0.0.1"
164+ bind_port = 54321
165+ args = argparse .Namespace (
166+ robot_ip = bind_ip ,
167+ tcp_port = bind_port ,
168+ device_name = "/tmp/nonexistent_dir/ttyUR"
169+ )
170+ with pytest .raises (ConnectionError ):
171+ main (args )
172+
173+
174+ def test_parent_dir_doesnt_exist ():
175+ bind_ip = "127.0.0.1"
176+ bind_port = 54321
177+ server = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
178+ server .bind ((bind_ip , bind_port ))
179+ server .listen (1 )
180+ args = argparse .Namespace (
181+ robot_ip = bind_ip ,
182+ tcp_port = bind_port ,
183+ device_name = "/tmp/nonexistent_dir/ttyUR"
184+ )
185+ with pytest .raises (FileNotFoundError ) as exc_info :
186+ main (args )
187+ assert os .path .dirname (args .device_name ) in str (exc_info .value )
188+ server .close ()
189+
190+
191+ def test_device_name_is_directory ():
192+ bind_ip = "127.0.0.1"
193+ bind_port = 54321
194+ server = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
195+ server .bind ((bind_ip , bind_port ))
196+ server .listen (1 )
197+ args = argparse .Namespace (
198+ robot_ip = bind_ip ,
199+ tcp_port = bind_port ,
200+ device_name = "/tmp/ttyUR_dir"
201+ )
202+ os .makedirs (args .device_name , exist_ok = True )
203+ with pytest .raises (FileExistsError ) as exc_info :
204+ main (args )
205+ assert args .device_name in str (exc_info .value )
206+ server .close ()
158207
159208
160209if __name__ == "__main__" :
161210 args = get_args ()
162- main (args )
211+ try :
212+ main (args )
213+ except Exception as e :
214+ logging .error (f"{ RED } Unexpected error: { e } { RESET } " )
215+ logging .info ("Exiting tool communication script." )
0 commit comments