Skip to content

Commit 8c45d10

Browse files
committed
Code formatting
1 parent 603f31a commit 8c45d10

File tree

3 files changed

+138
-133
lines changed

3 files changed

+138
-133
lines changed

.github/workflows/build.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ jobs:
1919
.github/dockerursim/build_and_run_docker_ursim.sh
2020
- name: Install netcat
2121
run: |
22-
sudo apt-get update && sudo apt-get install -y netcat
22+
sudo apt-get update && sudo apt-get install -y netcat
2323
- name: Wait for dashboard client
24-
run: |
24+
run: |
2525
./.github/helpers/wait_for_dashboard_server.sh
2626
- name: Run test
2727
run: |

examples/simple_external_control_server/simple_external_control_server.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import threading
2525
import argparse
2626

27+
2728
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
2829
daemon_threads = True
2930
allow_reuse_address = True
@@ -52,20 +53,21 @@ def handle(self):
5253
file.close()
5354
print(f'Closed: {client}')
5455

56+
5557
if __name__ == '__main__':
56-
parser = argparse.ArgumentParser(description='Simple External Control server')
57-
parser.add_argument(
58-
"file", type=str, help="Path to the UR script file, which will be sent to the robot")
59-
parser.add_argument("-p", "--port", type=int,
60-
default=50002, help="Port number to use")
58+
parser = argparse.ArgumentParser(description='Simple External Control server')
59+
parser.add_argument(
60+
"file", type=str, help="Path to the UR script file, which will be sent to the robot")
61+
parser.add_argument("-p", "--port", type=int,
62+
default=50002, help="Port number to use")
6163

62-
args = parser.parse_args()
64+
args = parser.parse_args()
6365

64-
server = ThreadedTCPServer(('', args.port), FileHandler, args.file)
65-
print(f'The Simple External Control server is running on port {args.port}')
66-
try:
67-
server.serve_forever()
68-
except KeyboardInterrupt:
69-
pass
66+
server = ThreadedTCPServer(('', args.port), FileHandler, args.file)
67+
print(f'The Simple External Control server is running on port {args.port}')
68+
try:
69+
server.serve_forever()
70+
except KeyboardInterrupt:
71+
pass
7072

71-
server.server_close()
73+
server.server_close()

tests/test_urcap.py

Lines changed: 121 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -31,131 +31,134 @@
3131

3232
ROBOT_IP = "127.0.0.1"
3333

34+
3435
class DashboardClient:
35-
def __init__(self, ip_address):
36-
self.ip_address = ip_address
37-
self.port = 29999
38-
39-
def connect(self):
40-
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41-
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
42-
43-
self.socket.connect((self.ip_address, self.port))
44-
self.buffer_size = 1024
45-
self.socket.settimeout(5)
46-
self.socket.recv(self.buffer_size)
47-
48-
def disconnect(self):
49-
self.socket.close()
50-
51-
def sendAndRecieve(self, command):
52-
self.socket.settimeout(10)
53-
self.socket.sendall((command + "\n").encode("utf-8"))
54-
msg = self.socket.recv(self.buffer_size)
55-
msg = msg.decode("utf-8")
56-
msg = msg.replace('\n','')
57-
return msg
36+
def __init__(self, ip_address):
37+
self.ip_address = ip_address
38+
self.port = 29999
39+
40+
def connect(self):
41+
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
42+
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
43+
44+
self.socket.connect((self.ip_address, self.port))
45+
self.buffer_size = 1024
46+
self.socket.settimeout(5)
47+
self.socket.recv(self.buffer_size)
48+
49+
def disconnect(self):
50+
self.socket.close()
51+
52+
def sendAndRecieve(self, command):
53+
self.socket.settimeout(10)
54+
self.socket.sendall((command + "\n").encode("utf-8"))
55+
msg = self.socket.recv(self.buffer_size)
56+
msg = msg.decode("utf-8")
57+
msg = msg.replace('\n', '')
58+
return msg
59+
5860

5961
class ServerThread(threading.Thread):
60-
def __init__(self, server):
61-
threading.Thread.__init__(self)
62-
self.server = server
62+
def __init__(self, server):
63+
threading.Thread.__init__(self)
64+
self.server = server
6365

