Skip to content

Commit 198cda9

Browse files
committed
database_server emits a message once database is modified
1 parent 06cf662 commit 198cda9

File tree

5 files changed

+49
-16
lines changed

5 files changed

+49
-16
lines changed

mocha_core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
2222
"msg/ClientStats.msg"
2323
"msg/HeaderPub.msg"
2424
"msg/SMState.msg"
25+
"msg/DatabaseCB.msg"
2526
"srv/AddUpdateDB.srv"
2627
"srv/GetDataHeaderDB.srv"
2728
"srv/SelectDB.srv"

mocha_core/mocha_core/database.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class DBwLock():
8282
8383
sample_db may be a valid dictionary that can be preloaded into
8484
the object (useful for debugging, see sample_db.py) """
85-
def __init__(self, sample_db=None, ros_node=None):
85+
def __init__(self, sample_db=None, ros_node=None, insertion_cb=None):
8686
if sample_db is not None:
8787
assert isinstance(sample_db, dict)
8888
# For ROS2, we can't deepcopy Time objects, so we'll just assign directly
@@ -91,8 +91,11 @@ def __init__(self, sample_db=None, ros_node=None):
9191
self.db = sample_db
9292
else:
9393
self.db = {}
94+
if insertion_cb is not None:
95+
assert callable(insertion_cb)
9496
self.lock = threading.Lock()
9597
self.ros_node = ros_node
98+
self.insertion_cb = insertion_cb
9699

97100
def add_modify_data(self, dbm):
98101
""" Insert new stuff into the db. Use the header as message index """
@@ -113,6 +116,9 @@ def add_modify_data(self, dbm):
113116
raise Exception("database: Header Collision Detected")
114117
self.db[dbm.robot_id][dbm.topic_id][dbm.header] = dbm
115118
self.lock.release()
119+
# Callback on insertion of new data
120+
if self.insertion_cb is not None:
121+
self.insertion_cb(dbm.robot_id, dbm.topic_id, dbm.ts)
116122
return dbm.header
117123

118124
def get_header_list(self, filter_robot_id=None, filter_latest=None):

mocha_core/mocha_core/database_server.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import mocha_core.database as database
1111
import pdb
1212
import mocha_core.database_utils as du
13+
import mocha_core.msg
1314

1415
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
1516

@@ -52,8 +53,22 @@ def __init__(self, robot_configs, topic_configs, robot_name, ros_node):
5253

5354
self.topic_list = self.topic_configs[self.robot_configs[self.robot_name]["node-type"]]
5455

56+
# Publish a message every time that the database is modified
57+
pub_database = self.ros_node.create_publisher(
58+
mocha_core.msg.DatabaseCB,
59+
f"{self.ros_node_name}/database_cb",
60+
10
61+
)
62+
def database_cb(robot_id, topic_id, ts):
63+
database_cb_msg = mocha_core.msg.DatabaseCB()
64+
database_cb_msg.header.stamp = ts.to_msg()
65+
database_cb_msg.header.frame_id = self.robot_name
66+
database_cb_msg.robot_id = robot_id
67+
database_cb_msg.topic_id = topic_id
68+
pub_database.publish(database_cb_msg)
69+
5570
# Create the empty database with lock
56-
self.dbl = database.DBwLock(ros_node=self.ros_node)
71+
self.dbl = database.DBwLock(ros_node=self.ros_node, insertion_cb=database_cb)
5772

5873
# Get current robot number
5974
self.robot_number = du.get_robot_id_from_name(self.robot_configs,

mocha_core/msg/DatabaseCB.msg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
std_msgs/Header header
2+
uint16 robot_id
3+
uint16 topic_id

mocha_core/test/test_database_server.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import mocha_core.database_server as ds
2424
import mocha_core.database_utils as du
2525
from mocha_core.srv import AddUpdateDB, GetDataHeaderDB, SelectDB
26-
27-
28-
import mocha_core.srv
26+
from mocha_core.msg import DatabaseCB
2927

3028

3129
class Database_server_test(Node):
@@ -51,11 +49,18 @@ def __init__(self):
5149
while not self.get_data_header_db_cli.wait_for_service(timeout_sec=1.0):
5250
logger.debug("Waiting for get_data_header_db")
5351
self.select_db_cli = self.create_client(SelectDB,
54-
"/test_database_server/select_db",
52+
"/test_database_server/select_db",
5553
qos_profile=ds.DatabaseServer.QOS_PROFILE)
5654
while not self.select_db_cli.wait_for_service(timeout_sec=1.0):
5755
logger.debug("Waiting for get_data_header_db")
5856

57+
# Subscribe to the answers from the database
58+
def callback(data):
59+
self.data_answer = data
60+
self.database_cb_sub = self.create_subscription(DatabaseCB,
61+
"/test_database_server/database_cb",
62+
callback, 10)
63+
5964
def recorder_async(self, loops_per_thread, robot_configs, topic_configs, robot_name):
6065
# Get a random ros time
6166
tid = du.get_topic_id_from_name(
@@ -82,7 +87,7 @@ def recorder_async(self, loops_per_thread, robot_configs, topic_configs, robot_n
8287
).to_msg()
8388
try:
8489
# Create request and call service method directly
85-
req = mocha_core.srv.AddUpdateDB.Request()
90+
req = AddUpdateDB.Request()
8691
req.topic_id = tid
8792
req.timestamp = timestamp
8893
req.msg_content = serialized_msg
@@ -174,7 +179,7 @@ def test_add_retrieve_single_msg(self):
174179

175180
try:
176181
# Create request and call service method directly
177-
req = mocha_core.srv.AddUpdateDB.Request()
182+
req = AddUpdateDB.Request()
178183
req.topic_id = tid
179184
req.timestamp = rclpy.clock.Clock().now().to_msg()
180185
req.msg_content = serialized_msg
@@ -191,7 +196,7 @@ def test_add_retrieve_single_msg(self):
191196
# Request the same hash through service
192197
try:
193198
# Create request and call service method directly
194-
req = mocha_core.srv.GetDataHeaderDB.Request()
199+
req = GetDataHeaderDB.Request()
195200
req.msg_header = answ_header
196201
future = client_node.get_data_header_db_cli.call_async(req)
197202
rclpy.spin_until_future_complete(client_node, future)
@@ -207,9 +212,12 @@ def test_add_retrieve_single_msg(self):
207212

208213
self.assertEqual(tid, ans_topic_id)
209214
self.assertEqual(ans_data, sample_msg)
210-
# print("Received topic:", ans_topic_name)
211-
# print("ROS msg:", ans_data)
212-
# print("Timestamp:", ans_ts)
215+
216+
# We will now test the callback from the database when a new message is
217+
# published
218+
self.assertTrue(client_node.data_answer is not None)
219+
self.assertEqual(client_node.data_answer.topic_id, tid)
220+
self.assertEqual(client_node.data_answer.header.frame_id, self.robot_name)
213221

214222
def test_add_select_robot(self):
215223
# Create a single client
@@ -237,7 +245,7 @@ def test_add_select_robot(self):
237245
serialized_msg = du.serialize_ros_msg(sample_msg)
238246
try:
239247
# Create request and call service method directly
240-
req = mocha_core.srv.AddUpdateDB.Request()
248+
req = AddUpdateDB.Request()
241249
req.topic_id = tid
242250
req.timestamp = rclpy.clock.Clock().now().to_msg()
243251
req.msg_content = serialized_msg
@@ -254,7 +262,7 @@ def test_add_select_robot(self):
254262
try:
255263
robot_id = du.get_robot_id_from_name(self.robot_configs, self.robot_name)
256264
# Create request and call service method directly
257-
req = mocha_core.srv.SelectDB.Request()
265+
req = SelectDB.Request()
258266
req.robot_id = robot_id
259267
req.topic_id = 0 # Changed from None to 0 since it's uint8
260268
req.ts_limit = rclpy.clock.Clock().now().to_msg()
@@ -299,7 +307,7 @@ def test_insert_storm(self):
299307
try:
300308
robot_id = du.get_robot_id_from_name(self.robot_configs, self.robot_name)
301309
# Create request and call service method directly
302-
req = mocha_core.srv.SelectDB.Request()
310+
req = SelectDB.Request()
303311
req.robot_id = robot_id
304312
req.topic_id = 0 # Changed from None to 0 since it's uint8
305313
req.ts_limit = rclpy.clock.Clock().now().to_msg()
@@ -315,4 +323,4 @@ def test_insert_storm(self):
315323

316324

317325
if __name__ == "__main__":
318-
unittest.main()
326+
unittest.main(failfast=True)

0 commit comments

Comments
 (0)