Skip to content

Commit a0f234b

Browse files
committed
Migrate synchronize_channel and test_synchronize_channel
1 parent 9216803 commit a0f234b

File tree

3 files changed

+113
-85
lines changed

3 files changed

+113
-85
lines changed

mocha_core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ if(BUILD_TESTING)
6161
ament_add_pytest_test(test_database test/test_database.py)
6262
ament_add_pytest_test(test_database_utils test/test_database_utils.py)
6363
ament_add_pytest_test(test_zmq_comm_node test/test_zmq_comm_node.py)
64+
ament_add_pytest_test(test_synchronize_channel test/test_synchronize_channel.py)
6465
endif()
6566

6667
ament_package()

mocha_core/mocha_core/synchronize_channel.py

Lines changed: 87 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@
22

33
import enum
44
import threading
5+
import time
56
import smach
67
import database as db
78
import database_utils as du
89
import hash_comm as hc
910
import zmq_comm_node
1011
import logging
11-
import rospy
12+
import rclpy
13+
import rclpy.logging
14+
import rclpy.time
1215
import pdb
13-
from std_msgs.msg import Time, String
14-
from mocha_core.msg import SM_state
16+
from builtin_interfaces.msg import Time
17+
from mocha_core.msg import SMState
1518

1619
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1720
# General configuration variables
@@ -48,12 +51,12 @@ def __init__(self, outer):
4851
def execute(self, userdata):
4952
self.outer.publishState("Idle Start")
5053
while (not self.outer.sm_shutdown.is_set() and
51-
not rospy.is_shutdown()):
54+
not self.outer.shutdown_requested):
5255
if self.outer.sync.get_state():
5356
# trigger sync and reset bistable
5457
self.outer.publishState("Idle to RequestHash")
5558
return 'to_req_hash'
56-
rospy.sleep(CHECK_TRIGGER_TIME)
59+
time.sleep(CHECK_TRIGGER_TIME)
5760
self.outer.publishState("Idle to Stopped")
5861
return 'to_stopped'
5962

@@ -82,12 +85,12 @@ def execute(self, userdata):
8285
and not self.outer.sm_shutdown.is_set()):
8386
answer = self.outer.client_answer
8487
if answer is not None:
85-
rospy.logdebug(f"{comm.this_node} - Channel" +
88+
self.outer.logger.debug(f"{comm.this_node} - Channel" +
8689
f"- REQUESTHASH: {answer}")
8790
userdata.out_answer = answer
8891
self.outer.publishState("RequestHash to Reply")
8992
return 'to_req_hash_reply'
90-
rospy.sleep(CHECK_POLL_TIME)
93+
time.sleep(CHECK_POLL_TIME)
9194
i += 1
9295
if self.outer.sm_shutdown.is_set():
9396
self.outer.publishState("RequestHash to Stopped")
@@ -113,7 +116,7 @@ def execute(self, userdata):
113116
# depending on the message type
114117
hash_list = self.outer.dbl.headers_not_in_local(deserialized,
115118
newer=True)
116-
rospy.logdebug(f"======== - REQUESTHASH: {hash_list}")
119+
self.outer.logger.debug(f"======== - REQUESTHASH: {hash_list}")
117120
if len(hash_list):
118121
# We have hashes. Go get them
119122
# rospy.logdebug(f"{self.this_robot} - REQUESTHASH: Unique -> {hash_list}")
@@ -143,7 +146,7 @@ def execute(self, userdata):
143146
hash_list = userdata.in_hash_list.copy()
144147
# Get the first hash of the list, the one with the higher priority
145148
req_hash = hash_list.pop(0)
146-
rospy.logdebug(f"{comm.this_node} - Channel - GETDATA: {req_hash}")
149+
self.outer.logger.debug(f"{comm.this_node} - Channel - GETDATA: {req_hash}")
147150
# Ask for hash
148151
msg = Comm_msgs.GDATA.name.encode() + req_hash
149152
comm.connect_send_message(msg)
@@ -160,7 +163,7 @@ def execute(self, userdata):
160163
userdata.out_req_hash = req_hash
161164
self.outer.publishState("GetData to GetDataReply")
162165
return 'to_get_data_reply'
163-
rospy.sleep(CHECK_POLL_TIME)
166+
time.sleep(CHECK_POLL_TIME)
164167
i += 1
165168
if self.outer.sm_shutdown.is_set():
166169
self.outer.publishState("GetData to Stopped")
@@ -208,7 +211,7 @@ def execute(self, userdata):
208211
self.outer.publishState("TransmissionEnd Start")
209212
# Request current comm node
210213
comm = self.outer.get_comm_node()
211-
rospy.logdebug(f"{comm.this_node} - Channel - DENDT")
214+
self.outer.logger.debug(f"{comm.this_node} - Channel - DENDT")
212215
# Ask for hash
213216
msg = Comm_msgs.DENDT.name.encode() + self.outer.this_robot.encode()
214217
comm.connect_send_message(msg)
@@ -221,9 +224,14 @@ def execute(self, userdata):
221224
answer = self.outer.client_answer
222225
if answer is not None:
223226
# We received an ACK
224-
self.outer.client_sync_complete_pub.publish(Time(rospy.get_rostime()))
227+
if self.outer.client_sync_complete_pub is not None:
228+
time_msg = Time()
229+
if self.outer.node is not None:
230+
current_time = self.outer.node.get_clock().now()
231+
time_msg.sec, time_msg.nanosec = current_time.seconds_nanoseconds()
232+
self.outer.client_sync_complete_pub.publish(time_msg)
225233
break
226-
rospy.sleep(CHECK_POLL_TIME)
234+
time.sleep(CHECK_POLL_TIME)
227235
i += 1
228236
if self.outer.sm_shutdown.is_set():
229237
self.outer.publishState("TransmissionEnd to Stopped")
@@ -265,19 +273,27 @@ def get_state(self):
265273
class Channel():
266274
def __init__(self, dbl, this_robot,
267275
target_robot, robot_configs,
268-
client_timeout):
276+
client_timeout, node=None):
269277
# Check input arguments
270278
assert type(dbl) is db.DBwLock
271279
assert type(this_robot) is str
272280
assert type(target_robot) is str
273281
assert type(robot_configs) is dict
274282
assert type(client_timeout) is float or type(client_timeout) is int
275283

