Skip to content

Commit 3a61d0e

Browse files
committed
Add more tests
1 parent d53271d commit 3a61d0e

File tree

4 files changed

+201
-0
lines changed

4 files changed

+201
-0
lines changed

tests/ros2/test_core.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import pytest
2+
3+
from roslibpy import Header, Time
4+
5+
REF_FLOAT_SECS_TIME = 1610122759.677662
6+
7+
8+
def test_time_from_sec_based_on_time_module():
9+
t = Time.from_sec(REF_FLOAT_SECS_TIME)
10+
assert t.secs == 1610122759
11+
assert t.nsecs == 677661895
12+
13+
14+
def test_to_nsec():
15+
t = Time.from_sec(REF_FLOAT_SECS_TIME)
16+
assert t.to_nsec() == 1610122759677661895
17+
18+
19+
def test_to_sec():
20+
t = Time.from_sec(REF_FLOAT_SECS_TIME)
21+
assert t.to_sec() == REF_FLOAT_SECS_TIME
22+
23+
24+
def test_is_zero():
25+
assert Time(0, 0).is_zero()
26+
assert Time(1, 0).is_zero() is False
27+
28+
29+
def test_header_ctor_supports_time():
30+
header = Header(stamp=Time.from_sec(REF_FLOAT_SECS_TIME))
31+
assert header["stamp"]["secs"] == 1610122759
32+
assert header["stamp"]["secs"] == header["stamp"].secs
33+
assert header["stamp"].to_sec() == REF_FLOAT_SECS_TIME
34+
35+
36+
def test_header_ctor_supports_dict():
37+
header = Header(stamp=dict(secs=1610122759, nsecs=677661895))
38+
assert header["stamp"]["secs"] == 1610122759
39+
assert header["stamp"]["secs"] == header["stamp"].secs
40+
assert header["stamp"].to_sec() == REF_FLOAT_SECS_TIME
41+
42+
43+
def test_time_accepts_only_ints():
44+
with pytest.raises(ValueError):
45+
Time(1.3, 1.0)
46+
with pytest.raises(ValueError):
47+
Time(100.0, 3.1)
48+
49+
t = Time(110.0, 0.0)
50+
assert t.secs == 110
51+
assert t.nsecs == 0
52+
53+
54+
def test_time_properties_are_readonly():
55+
t = Time.now()
56+
with pytest.raises(AttributeError):
57+
t.secs = 10
58+
with pytest.raises(AttributeError):
59+
t.nsecs = 10

tests/ros2/test_ros.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from __future__ import print_function
2+
3+
import threading
4+
import time
5+
6+
from roslibpy import Ros
7+
8+
host = "127.0.0.1"
9+
port = 9090
10+
url = "ws://%s:%d" % (host, port)
11+
12+
13+
def test_reconnect_does_not_trigger_on_client_close():
14+
ros = Ros(host, port)
15+
ros.run()
16+
17+
assert ros.is_connected, "ROS initially connected"
18+
time.sleep(0.5)
19+
event = threading.Event()
20+
ros.on("close", lambda m: event.set())
21+
ros.close()
22+
event.wait(5)
23+
24+
assert not ros.is_connected, "Successful disconnect"
25+
assert not ros.is_connecting, "Not trying to re-connect"
26+
27+
28+
def test_connection():
29+
ros = Ros(host, port)
30+
ros.run()
31+
assert ros.is_connected
32+
ros.close()
33+
34+
35+
def test_url_connection():
36+
ros = Ros(url)
37+
ros.run()
38+
assert ros.is_connected
39+
ros.close()
40+
41+
42+
def test_closing_event():
43+
ros = Ros(url)
44+
ros.run()
45+
ctx = dict(closing_event_called=False, was_still_connected=False)
46+
47+
def handle_closing():
48+
ctx["closing_event_called"] = True
49+
ctx["was_still_connected"] = ros.is_connected
50+
time.sleep(1.5)
51+
52+
ts_start = time.time()
53+
ros.on("closing", handle_closing)
54+
ros.close()
55+
ts_end = time.time()
56+
closing_was_handled_synchronously_before_close = ts_end - ts_start >= 1.5
57+
58+
assert ctx["closing_event_called"]
59+
assert ctx["was_still_connected"]
60+
assert closing_was_handled_synchronously_before_close
61+
62+
63+
def test_multithreaded_connect_disconnect():
64+
CONNECTIONS = 30
65+
clients = []
66+
67+
def connect(clients):
68+
ros = Ros(url)
69+
ros.run()
70+
clients.append(ros)
71+
72+
# First connect all
73+
threads = []
74+
for _ in range(CONNECTIONS):
75+
thread = threading.Thread(target=connect, args=(clients,))
76+
thread.daemon = False
77+
thread.start()
78+
threads.append(thread)
79+
80+
for thread in threads:
81+
thread.join()
82+
83+
# Assert connection status
84+
for ros in clients:
85+
assert ros.is_connected
86+
87+
# Now disconnect all
88+
for ros in clients:
89+
ros.close()
90+
91+
time.sleep(0.5)
92+
93+
# Assert connection status
94+
for ros in clients:
95+
assert not ros.is_connected

tests/ros2/test_rosapi.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import threading
2+
3+
import pytest
4+
5+
from roslibpy import Ros
6+
7+
host = "127.0.0.1"
8+
port = 9090
9+
url = "ws://%s:%d" % (host, port)
10+
11+
12+
def test_rosapi_topics():
13+
context = dict(wait=threading.Event(), result=None)
14+
ros = Ros(host, port)
15+
ros.run()
16+
17+
def callback(topic_list):
18+
context["result"] = topic_list
19+
context["wait"].set()
20+
21+
ros.get_topics(callback)
22+
if not context["wait"].wait(5):
23+
raise Exception
24+
25+
assert "/rosout" in context["result"]["topics"]
26+
ros.close()
27+
28+
29+
def test_rosapi_topics_blocking():
30+
ros = Ros(host, port)
31+
ros.run()
32+
topic_list = ros.get_topics()
33+
34+
print(topic_list)
35+
assert "/rosout" in topic_list
36+
37+
ros.close()
38+
39+
40+
def test_connection_fails_when_missing_port():
41+
with pytest.raises(Exception):
42+
Ros(host)
43+
44+
45+
def test_connection_fails_when_schema_not_ws():
46+
with pytest.raises(Exception):
47+
Ros("http://%s:%d" % (host, port))
File renamed without changes.

0 commit comments

Comments
 (0)