Skip to content

Commit 7c24d47

Browse files
committed
topic_publisher.py migrated with new test cases.
1 parent 0ef6ce3 commit 7c24d47

File tree

3 files changed

+257
-73
lines changed

3 files changed

+257
-73
lines changed

mocha_core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ if(BUILD_TESTING)
7171
ament_add_pytest_test(test_delay_2robots test/test_delay_2robots.py)
7272
ament_add_pytest_test(test_multiple_robots_sync test/test_multiple_robots_sync.py)
7373
ament_add_pytest_test(test_translator test/test_translator.py)
74+
ament_add_pytest_test(test_topic_publisher test/test_topic_publisher.py)
7475
endif()
7576

7677
ament_package()

mocha_core/mocha_core/topic_publisher.py

Lines changed: 177 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import pdb
44
import sys
55

6-
import rospkg
7-
import rospy
6+
import rclpy
7+
import rclpy.time
8+
import rclpy.clock
9+
import ament_index_python
810
import yaml
911
import re
1012
import mocha_core.msg
@@ -13,61 +15,86 @@
1315

1416

1517
class TopicPublisher():
16-
def __init__(self, this_robot, target, msg_history="WHOLE_HISTORY"):
18+
def __init__(self, this_robot, target, msg_history="WHOLE_HISTORY", node=None):
1719

1820
self.this_robot = this_robot
1921

22+
# Store or create the ROS2 node
23+
if node is None:
24+
# Initialize ROS2 if not already done
25+
if not rclpy.ok():
26+
rclpy.init()
27+
self.__node = rclpy.create_node(f'topic_publisher_{this_robot}')
28+
self.__node_owned = True
29+
else:
30+
self.__node = node
31+
self.__node_owned = False
32+
33+
self.__logger = self.__node.get_logger()
2034

2135
# Service configuration
22-
self.__select_service = "integrate_database/SelectDB"
23-
self.__get_header_service = "integrate_database/GetDataHeaderDB"
24-
25-
try:
26-
rospy.wait_for_service(self.__select_service)
27-
rospy.wait_for_service(self.__get_header_service)
28-
except rospy.ROSInterruptException as exc:
29-
rospy.logdebug("Service did not process request: " +
30-
str(exc))
31-
rospy.signal_shutdown("Service did not process request")
32-
rospy.spin()
33-
self.__select_db = rospy.ServiceProxy(
34-
self.__select_service, mocha_core.srv.SelectDB
35-
)
36+
self.__select_service = "select_db"
37+
self.__get_header_service = "get_data_header_db"
3638