276-
# Override smach logger to use rospy loggers
277-
# Use rospy.logdebug for smach info as it is too verbose
278-
# def set_loggers(info,warn,debug,error):
279-
smach.set_loggers(rospy.logdebug, rospy.logwarn,
280-
rospy.logdebug, rospy.logerr)
284+
# Store or create the ROS2 node
285+
self.node = node
286+
if self.node is None:
287+
self.logger = rclpy.logging.get_logger('synchronize_channel')
288+
else:
289+
self.logger = self.node.get_logger()
290+
291+
# Add shutdown_requested attribute for ROS2 compatibility
292+
self.shutdown_requested = False
293+
294+
# Override smach logger to use ROS2 loggers
295+
smach.set_loggers(self.logger.debug, self.logger.warn,
296+
self.logger.debug, self.logger.error)
281297

282298
# Basic parameters of the communication channel
283299
self.this_robot = this_robot
@@ -306,17 +322,30 @@ def __init__(self, dbl, this_robot,
306322
self.sm = smach.StateMachine(outcomes=['failure', 'stopped'])
307323

308324
# Create topic to notify that the transmission ended
309-
self.client_sync_complete_pub = rospy.Publisher(f"ddb/client_sync_complete/{self.target_robot}",
310-
Time,
311-
queue_size=20)
312-
self.server_sync_complete_pub = rospy.Publisher(f"ddb/server_sync_complete/{self.target_robot}",
313-
Time,
314-
queue_size=20)
325+
if self.node is not None:
326+
self.client_sync_complete_pub = self.node.create_publisher(
327+
Time,
328+
f"ddb/client_sync_complete/{self.target_robot}",
329+
20
330+
)
331+
self.server_sync_complete_pub = self.node.create_publisher(
332+
Time,
333+
f"ddb/server_sync_complete/{self.target_robot}",
334+
20
335+
)
336+
else:
337+
self.client_sync_complete_pub = None
338+
self.server_sync_complete_pub = None
315339

316340
# Create a topic that prints the current state of the state machine
317-
self.sm_state_pub = rospy.Publisher(f"ddb/client_sm_state/{self.target_robot}",
318-
SM_state,
319-
queue_size=20)
341+
if self.node is not None:
342+
self.sm_state_pub = self.node.create_publisher(
343+
SMState,
344+
f"ddb/client_sm_state/{self.target_robot}",
345+
20
346+
)
347+
else:
348+
self.sm_state_pub = None
320349
self.sm_state_count = 0
321350

322351
with self.sm:
@@ -366,13 +395,15 @@ def publishState(self, msg):
366395
""" Publish the string msg (where the state message will be stored)
367396
with a timestamp"""
368397
assert type(msg) is str
369-
state_msg = SM_state()
370-
state_msg.header.stamp = rospy.Time.now()
371-
state_msg.header.frame_id = self.this_robot
372-
state_msg.header.seq = self.sm_state_count
373-
self.sm_state_count += 1
374-
state_msg.state = msg
375-
self.sm_state_pub.publish(state_msg)
398+
if self.sm_state_pub is not None:
399+
state_msg = SMState()
400+
if self.node is not None:
401+
state_msg.header.stamp = self.node.get_clock().now().to_msg()
402+
state_msg.header.frame_id = self.this_robot
403+
# Note: ROS2 doesn't have seq field in header
404+
self.sm_state_count += 1
405+
state_msg.state = msg
406+
self.sm_state_pub.publish(state_msg)
376407

377408
def run(self):
378409
""" Configures the zmq_comm_node and also starts the state
@@ -384,7 +415,8 @@ def run(self):
384415
self.robot_configs,
385416
self.callback_client,
386417
self.callback_server,
387-
self.client_timeout)
418+
self.client_timeout,
419+
self.node)
388420
# Unset this flag before starting the SM thread
389421
self.sm_shutdown.clear()
390422
self.th = threading.Thread(target=self.sm_thread, args=())
@@ -399,70 +431,74 @@ def stop(self):
399431

400432
def sm_thread(self):
401433
# Start the state machine and wait until it ends
402-
rospy.logwarn(f"Channel {self.this_robot} -> {self.target_robot} started")
434+
self.logger.warn(f"Channel {self.this_robot} -> {self.target_robot} started")
403435
outcome = self.sm.execute()
404436
exit_msg = f"Channel {self.this_robot} -> {self.target_robot}" + \
405437
f" finished with outcome: {outcome}"
406438
if outcome == 'failure':
407-
rospy.logerr(exit_msg)
439+
self.logger.error(exit_msg)
408440
elif outcome == 'stopped':
409-
rospy.logwarn(exit_msg)
441+
self.logger.warn(exit_msg)
410442
# Terminate the comm node once the state machine ends
411443
self.comm_node.terminate()
412444

413445
def get_comm_node(self):
414446
if not self.comm_node:
415-
rospy.logerr("Requesting for an empty comm node")
416-
rospy.signal_shutdown("Requesting for an empty comm node")
417-
rospy.spin()
447+
self.logger.error("Requesting for an empty comm node")
448+
raise RuntimeError("Requesting for an empty comm node")
418449
return self.comm_node
419450

420451
def trigger_sync(self):
421452
if self.sync.get_state():
422-
rospy.logwarn(f"{self.this_robot} <- {self.target_robot}: Channel Busy")
453+
self.logger.warn(f"{self.this_robot} <- {self.target_robot}: Channel Busy")
423454
else:
424455
self.sync.set()
425456

426457
def callback_client(self, msg):
427458
if msg is not None:
428-
rospy.logdebug(f"{self.this_robot} - Channel - CALLBACK_CLIENT: len: {len(msg)}")
459+
self.logger.debug(f"{self.this_robot} - Channel - CALLBACK_CLIENT: len: {len(msg)}")
429460
else:
430-
rospy.logdebug(f"{self.this_robot} - Channel - CALLBACK_CLIENT - None")
461+
self.logger.debug(f"{self.this_robot} - Channel - CALLBACK_CLIENT - None")
431462
self.client_answer = msg
432463

433464
def callback_server(self, msg):
434-
rospy.logdebug(f"{self.this_robot} - Channel - CALLBACK_SERVER: {msg}")
465+
self.logger.debug(f"{self.this_robot} - Channel - CALLBACK_SERVER: {msg}")
435466
header = msg[:CODE_LENGTH].decode()
436467
data = msg[CODE_LENGTH:]
437468

438469
if header == Comm_msgs.GHEAD.name:
439470
# Returns all the headers that this node has
440471
# FIXME(Fernando): Configure this depending on the message type
441472
headers = self.dbl.get_header_list(filter_latest=True)
442-
rospy.logdebug(f"{self.this_robot} - Channel - Sending {len(headers)} headers")
443-
rospy.logdebug(f"{self.this_robot} - Channel - {headers}")
473+
self.logger.debug(f"{self.this_robot} - Channel - Sending {len(headers)} headers")
474+
self.logger.debug(f"{self.this_robot} - Channel - {headers}")
444475
serialized = du.serialize_headers(headers)
445476
return serialized
446477
if header == Comm_msgs.GDATA.name:
447478
r_header = data
448479
# Returns a packed data for the requires header
449480
# One header at a time
450481
if len(data) != HEADER_LENGTH:
451-
rospy.logerr(f"{self.this_robot} - Wrong header length: {len(data)}")
482+
self.logger.error(f"{self.this_robot} - Wrong header length: {len(data)}")
452483
return Comm_msgs.SERRM.name.encode()
453484
try:
454485
dbm = self.dbl.find_header(r_header)
455486
packed = du.pack_data(dbm)
456487
return packed
457488
except Exception:
458-
rospy.logerr(f"{self.this_robot} - Header not found: {r_header}")
489+
self.logger.error(f"{self.this_robot} - Header not found: {r_header}")
459490
return Comm_msgs.SERRM.name.encode()
460491
if header == Comm_msgs.DENDT.name:
461492
target = data
462493
if target.decode() != self.target_robot:
463494
print(f"{self.this_robot} - Channel - Wrong DENDT -" +
464495
f" Target: {target.decode()} - " +
465496
f"My target: {self.target_robot}")
466-
self.server_sync_complete_pub.publish(Time(rospy.get_rostime()))
497+
if self.server_sync_complete_pub is not None:
498+
time_msg = Time()
499+
if self.node is not None:
500+
current_time = self.node.get_clock().now()
501+
time_msg.sec, time_msg.nanosec = current_time.seconds_nanoseconds()
502+
self.server_sync_complete_pub.publish(time_msg)
467503
return "Ack".encode()
468504
return Comm_msgs.SERRM.name.encode()

0 commit comments

Comments
 (0)