66+
def run(self):
67+
self.server.serve_forever()
6468

65-
def run(self):
66-
self.server.serve_forever()
6769

6870
class TestUrcap(unittest.TestCase):
69-
@classmethod
70-
def setUpClass(cls):
71-
cls.init_robot(cls)
72-
73-
@classmethod
74-
def tearDownClass(cls):
75-
cls.client.disconnect()
76-
print("tearing down")
77-
cls.server.shutdown()
78-
print("done shutting down server")
79-
cls.server_thread.join()
80-
81-
def init_robot(self):
82-
"""Setup server to send urscript to urcap and connect to dashboard client."""
83-
self.server = ThreadedTCPServer(('', 50002), FileHandler, os.path.dirname(__file__) + "/resources/test_script.urscript")
84-
self.server_thread = ServerThread(self.server)
85-
self.server_thread.start()
86-
87-
self.client = DashboardClient(ROBOT_IP)
88-
self.client.connect()
89-
90-
def test_one_urcap(self):
91-
"""This is testing that a program with the urcap can load and run."""
92-
93-
# Load program
94-
self.client.sendAndRecieve("load one_urcap.urp")
95-
result = self.client.sendAndRecieve("get loaded program")
96-
t_end = time.time() + 5
97-
while 'one_urcap' not in result and time.time() < t_end:
98-
result = self.client.sendAndRecieve("get loaded program")
99-
self.assertTrue('one_urcap' in result)
100-
101-
self.client.sendAndRecieve("brake release")
102-
103-
# Make sure the robot is running
104-
robot_mode = self.client.sendAndRecieve("robotmode")
105-
t_end = time.time() + 10
106-
while robot_mode != "Robotmode: RUNNING" and time.time() < t_end:
107-
robot_mode = self.client.sendAndRecieve("robotmode")
108-
self.assertEqual(robot_mode, "Robotmode: RUNNING")
109-
110-
# Play program
111-
self.client.sendAndRecieve("play")
112-
program_state = self.client.sendAndRecieve("programState")
113-
t_end = time.time() + 10
114-
while "PLAYING" not in program_state and time.time() < t_end:
115-
program_state = self.client.sendAndRecieve("programState")
116-
self.assertTrue("PLAYING" in program_state)
117-
118-
# Wait for program to stop again
119-
program_state = self.client.sendAndRecieve("programState")
120-
t_end = time.time() + 10
121-
while "STOPPED" not in program_state and time.time() < t_end:
122-
program_state = self.client.sendAndRecieve("programState")
123-
self.assertTrue("STOPPED" in program_state)
124-
125-
def test_multiple_urcaps(self):
126-
"""Test that a program with multiple URCaps can load and run, this will test # HEADER_BEGIN # HEADER_END feature."""
127-
# Load program
128-
self.client.sendAndRecieve("load multiple_urcaps.urp")
129-
result = self.client.sendAndRecieve("get loaded program")
130-
t_end = time.time() + 5
131-
while 'one_urcap' not in result and time.time() < t_end:
132-
result = self.client.sendAndRecieve("get loaded program")
133-
self.assertTrue('multiple_urcaps' in result)
134-
135-
self.client.sendAndRecieve("brake release")
136-
137-
# Make sure the robot is running
138-
robot_mode = self.client.sendAndRecieve("robotmode")
139-
t_end = time.time() + 10
140-
while robot_mode != "Robotmode: RUNNING" and time.time() < t_end:
141-
robot_mode = self.client.sendAndRecieve("robotmode")
142-
self.assertEqual(robot_mode, "Robotmode: RUNNING")
143-
144-
# Play program
145-
self.client.sendAndRecieve("play")
146-
program_state = self.client.sendAndRecieve("programState")
147-
t_end = time.time() + 10
148-
while "PLAYING" not in program_state and time.time() < t_end:
149-
program_state = self.client.sendAndRecieve("programState")
150-
self.assertTrue("PLAYING" in program_state)
151-
152-
# Wait for program to stop again
153-
program_state = self.client.sendAndRecieve("programState")
154-
t_end = time.time() + 10
155-
while "STOPPED" not in program_state and time.time() < t_end:
156-
program_state = self.client.sendAndRecieve("programState")
157-
self.assertTrue("STOPPED" in program_state)
71+
@classmethod
72+
def setUpClass(cls):
73+
cls.init_robot(cls)
74+
75+
@classmethod
76+
def tearDownClass(cls):
77+
cls.client.disconnect()
78+
print("tearing down")
79+
cls.server.shutdown()
80+
print("done shutting down server")
81+
cls.server_thread.join()
82+
83+
def init_robot(self):
84+
"""Setup server to send urscript to urcap and connect to dashboard client."""
85+
self.server = ThreadedTCPServer(('', 50002), FileHandler, os.path.dirname(
86+
__file__) + "/resources/test_script.urscript")
87+
self.server_thread = ServerThread(self.server)
88+
self.server_thread.start()
89+
90+
self.client = DashboardClient(ROBOT_IP)
91+
self.client.connect()
92+
93+
def test_one_urcap(self):
94+
"""This is testing that a program with the urcap can load and run."""
95+
96+
# Load program
97+
self.client.sendAndRecieve("load one_urcap.urp")
98+
result = self.client.sendAndRecieve("get loaded program")
99+
t_end = time.time() + 5
100+
while 'one_urcap' not in result and time.time() < t_end:
101+
result = self.client.sendAndRecieve("get loaded program")
102+
self.assertTrue('one_urcap' in result)
103+
104+
self.client.sendAndRecieve("brake release")
105+
106+
# Make sure the robot is running
107+
robot_mode = self.client.sendAndRecieve("robotmode")
108+
t_end = time.time() + 10
109+
while robot_mode != "Robotmode: RUNNING" and time.time() < t_end:
110+
robot_mode = self.client.sendAndRecieve("robotmode")
111+
self.assertEqual(robot_mode, "Robotmode: RUNNING")
112+
113+
# Play program
114+
self.client.sendAndRecieve("play")
115+
program_state = self.client.sendAndRecieve("programState")
116+
t_end = time.time() + 10
117+
while "PLAYING" not in program_state and time.time() < t_end:
118+
program_state = self.client.sendAndRecieve("programState")
119+
self.assertTrue("PLAYING" in program_state)
120+
121+
# Wait for program to stop again
122+
program_state = self.client.sendAndRecieve("programState")
123+
t_end = time.time() + 10
124+
while "STOPPED" not in program_state and time.time() < t_end:
125+
program_state = self.client.sendAndRecieve("programState")
126+
self.assertTrue("STOPPED" in program_state)
127+
128+
def test_multiple_urcaps(self):
129+
"""Test that a program with multiple URCaps can load and run, this will test # HEADER_BEGIN # HEADER_END feature."""
130+
# Load program
131+
self.client.sendAndRecieve("load multiple_urcaps.urp")
132+
result = self.client.sendAndRecieve("get loaded program")
133+
t_end = time.time() + 5
134+
while 'one_urcap' not in result and time.time() < t_end:
135+
result = self.client.sendAndRecieve("get loaded program")
136+
self.assertTrue('multiple_urcaps' in result)
137+
138+
self.client.sendAndRecieve("brake release")
139+
140+
# Make sure the robot is running
141+
robot_mode = self.client.sendAndRecieve("robotmode")
142+
t_end = time.time() + 10
143+
while robot_mode != "Robotmode: RUNNING" and time.time() < t_end:
144+
robot_mode = self.client.sendAndRecieve("robotmode")
145+
self.assertEqual(robot_mode, "Robotmode: RUNNING")
146+
147+
# Play program
148+
self.client.sendAndRecieve("play")
149+
program_state = self.client.sendAndRecieve("programState")
150+
t_end = time.time() + 10
151+
while "PLAYING" not in program_state and time.time() < t_end:
152+
program_state = self.client.sendAndRecieve("programState")
153+
self.assertTrue("PLAYING" in program_state)
154+
155+
# Wait for program to stop again
156+
program_state = self.client.sendAndRecieve("programState")
157+
t_end = time.time() + 10
158+
while "STOPPED" not in program_state and time.time() < t_end:
159+
program_state = self.client.sendAndRecieve("programState")
160+
self.assertTrue("STOPPED" in program_state)
158161

159162

160163
if __name__ == '__main__':
161-
unittest.main()
164+
unittest.main()

0 commit comments

Comments
 (0)