37-
self.__get_header_db = rospy.ServiceProxy(
38-
self.__get_header_service, mocha_core.srv.GetDataHeaderDB
39+
# Create service clients
40+
self.__select_db = self.__node.create_client(
41+
mocha_core.srv.SelectDB, self.__select_service
42+
)
43+
self.__get_header_db = self.__node.create_client(
44+
mocha_core.srv.GetDataHeaderDB, self.__get_header_service
3945
)
4046

47+
# Wait for services to be available
48+
if not self.__select_db.wait_for_service(timeout_sec=10.0):
49+
self.__logger.error(f"Service {self.__select_service} not available")
50+
51+
if not self.__get_header_db.wait_for_service(timeout_sec=10.0):
52+
self.__logger.error(f"Service {self.__get_header_service} not available")
53+
4154
# List of robots to pull
4255
self.__robot_list = []
4356

4457
# Publisher creation
4558
self.publishers = {}
46-
self.header_pub = rospy.Publisher("ddb/topic_publisher/headers",
47-
mocha_core.msg.Header_pub,
48-
queue_size=10)
59+
self.header_pub = self.__node.create_publisher(
60+
mocha_core.msg.HeaderPub,
61+
"ddb/topic_publisher/headers",
62+
10
63+
)
4964
for t in target:
5065
robot, robot_id, topic, topic_id, obj = t
5166
if robot not in self.__robot_list:
5267
self.__robot_list.append(robot_id)
53-
self.publishers[(robot_id, topic_id)] = {"pub": rospy.Publisher(f"/{robot}{topic}",
54-
obj,
55-
queue_size=10),
56-
"ts": rospy.Time(1, 0)}
68+
self.publishers[(robot_id, topic_id)] = {
69+
"pub": self.__node.create_publisher(obj, f"/{robot}{topic}", 10),
70+
"ts": rclpy.time.Time(seconds=1, nanoseconds=0)
71+
}
5772

5873

5974
def run(self):
60-
rospy.loginfo(f"{self.this_robot} - Topic Publisher - Started")
61-
rate = rospy.Rate(10)
75+
self.__logger.info(f"{self.this_robot} - Topic Publisher - Started")
76+
rate = self.__node.create_rate(10)
6277
headers = set()
63-
while not rospy.is_shutdown():
78+
while rclpy.ok():
6479
for robot_id in self.__robot_list:
6580
headers_to_get = []
6681

82+
# Create service request
83+
req = mocha_core.srv.SelectDB.Request()
84+
req.robot_id = robot_id
85+
req.topic_id = 0 # None equivalent for uint8
86+
req.ts_limit = self.__node.get_clock().now().to_msg()
87+
6788
try:
68-
answ = self.__select_db(robot_id, None, None)
69-
except rospy.ServiceException as exc:
70-
rospy.logdebug(f"Service did not process request {exc}")
89+
future = self.__select_db.call_async(req)
90+
rclpy.spin_until_future_complete(self.__node, future, timeout_sec=1.0)
91+
if future.done():
92+
answ = future.result()
93+
else:
94+
self.__logger.debug("Service call timeout")
95+
continue
96+
except Exception as exc:
97+
self.__logger.debug(f"Service did not process request {exc}")
7198
continue
7299
returned_headers = du.deserialize_headers(answ.headers)
73100
if len(returned_headers) == 0:
@@ -79,75 +106,107 @@ def run(self):
79106
headers_to_get.append(header_)
80107

81108
for get_header in headers_to_get:
82-
rospy.logdebug(f"Getting headers {get_header}")
109+
self.__logger.debug(f"Getting headers {get_header}")
110+
111+
# Create service request for getting header data
112+
req = mocha_core.srv.GetDataHeaderDB.Request()
113+
req.msg_header = get_header
114+
83115
try:
84-
answ = self.__get_header_db(get_header)
85-
except rospy.ServiceException as exc:
86-
rospy.logerr("Service did not process request: " +
87-
str(exc))
116+
future = self.__get_header_db.call_async(req)
117+
rclpy.spin_until_future_complete(self.__node, future, timeout_sec=1.0)
118+
if future.done():
119+
answ = future.result()
120+
else:
121+
self.__logger.debug("Get header service call timeout")
122+
continue
123+
except Exception as exc:
124+
self.__logger.error("Service did not process request: " +
125+
str(exc))
88126
continue
89127
headers.add(get_header)
90128

91129
ans_robot_id, ans_topic_id, ans_ts, ans_data = du.parse_answer(answ,
92130
msg_types)
93131
for t in self.publishers.keys():
94132
if t == (ans_robot_id, ans_topic_id):
95-
assert isinstance(ans_data,
96-
self.publishers[t]['pub'].data_class)
133+
# Convert ROS1 timestamp to ROS2 Time for comparison
134+
ans_ts_ros2 = rclpy.time.Time.from_msg(ans_ts) if hasattr(ans_ts, 'sec') else ans_ts
135+
current_ts = self.publishers[t]["ts"]
136+
97137
# FIXME: remove this line once we have proper time
98138
# filtering implemented
99-
if ans_ts > self.publishers[t]["ts"]:
100-
self.publishers[t]["ts"] = ans_ts
139+
if ans_ts_ros2 > current_ts:
140+
self.publishers[t]["ts"] = ans_ts_ros2
101141
self.publishers[t]["pub"].publish(ans_data)
102142
self.header_pub.publish(get_header)
103-
rospy.logdebug(f"Publishing robot_id: {ans_robot_id}" +
104-
f" - topic: {ans_topic_id}")
143+
self.__logger.debug(f"Publishing robot_id: {ans_robot_id}" +
144+
f" - topic: {ans_topic_id}")
105145
else:
106-
rospy.logdebug(f"Skipping robot_id: {ans_robot_id}" +
107-
f" - topic: {ans_topic_id} as there is an old ts")
146+
self.__logger.debug(f"Skipping robot_id: {ans_robot_id}" +
147+
f" - topic: {ans_topic_id} as there is an old ts")
108148
rate.sleep()
109149

150+
def shutdown(self):
151+
"""Cleanup method for shutting down the topic publisher"""
152+
if self.__node_owned and self.__node is not None:
153+
self.__node.destroy_node()
154+
if rclpy.ok():
155+
rclpy.shutdown()
110156

111-
if __name__ == "__main__":
112-
rospy.init_node("mocha_core_publisher", anonymous=False)
113157

114-
# Get the mocha_core path
115-
rospack = rospkg.RosPack()
116-
ddb_path = rospack.get_path('mocha_core')
117-
scripts_path = os.path.join(ddb_path, "scripts/core")
118-
sys.path.append(scripts_path)
119-
import database_utils as du
158+
def create_topic_publisher_node(robot_name, robot_configs_path=None, topic_configs_path=None, node=None):
159+
"""
160+
Create and setup topic publisher for a given robot.
161+
Returns the node for cleanup.
162+
"""
163+
# Use provided node or create new one
164+
if node is None:
165+
# Initialize ROS2 if not already done
166+
if not rclpy.ok():
167+
rclpy.init()
168+
node = rclpy.create_node(f"topic_publisher_{robot_name}")
169+
node_owned = True
170+
else:
171+
node_owned = False
172+
173+
logger = node.get_logger()
120174

121-
# Get robot from the parameters
122-
this_robot = rospy.get_param("~robot_name")
175+
# Get the mocha_core path using ament
176+
try:
177+
from ament_index_python.packages import get_package_share_directory
178+
ddb_path = get_package_share_directory('mocha_core')
179+
except:
180+
# Fallback for development
181+
current_dir = os.path.dirname(os.path.abspath(__file__))
182+
ddb_path = os.path.join(current_dir, "..")
183+
184+
# Add path for database_utils
185+
sys.path.append(os.path.join(ddb_path, "."))
186+
import database_utils as du
123187

124188
# Get the robot_config path and generate the robot_configs object
125-
robot_configs_default = os.path.join(ddb_path,
126-
"config/testConfigs/robot_configs.yaml")
127-
robot_configs_path = rospy.get_param("~robot_configs", robot_configs_default)
189+
if robot_configs_path is None:
190+
robot_configs_path = os.path.join(ddb_path, "config", "testConfigs", "robot_configs.yaml")
191+
128192
with open(robot_configs_path, 'r') as f:
129193
robot_configs = yaml.load(f, Loader=yaml.FullLoader)
130-
if this_robot not in robot_configs.keys():
131-
rospy.logerr(f"Robot {this_robot} not in robot_configs")
132-
rospy.signal_shutdown("Robot not in robot_configs")
133-
rospy.spin()
134-
194+
if robot_name not in robot_configs.keys():
195+
logger.error(f"Robot {robot_name} not in robot_configs")
196+
if node_owned:
197+
rclpy.shutdown()
198+
raise ValueError(f"Robot {robot_name} not found in config")
135199

136200
# Get the topic_configs path and generate the topic_configs object
137-
topic_configs_default = os.path.join(ddb_path,
138-
"config/testConfigs/topic_configs.yaml")
139-
topic_configs_path = rospy.get_param("~topic_configs", topic_configs_default)
201+
if topic_configs_path is None:
202+
topic_configs_path = os.path.join(ddb_path, "config", "testConfigs", "topic_configs.yaml")
203+
140204
with open(topic_configs_path, 'r') as f:
141205
topic_configs = yaml.load(f, Loader=yaml.FullLoader)
142206

143207
# Get msg_types dict used to publish the topics
144208
msg_types = du.msg_types(robot_configs, topic_configs)
145209

146-
# Flip the dict so that we have name: obj
147-
msg_objects = {}
148-
# for msg in msg_types:
149-
# msg_objects[msg_types[msg]["name"]] = msg_types[msg]["obj"]
150-
151210
list_of_topics = set()
152211

153212
# Iterate for each robot in robot_configs, and generate the topics
@@ -161,5 +220,50 @@ def run(self):
161220
robot_tuple = (robot, robot_id, topic["msg_topic"], topic_id, obj)
162221
list_of_topics.add(robot_tuple)
163222

164-
pub = TopicPublisher(this_robot, list_of_topics)
165-
pub.run()
223+
# Create TopicPublisher instance
224+
pub = TopicPublisher(robot_name, list_of_topics, node=node)
225+
226+
return node, pub
227+
228+
229+
if __name__ == "__main__":
230+
# Initialize ROS2
231+
rclpy.init()
232+
233+
# Create main node
234+
node = rclpy.create_node("mocha_core_publisher")
235+
logger = node.get_logger()
236+
237+
# Declare parameters with defaults
238+
node.declare_parameter("robot_name", "")
239+
node.declare_parameter("robot_configs", "")
240+
node.declare_parameter("topic_configs", "")
241+
242+
# Get robot from the parameters
243+
this_robot = node.get_parameter("robot_name").get_parameter_value().string_value
244+
if not this_robot:
245+
logger.error("robot_name parameter is required")
246+
rclpy.shutdown()
247+
sys.exit(1)
248+
249+
# Get config file paths from parameters
250+
robot_configs_path = node.get_parameter("robot_configs").get_parameter_value().string_value
251+
topic_configs_path = node.get_parameter("topic_configs").get_parameter_value().string_value
252+
253+
# Use empty strings as None for the function
254+
robot_configs_path = robot_configs_path if robot_configs_path else None
255+
topic_configs_path = topic_configs_path if topic_configs_path else None
256+
257+
try:
258+
# Create topic publisher node and publisher instance
259+
_, pub = create_topic_publisher_node(this_robot, robot_configs_path, topic_configs_path, node)
260+
261+
# Run the publisher
262+
pub.run()
263+
except KeyboardInterrupt:
264+
pass
265+
finally:
266+
# Cleanup
267+
pub.shutdown()
268+
node.destroy_node()
269+
rclpy.shutdown()

0 commit comments

Comments
 (0)