Skip to content

Commit 1f8e2ec

Browse files
committed
Improved zmq_comm_node and test to avoid creating node
1 parent d8d0d4d commit 1f8e2ec

File tree

2 files changed

+83
-69
lines changed

2 files changed

+83
-69
lines changed

mocha_core/mocha_core/zmq_comm_node.py

Lines changed: 28 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class SyncStatus(enum.Enum):
2424

2525
class Comm_node:
2626
def __init__(self, this_node, client_node, robot_configs,
27-
client_callback, server_callback, client_timeout, node=None):
27+
client_callback, server_callback, client_timeout, ros_node):
2828
# Check input arguments
2929
assert isinstance(this_node, str)
3030
assert isinstance(client_node, str)
@@ -33,13 +33,11 @@ def __init__(self, this_node, client_node, robot_configs,
3333
assert callable(server_callback)
3434
assert isinstance(client_timeout, (int, float))
3535

36-
# Store or create the ROS2 node
37-
self.node = node
38-
if self.node is None:
39-
# Create a minimal logger if no node provided
40-
self.logger = rclpy.logging.get_logger('zmq_comm_node')
41-
else:
42-
self.logger = self.node.get_logger()
36+
# Confirm that we are providing a ROS node
37+
assert ros_node is not None
38+
self.ros_node = ros_node
39+
self.logger = self.ros_node.get_logger()
40+
self.ros_node_name = self.ros_node.get_fully_qualified_name()
4341

4442
# Check that this_node and client_node exist in the config file
4543
if this_node not in robot_configs:
@@ -50,7 +48,6 @@ def __init__(self, this_node, client_node, robot_configs,
5048
if client_node not in robot_configs[self.this_node]["clients"]:
5149
self.logger.error(f"{this_node} - Node: client_node not in config file")
5250
raise ValueError("client_node not in config file")
53-
5451
self.client_node = client_node
5552

5653
self.robot_configs = robot_configs
@@ -72,14 +69,11 @@ def __init__(self, this_node, client_node, robot_configs,
7269
self.client_timeout = client_timeout
7370

7471
# Create a publisher for the client bandwidth
75-
if self.node is not None:
76-
self.pub_client_stats = self.node.create_publisher(
77-
mocha_core.msg.ClientStats,
78-
f"ddb/client_stats/{self.client_node}",
79-
10
80-
)
81-
else:
82-
self.pub_client_stats = None
72+
self.pub_client_stats = self.ros_node.create_publisher(
73+
mocha_core.msg.ClientStats,
74+
f"{self.ros_node_name}/client_stats/{self.client_node}",
75+
10
76+
)
8377
self.pub_client_count = 0
8478

8579
self.syncStatus = SyncStatus.IDLE
@@ -134,11 +128,8 @@ def connect_send_message(self, msg):
134128
self.logger.debug(f"{self.this_node} - Node - SENDMSG: " +
135129
f"Sending ({full_msg})")
136130
client.send(full_msg)
137-
if self.node is not None:
138-
start_ts = self.node.get_clock().now()
139-
else:
140-
import time
141-
start_ts = time.time()
131+
start_ts = self.ros_node.get_clock().now()
132+
142133
# Wait at most self.client_timeout * 1000 ms
143134
socks = dict(poll.poll(self.client_timeout*1000))
144135
if socks.get(client) == zmq.POLLIN:
@@ -157,29 +148,22 @@ def connect_send_message(self, msg):
157148
f"{self.this_node} - Node - SENDMSG: Server replied " +
158149
f"({len(reply)} bytes)"
159150
)
160-
if self.node is not None:
161-
stop_ts = self.node.get_clock().now()
162-
time_d = (stop_ts - start_ts).nanoseconds
163-
time_s = float(time_d / 1e9)
164-
else:
165-
import time
166-
stop_ts = time.time()
167-
time_s = stop_ts - start_ts
168-
151+
stop_ts = self.ros_node.get_clock().now()
152+
time_d = (stop_ts - start_ts).nanoseconds
153+
time_s = float(time_d / 1e9)
154+
169155
bw = len(reply)/time_s/1024/1024
170-
171-
if self.pub_client_stats is not None:
172-
stats = mocha_core.msg.ClientStats()
173-
if self.node is not None:
174-
stats.header.stamp = self.node.get_clock().now().to_msg()
175-
stats.header.frame_id = self.this_node
176-
# Note: ROS2 doesn't have seq field in header
177-
stats.msg = msg[:5].decode("utf-8")
178-
stats.rtt = time_s
179-
stats.bw = bw
180-
stats.answ_len = len(reply)
181-
self.pub_client_stats.publish(stats)
182-
self.pub_client_count += 1
156+
157+
stats = mocha_core.msg.ClientStats()
158+
stats.header.stamp = self.ros_node.get_clock().now().to_msg()
159+
stats.header.frame_id = self.this_node
160+
# Note: ROS2 doesn't have seq field in header
161+
stats.msg = msg[:5].decode("utf-8")
162+
stats.rtt = time_s
163+
stats.bw = bw
164+
stats.answ_len = len(reply)
165+
self.pub_client_stats.publish(stats)
166+
self.pub_client_count += 1
183167
if len(reply) > 10*1024 and SHOW_BANDWIDTH:
184168
self.logger.info(f"{self.this_node} - Node - " +
185169
f"SENDMSG: Data RTT: {time_s}")

mocha_core/test/test_zmq_comm_node.py

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,16 @@
1919
import pdb
2020
import rclpy
2121
import rclpy.logging
22+
from rclpy.logging import LoggingSeverity
23+
from rclpy.executors import SingleThreadedExecutor
24+
from rclpy.node import Node
25+
import threading
2226
from colorama import Fore, Style
27+
import mocha_core.zmq_comm_node as zmq_comm_node
28+
29+
class Comm_node_test(Node):
30+
def __init__(self):
31+
super().__init__("comm_node_test")
2332

2433

2534
class Test(unittest.TestCase):
@@ -28,44 +37,71 @@ def setUpClass(cls):
2837
# Load configurations at the class level
2938
current_dir = os.path.dirname(os.path.abspath(__file__))
3039
ddb_path = os.path.join(current_dir, "..")
31-
40+
3241
# Load robot configs
3342
robot_configs_path = os.path.join(ddb_path, "config/testConfigs/robot_configs.yaml")
3443
with open(robot_configs_path, "r") as f:
3544
cls.robot_configs = yaml.load(f, Loader=yaml.FullLoader)
3645

46+
3747
def setUp(self):
3848
test_name = self._testMethodName
3949
print("\n", Fore.RED, 20*"=", test_name, 20*"=", Style.RESET_ALL)
50+
# Create a mock node to run the tests
51+
rclpy.init()
52+
self.comm_node_test = Comm_node_test()
53+
self.executor = SingleThreadedExecutor()
54+
self.executor.add_node(self.comm_node_test)
55+
executor_thread = threading.Thread(target=self.executor.spin, daemon=True)
56+
executor_thread.start()
4057
super().setUp()
4158

4259
def tearDown(self):
4360
time.sleep(1)
61+
self.executor.shutdown()
62+
self.comm_node_test.destroy_node()
63+
rclpy.shutdown()
4464
super().tearDown()
4565

4666
def test_simple_connection(self):
47-
self.answer = None
48-
logger = rclpy.logging.get_logger('test_zmq_comm_node')
49-
50-
def cb_groundstation(value):
51-
logger.debug("cb_groundstation")
52-
self.answer = value
53-
54-
def cb_charon(value):
67+
self.answer_cb_gs_client = None
68+
self.answer_cb_ch_client = None
69+
logger = self.comm_node_test.get_logger()
70+
logger.set_level(LoggingSeverity.DEBUG)
71+
72+
def cb_groundstation_client(value):
73+
logger.debug(f"cb groundstation client")
74+
self.answer_cb_gs_client = value
75+
76+
def cb_groundstation_server(value):
77+
# This function is called upon reception of a message by the base
78+
# station
79+
logger.debug("cb groundstation server")
80+
to_append = b"GSSERVER"
81+
return value + to_append
82+
83+
def cb_charon_client(value):
84+
logger.debug(f"cb charon client")
85+
self.answer_cb_ch_client = value
86+
87+
def cb_charon_server(value):
5588
# This function is called upon reception of a message by charon.
5689
# The return value is transmitted as answer to the original
57-
# message.
58-
logger.debug(f"cb_charon: {value}")
59-
return value
90+
# message + b"CHARON"
91+
logger.debug(f"cb charon server")
92+
to_append = b"CHARONSERVER"
93+
return value + to_append
6094

6195
# Create the two robots
6296
node_groundstation = zmq_comm_node.Comm_node(
6397
"basestation", "charon", self.robot_configs,
64-
cb_groundstation, cb_groundstation, 2
98+
cb_groundstation_client, cb_groundstation_server,
99+
2, self.comm_node_test
65100
)
66101
node_charon = zmq_comm_node.Comm_node(
67102
"charon", "basestation", self.robot_configs,
68-
cb_charon, cb_charon, 2
103+
cb_charon_client, cb_charon_server,
104+
2, self.comm_node_test
69105
)
70106

71107
# Generate random message
@@ -76,21 +112,15 @@ def cb_charon(value):
76112

77113
# Send message from node_groundstation to robot 2
78114
node_groundstation.connect_send_message(random_msg)
79-
80-
# node_charon.connect_send_message(random_msg)
115+
node_charon.connect_send_message(random_msg)
81116

82117
# Terminate robots and test assertion
83118
node_groundstation.terminate()
84119
node_charon.terminate()
85-
self.assertEqual(random_msg, self.answer, "Sent %s" % random_msg)
86-
87-
88-
# Add the mocha_core module path for imports
89-
current_dir = os.path.dirname(os.path.abspath(__file__))
90-
mocha_core_path = os.path.join(current_dir, "..", "mocha_core")
91-
sys.path.append(mocha_core_path)
92-
93-
import mocha_core.zmq_comm_node as zmq_comm_node
120+
self.assertEqual(random_msg + b"CHARONSERVER", self.answer_cb_gs_client,
121+
"Sent %s" % random_msg)
122+
self.assertEqual(random_msg + b"GSSERVER", self.answer_cb_ch_client,
123+
"Sent %s" % random_msg)
94124

95125
if __name__ == "__main__":
96126
# Run test cases!

0 commit comments

Comments
 (0)