Skip to content

Commit 00a0a67

Browse files
committed
Mostly completed migration of database_server.py
1 parent a0f234b commit 00a0a67

File tree

4 files changed

+273
-194
lines changed

4 files changed

+273
-194
lines changed

mocha_core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ if(BUILD_TESTING)
6262
ament_add_pytest_test(test_database_utils test/test_database_utils.py)
6363
ament_add_pytest_test(test_zmq_comm_node test/test_zmq_comm_node.py)
6464
ament_add_pytest_test(test_synchronize_channel test/test_synchronize_channel.py)
65+
ament_add_pytest_test(test_database_server test/test_database_server.py)
6566
endif()
6667

6768
ament_package()

mocha_core/mocha_core/database_server.py

Lines changed: 109 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import os
44
import threading
5-
import rospy
5+
import rclpy
6+
import rclpy.logging
7+
import rclpy.time
68
import mocha_core.srv
79
import database
810
import pdb
@@ -15,21 +17,28 @@ class DatabaseServer:
1517
1618
Please see the list of services in the srv folder
1719
"""
18-
def __init__(self, robot_configs, topic_configs):
20+
def __init__(self, robot_configs, topic_configs, node=None):
1921
# Check input topics
2022
assert isinstance(robot_configs, dict)
2123
assert isinstance(topic_configs, dict)
2224

2325
self.robot_configs = robot_configs
2426
self.topic_configs = topic_configs
25-
26-
# Get current robot name from params
27-
self.robot_name = rospy.get_param('~robot_name')
27+
28+
# Store or create the ROS2 node
29+
self.node = node
30+
if self.node is None:
31+
self.logger = rclpy.logging.get_logger('database_server')
32+
else:
33+
self.logger = self.node.get_logger()
34+
35+
# Get current robot name from params (passed as parameter since ROS2 doesn't use global params)
36+
# For now, we'll default to 'charon' - this should be passed as a parameter
37+
self.robot_name = 'charon' # TODO: Make this configurable
2838

2939
if self.robot_name not in self.robot_configs.keys():
30-
rospy.logerr(f"{self.robot_name} does not exist in robot_configs")
31-
rospy.signal_shutdown("robot_name does not exist in robot_configs")
32-
rospy.spin()
40+
self.logger.error(f"{self.robot_name} does not exist in robot_configs")
41+
raise ValueError("robot_name does not exist in robot_configs")
3342

3443
self.topic_list = self.topic_configs[self.robot_configs[self.robot_name]["node-type"]]
3544

@@ -42,68 +51,105 @@ def __init__(self, robot_configs, topic_configs):
4251

4352
# create services for all the possible calls to the DB
4453
self.service_list = []
45-
s = rospy.Service('~AddUpdateDB',
46-
mocha_core.srv.AddUpdateDB,
47-
self.add_update_db_service_cb)
48-
self.service_list.append(s)
49-
s = rospy.Service('~GetDataHeaderDB',
50-
mocha_core.srv.GetDataHeaderDB,
51-
self.get_data_hash_db_service_cb)
52-
self.service_list.append(s)
53-
s = rospy.Service('~SelectDB',
54-
mocha_core.srv.SelectDB,
55-
self.select_db_service_cb)
56-
self.service_list.append(s)
54+
if self.node is not None:
55+
s = self.node.create_service(mocha_core.srv.AddUpdateDB,
56+
'AddUpdateDB',
57+
self.add_update_db_service_cb)
58+
self.service_list.append(s)
59+
s = self.node.create_service(mocha_core.srv.GetDataHeaderDB,
60+
'GetDataHeaderDB',
61+
self.get_data_hash_db_service_cb)
62+
self.service_list.append(s)
63+
s = self.node.create_service(mocha_core.srv.SelectDB,
64+
'SelectDB',
65+
self.select_db_service_cb)
66+
self.service_list.append(s)
5767

5868
self.msg_types = du.msg_types(self.robot_configs, self.topic_configs)
5969

6070
def add_update_db_service_cb(self, req):
61-
if not isinstance(req.topic_id, int) or req.topic_id is None:
62-
rospy.logerr("Error: topic_id empty")
63-
return
64-
if len(req.msg_content) == 0:
65-
rospy.logerr("Error: msg_content empty")
66-
return
67-
if req.topic_id > len(self.topic_list):
68-
rospy.logerr("Error: topic_id not in topic_list")
69-
return
70-
topic = self.topic_list[req.topic_id]
71-
priority = du.get_priority_number(topic["msg_priority"])
72-
ts = req.timestamp
73-
ts = rospy.Time(ts.secs, ts.nsecs)
74-
dbm = database.DBMessage(self.robot_number,
75-
req.topic_id,
76-
dtype=self.msg_types[self.robot_number][req.topic_id]["dtype"],
77-
priority=priority,
78-
ts=ts,
79-
data=req.msg_content)
80-
81-
header = self.dbl.add_modify_data(dbm)
82-
return mocha_core.srv.AddUpdateDBResponse(header)
71+
try:
72+
if not isinstance(req.topic_id, int) or req.topic_id is None:
73+
self.logger.error("topic_id empty")
74+
return mocha_core.srv.AddUpdateDB.Response()
75+
if len(req.msg_content) == 0:
76+
self.logger.error("msg_content empty")
77+
return mocha_core.srv.AddUpdateDB.Response()
78+
if req.topic_id >= len(self.topic_list): # Changed > to >= for proper bounds check
79+
self.logger.error(f"topic_id {req.topic_id} not in topic_list (length: {len(self.topic_list)})")
80+
return mocha_core.srv.AddUpdateDB.Response()
81+
82+
topic = self.topic_list[req.topic_id]
83+
priority = du.get_priority_number(topic["msg_priority"])
84+
ts = req.timestamp
85+
# ROS2 builtin_interfaces/Time uses 'sec' and 'nanosec' fields
86+
ts = rclpy.time.Time(seconds=ts.sec, nanoseconds=ts.nanosec)
87+
88+
# Convert array to bytes if needed (ROS2 service messages use array)
89+
msg_data = req.msg_content
90+
if hasattr(msg_data, 'tobytes'):
91+
msg_data = msg_data.tobytes()
92+
elif isinstance(msg_data, (list, tuple)):
93+
msg_data = bytes(msg_data)
94+
95+
dbm = database.DBMessage(self.robot_number,
96+
req.topic_id,
97+
dtype=self.msg_types[self.robot_number][req.topic_id]["dtype"],
98+
priority=priority,
99+
ts=ts,
100+
data=msg_data)
101+
102+
header = self.dbl.add_modify_data(dbm)
103+
response = mocha_core.srv.AddUpdateDB.Response()
104+
response.new_header = header
105+
return response
106+
except Exception as e:
107+
self.logger.error(f"Exception in add_update_db_service_cb: {e}")
108+
return mocha_core.srv.AddUpdateDB.Response()
83109

84110
def get_data_hash_db_service_cb(self, req):
85-
if req.msg_header is None or len(req.msg_header) == 0:
86-
rospy.logerr("Error: msg_header empty")
87-
return
88-
dbm = self.dbl.find_header(req.msg_header)
89-
answ = mocha_core.srv.GetDataHeaderDBResponse(dbm.robot_id,
90-
dbm.topic_id,
91-
dbm.ts,
92-
dbm.data)
93-
return answ
111+
try:
112+
if req.msg_header is None or len(req.msg_header) == 0:
113+
self.logger.error("msg_header empty")
114+
return mocha_core.srv.GetDataHeaderDB.Response()
115+
116+
# Convert array to bytes if needed (ROS2 service messages use array)
117+
header_data = req.msg_header
118+
if hasattr(header_data, 'tobytes'):
119+
header_data = header_data.tobytes()
120+
elif isinstance(header_data, (list, tuple)):
121+
header_data = bytes(header_data)
122+
123+
dbm = self.dbl.find_header(header_data)
124+
125+
response = mocha_core.srv.GetDataHeaderDB.Response()
126+
response.robot_id = dbm.robot_id
127+
response.topic_id = dbm.topic_id
128+
response.timestamp = dbm.ts
129+
response.msg_content = dbm.data
130+
return response
131+
except Exception as e:
132+
self.logger.error(f"Exception in get_data_hash_db_service_cb: {e}")
133+
return mocha_core.srv.GetDataHeaderDB.Response()
94134

95135
def select_db_service_cb(self, req):
96-
# TODO Implement filtering
97-
if req.robot_id is None:
98-
rospy.logerr("Error: robot_id none")
99-
return
100-
if req.topic_id is None:
101-
rospy.logerr("Error: topic_id none")
102-
return
103-
list_headers = self.dbl.get_header_list(req.robot_id)
104-
answ = mocha_core.srv.SelectDBResponse(du.serialize_headers(list_headers))
105-
return answ
136+
try:
137+
# TODO Implement filtering
138+
response = mocha_core.srv.SelectDB.Response()
139+
140+
# Note: robot_id and topic_id are uint8 in ROS2, so they can't be None
141+
# We can add range validation if needed, but for now just proceed
142+
143+
list_headers = self.dbl.get_header_list(req.robot_id)
144+
145+
response.headers = du.serialize_headers(list_headers)
146+
return response
147+
except Exception as e:
148+
self.logger.error(f"Exception in select_db_service_cb: {e}")
149+
response = mocha_core.srv.SelectDB.Response()
150+
response.headers = b''
151+
return response
106152

107153
def shutdown(self):
108-
for s in self.service_list:
109-
s.shutdown()
154+
# In ROS2, services are automatically cleaned up when the node is destroyed
155+
pass

mocha_core/mocha_core/database_utils.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,19 +133,41 @@ def unpack_data(header, packed_data):
133133

134134
def serialize_ros_msg(msg):
135135
# TODO check that we are not entering garbage
136-
sio_h = io.BytesIO()
137-
msg.serialize(sio_h)
138-
serialized = sio_h.getvalue()
136+
# In ROS2, we use CDR serialization
137+
from rclpy.serialization import serialize_message
138+
serialized = serialize_message(msg)
139139
compressed = lz4.frame.compress(serialized)
140+
141+
# Debug: Test round-trip compression
142+
try:
143+
test_decompress = lz4.frame.decompress(compressed)
144+
if test_decompress != serialized:
145+
raise Exception("LZ4 round-trip test failed")
146+
except Exception as e:
147+
raise Exception(f"LZ4 compression test failed: {e}")
148+
140149
return compressed
141150

142151

143152
def parse_answer(api_answer, msg_types):
144153
constructor = msg_types[api_answer.robot_id][api_answer.topic_id]['obj']
145-
msg = constructor()
146154
# api_answer.msg_content has the compressed message
147-
decompressed = lz4.frame.decompress(api_answer.msg_content)
148-
msg.deserialize(decompressed)
155+
156+
# Debug: Check what we're trying to decompress
157+
print(f"DEBUG: Trying to decompress {len(api_answer.msg_content)} bytes")
158+
print(f"DEBUG: First 20 bytes: {api_answer.msg_content[:20]}")
159+
160+
try:
161+
decompressed = lz4.frame.decompress(api_answer.msg_content)
162+
except Exception as e:
163+
print(f"DEBUG: LZ4 decompression failed: {e}")
164+
print(f"DEBUG: msg_content type: {type(api_answer.msg_content)}")
165+
print(f"DEBUG: msg_content length: {len(api_answer.msg_content)}")
166+
raise
167+
168+
# In ROS2, we use CDR deserialization
169+
from rclpy.serialization import deserialize_message
170+
msg = deserialize_message(decompressed, constructor)
149171
robot_id = api_answer.robot_id
150172
topic_id = api_answer.topic_id
151173
ts = api_answer.timestamp

0 commit comments

Comments
 (0)