diff --git a/mongodb_store/src/mongodb_store/__init__.py b/libmongocxx_ros/COLCON_IGNORE similarity index 100% rename from mongodb_store/src/mongodb_store/__init__.py rename to libmongocxx_ros/COLCON_IGNORE diff --git a/mongodb_log/CHANGELOG.rst b/mongodb_log/CHANGELOG.rst index 2e3618e..de0b687 100644 --- a/mongodb_log/CHANGELOG.rst +++ b/mongodb_log/CHANGELOG.rst @@ -2,6 +2,12 @@ Changelog for package mongodb_log ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +0.6.0 (2022-09-20) +------------------ +* Ensure Python 3 compatibility in mongodb log (`#277 `_) +* update package.xml to format=3 (`#269 `_) +* Contributors: Gal Gorjup, Kei Okada, Nick Hawes + 0.5.2 (2019-11-11) ------------------ * back to system mongo diff --git a/mongodb_log/CMakeLists.txt b/mongodb_log/CMakeLists.txt index 239481e..ff065a6 100644 --- a/mongodb_log/CMakeLists.txt +++ b/mongodb_log/CMakeLists.txt @@ -138,7 +138,7 @@ target_link_libraries(mongodb_log_cimg ## Mark executable scripts (Python etc.) for installation ## in contrast to setup.py, you can choose the destination -install(PROGRAMS +catkin_install_python(PROGRAMS scripts/mongodb_log.py DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} ) diff --git a/mongodb_log/COLCON_IGNORE b/mongodb_log/COLCON_IGNORE new file mode 100644 index 0000000..e69de29 diff --git a/mongodb_log/package.xml b/mongodb_log/package.xml index d1fe6e2..2777bf9 100644 --- a/mongodb_log/package.xml +++ b/mongodb_log/package.xml @@ -1,7 +1,7 @@ mongodb_log - 0.5.2 + 0.6.0 The mongodb_log package Tim Niemueller diff --git a/mongodb_log/scripts/mongodb_log.py b/mongodb_log/scripts/mongodb_log.py index b12b7b3..bd6b70c 100755 --- a/mongodb_log/scripts/mongodb_log.py +++ b/mongodb_log/scripts/mongodb_log.py @@ -46,7 +46,10 @@ import subprocess from threading import Thread, Timer -from Queue import Empty +try: + from queue import Empty +except ImportError: + from Queue import Empty from optparse import OptionParser from tempfile import mktemp from datetime import datetime, timedelta @@ -271,12 +274,12 @@ def dequeue(self): mongodb_store.util.store_message(self.collection, msg, meta) - except InvalidDocument, e: + except InvalidDocument as e: print("InvalidDocument " + current_process().name + "@" + topic +": \n") - print e - except InvalidStringData, e: + print(e) + except InvalidStringData as e: print("InvalidStringData " + current_process().name + "@" + topic +": \n") - print e + print(e) else: #print("Quit W2: %s" % self.name) @@ -447,7 +450,7 @@ def subscribe_topics(self, topics): self.workers.append(w) self.collnames |= set([collname]) self.topics |= set([topic]) - except Exception, e: + except Exception as e: print('Failed to subscribe to %s due to %s' % (topic, e)) missing_topics.add(topic) @@ -457,7 +460,7 @@ def subscribe_topics(self, topics): def create_worker(self, idnum, topic, collname): try: msg_class, real_topic, msg_eval = rostopic.get_topic_class(topic, blocking=False) - except Exception, e: + except Exception as e: print('Topic %s not announced, cannot get type: %s' % (topic, e)) raise diff --git a/mongodb_log/test/test_publisher.py b/mongodb_log/test/test_publisher.py index 2c368e9..e4fbef6 100755 --- a/mongodb_log/test/test_publisher.py +++ b/mongodb_log/test/test_publisher.py @@ -45,6 +45,6 @@ def publish(self): for target in to_publish: msg_store = MessageStoreProxy(database='roslog', collection=target[0]) - print len(msg_store.query(Int64._type)) == target[3] + print(len(msg_store.query(Int64._type)) == target[3]) diff --git a/mongodb_store/CHANGELOG.rst b/mongodb_store/CHANGELOG.rst index 4ed0bb1..46cb58b 100644 --- a/mongodb_store/CHANGELOG.rst +++ b/mongodb_store/CHANGELOG.rst @@ -2,6 +2,23 @@ Changelog for package mongodb_store ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +0.6.0 (2022-09-20) +------------------ +* Avoid deadlock on server shutdown (`#279 `_) +* Fix Python 3 bugs in mongodb_store (`#272 `_, `#274 `_, `#275 `_) +* handling of host binding (`#270 `_) +* update package.xml to format=3 (`#269 `_) +* fix connection_string arg default value (`#266 `_) +* fixed bug in where the replicator node did not recognize the db_host (`#261 `_) +* Added .launch to the roslaunch command that was written in the readme file (`#262 `_) +* fixed a formatting issue +* fixed bug in where the replicator node did not recognize the db_host +* remembering namespace in rosparam +* Provide options to prevent unnecessary nodes launching +* added ability for message store to use a full connection string +* Removed --smallfiles arg no longer supported by MongoDB (`#257 `_) +* Contributors: Adrian Dole, Gal Gorjup, Kei Okada, Marc Hanheide, Nick Hawes, Shingo Kitagawa, Vittoria Santoro + 0.5.2 (2019-11-11) ------------------ * added python-future to package.xml, which got lost in previous commit for some reasons ... diff --git a/mongodb_store/CMakeLists.txt b/mongodb_store/CMakeLists.txt index febdaf3..18f28d0 100644 --- a/mongodb_store/CMakeLists.txt +++ b/mongodb_store/CMakeLists.txt @@ -1,58 +1,19 @@ cmake_minimum_required(VERSION 2.8.3) project(mongodb_store) -# for ROS indigo compile without c++11 support -if(DEFINED ENV{ROS_DISTRO}) - if(NOT $ENV{ROS_DISTRO} STREQUAL "indigo") - add_compile_options(-std=c++11) - message(STATUS "Building with C++11 support") - else() - message(STATUS "ROS Indigo: building without C++11 support") - endif() -else() - message(STATUS "Environmental variable ROS_DISTRO not defined, checking OS version") - file(STRINGS /etc/os-release RELEASE_CODENAME - REGEX "VERSION_CODENAME=") - if(NOT ${RELEASE_CODENAME} MATCHES "trusty") - add_compile_options(-std=c++11) - message(STATUS "OS distro is not trusty: building with C++11 support") - else() - message(STATUS "Ubuntu Trusty: building without C++11 support") - endif() -endif() - -find_package(catkin REQUIRED COMPONENTS roscpp message_generation rospy std_msgs std_srvs mongodb_store_msgs topic_tools) - -set(CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake) -find_package(MongoClient REQUIRED) +find_package(ament_cmake) +find_package(std_msgs) +find_package(std_srvs) +find_package(mongodb_store_msgs) + +#set(CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake) +#find_package(MongoClient REQUIRED) ## Uncomment this if the package has a setup.py. This macro ensures ## modules and global scripts declared therein get installed ## See http://ros.org/doc/api/catkin/html/user_guide/setup_dot_py.html -catkin_python_setup() - -####################################### -## Declare ROS messages and services ## -####################################### - - -## Generate services in the 'srv' folder -add_service_files( - FILES - GetParam.srv - SetParam.srv - MongoFind.srv - MongoUpdate.srv - MongoInsert.srv -) - - -## Generate added messages and services with any dependencies listed here -generate_messages( - DEPENDENCIES - std_msgs -) +#catkin_python_setup() ################################### ## catkin specific configuration ## @@ -65,12 +26,12 @@ generate_messages( ## DEPENDS: system dependencies of this project that dependent projects also need -catkin_package( - INCLUDE_DIRS include - LIBRARIES message_store ${MongoClient_INCLUDE_DIR} - CATKIN_DEPENDS mongodb_store_msgs topic_tools - DEPENDS MongoClient -) +#catkin_package( +# INCLUDE_DIRS include +# LIBRARIES message_store ${MongoClient_INCLUDE_DIR} +# CATKIN_DEPENDS mongodb_store_msgs topic_tools +# DEPENDS MongoClient +#) ########### ## Build ## @@ -78,50 +39,50 @@ catkin_package( ## Specify additional locations of header files ## Your package locations should be listed before other locations -include_directories( - include - ${catkin_INCLUDE_DIRS} - ${MongoClient_INCLUDE_DIR} -) - -link_directories(${catkin_LINK_DIRS}) -link_directories(${MongoClient_LINK_DIRS}) - -add_library(message_store src/message_store.cpp ) - -add_executable(example_mongodb_store_cpp_client src/example_mongodb_store_cpp_client.cpp) -add_executable(example_multi_event_log src/example_multi_event_log.cpp) -# add_executable(pc_test src/point_cloud_test.cpp) - - -add_dependencies(example_mongodb_store_cpp_client mongodb_store_msgs_generate_messages_cpp ) -add_dependencies(message_store mongodb_store_msgs_generate_messages_cpp ) - -target_link_libraries(message_store - ${MongoClient_LIBRARIES} - ${catkin_LIBRARIES} -) +#include_directories( +# include +# ${catkin_INCLUDE_DIRS} +# ${MongoClient_INCLUDE_DIR} +#) +# +#link_directories(${catkin_LINK_DIRS}) +#link_directories(${MongoClient_LINK_DIRS}) +# +#add_library(message_store src/message_store.cpp ) +# +#add_executable(example_mongodb_store_cpp_client src/example_mongodb_store_cpp_client.cpp) +#add_executable(example_multi_event_log src/example_multi_event_log.cpp) +## add_executable(pc_test src/point_cloud_test.cpp) +# +# +#add_dependencies(example_mongodb_store_cpp_client mongodb_store_msgs_generate_messages_cpp ) +#add_dependencies(message_store mongodb_store_msgs_generate_messages_cpp ) +# +#target_link_libraries(message_store +# ${MongoClient_LIBRARIES} +# ${catkin_LIBRARIES} +#) # Specify libraries to link a library or executable target against -target_link_libraries(example_mongodb_store_cpp_client - message_store - ${MongoClient_LIBRARIES} - ${catkin_LIBRARIES} -) - -target_link_libraries(example_multi_event_log - message_store - ${MongoClient_LIBRARIES} - ${catkin_LIBRARIES} -) - -target_link_libraries(example_multi_event_log - message_store - ${MongoClient_LIBRARIES} - ${catkin_LIBRARIES} -) +#target_link_libraries(example_mongodb_store_cpp_client +# message_store +# ${MongoClient_LIBRARIES} +# ${catkin_LIBRARIES} +#) +# +#target_link_libraries(example_multi_event_log +# message_store +# ${MongoClient_LIBRARIES} +# ${catkin_LIBRARIES} +#) +# +#target_link_libraries(example_multi_event_log +# message_store +# ${MongoClient_LIBRARIES} +# ${catkin_LIBRARIES} +#) ############# @@ -133,9 +94,18 @@ target_link_libraries(example_multi_event_log ## Mark executable scripts (Python etc.) for installation ## in contrast to setup.py, you can choose the destination -install(DIRECTORY scripts/ - DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} - USE_SOURCE_PERMISSIONS) +#catkin_install_python(PROGRAMS +# scripts/config_manager.py +# scripts/example_message_store_client.py +# scripts/example_multi_event_log.py +# scripts/message_store_node.py +# scripts/mongo_bridge.py +# scripts/mongodb_play.py +# scripts/mongodb_server.py +# scripts/replicator_client.py +# scripts/replicator_node.py +# DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} +#) # Mark other files for installation (e.g. launch and bag files, etc.) install( @@ -144,39 +114,39 @@ install( ) # Mark cpp header files for installation -install(DIRECTORY include/mongodb_store/ - DESTINATION ${CATKIN_PACKAGE_INCLUDE_DESTINATION} -) +#install(DIRECTORY include/mongodb_store/ +# DESTINATION ${CATKIN_PACKAGE_INCLUDE_DESTINATION} +#) ## Mark executables and/or libraries for installation -install(TARGETS example_mongodb_store_cpp_client example_multi_event_log message_store #pc_test - ARCHIVE DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION} - LIBRARY DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION} - RUNTIME DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} -) +#install(TARGETS example_mongodb_store_cpp_client example_multi_event_log message_store #pc_test +# ARCHIVE DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION} +# LIBRARY DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION} +# RUNTIME DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} +#) ############# ## Testing ## ############# -if (CATKIN_ENABLE_TESTING) - find_package(catkin REQUIRED COMPONENTS rostest) - - add_rostest(tests/message_store.test) - add_rostest(tests/config_manager.test) - add_rostest(tests/replication.test) - - add_executable(message_store_cpp_test tests/message_store_cpp_test.cpp) - - target_link_libraries(message_store_cpp_test - message_store - ${OPENSSL_LIBRARIES} - ${catkin_LIBRARIES} - ${Boost_LIBRARIES} - gtest - ) - - add_rostest(tests/message_store_cpp_client.test) - -endif() +#if (CATKIN_ENABLE_TESTING) +# find_package(catkin REQUIRED COMPONENTS rostest) +# +# add_rostest(tests/message_store.test) +# add_rostest(tests/config_manager.test) +# add_rostest(tests/replication.test) +# +# add_executable(message_store_cpp_test tests/message_store_cpp_test.cpp) +# +# target_link_libraries(message_store_cpp_test +# message_store +# ${OPENSSL_LIBRARIES} +# ${catkin_LIBRARIES} +# ${Boost_LIBRARIES} +# gtest +# ) +# +# add_rostest(tests/message_store_cpp_client.test) +# +#endif() diff --git a/mongodb_store/launch/mongodb_store.launch b/mongodb_store/launch/mongodb_store.launch deleted file mode 100644 index 8ff0bfd..0000000 --- a/mongodb_store/launch/mongodb_store.launch +++ /dev/null @@ -1,48 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/mongodb_store/launch/mongodb_store_inc.launch b/mongodb_store/launch/mongodb_store_inc.launch deleted file mode 100644 index 1dce50f..0000000 --- a/mongodb_store/launch/mongodb_store_inc.launch +++ /dev/null @@ -1,73 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/mongodb_store/launch/mongodb_store_inc_launch.xml b/mongodb_store/launch/mongodb_store_inc_launch.xml new file mode 100644 index 0000000..fec033d --- /dev/null +++ b/mongodb_store/launch/mongodb_store_inc_launch.xml @@ -0,0 +1,69 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/mongodb_store/launch/mongodb_store_launch.xml b/mongodb_store/launch/mongodb_store_launch.xml new file mode 100644 index 0000000..cb3231e --- /dev/null +++ b/mongodb_store/launch/mongodb_store_launch.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/mongodb_store/mongodb_store/__init__.py b/mongodb_store/mongodb_store/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mongodb_store/mongodb_store/message_store.py b/mongodb_store/mongodb_store/message_store.py new file mode 100644 index 0000000..ac3c6e4 --- /dev/null +++ b/mongodb_store/mongodb_store/message_store.py @@ -0,0 +1,433 @@ +import copy +import json +import typing + +import rclpy +from bson import json_util +from bson.objectid import ObjectId + +import mongodb_store.util as dc_util +from mongodb_store_msgs.msg import StringPair, StringPairList, Insert +from mongodb_store_msgs.srv import ( + MongoInsertMsg, + MongoDeleteMsg, + MongoQueryMsg, + MongoUpdateMsg, +) + + +class MessageStoreProxy: + """ + A class that provides functions for storage and retrieval of ROS Message + objects in the mongodb_store. This is achieved by acting as a proxy to the + services provided by the MessageStore ROS node, and therefore requires the message + store node to be running in addition to the datacentre: + + `rosrun mongodb_store message_store_node.py` + + >>> from geometry_msgs.msg import Pose, Quaternion + >>> msg_store = MessageStoreProxy() + >>> p = Pose(Point(0, 1, 2), Quaternion(0, 0, 0 , 1)) + >>> msg_store.insert_named("my favourite pose", p) + >>> retrieved = msg_store.query_named("my favourite pose", Pose._type) + + For usage examples, please see `example_message_store_client.py` within the scripts + folder of mongodb_store. + + """ + + def __init__( + self, + parent_node: rclpy.node.Node, + service_prefix="/message_store", + database="message_store", + collection="message_store", + queue_size=100, + ) -> None: + """ + Args: + | service_prefix (str): The prefix to the *insert*, *update*, *delete* and + *query_messages* ROS services/ + | database (str): The MongoDB database that this object works with. + | collection (str): The MongoDB collect/on that this object works with. + """ + self.parent_node = parent_node + self.database = database + self.collection = collection + insert_service = service_prefix + "/insert" + update_service = service_prefix + "/update" + delete_service = service_prefix + "/delete" + query_service = service_prefix + "/query_messages" + # try and get the mongo service, block until available + found_services_first_try = True # if found straight away + self.insert_srv = self.parent_node.create_client(MongoInsertMsg, insert_service) + self.update_srv = self.parent_node.create_client(MongoUpdateMsg, update_service) + self.query_srv = self.parent_node.create_client(MongoQueryMsg, query_service) + self.delete_srv = self.parent_node.create_client(MongoDeleteMsg, delete_service) + + insert_topic = service_prefix + "/insert" + self.pub_insert = self.parent_node.create_publisher(Insert, insert_topic, 10) + + all_ok = False + while rclpy.ok() and not all_ok: + all_ok = True + for service in [ + self.insert_srv, + self.update_srv, + self.query_srv, + self.delete_srv, + ]: + if not service.wait_for_service(5): + found_services_first_try = False + self.parent_node.get_logger().error( + f"Could not get message store service {service.srv_name}. Maybe the message " + "store has not been started? Retrying..." + ) + all_ok = False + + if not found_services_first_try: + self.parent_node.get_logger().info("Message store services found.") + + def insert_named( + self, + name: str, + message: "RosMessage", + meta: typing.Dict = None, + wait: bool = True, + ) -> str: + """ + Inserts a ROS message into the message storage, giving it a name for convenient + later retrieval. + .. note:: Multiple messages can be stored with the same name. + + :Args: + | name (str): The name to refere to this message as. + | message (ROS Message): An instance of a ROS message type to store + | meta (dict): A dictionary of additional meta data to store in association + with thie message. + | wait (bool): If true, waits until database returns object id after insert + :Returns: + | (str) the ObjectId of the MongoDB document containing the stored message. + """ + # create a copy as we're modifying it + if meta is None: + meta = {} + meta_copy = copy.copy(meta) + meta_copy["name"] = name + return self.insert(message, meta_copy, wait=wait) + + def insert( + self, message: "ROSMessage", meta: typing.Dict = None, wait: bool = True + ) -> typing.Union[bool, str]: + """ + Inserts a ROS message into the message storage. + + :Args: + | message (ROS Message): An instance of a ROS message type to store + | meta (dict): A dictionary of additional meta data to store in association + with thie message. + | wait (bool): If true, waits until database returns object id after insert + :Returns: + | (str) the ObjectId of the MongoDB document containing the stored message. + + """ + # assume meta is a dict, convert k/v to tuple pairs + meta_tuple = ( + StringPair( + first=MongoQueryMsg.Request.JSON_QUERY, + second=json.dumps(meta, default=json_util.default), + ), + ) + serialised_msg = dc_util.serialise_message(message) + request = MongoInsertMsg.Request() + request.database = self.database + request.collection = self.collection + request.meta = StringPairList(pairs=meta_tuple) + request.message = serialised_msg + + if wait: + return dc_util.check_and_get_service_result_async( + self.parent_node, self.insert_srv, request + )[0].id + else: + msg = Insert( + database=self.database, + collection=self.collection, + message=serialised_msg, + meta=StringPairList(pairs=meta_tuple), + ) + self.pub_insert.publish(msg) + return True + + def query_id(self, id: str, type: str): + """ + Finds and returns the message with the given ID. + + :Parameters: + | id (str): The ObjectID of the MongoDB document holding the message. + | type (str): The ROS message type of the stored messsage to retrieve. + :Returns: + | message (ROS message), meta (dict): The retrieved message and associated metadata + or *None* if the named message could not be found. + """ + return self.query(type, {"_id": ObjectId(id)}, {}, True) + + def delete(self, message_id: str) -> bool: + """ + Delete the message with the given ID. + + :Parameters: + | message_id (str) : The ObjectID of the MongoDB document holding the message. + :Returns: + | bool : was the object successfully deleted. + """ + request = MongoDeleteMsg.Request() + request.database = self.database + request.collection = self.collection + request.document_id = message_id + return dc_util.check_and_get_service_result_async( + self.parent_node, self.delete_srv, request + )[0].success + + def query_named( + self, + name: str, + type: str, + single: bool = True, + meta: typing.Dict = None, + limit: int = 0, + ): + """ + Finds and returns the message(s) with the given name. + + :Args: + | name (str): The name of the stored messages to retrieve. + | type (str): The type of the stored message. + | single (bool): Should only one message be returned? + | meta (dict): Extra queries on the meta data of the message. + | limit (int): Limit number of return documents + :Return: + | message (ROS message), meta (dict): The retrieved message and associated metadata + or *None* if the named message could not be found. + + """ + # create a copy as we're modifying it + if meta is None: + meta = {} + meta_copy = copy.copy(meta) + meta_copy["name"] = name + return self.query( + type, {}, meta_copy, single, [], projection_query={}, limit=limit + ) + + def update_named( + self, + name: str, + message: "ROSMessage", + meta: typing.Dict = None, + upsert: bool = False, + ) -> typing.Tuple[str, bool]: + """ + Updates a named message. + + :Args: + | name (str): The name of the stored messages to update. + | message (ROS Message): The updated ROS message + | meta (dict): Updated meta data to store with the message. + | upsert (bool): If True, insert the named message if it doesnt exist. + :Return: + | str, bool: The MongoDB ObjectID of the document, and whether it was altered by + the update. + """ + meta_query = {} + meta_query["name"] = name + + # make sure the name goes into the meta info after update + if meta is None: + meta = {} + meta_copy = copy.copy(meta) + meta_copy["name"] = name + + return self.update(message, meta_copy, {}, meta_query, upsert) + + def update_id(self, id, message, meta=None, upsert=False): + """ + Updates a message by MongoDB ObjectId. + + Args: + id: The MongoDB ObjectId of the doucment storing the message. + message: The updated ROS message + meta: Updated meta data to store with the message. + upsert: If True, insert the named message if it doesnt exist. + Return: + str, bool: The MongoDB ObjectID of the document, and whether it was altered by + the update. + + """ + + msg_query = {"_id": ObjectId(id)} + meta_query = {} + + return self.update(message, meta, msg_query, meta_query, upsert) + + def update( + self, + message: "ROSMessage", + meta: typing.Dict = None, + message_query: typing.Dict = None, + meta_query: typing.Dict = None, + upsert: bool = False, + ) -> MongoUpdateMsg.Response: + """ + Updates a message. + + :Args: + | message (ROS Message): The updated ROS message + | meta (dict): Updated meta data to store with the message. + | message_query (dict): A query to match the ROS message that is to be updated. + | meta_query (dict): A query to match against the meta data of the message to be updated + | upsert (bool): If True, insert the named message if it doesnt exist. + :Return: + | str, bool: The MongoDB ObjectID of the document, and whether it was altered by + the update. + + """ + if message_query is None: + message_query = {} + if meta_query is None: + meta_query = {} + if meta is None: + meta = {} + + # serialise the json queries to strings using json_util.dumps + message_query_tuple = ( + StringPair( + first=MongoQueryMsg.Request.JSON_QUERY, + second=json.dumps(message_query, default=json_util.default), + ), + ) + meta_query_tuple = ( + StringPair( + first=MongoQueryMsg.Request.JSON_QUERY, + second=json.dumps(meta_query, default=json_util.default), + ), + ) + meta_tuple = ( + StringPair( + first=MongoQueryMsg.Request.JSON_QUERY, + second=json.dumps(meta, default=json_util.default), + ), + ) + + request = MongoUpdateMsg.Request() + request.database = self.database + request.collection = self.collection + request.upsert = upsert + request.message_query = StringPairList(pairs=message_query_tuple) + request.meta_query = StringPairList(pairs=meta_query_tuple) + request.message = dc_util.serialise_message(message) + request.meta = StringPairList(pairs=meta_tuple) + + return dc_util.check_and_get_service_result_async( + self.parent_node, self.update_srv, request + )[0] + + def query( + self, + type: str, + message_query: typing.Dict = None, + meta_query: typing.Dict = None, + single: bool = False, + sort_query: typing.List[typing.Tuple] = None, + projection_query: typing.Dict = None, + limit: int = 0, + ) -> typing.Tuple[typing.Any, typing.Dict]: + """ + Finds and returns message(s) matching the message and meta data queries. + + :Parameters: + | type (str): The ROS message type of the stored messsage to retrieve. + | message_query (dict): A query to match the actual ROS message + | meta_query (dict): A query to match against the meta data of the message + | sort_query (list of tuple): A query to request sorted list to mongodb module + | projection_query (dict): A query to request desired fields to be returned or excluded + | single (bool): Should only one message be returned? + | limit (int): Limit number of return documents + :Returns: + | [message, meta] where message is the queried message and meta a dictionary of + meta information. If single is false returns a list of these lists. + """ + if message_query is None: + message_query = {} + if meta_query is None: + meta_query = {} + if sort_query is None: + sort_query = [] + if projection_query is None: + projection_query = {} + + # assume meta is a dict, convert k/v to tuple pairs for ROS msg type + + # serialise the json queries to strings using json_util.dumps + + message_tuple = StringPairList( + pairs=[ + StringPair( + first=MongoQueryMsg.Request.JSON_QUERY, + second=json.dumps(message_query, default=json_util.default), + ), + ] + ) + meta_tuple = StringPairList( + pairs=[ + StringPair( + first=MongoQueryMsg.Request.JSON_QUERY, + second=json.dumps(meta_query, default=json_util.default), + ), + ] + ) + projection_tuple = StringPairList( + pairs=[ + StringPair( + first=MongoQueryMsg.Request.JSON_QUERY, + second=json.dumps(projection_query, default=json_util.default), + ), + ] + ) + + if len(sort_query) > 0: + sort_tuple = StringPairList( + pairs=[StringPair(first=str(k), second=str(v)) for k, v in sort_query] + ) + else: + sort_tuple = StringPairList() + + request = MongoQueryMsg.Request() + request.database = self.database + request.collection = self.collection + request.type = type + request.single = single + request.limit = limit + request.message_query = message_tuple + request.meta_query = meta_tuple + request.projection_query = projection_tuple + request.sort_query = sort_tuple + + response = dc_util.check_and_get_service_result_async( + self.parent_node, self.query_srv, request + )[0] + + if response.messages is None: + messages = [] + metas = [] + else: + messages = list(map(dc_util.deserialise_message, response.messages)) + metas = list(map(dc_util.string_pair_list_to_dictionary, response.metas)) + + if single: + if len(messages) > 0: + return [messages[0], metas[0]] + else: + return [None, None] + else: + return list(zip(messages, metas)) diff --git a/mongodb_store/mongodb_store/message_store_node.py b/mongodb_store/mongodb_store/message_store_node.py new file mode 100755 index 0000000..d0b4c00 --- /dev/null +++ b/mongodb_store/mongodb_store/message_store_node.py @@ -0,0 +1,427 @@ +""" +Provides a service to store ROS message objects in a mongodb database in JSON. +""" + +import rosidl_runtime_py +import rosidl_runtime_py.utilities +import json +from datetime import datetime, timezone + +import pymongo +import rclpy +import rclpy.time +from bson import json_util +from bson.objectid import ObjectId +from builtin_interfaces.msg import Time +from rcl_interfaces.msg import ParameterDescriptor, ParameterType +from rcl_interfaces.srv import GetParameters +from rclpy.executors import MultiThreadedExecutor +from tf2_msgs.msg import TFMessage + +import mongodb_store.util as dc_util +from mongodb_store_msgs.msg import StringPair, StringPairList, Insert +from mongodb_store_msgs.srv import ( + MongoQueryMsg, + MongoUpdateMsg, + MongoDeleteMsg, + MongoInsertMsg, + MongoQuerywithProjectionMsg, +) + +import functools + +MongoClient = dc_util.import_MongoClient() + + +def srv_call_decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + try: + print("executing wrapped") + return func(*args, **kwargs) + except Exception as e: + import traceback + + print(traceback.print_exc()) + + return wrapper + + +class MessageStore(rclpy.node.Node): + def __init__(self, replicate_on_write=False): + super().__init__("message_store") + use_daemon = self.declare_parameter( + "mongodb_use_daemon", + False, + descriptor=ParameterDescriptor(description="Use the daemon"), + ).value + connection_string = self.declare_parameter( + "mongodb_connection_string", + "", + descriptor=ParameterDescriptor(description=""), + ).value + use_connection_string = len(connection_string) > 0 + if use_connection_string: + use_daemon = True + self.get_logger().info("Using connection string: %s", connection_string) + + # If you want to use a remote datacenter, then it should be set as false + use_localdatacenter = self.declare_parameter( + "mongodb_use_localdatacenter", True + ) + local_timeout = self.declare_parameter("local_timeout", 10).value + if str(local_timeout).lower() == "none": + local_timeout = None + + # wait for hostname and port for mongodb server + # TODO: This is limited to a specific name for the server + param_client = self.create_client( + GetParameters, "/mongodb_server/get_parameters" + ) + request = GetParameters.Request() + request.names = ["mongodb_host", "mongodb_port"] + + result, message = dc_util.check_and_get_service_result_async( + self, param_client, request, existence_timeout=10 + ) + if result is None: + raise RuntimeError( + f"Could not find service {param_client.srv_name} from which to retrieve mongo host and port parameters" + ) + + host_value = result.values[0] + port_value = result.values[1] + if host_value.type != ParameterType.PARAMETER_STRING: + raise RuntimeError( + f"Parameter value for the mongodb_host param was not a string: {host_value}. Check the /mongodb_server parameters." + ) + if port_value.type != ParameterType.PARAMETER_INTEGER: + raise RuntimeError( + f"Parameter value for the mongodb_port param was not an integer: {port_value}. Check the /mongodb_server parameters." + ) + + mongodb_host = host_value.string_value + mongodb_port = port_value.integer_value + + db_host = self.declare_parameter( + "mongodb_host", mongodb_host, descriptor=ParameterDescriptor(description="") + ).value + db_port = self.declare_parameter( + "mongodb_port", mongodb_port, descriptor=ParameterDescriptor(description="") + ).value + + if use_daemon: + if use_connection_string: + is_daemon_alive = dc_util.check_connection_to_mongod( + self, None, None, connection_string=connection_string + ) + else: + is_daemon_alive = dc_util.check_connection_to_mongod( + self, db_host, db_port + ) + if not is_daemon_alive: + raise Exception("No Daemon?") + elif use_localdatacenter: + self.get_logger().info( + f"Waiting for local datacentre (timeout: {local_timeout})" + ) + have_dc = dc_util.wait_for_mongo(self, local_timeout) + if not have_dc: + raise Exception("No Datacentre?") + else: + self.get_logger().info("Got datacentre") + + self.keep_trash = self.declare_parameter( + "mongodb_keep_trash", True, descriptor=ParameterDescriptor(description="") + ).value + + if use_connection_string: + self._mongo_client = MongoClient(connection_string) + else: + self._mongo_client = MongoClient(db_host, db_port) + + self.replicate_on_write = self.declare_parameter( + "mongodb_replicate_on_write", replicate_on_write + ).value + if self.replicate_on_write: + self.get_logger().warning( + "The option 'replicate_on_write' is now deprecated and will not function. " + "Use 'Replication' on MongoDB instead: " + "https://docs.mongodb.com/manual/replication/" + ) + + # advertise ros services + for attr in dir(self): + if attr.endswith("_ros_srv"): + service = getattr(self, attr) + self.create_service( + service.type, "/message_store/" + attr[:-8], service + ) + + self.queue_size = self.declare_parameter( + "queue_size", 100, descriptor=ParameterDescriptor(description="") + ).value + self.sub_insert = self.create_subscription( + Insert, + "/message_store/insert", + self.insert_ros_msg, + self.queue_size, + ) + + def insert_ros_msg(self, msg): + """ + Receives a message published + """ + # actually procedure is the same + self.insert_ros_srv(msg, MongoInsertMsg.Response()) + + @srv_call_decorator + def insert_ros_srv( + self, request: MongoInsertMsg.Request, response: MongoInsertMsg.Response + ) -> MongoInsertMsg.Response: + """ + Receives a + """ + # deserialize data into object + obj = dc_util.deserialise_message(request.message) + # convert input tuple to dict + meta = dc_util.string_pair_list_to_dictionary(request.meta) + # get requested collection from the db, creating if necessary + collection = self._mongo_client[request.database][request.collection] + # check if the object has the location attribute + if hasattr(obj, "pose"): + # if it does create a location index + collection.create_index([("loc", pymongo.GEO2D)]) + + # check if the object has the location attribute + if hasattr(obj, "geotype"): + # if it does create a location index + collection.create_index([("geoloc", pymongo.GEOSPHERE)]) + + # check if the object has the timestamp attribute TODO ?? really necessary + # if hasattr(obj, 'logtimestamp'): + # if it does create a location index + # collection.create_index([("datetime", pymongo.GEO2D)]) + + try: + stamp = self.get_clock().now() + sec_ns = stamp.seconds_nanoseconds() + fl = float(f"{sec_ns[0]}.{sec_ns[1]}") + meta["inserted_at"] = datetime.fromtimestamp(fl, timezone.utc) + # TODO: Retrieving this information seems to be much harder/impossible in ros2 + # meta["inserted_by"] = request._connection_header["callerid"] + + if ( + hasattr(obj, "header") + and hasattr(obj.header, "stamp") + and isinstance(obj.header.stamp, Time) + ): + stamp = rclpy.time.Time.from_msg(obj.header.stamp) + elif isinstance(obj, TFMessage): + if obj.transforms: + transforms = sorted( + obj.transforms, key=lambda m: m.header.stamp, reverse=True + ) + stamp = rclpy.time.Time.from_msg(transforms[0].header.stamp) + + sec_ns = stamp.seconds_nanoseconds() + fl = float(f"{sec_ns[0]}.{sec_ns[1]}") + meta["published_at"] = datetime.fromtimestamp(fl, timezone.utc) + meta["timestamp"] = stamp.nanoseconds + + obj_id = dc_util.store_message(collection, obj, meta).inserted_id + return MongoInsertMsg.Response(id=str(obj_id)) + except Exception as e: + import traceback + + self.get_logger().error(traceback.format_exc()) + return MongoInsertMsg.Response(id="") + + insert_ros_srv.type = MongoInsertMsg + + @srv_call_decorator + def delete_ros_srv( + self, request: MongoDeleteMsg.Request, response: MongoDeleteMsg.Response + ) -> MongoDeleteMsg.Response: + """ + Deletes a message by ID + """ + # Get the message + collection = self._mongo_client[request.database][request.collection] + docs = dc_util.query_message( + collection, {"_id": ObjectId(request.document_id)}, find_one=True + ) + if len(docs) != 1: + return MongoDeleteMsg.Response(success=False) + + message = docs[0] + + # Remove the doc + collection.delete_one({"_id": ObjectId(request.document_id)}) + + if self.keep_trash: + # But keep it in "trash" + bk_collection = self._mongo_client[request.database][ + request.collection + "_Trash" + ] + bk_collection.insert_one(message) + + return MongoDeleteMsg.Response(success=True) + + delete_ros_srv.type = MongoDeleteMsg + + @srv_call_decorator + def update_ros_srv( + self, request: MongoUpdateMsg.Request, response: MongoUpdateMsg.Response + ) -> MongoUpdateMsg.Response: + """ + Updates a msg in the store + """ + # rospy.lrosoginfo("called") + collection = self._mongo_client[request.database][request.collection] + + # build the query doc + obj_query = self.to_query_dict(request.message_query, request.meta_query) + + # restrict results to have the type asked for + obj_query["_meta.stored_type"] = request.message.type + + # TODO start using some string constants! + + self.get_logger().debug(f"update spec document: {obj_query}") + + # deserialize data into object + obj = dc_util.deserialise_message(request.message) + + meta = dc_util.string_pair_list_to_dictionary(request.meta) + meta["last_updated_at"] = datetime.fromtimestamp( + self.get_clock().now().seconds_nanoseconds()[0], timezone.utc + ) + # can't do this in ros2 + # meta["last_updated_by"] = request._connection_header["callerid"] + + (obj_id, altered) = dc_util.update_message( + collection, obj_query, obj, meta, request.upsert + ) + + return MongoUpdateMsg.Response(id=str(obj_id), success=altered) + + update_ros_srv.type = MongoUpdateMsg + + def to_query_dict(self, message_query, meta_query): + """ + Decodes and combines the given StringPairList queries into a single mongodb query + """ + obj_query = dc_util.string_pair_list_to_dictionary(message_query) + bare_meta_query = dc_util.string_pair_list_to_dictionary(meta_query) + for k, v in bare_meta_query.items(): + obj_query["_meta." + k] = v + return obj_query + + @srv_call_decorator + def query_messages_ros_srv( + self, request: MongoQueryMsg.Request, response: MongoQueryMsg.Response + ) -> MongoQueryMsg.Response: + """ + Returns t + """ + collection = self._mongo_client[request.database][request.collection] + + # build the query doc + obj_query = self.to_query_dict(request.message_query, request.meta_query) + + # restrict results to have the type asked for + obj_query["_meta.stored_type"] = request.type + + # TODO start using some string constants! + + self.get_logger().debug(f"query document: {obj_query}") + + # this is a list of entries in dict format including meta + sort_query_dict = dc_util.string_pair_list_to_dictionary(request.sort_query) + sort_query_tuples = [] + for k, v in sort_query_dict.items(): + try: + sort_query_tuples.append((k, int(v))) + except ValueError: + sort_query_tuples.append((k, v)) + # this is a list of entries in dict format including meta + + projection_query_dict = dc_util.string_pair_list_to_dictionary( + request.projection_query + ) + projection_meta_dict = dict() + projection_meta_dict["_meta"] = 1 + + entries = dc_util.query_message( + collection, + obj_query, + sort_query_tuples, + projection_query_dict, + request.single, + request.limit, + ) + if projection_query_dict: + meta_entries = dc_util.query_message( + collection, + obj_query, + sort_query_tuples, + projection_meta_dict, + request.single, + request.limit, + ) + + serialised_messages = () + metas = () + + for idx, entry in enumerate(entries): + # load the class object for this type + # TODO this should be the same for every item in the list, so could reuse + cls = rosidl_runtime_py.utilities.get_interface( + entry["_meta"]["stored_type"] + ) + # instantiate the ROS message object from the dictionary retrieved from the db + message = cls() + rosidl_runtime_py.set_message_fields(message, entry["message"]) + # the serialise this object in order to be sent in a generic form + serialised_messages = serialised_messages + ( + dc_util.serialise_message(message), + ) + # add ObjectID into meta as it might be useful later + if projection_query_dict: + entry["_meta"]["_id"] = meta_entries[idx]["_id"] + else: + entry["_meta"]["_id"] = entry["_id"] + # serialise meta + metas = metas + ( + StringPairList( + pairs=[ + StringPair( + first=MongoQueryMsg.Request.JSON_QUERY, + second=json.dumps( + entry["_meta"], default=json_util.default + ), + ) + ] + ), + ) + + return MongoQueryMsg.Response(messages=serialised_messages, metas=metas) + + query_messages_ros_srv.type = MongoQueryMsg + + def query_with_projection_messages_ros_srv(self, req): + """ + Returns t + """ + return self.query_messages_ros_srv(req, MongoQueryMsg.Response()) + + query_with_projection_messages_ros_srv.type = MongoQuerywithProjectionMsg + + +def main(): + rclpy.init() + store = MessageStore() + rclpy.spin(store, executor=MultiThreadedExecutor()) + store.destroy_node() + rclpy.shutdown() diff --git a/mongodb_store/mongodb_store/mongodb_server.py b/mongodb_store/mongodb_store/mongodb_server.py new file mode 100755 index 0000000..02df6a1 --- /dev/null +++ b/mongodb_store/mongodb_store/mongodb_server.py @@ -0,0 +1,255 @@ +import rclpy +import subprocess +import sys +import os +import re +import errno +from rclpy.duration import Duration +import threading + +from rcl_interfaces.msg import ParameterDescriptor +from rclpy.executors import MultiThreadedExecutor +from std_srvs.srv import Empty +import shutil +import pymongo + +import mongodb_store.util + +if not mongodb_store.util.check_for_pymongo(): + sys.exit(1) + +MongoClient = mongodb_store.util.import_MongoClient() + + +def is_socket_free(host, port): + import socket + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = sock.connect_ex((host, port)) + return result != 0 + + +class MongoServer(rclpy.node.Node): + def __init__(self): + # TODO: This should be anonymous, but ROS2 doesn't allow for that natively + super().__init__("mongodb_server") + + # Has the db already gone down, before the ros node? + self._gone_down = False + + self._ready = False # is the db ready: when mongo says "waiting for connection" + + self.test_mode = self.declare_parameter( + "test_mode", False, descriptor=ParameterDescriptor(description="") + ).value + self.repl_set = self.declare_parameter( + "repl_set", "", descriptor=ParameterDescriptor(description="") + ).value + self.bind_to_host = self.declare_parameter( + "bind_to_host", False, descriptor=ParameterDescriptor(description="") + ).value + + if self.test_mode: + import random + + default_host = "localhost" + default_port = random.randrange(49152, 65535) + + count = 0 + while not is_socket_free(default_host, default_port): + default_port = random.randrange(49152, 65535) + count += 1 + if count > 100: + self.get_logger().error( + "Can't find a free port to run the test server on." + ) + sys.exit(1) + + self.default_path = "/tmp/ros_mongodb_store_%d" % default_port + os.mkdir(self.default_path) + else: + default_host = "localhost" + default_port = 27017 + self.default_path = "/opt/ros/mongodb_store" + + # Get the database path + self._db_path = self.declare_parameter( + "database_path", + self.default_path, + descriptor=ParameterDescriptor(description=""), + ).value + is_master = self.declare_parameter( + "master", True, descriptor=ParameterDescriptor(description="") + ).value + + if is_master: + # TODO: These used to be global params + self._mongo_host = self.declare_parameter( + "mongodb_host", + default_host, + descriptor=ParameterDescriptor(description=""), + ).value + self._mongo_port = self.declare_parameter( + "mongodb_port", + default_port, + descriptor=ParameterDescriptor(description=""), + ).value + else: + self._mongo_host = self.declare_parameter( + "host", descriptor=ParameterDescriptor(description="") + ).value + self._mongo_port = self.declare_parameter( + "port", descriptor=ParameterDescriptor(description="") + ).value + + self.get_logger().info( + "Mongo server address: " + self._mongo_host + ":" + str(self._mongo_port) + ) + + # Check that mongodb is installed + try: + mongov = subprocess.check_output(["mongod", "--version"]) + match = re.search("db version v(\d+\.\d+\.\d+)", mongov.decode("utf-8")) + self._mongo_version = match.group(1) + except subprocess.CalledProcessError: + self.get_logger().error( + 'Can\'t find MongoDB executable. Is it installed?\nInstall it with "sudo apt install mongodb"' + ) + sys.exit(1) + self.get_logger().info("Found MongoDB version " + self._mongo_version) + + # Check that the provided db path exists. + if not os.path.exists(self._db_path): + self.get_logger().error( + f"Can't find database at supplied path {self._db_path}. If this is a new DB, create it as an empty " + f"directory." + ) + sys.exit(1) + + # Advertise ros services for db interaction + self._shutdown_srv = self.create_service( + Empty, "/datacentre/shutdown", self._shutdown_srv_cb + ) + self._wait_ready_srv = self.create_service( + Empty, "/datacentre/wait_ready", self._wait_ready_srv_cb + ) + + self.mongo_thread = threading.Thread(target=self._mongo_loop) + self.mongo_thread.start() + + def _mongo_loop(self): + + # Blocker to prevent Ctrl-C being passed to the mongo server + def block_mongo_kill(): + os.setpgrp() + + # signal.signal(signal.SIGINT, signal.SIG_IGN) + + # cmd = ["mongod","--dbpath",self._db_path,"--port",str(self._mongo_port),"--smallfiles","--bind_ip","127.0.0.1"] + cmd = ["mongod", "--dbpath", self._db_path, "--port", str(self._mongo_port)] + + if self.bind_to_host: + cmd.append("--bind_ip") + cmd.append(self._mongo_host) + else: + cmd.append("--bind_ip") + cmd.append("0.0.0.0") + + if self.repl_set: + cmd.append("--replSet") + cmd.append(self.repl_set) + + self.get_logger().info(f"Running command {' '.join(cmd)}") + self._mongo_process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, preexec_fn=block_mongo_kill + ) + + while self._mongo_process.poll() is None: # and rclpy.ok(): + try: + stdout = self._mongo_process.stdout.readline().decode("utf-8") + except IOError as e: # probably interupt because shutdown cut it up + if e.errno == errno.EINTR: + continue + else: + raise + if stdout is not None: + # if stdout.find("ERROR") != -1: + # self.get_logger().error(stdout.strip()) + # else: + # self.get_logger().info(stdout.strip()) + + if not self._ready and stdout.find("mongod startup complete") != -1: + self._ready = True + if self.repl_set: + try: + self.initialize_repl_set() + except Exception as e: + self.get_logger().warning( + f"initialzing replSet failed: {e}" + ) + + if not rclpy.ok(): + self.get_logger().error("MongoDB process stopped!") + + if self._mongo_process.returncode != 0: + self.get_logger().error( + "Mongo process error! Exit code=" + str(self._mongo_process.returncode) + ) + + self._gone_down = True + + def _shutdown_srv_cb( + self, request: Empty.Request, response: Empty.Response + ) -> Empty.Response: + # Calling shutdown exits the spin on the node. + rclpy.shutdown() + return Empty.Response() + + def _wait_ready_srv_cb( + self, request: Empty.Request, resp: Empty.Response + ) -> Empty.Response: + while not self._ready: + self.get_clock().sleep_for(Duration(seconds=0.1)) + return Empty.Response() + + def initialize_repl_set(self): + c = pymongo.Connection( + f"{self._mongo_host}:{self._mongo_port}", slave_okay=True + ) + c.admin.command("replSetInitiate") + c.close() + + +def main(): + rclpy.init() + try: + server = MongoServer() + rclpy.spin(server, executor=MultiThreadedExecutor()) + server.destroy_node() + if rclpy.ok(): + # If the shutdown srv is called calling this again will cause a crash + rclpy.shutdown() + finally: + # TODO: The context on_shutdown doesn't seem to work, so moving that code here + server.get_logger().info("Shutting down datacentre") + server._ready = False + if server._gone_down: + server.get_logger().warning( + "It looks like Mongo already died. Watch out as the DB might need recovery time at next run." + ) + return + try: + c = MongoClient(host=server._mongo_host, port=server._mongo_port) + except pymongo.errors.ConnectionFailure: + c = None + try: + if c is not None: + c.admin.command("shutdown") + except pymongo.errors.AutoReconnect: + pass + + if server.test_mode: # remove auto-created DB in the /tmp folder + try: + shutil.rmtree(server.default_path) + except Exception as e: + server.get_logger().error(e) diff --git a/mongodb_store/mongodb_store/scripts/__init__.py b/mongodb_store/mongodb_store/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mongodb_store/scripts/config_manager.py b/mongodb_store/mongodb_store/scripts/config_manager.py similarity index 98% rename from mongodb_store/scripts/config_manager.py rename to mongodb_store/mongodb_store/scripts/config_manager.py index b77dc69..11e8d57 100755 --- a/mongodb_store/scripts/config_manager.py +++ b/mongodb_store/mongodb_store/scripts/config_manager.py @@ -108,8 +108,6 @@ def __init__(self): else: self._mongo_client=MongoClient(db_host, db_port) - rospy.on_shutdown(self._on_node_shutdown) - self._database=self._mongo_client.config self._database.add_son_manipulator(MongoTransformer()) @@ -241,14 +239,6 @@ def _list_params(self): print(name, " "*(30-len(name)),val," "*(30-len(str(val))),filename) print() - - def _on_node_shutdown(self): - try: - # PyMongo 2.9 or later - self._mongo_client.close() - except Exception as e: - self._mongo_client.disconnect() - # Could just use the ros parameter server to get the params # but one day might not back onto the parameter server... def _getparam_srv_cb(self,req): diff --git a/mongodb_store/mongodb_store/scripts/example_message_store_client.py b/mongodb_store/mongodb_store/scripts/example_message_store_client.py new file mode 100755 index 0000000..34b443b --- /dev/null +++ b/mongodb_store/mongodb_store/scripts/example_message_store_client.py @@ -0,0 +1,109 @@ +import sys +import rclpy +from geometry_msgs.msg import Pose, Point, Quaternion +from mongodb_store.message_store import MessageStoreProxy +from mongodb_store.util import message_to_namespaced_type + +if __name__ == "__main__": + rclpy.init() + node = rclpy.node.Node("example_message_store_client") + + msg_store = MessageStoreProxy(node) + print("Created message store proxy") + p = Pose( + position=Point(x=0.0, y=1.0, z=2.0), + orientation=Quaternion(x=3.0, y=4.0, z=5.0, w=6.0), + ) + try: + # insert a pose object with a name, store the id from db + print("Inserting pose") + p_id_fav = msg_store.insert_named("my favourite pose", p) + + # you don't need a name (note that this p_id is different than one above) + p_id_other = msg_store.insert(p) + + # p_id = msg_store.insert(['test1', 'test2']) + + # get it back with a name + print("querying") + print( + f"query result: {msg_store.query_named('my favourite pose', message_to_namespaced_type(Pose))}" + ) + + p.position.x = 666.0 + + # update it with a name + print("Updating named") + msg_store.update_named("my favourite pose", p) + + p.position.y = 2020.0 + + # update the other inserted one using the id + print("updating with id") + msg_store.update_id(p_id_other, p) + + stored_p, meta = msg_store.query_id(p_id_other, message_to_namespaced_type(Pose)) + + print(stored_p, meta) + + assert stored_p.position.x == 666 + assert stored_p.position.y == 2020 + print("stored object ok") + print(f"stored object inserted at {meta['inserted_at']} (UTC rostime)") + print(f"stored object last updated at {meta['last_updated_at']} (UTC rostime)") + + # some other things you can do... + + # get it back with a name + print("Getting back the object using its name") + print( + msg_store.query_named("my favourite pose", message_to_namespaced_type(Pose)) + ) + + # try to get it back with an incorrect name, so get None instead + print("Trying to get back the object with an incorrect name") + print( + msg_store.query_named( + "my favourite position", message_to_namespaced_type(Pose) + ) + ) + + # get all poses + print("Getting all poses") + print(msg_store.query(message_to_namespaced_type(Pose))) + + # get the latest one pose + print("Getting the latest pose") + print( + msg_store.query( + message_to_namespaced_type(Pose), + sort_query=[("$natural", -1)], + single=True, + ) + ) + + # get all non-existent typed objects, so get an empty list back + print("Getting objects with a non-existent type") + print(msg_store.query("not my type")) + + # get all poses where the y position is 1 + print("All poses where the y position is 1") + print(msg_store.query(message_to_namespaced_type(Pose), {"message.position.y": 1.0})) + + # get all poses where the y position greater than 0 + print("All poses where the y position is greater than 0") + print( + msg_store.query( + message_to_namespaced_type(Pose), {"message.position.y": {"$gt": 0}} + ) + ) + + print("deleting my favourite pose...") + msg_store.delete(p_id_fav) + print("deleting the other pose...") + msg_store.delete(p_id_other) + + except Exception: + import traceback + + print(traceback.print_exc()) diff --git a/mongodb_store/mongodb_store/scripts/example_multi_event_log.py b/mongodb_store/mongodb_store/scripts/example_multi_event_log.py new file mode 100755 index 0000000..51fc2f2 --- /dev/null +++ b/mongodb_store/mongodb_store/scripts/example_multi_event_log.py @@ -0,0 +1,78 @@ +from datetime import * + +import rclpy +from geometry_msgs.msg import Pose, Point, Quaternion +from std_msgs.msg import Bool + +from mongodb_store.message_store import MessageStoreProxy +from mongodb_store_msgs.msg import StringPairList, StringPair +from mongodb_store.util import message_to_namespaced_type + +if __name__ == "__main__": + rclpy.init() + node = rclpy.node.Node("example_multi_event_log") + + try: + + # let's say we have a couple of things that we need to store together + # these could be some sensor data, results of processing etc. + pose = Pose( + position=Point(x=0.0, y=1.0, z=2.0), + orientation=Quaternion(x=3.0, y=4.0, z=5.0, w=6.0), + ) + point = Point(x=7.0, y=8.0, z=9.0) + quaternion = Quaternion(x=10.0, y=11.0, z=12.0, w=13.0) + # note that everything that is pass to the message_store must be a ros message type + # therefore use std_msg types for standard data types like float, int, bool, string etc + result = Bool(data=True) + + # we will store our results in a separate collection + msg_store = MessageStoreProxy(node, collection="pose_results") + + messages_to_store = [pose, point, quaternion, result] + spl = StringPairList() + for message in messages_to_store: + # Each pair in the string pair list will be the type of message stored, and the id of the relevant + # message in the collection + spl.pairs.append( + StringPair( + first=message_to_namespaced_type(message), + second=msg_store.insert(message), + ) + ) + + # and add some meta information + meta = {"description": "this wasn't great"} + sec_ns = node.get_clock().now().seconds_nanoseconds() + fl = float(f"{sec_ns[0]}.{sec_ns[1]}") + meta["result_time"] = datetime.fromtimestamp(fl, timezone.utc) + msg_store.insert(spl, meta=meta) + + # now let's get all our logged data back + results = msg_store.query(message_to_namespaced_type(StringPairList)) + for message, meta in results: + if "description" in meta: + print(f"description: {meta['description']}") + print(f"result time (UTC from rostime): {meta['result_time']}") + print(f"inserted at (UTC from rostime): {meta['inserted_at']}") + pose = msg_store.query_id( + message.pairs[0].second, message_to_namespaced_type(Pose) + )[0] + point = msg_store.query_id( + message.pairs[1].second, message_to_namespaced_type(Point) + )[0] + quaternion = msg_store.query_id( + message.pairs[2].second, message_to_namespaced_type(Quaternion) + )[0] + result = msg_store.query_id( + message.pairs[3].second, message_to_namespaced_type(Bool) + )[0] + print(pose) + print(point) + print(quaternion) + print(result) + + except Exception: + import traceback + + print(traceback.print_exc()) diff --git a/mongodb_store/scripts/mongo_bridge.py b/mongodb_store/mongodb_store/scripts/mongo_bridge.py similarity index 100% rename from mongodb_store/scripts/mongo_bridge.py rename to mongodb_store/mongodb_store/scripts/mongo_bridge.py diff --git a/mongodb_store/scripts/mongodb_play.py b/mongodb_store/mongodb_store/scripts/mongodb_play.py similarity index 100% rename from mongodb_store/scripts/mongodb_play.py rename to mongodb_store/mongodb_store/scripts/mongodb_play.py diff --git a/mongodb_store/scripts/replicator_client.py b/mongodb_store/mongodb_store/scripts/replicator_client.py similarity index 100% rename from mongodb_store/scripts/replicator_client.py rename to mongodb_store/mongodb_store/scripts/replicator_client.py diff --git a/mongodb_store/scripts/replicator_node.py b/mongodb_store/mongodb_store/scripts/replicator_node.py similarity index 98% rename from mongodb_store/scripts/replicator_node.py rename to mongodb_store/mongodb_store/scripts/replicator_node.py index 251189d..117c298 100755 --- a/mongodb_store/scripts/replicator_node.py +++ b/mongodb_store/mongodb_store/scripts/replicator_node.py @@ -356,7 +356,14 @@ def do_cancel(self): self.restore_process.shutdown() self.restore_process = None +def main(): + rclpy.init() + node = rclpy.node.Node("mongodb_replicator") + store = Replicator(node) + rclpy.spin(node) + node.destroy_node() + rclpy.shutdown() + + if __name__ == '__main__': - rospy.init_node("mongodb_replicator") - store = Replicator() - rospy.spin() + main() \ No newline at end of file diff --git a/mongodb_store/mongodb_store/util.py b/mongodb_store/mongodb_store/util.py new file mode 100644 index 0000000..19154c5 --- /dev/null +++ b/mongodb_store/mongodb_store/util.py @@ -0,0 +1,495 @@ +import json +import typing + +import pymongo.collection +import rclpy +import rclpy.client +import rclpy.node +import rclpy.serialization +import rclpy.type_support +import rosidl_runtime_py.utilities +import yaml +from bson import json_util, Binary +from pymongo.errors import ConnectionFailure +from rclpy.executors import MultiThreadedExecutor +from std_srvs.srv import Empty + +from mongodb_store_msgs.msg import SerialisedMessage +from mongodb_store_msgs.srv import MongoQueryMsg + + +def check_connection_to_mongod( + parent_node: rclpy.node.Node, db_host, db_port, connection_string=None +) -> bool: + """ + Check connection to mongod server + + :Returns: + | bool : True on success, False if connection is not established. + """ + if check_for_pymongo(): + try: + # pymongo 3.X + from pymongo import MongoClient + + if connection_string is None: + client = MongoClient(db_host, db_port, connect=False) + else: + client = MongoClient(connection_string) + result = client.admin.command("ismaster") + return True + except ConnectionFailure: + if connection_string is None: + parent_node.get_logger().error( + f"Could not connect to mongo server {db_host}:{db_port}\nMake sure mongod is launched on your specified host/port" + ) + else: + parent_node.get_logger().error( + f"Could not connect to mongo server {connection_string}\nMake sure mongod is launched on your specified host/port" + ) + return False + else: + return False + + +def wait_for_mongo(parent_node: rclpy.node.Node, timeout=60, ns="/datacentre"): + """ + Waits for the mongo server, as started through the mongodb_store/mongodb_server.py wrapper + + :Returns: + | bool : True on success, False if server not even started. + """ + # # Check that mongo is live, create connection + service = ns + "/wait_ready" + wait_client = parent_node.create_client(Empty, service) + + result, message = check_and_get_service_result_async( + parent_node, wait_client, Empty.Request(), existence_timeout=timeout + ) + if result is None: + parent_node.get_logger().error( + "Can't connect to MongoDB server. Make sure mongodb_store/mongodb_server.py node is started." + ) + return False + return True + + +def check_and_get_service_result_async( + node: rclpy.node.Node, + client: rclpy.client.Client, + request, + existence_timeout: typing.Optional[float] = 1, + spin_timeout: typing.Optional[float] = None, +) -> typing.Tuple[typing.Optional[typing.Any], str]: + """ + Check the service for the given client exists, then call it and retrieve the response asynchronously, + but block to do so. Calls the executor from the node associated with the client, using its + spin_until_future_complete function + + Note: If the client is being called from a callback (i.e. subscriber callback, service callback, any actionserver + callback) You must ensure that the client is in a separate callback group to the one which is initiating this + call. If it is not, you will probably get a silent deadlock. + + Args: + node: Node which created the client. TODO: Is this needed? The client has a context and handle but not sure if those have references to the node + client: Client to call + request: Request to send to the client + existence_timeout: How long to wait for the service to become available + spin_timeout: How long to spin waiting for the result before timing out + + Returns: + Tuple with result of the call, or None if it failed, and a message + """ + if not client.wait_for_service(timeout_sec=existence_timeout): + message = f"Couldn't find {client.srv_name}" + node.get_logger().error(message) + return None, message + resp_future = client.call_async(request) + # Don't use rclpy.spin_until_future_completes on the node because then the node is removed from the global + # executor and will not receive any further callbacks + if not node.executor: + # do this in case the node doesn't have an executor, and remove the executor after we're done, otherwise the + # executor is permanently set as that node's executor + rclpy.spin_until_future_complete( + node, + resp_future, + timeout_sec=spin_timeout, + executor=MultiThreadedExecutor(), + ) + node.executor = None + else: + node.executor.spin_until_future_complete(resp_future, timeout_sec=spin_timeout) + + return resp_future.result(), f"Successfully called {client.srv_name}" + + +def check_for_pymongo(): + """ + Checks for required version of pymongo python library. + + :Returns: + | bool : True if found, otherwise Fale + """ + try: + import pymongo + except: + print("ERROR!!!") + print("Can't import pymongo, this is needed by mongodb_store.") + print("Make sure it is installed (pip install pymongo)") + return False + + return True + + +def import_MongoClient(): + """ + Pick an object to use as MongoClient based on the currently installed pymongo + version. Use this instead of importing Connection or MongoClient from pymongo + directly. + + Example: + MongoClient = util.importMongoClient() + """ + import pymongo + + if pymongo.version >= "2.4": + + def mongo_client_wrapper(*args, **kwargs): + return pymongo.MongoClient(*args, **kwargs) + + return mongo_client_wrapper + + +def msg_to_document(msg): + """ + Given a ROS message, turn it into a (nested) dictionary suitable for the datacentre. + + >>> from geometry_msgs.msg import Pose + >>> msg_to_document(Pose()) + {'orientation': {'w': 0.0, 'x': 0.0, 'y': 0.0, 'z': 0.0}, + 'position': {'x': 0.0, 'y': 0.0, 'z': 0.0}} + + :Args: + | msg (ROS Message): An instance of a ROS message to convert + :Returns: + | dict : A dictionary representation of the supplied message. + """ + return yaml.safe_load(rosidl_runtime_py.message_to_yaml(msg)) + + +def sanitize_value(attr, v, type): + """ + De-rosify a msg. + + Internal function used to convert ROS messages into dictionaries of pymongo insertable + values. + + :Args: + | attr(str): the ROS message slot name the value came from + | v: the value from the message's slot to make into a MongoDB able type + | type (str): The ROS type of the value passed, as given by the ressage slot_types member. + :Returns: + | A sanitized version of v. + """ + + # print '---' + # print attr + # print v.__class__ + # print type + # print v + + if isinstance(v, str): + if type == "uint8[]": + v = Binary(v) + + # no need to carry on with the other type checks below + return v + + if rclpy.type_support.check_for_type_support(v): + # This should be a sufficient check for whether something is a ros msg, srv, or action + return msg_to_document(v) + elif isinstance(v, list): + result = [] + for t in v: + if hasattr(t, "_type"): + result.append(sanitize_value(None, t, t._type)) + else: + result.append(sanitize_value(None, t, None)) + return result + else: + return v + + +def store_message(collection: pymongo.collection.Collection, msg, meta, oid=None): + """ + Update ROS message into the DB + + :Args: + | collection (pymongo.Collection): the collection to store the message in + | msg (ROS message): an instance of a ROS message to store + | meta (dict): Additional meta data to store with the ROS message + | oid (str): An optional ObjectID for the MongoDB document created. + :Returns: + | str: ObjectId of the MongoDB document. + """ + # The message should be stored separate from the meta fields so it can be more easily restored. Otherwise rosidl + # dict to message has problems later because the message doesn't have the meta fields + doc = {"message": msg_to_document(msg)} + + message_type = message_to_namespaced_type(msg) + doc["_meta"] = meta + # also store type information + doc["_meta"]["stored_class"] = ".".join([msg.__module__, msg.__class__.__name__]) + doc["_meta"]["stored_type"] = message_type + + if hasattr(msg, "_connection_header"): + print(getattr(msg, "_connection_header")) + + if oid != None: + doc["_id"] = oid + + return collection.insert_one(doc) + + +def store_message_no_meta(collection, msg): + """ + Store a ROS message sans meta data. + + :Args: + | collection (pymongo.Collection): The collection to store the message in + | msg (ROS message): An instance of a ROS message to store + :Returns: + | str: The ObjectId of the MongoDB document created. + """ + doc = msg_to_document(msg) + return collection.insert_one(doc) + + +def query_message( + collection, + query_doc, + sort_query=None, + projection_query=None, + find_one=False, + limit=0, +): + """ + Peform a query for a stored messages, returning results in list. + + :Args: + | collection (pymongo.Collection): The collection to query + | query_doc (dict): The MongoDB query to execute + | sort_query (list of tuple): The MongoDB query to sort + | projection_query (dict): The projection query + | find_one (bool): Returns one matching document if True, otherwise all matching. + | limit (int): Limits number of return documents. 0 means no limit + :Returns: + | dict or list of dict: the MongoDB document(s) found by the query + """ + if sort_query is None: + sort_query = [] + if projection_query is None: + projection_query = {} + if find_one: + ids = () + if sort_query: + if not projection_query: + result = collection.find_one(query_doc, sort=sort_query) + else: + result = collection.find_one( + query_doc, projection_query, sort=sort_query + ) + elif projection_query: + result = collection.find_one(query_doc, projection_query) + else: + result = collection.find_one(query_doc) + if result: + return [result] + else: + return [] + else: + if sort_query: + if not projection_query: + return [ + result + for result in collection.find(query_doc) + .sort(sort_query) + .limit(limit) + ] + else: + return [ + result + for result in collection.find(query_doc, projection_query) + .sort(sort_query) + .limit(limit) + ] + elif projection_query: + return [ + result + for result in collection.find(query_doc, projection_query).limit(limit) + ] + else: + return [result for result in collection.find(query_doc).limit(limit)] + + +def update_message( + collection: pymongo.collection.Collection, query_doc, msg, meta, upsert +): + """ + Update ROS message in the DB, return updated id and true if db altered. + + :Args: + | collection (pymongo.Collection): The collection to update in + | query_doc (dict): The MongoDB query to execute to select document for update + | msg (ROS message): An instance of a ROS message to update to + | meta (dict): New meta data to update the stored message with + | upsert (bool): If message does not already exits, create if upsert==True. + :Returns: + | str, bool: the OjectId of the updated document and whether it was altered by + the operation + """ + # see if it's in db first + result = collection.find_one(query_doc) + + # if it's not in there but we're allowed to insert + if not result: + if upsert: + return store_message(collection, msg, meta), True + else: + return "", False + + # convert msg to db document + doc = {"message": msg_to_document(msg)} + + # update _meta + doc["_meta"] = result["_meta"] + # merge the two dicts, overwiriting elements in doc["_meta"] with elements in meta + doc["_meta"] = dict(list(doc["_meta"].items()) + list(meta.items())) + + # ensure necessary parts are there too + doc["_meta"]["stored_class"] = ".".join([msg.__module__, msg.__class__.__name__]) + doc["_meta"]["stored_type"] = message_to_namespaced_type(msg) + + # Have to use the $set command to actually update the matching entry + set_cmd = {"$set": doc} + + return collection.update_one(query_doc, set_cmd), True + + +def query_message_ids(collection, query_doc, find_one): + """ + Peform a query for a stored message, returning a tuple of id strings + + :Args: + | collection (pymongo.Collection): The collection to search + | query_doc (dict): The MongoDB query to execute + | find_one (bool): Find one matching document if True, otherwise all matching. + :Returns: + | tuple of strings: all ObjectIds of matching documents + """ + if find_one: + result = collection.find_one(query_doc) + if result: + return (str(result["_id"]),) + else: + return tuple( + str(result["_id"]) for result in collection.find(query_doc, {"_id": 1}) + ) + + +def message_to_namespaced_type(message): + """ + Takes a ROS msg and turn it into a namespaced type + + E.g + >>> type(Pose()) + + >>> type_to_class_string(Pose()) + geometry_msgs/Pose + + :Args: + | type (ROS message): The ROS message object + :Returns: + | str: A python class string for the ROS message type supplied + """ + if "metaclass" in str(type(message)).lower(): + # print( + # f"Received message {type(message)} which is a metaclass. You should pass instantiated objects rather than " + # f"metaclasses, but I'll convert it for you." + # ) + message = message() + message_type = type(message) + module_parts = message_type.__module__.split(".") + cls_string = f"{module_parts[0]}/{module_parts[1]}/{message_type.__name__}" + return cls_string + + +def serialise_message(message): + """ + Create a mongodb_store_msgs/SerialisedMessage instance from a ROS message. + + :Args: + | message (ROS message): The message to serialise + :Returns: + | mongodb_store_msgs.msg.SerialisedMessage: A serialised copy of message + """ + msg_bytes = rclpy.serialization.serialize_message(message) + serialised_msg = SerialisedMessage() + serialised_msg.msg = msg_bytes + serialised_msg.type = message_to_namespaced_type(message) + + return serialised_msg + + +def deserialise_message(serialised_message): + """ + Create a ROS message from a mongodb_store_msgs/SerialisedMessage + + :Args: + | serialised_message (mongodb_store_msgs.msg.SerialisedMessage): The message to deserialise + :Returns: + | ROS message: The message deserialised + """ + cls = rosidl_runtime_py.utilities.get_interface(serialised_message.type) + message = rclpy.serialization.deserialize_message( + bytes(serialised_message.msg), cls + ) + return message + + +def string_pair_list_to_dictionary_no_json(spl): + """ + Covert a mongodb_store_msgs/StringPairList into a dictionary, ignoring content + + :Args: + | spl (StringPairList): The list of (key, value) to pairs convert + :Returns: + | dict: resulting dictionary + """ + return dict((pair.first, pair.second) for pair in spl) + + +def string_pair_list_to_dictionary(spl): + """ + Creates a dictionary from a mongodb_store_msgs/StringPairList which could contain JSON as a string. + If the first entry in the supplied list is a JSON query then the returned dictionary is loaded from that. + + :Args: + | spl (StringPairList): The list of (key, value) pairs to convert + :Returns: + | dict: resulting dictionary + """ + if len(spl.pairs) > 0 and spl.pairs[0].first == MongoQueryMsg.Request.JSON_QUERY: + # print "looks like %s", spl.pairs[0].second + # json loads will return None if the pair value is 'null'. Make sure it returns a dict. + return json.loads(spl.pairs[0].second, object_hook=json_util.object_hook) or {} + # else use the string pairs + else: + return string_pair_list_to_dictionary_no_json(spl.pairs) + + +def topic_name_to_collection_name(topic_name): + """ + Converts the fully qualified name of a topic into legal mongodb collection name. + """ + return topic_name.replace("/", "_")[1:] diff --git a/mongodb_store/package.xml b/mongodb_store/package.xml index 495aab8..34fb9fd 100644 --- a/mongodb_store/package.xml +++ b/mongodb_store/package.xml @@ -1,7 +1,7 @@ mongodb_store - 0.5.2 + 0.6.0 A package to support MongoDB-based storage and analysis for data from a ROS system, eg. saved messages, configurations etc Nick Hawes @@ -12,42 +12,40 @@ https://github.com/strands-project/mongodb_store MIT - catkin - - rospy - roscpp - std_msgs - std_srvs - message_generation - mongodb_store_msgs - rostest - python-catkin-pkg - python3-catkin-pkg - mongodb - libmongoclient-dev - libssl-dev - topic_tools - + ament_python + + rospy + roscpp + std_msgs + std_srvs + mongodb_store_msgs + + + mongodb + + + + - geometry_msgs - - rospy - roscpp - std_msgs - std_srvs - python-future - python3-future - python-pymongo - python3-pymongo - mongodb - mongodb_store_msgs - libmongoclient-dev - topic_tools + + + + + + + + + + + + + + - geometry_msgs + - + ament_python diff --git a/mongodb_store/resource/mongodb_store b/mongodb_store/resource/mongodb_store new file mode 100644 index 0000000..e69de29 diff --git a/mongodb_store/scripts/example_message_store_client.py b/mongodb_store/scripts/example_message_store_client.py deleted file mode 100755 index 1e25335..0000000 --- a/mongodb_store/scripts/example_message_store_client.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -from __future__ import print_function -import rospy -import mongodb_store_msgs.srv as dc_srv -import mongodb_store.util as dc_util -from mongodb_store.message_store import MessageStoreProxy -from geometry_msgs.msg import Pose, Point, Quaternion -import platform -if float(platform.python_version()[0:2]) >= 3.0: - import io -else: - import StringIO - - -if __name__ == '__main__': - rospy.init_node("example_message_store_client") - - - msg_store = MessageStoreProxy() - - p = Pose(Point(0, 1, 2), Quaternion(3, 4, 5, 6)) - - try: - - - # insert a pose object with a name, store the id from db - p_id = msg_store.insert_named("my favourite pose", p) - - # you don't need a name (note that this p_id is different than one above) - p_id = msg_store.insert(p) - - # p_id = msg_store.insert(['test1', 'test2']) - - # get it back with a name - print(msg_store.query_named("my favourite pose", Pose._type)) - - p.position.x = 666 - - # update it with a name - msg_store.update_named("my favourite pose", p) - - p.position.y = 2020 - - # update the other inserted one using the id - msg_store.update_id(p_id, p) - - stored_p, meta = msg_store.query_id(p_id, Pose._type) - - assert stored_p.position.x == 666 - assert stored_p.position.y == 2020 - print("stored object ok") - print("stored object inserted at %s (UTC rostime) by %s" % (meta['inserted_at'], meta['inserted_by'])) - print("stored object last updated at %s (UTC rostime) by %s" % (meta['last_updated_at'], meta['last_updated_by'])) - - # some other things you can do... - - # get it back with a name - print(msg_store.query_named("my favourite pose", Pose._type)) - - - # try to get it back with an incorrect name, so get None instead - print(msg_store.query_named("my favourite position", Pose._type)) - - # get all poses - print(msg_store.query(Pose._type)) - - # get the latest one pose - print(msg_store.query(Pose._type, sort_query=[("$natural", -1)], single=True)) - - # get all non-existant typed objects, so get an empty list back - print(msg_store.query( "not my type")) - - # get all poses where the y position is 1 - print(msg_store.query(Pose._type, {"position.y": 1})) - - # get all poses where the y position greater than 0 - print(msg_store.query(Pose._type, {"position.y": {"$gt": 0}})) - - - except rospy.ServiceException as e: - print("Service call failed: %s"%e) - - - diff --git a/mongodb_store/scripts/example_multi_event_log.py b/mongodb_store/scripts/example_multi_event_log.py deleted file mode 100755 index ce84623..0000000 --- a/mongodb_store/scripts/example_multi_event_log.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python -from __future__ import print_function -import rospy -from mongodb_store_msgs.msg import StringPairList, StringPair -import mongodb_store_msgs.srv as dc_srv -import mongodb_store.util as dc_util -from mongodb_store.message_store import MessageStoreProxy -from geometry_msgs.msg import Pose, Point, Quaternion -from std_msgs.msg import Bool -from datetime import * -import platform -if float(platform.python_version()[0:2]) >= 3.0: - import io -else: - import StringIO - -if __name__ == '__main__': - rospy.init_node("example_multi_event_log") - - try: - - # let's say we have a couple of things that we need to store together - # these could be some sensor data, results of processing etc. - pose = Pose(Point(0, 1, 2), Quaternion(3, 4, 5, 6)) - point = Point(7, 8, 9) - quaternion = Quaternion(10, 11, 12, 13) - # note that everything that is pass to the message_store must be a ros message type - #therefore use std_msg types for standard data types like float, int, bool, string etc - result = Bool(True) - - - # we will store our results in a separate collection - msg_store = MessageStoreProxy(collection='pose_results') - # save the ids from each addition - stored = [] - stored.append([pose._type, msg_store.insert(pose)]) - stored.append([point._type, msg_store.insert(point)]) - stored.append([quaternion._type, msg_store.insert(quaternion)]) - stored.append([result._type, msg_store.insert(result)]) - - # now store ids togther in store, addition types for safety - spl = StringPairList() - for pair in stored: - spl.pairs.append(StringPair(pair[0], pair[1])) - - # and add some meta information - meta = {} - meta['description'] = "this wasn't great" - meta['result_time'] = datetime.utcfromtimestamp(rospy.get_rostime().to_sec()) - msg_store.insert(spl, meta = meta) - - # now let's get all our logged data back - results = msg_store.query(StringPairList._type) - for message, meta in results: - if 'description' in meta: - print('description: %s' % meta['description']) - print('result time (UTC from rostime): %s' % meta['result_time']) - print('inserted at (UTC from rostime): %s' % meta['inserted_at']) - pose = msg_store.query_id(message.pairs[0].second, Pose._type) - point = msg_store.query_id(message.pairs[1].second, Point._type) - quaternion = msg_store.query_id(message.pairs[2].second, Quaternion._type) - result = msg_store.query_id(message.pairs[3].second, Bool._type) - - - except rospy.ServiceException as e: - print("Service call failed: %s"%e) - - - diff --git a/mongodb_store/scripts/message_store_node.py b/mongodb_store/scripts/message_store_node.py deleted file mode 100755 index f79209e..0000000 --- a/mongodb_store/scripts/message_store_node.py +++ /dev/null @@ -1,329 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function -from future.utils import iteritems -""" -Provides a service to store ROS message objects in a mongodb database in JSON. -""" - -import genpy -import rospy -import pymongo -from pymongo import GEO2D -import json -from bson import json_util -from bson.objectid import ObjectId -from datetime import * -from tf2_msgs.msg import TFMessage - - -import mongodb_store_msgs.srv as dc_srv -import mongodb_store.util as dc_util -from mongodb_store_msgs.msg import StringPair, StringPairList, Insert - -MongoClient = dc_util.import_MongoClient() - -class MessageStore(object): - def __init__(self, replicate_on_write=False): - - use_daemon = rospy.get_param('mongodb_use_daemon', False) - connection_string = rospy.get_param('/mongodb_connection_string', '') - use_connection_string = len(connection_string) > 0 - if use_connection_string: - use_daemon = True - rospy.loginfo('Using connection string: %s', connection_string) - - # If you want to use a remote datacenter, then it should be set as false - use_localdatacenter = rospy.get_param('~mongodb_use_localdatacenter', True) - local_timeout = rospy.get_param('~local_timeout', 10) - if str(local_timeout).lower() == "none": - local_timeout = None - - # wait for hostname and port for mongodb server - for _ in range(10): - if rospy.has_param('mongodb_host') and rospy.has_param('mongodb_port'): - break - rospy.sleep(1.0) - db_host = rospy.get_param('mongodb_host') - db_port = rospy.get_param('mongodb_port') - - if use_daemon: - if use_connection_string: - is_daemon_alive = dc_util.check_connection_to_mongod(None, None, connection_string=connection_string) - else: - is_daemon_alive = dc_util.check_connection_to_mongod(db_host, db_port) - if not is_daemon_alive: - raise Exception("No Daemon?") - elif use_localdatacenter: - rospy.loginfo('Waiting for local datacentre (timeout: %s)' % str(local_timeout)) - have_dc = dc_util.wait_for_mongo(local_timeout) - if not have_dc: - raise Exception("No Datacentre?") - - self.keep_trash = rospy.get_param('mongodb_keep_trash', True) - - if use_connection_string: - self._mongo_client=MongoClient(connection_string) - else: - self._mongo_client=MongoClient(db_host, db_port) - - self.replicate_on_write = rospy.get_param( - "mongodb_replicate_on_write", replicate_on_write) - if self.replicate_on_write: - rospy.logwarn( - "The option 'replicate_on_write' is now deprecated and will be removed. " - "Use 'Replication' on MongoDB instead: " - "https://docs.mongodb.com/manual/replication/") - - extras = rospy.get_param('mongodb_store_extras', []) - self.extra_clients = [] - for extra in extras: - try: - self.extra_clients.append(MongoClient(extra[0], extra[1])) - except pymongo.errors.ConnectionFailure as e: - rospy.logwarn('Could not connect to extra datacentre at %s:%s' % (extra[0], extra[1])) - rospy.loginfo('Replicating content to a futher %s datacentres',len(self.extra_clients)) - - # advertise ros services - for attr in dir(self): - if attr.endswith("_ros_srv"): - service=getattr(self, attr) - rospy.Service("/message_store/"+attr[:-8], service.type, service) - - self.queue_size = rospy.get_param("queue_size", 100) - self.sub_insert = rospy.Subscriber("/message_store/insert", Insert, - self.insert_ros_msg, - queue_size=self.queue_size) - - def insert_ros_msg(self, msg): - """ - Receives a message published - """ - # actually procedure is the same - self.insert_ros_srv(msg) - - def insert_ros_srv(self, req): - """ - Receives a - """ - # deserialize data into object - obj = dc_util.deserialise_message(req.message) - # convert input tuple to dict - meta = dc_util.string_pair_list_to_dictionary(req.meta) - # get requested collection from the db, creating if necessary - collection = self._mongo_client[req.database][req.collection] - # check if the object has the location attribute - if hasattr(obj, 'pose'): - # if it does create a location index - collection.create_index([("loc", pymongo.GEO2D)]) - - #check if the object has the location attribute - if hasattr(obj, 'geotype'): - # if it does create a location index - collection.create_index([("geoloc", pymongo.GEOSPHERE)]) - - # check if the object has the timestamp attribute TODO ?? really necessary - # if hasattr(obj, 'logtimestamp'): - # if it does create a location index - # collection.create_index([("datetime", pymongo.GEO2D)]) - - # try: - stamp = rospy.get_rostime() - meta['inserted_at'] = datetime.utcfromtimestamp(stamp.to_sec()) - meta['inserted_by'] = req._connection_header['callerid'] - if hasattr(obj, "header") and hasattr(obj.header, "stamp") and\ - isinstance(obj.header.stamp, genpy.Time): - stamp = obj.header.stamp - elif isinstance(obj, TFMessage): - if obj.transforms: - transforms = sorted(obj.transforms, - key=lambda m: m.header.stamp, reverse=True) - stamp = transforms[0].header.stamp - - meta['published_at'] = datetime.utcfromtimestamp(stamp.to_sec()) - meta['timestamp'] = stamp.to_nsec() - - obj_id = dc_util.store_message(collection, obj, meta) - - if self.replicate_on_write: - # also do insert to extra datacentres, making sure object ids are consistent - for extra_client in self.extra_clients: - extra_collection = extra_client[req.database][req.collection] - dc_util.store_message(extra_collection, obj, meta, obj_id) - - return str(obj_id) - # except Exception, e: - # print e - - insert_ros_srv.type=dc_srv.MongoInsertMsg - - def delete_ros_srv(self, req): - """ - Deletes a message by ID - """ - # Get the message - collection = self._mongo_client[req.database][req.collection] - docs = dc_util.query_message(collection, {"_id": ObjectId(req.document_id)}, find_one=True) - if len(docs) != 1: - return False - - message = docs[0] - - # Remove the doc - collection.remove({"_id": ObjectId(req.document_id)}) - - if self.keep_trash: - # But keep it into "trash" - bk_collection = self._mongo_client[req.database][req.collection + "_Trash"] - bk_collection.save(message) - - - # also repeat in extras - if self.replicate_on_write: - for extra_client in self.extra_clients: - extra_collection = extra_client[req.database][req.collection] - extra_collection.remove({"_id": ObjectId(req.document_id)}) - extra_bk_collection = extra_client[req.database][req.collection + "_Trash"] - extra_bk_collection.save(message) - - return True - delete_ros_srv.type=dc_srv.MongoDeleteMsg - - - def update_ros_srv(self, req): - """ - Updates a msg in the store - """ - # rospy.lrosoginfo("called") - collection = self._mongo_client[req.database][req.collection] - - # build the query doc - obj_query = self.to_query_dict(req.message_query, req.meta_query) - - # restrict results to have the type asked for - obj_query["_meta.stored_type"] = req.message.type - - # TODO start using some string constants! - - rospy.logdebug("update spec document: %s", obj_query) - - # deserialize data into object - obj = dc_util.deserialise_message(req.message) - - meta = dc_util.string_pair_list_to_dictionary(req.meta) - meta['last_updated_at'] = datetime.utcfromtimestamp(rospy.get_rostime().to_sec()) - meta['last_updated_by'] = req._connection_header['callerid'] - - (obj_id, altered) = dc_util.update_message(collection, obj_query, obj, meta, req.upsert) - - if self.replicate_on_write: - # also do update to extra datacentres - for extra_client in self.extra_clients: - extra_collection = extra_client[req.database][req.collection] - dc_util.update_message(extra_collection, obj_query, obj, meta, req.upsert) - - return str(obj_id), altered - update_ros_srv.type=dc_srv.MongoUpdateMsg - - def to_query_dict(self, message_query, meta_query): - """ - Decodes and combines the given StringPairList queries into a single mongodb query - """ - obj_query = dc_util.string_pair_list_to_dictionary(message_query) - bare_meta_query = dc_util.string_pair_list_to_dictionary(meta_query) - for (k, v) in iteritems(bare_meta_query): - obj_query["_meta." + k] = v - return obj_query - - def query_messages_ros_srv(self, req): - """ - Returns t - """ - collection = self._mongo_client[req.database][req.collection] - - # build the query doc - obj_query = self.to_query_dict(req.message_query, req.meta_query) - - # restrict results to have the type asked for - obj_query["_meta.stored_type"] = req.type - - # TODO start using some string constants! - - rospy.logdebug("query document: %s", obj_query) - - # this is a list of entries in dict format including meta - sort_query_dict = dc_util.string_pair_list_to_dictionary(req.sort_query) - sort_query_tuples = [] - for k, v in iteritems(sort_query_dict): - try: - sort_query_tuples.append((k, int(v))) - except ValueError: - sort_query_tuples.append((k,v)) - # this is a list of entries in dict format including meta - - - projection_query_dict = dc_util.string_pair_list_to_dictionary(req.projection_query) - projection_meta_dict = dict() - projection_meta_dict["_meta"] = 1 - - entries = dc_util.query_message( - collection, obj_query, sort_query_tuples, projection_query_dict, req.single, req.limit) - if projection_query_dict: - meta_entries = dc_util.query_message( - collection, obj_query, sort_query_tuples, projection_meta_dict, req.single, req.limit) - - - # keep trying clients until we find an answer - if self.replicate_on_write: - for extra_client in self.extra_clients: - if len(entries) == 0: - extra_collection = extra_client[req.database][req.collection] - entries = dc_util.query_message( - extra_collection, obj_query, sort_query_tuples, projection_query_dict, req.single, req.limit) - if projection_query_dict: - meta_entries = dc_util.query_message( - extra_collection, obj_query, sort_query_tuples, projection_meta_dict, req.single, req.limit) - if len(entries) > 0: - rospy.loginfo("found result in extra datacentre") - else: - break - - serialised_messages = () - metas = () - - for idx, entry in enumerate(entries): - - # load the class object for this type - # TODO this should be the same for every item in the list, so could reuse - cls = dc_util.load_class(entry["_meta"]["stored_class"]) - # instantiate the ROS message object from the dictionary retrieved from the db - message = dc_util.dictionary_to_message(entry, cls) - # the serialise this object in order to be sent in a generic form - serialised_messages = serialised_messages + (dc_util.serialise_message(message), ) - # add ObjectID into meta as it might be useful later - if projection_query_dict: - entry["_meta"]["_id"] = meta_entries[idx]["_id"] - else: - entry["_meta"]["_id"] = entry["_id"] - # serialise meta - metas = metas + (StringPairList([StringPair(dc_srv.MongoQueryMsgRequest.JSON_QUERY, json.dumps(entry["_meta"], default=json_util.default))]), ) - - return [serialised_messages, metas] - - query_messages_ros_srv.type=dc_srv.MongoQueryMsg - - def query_with_projection_messages_ros_srv(self, req): - """ - Returns t - """ - return self.query_messages_ros_srv(req) - - query_with_projection_messages_ros_srv.type=dc_srv.MongoQuerywithProjectionMsg - - -if __name__ == '__main__': - rospy.init_node("message_store") - - store = MessageStore() - - rospy.spin() diff --git a/mongodb_store/scripts/mongodb_server.py b/mongodb_store/scripts/mongodb_server.py deleted file mode 100755 index c3c1bd8..0000000 --- a/mongodb_store/scripts/mongodb_server.py +++ /dev/null @@ -1,202 +0,0 @@ -#!/usr/bin/env python -from __future__ import absolute_import -import rospy -import subprocess -import sys -import os -import re -import signal -import errno -from std_srvs.srv import Empty, EmptyResponse -import shutil -import platform -if float(platform.python_version()[0:2]) >= 3.0: - _PY3 = True -else: - _PY3 = False - -import mongodb_store.util - -if not mongodb_store.util.check_for_pymongo(): - sys.exit(1) - -MongoClient = mongodb_store.util.import_MongoClient() - -import pymongo - -def is_socket_free(host, port): - import socket; - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - result = sock.connect_ex((host, port)) - return result != 0 - -class MongoServer(object): - def __init__(self): - rospy.init_node("mongodb_server", anonymous=True)#, disable_signals=True) - - - # Has the db already gone down, before the ros node? - self._gone_down = False - - self._ready = False # is the db ready: when mongo says "waiting for connection" - - - self.test_mode = rospy.get_param("~test_mode", False) - self.repl_set = rospy.get_param("~repl_set", None) - self.bind_to_host = rospy.get_param("~bind_to_host", False) - - - if self.test_mode: - import random - - default_host = "localhost" - default_port = random.randrange(49152,65535) - - count = 0 - while not is_socket_free(default_host, default_port): - default_port = random.randrange(49152,65535) - count += 1 - if count > 100: - rospy.logerr("Can't find a free port to run the test server on.") - sys.exit(1) - - self.default_path = "/tmp/ros_mongodb_store_%d" % default_port - os.mkdir(self.default_path) - else: - default_host = "localhost" - default_port = 27017 - self.default_path = "/opt/ros/mongodb_store" - - # Get the database path - self._db_path = rospy.get_param("~database_path", self.default_path) - is_master = rospy.get_param("~master", True) - - if is_master: - self._mongo_host = rospy.get_param("mongodb_host", default_host) - rospy.set_param("mongodb_host",self._mongo_host) - self._mongo_port = rospy.get_param("mongodb_port", default_port) - rospy.set_param("mongodb_port",self._mongo_port) - else: - self._mongo_host = rospy.get_param("~host") - self._mongo_port = rospy.get_param("~port") - - rospy.loginfo("Mongo server address: "+self._mongo_host+":"+str(self._mongo_port)) - - # Check that mongodb is installed - try: - mongov = subprocess.check_output(["mongod","--version"]) - match = re.search("db version v(\d+\.\d+\.\d+)", mongov.decode('utf-8')) - self._mongo_version=match.group(1) - except subprocess.CalledProcessError: - rospy.logerr("Can't find MongoDB executable. Is it installed?\nInstall it with \"sudo apt-get install mongodb\"") - sys.exit(1) - rospy.loginfo("Found MongoDB version " + self._mongo_version) - - # Check that the provided db path exists. - if not os.path.exists(self._db_path): - rospy.logerr("Can't find database at supplied path " + self._db_path + ". If this is a new DB, create it as an empty directory.") - sys.exit(1) - - # Advertise ros services for db interaction - self._shutdown_srv = rospy.Service("/datacentre/shutdown", Empty, self._shutdown_srv_cb) - self._wait_ready_srv = rospy.Service("/datacentre/wait_ready",Empty,self._wait_ready_srv_cb) - - rospy.on_shutdown(self._on_node_shutdown) - - # Start the mongodb server - self._mongo_loop() - - def _mongo_loop(self): - - # Blocker to prevent Ctrl-C being passed to the mongo server - def block_mongo_kill(): - os.setpgrp() -# signal.signal(signal.SIGINT, signal.SIG_IGN) - - #cmd = ["mongod","--dbpath",self._db_path,"--port",str(self._mongo_port),"--smallfiles","--bind_ip","127.0.0.1"] - cmd = ["mongod","--dbpath",self._db_path,"--port",str(self._mongo_port)] - - if self.bind_to_host: - cmd.append("--bind_ip") - cmd.append(self._mongo_host) - else: - cmd.append("--bind_ip") - cmd.append("0.0.0.0") - - - if self.repl_set is not None: - cmd.append("--replSet") - cmd.append(self.repl_set) - self._mongo_process = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - preexec_fn = block_mongo_kill) - - while self._mongo_process.poll() is None:# and not rospy.is_shutdown(): - try: - stdout = self._mongo_process.stdout.readline().decode('utf-8') - except IOError as e: # probably interupt because shutdown cut it up - if e.errno == errno.EINTR: - continue - else: - raise - if stdout is not None: - if stdout.find("ERROR") != -1: - rospy.logerr(stdout.strip()) - else: - rospy.loginfo(stdout.strip()) - - if stdout.find("waiting for connections on port") !=-1: - self._ready=True - if self.repl_set is not None: - try: - self.initialize_repl_set() - except Exception as e: - rospy.logwarn("initialzing replSet failed: %s" % e) - - if not rospy.is_shutdown(): - rospy.logerr("MongoDB process stopped!") - - if self._mongo_process.returncode!=0: - rospy.logerr("Mongo process error! Exit code="+str(self._mongo_process.returncode)) - - self._gone_down = True - self._ready=False - - def _on_node_shutdown(self): - rospy.loginfo("Shutting down datacentre") - if self._gone_down: - rospy.logwarn("It looks like Mongo already died. Watch out as the DB might need recovery time at next run.") - return - try: - c = MongoClient(host=self._mongo_host, port=self._mongo_port) - except pymongo.errors.ConnectionFailure: - c = None - try: - if c is not None: - c.admin.command("shutdown") - except pymongo.errors.AutoReconnect: - pass - - if self.test_mode: # remove auto-created DB in the /tmp folder - try: - shutil.rmtree(self.default_path) - except Exception as e: - rospy.logerr(e) - - def _shutdown_srv_cb(self,req): - rospy.signal_shutdown("Shutdown request..") - return EmptyResponse() - - def _wait_ready_srv_cb(self,req): - while not self._ready: - rospy.sleep(0.1) - return EmptyResponse() - - def initialize_repl_set(self): - c = pymongo.Connection("%s:%d" % (self._mongo_host,self._mongo_port), slave_okay=True) - c.admin.command("replSetInitiate") - c.close() - -if __name__ == '__main__': - server = MongoServer() - diff --git a/mongodb_store/setup.cfg b/mongodb_store/setup.cfg new file mode 100644 index 0000000..e7fbff9 --- /dev/null +++ b/mongodb_store/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/mongodb_store +[install] +install_scripts=$base/lib/mongodb_store \ No newline at end of file diff --git a/mongodb_store/setup.py b/mongodb_store/setup.py index a1f6097..a4c24fe 100644 --- a/mongodb_store/setup.py +++ b/mongodb_store/setup.py @@ -1,11 +1,62 @@ -## ! DO NOT MANUALLY INVOKE THIS setup.py, USE CATKIN INSTEAD +import os +import typing +from glob import glob -from distutils.core import setup -from catkin_pkg.python_setup import generate_distutils_setup +from setuptools import find_packages, setup -# fetch values from package.xml -setup_args = generate_distutils_setup( - packages=['mongodb_store'], - package_dir={'': 'src'}) +package_name = "mongodb_store" -setup(**setup_args) + +def glob_files( + directory: str, + file_matcher: str = "*", + recursive=True, +) -> typing.Tuple[str, typing.List[str]]: + """ + Glob files in the given directory to use in the data files part of setup. + + Args: + directory: Directory to glob + file_matcher: Shell-style matching string used to match files to glob + recursive: Recurse over subdirectories. This probably doesn't work because subdirectories also get globbed + and setup doesn't like that. + + Returns: + Tuple of the directory in the share location, and list of globbed files + """ + return os.path.join("share", package_name, directory), glob( + os.path.join(directory, "" if not recursive else "**", file_matcher), + recursive=recursive, + ) + + +setup( + name=package_name, + version="2.0.3", + packages=find_packages(), + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + glob_files("launch", "*launch.[pxy][yma]*"), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="Michal Staniaszek", + maintainer_email="michal@robots.ox.ac.uk", + description="MongoDB interaction for ROS2", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "config_manager = mongodb_store.scripts.config_manager:main", + "example_message_store_client = mongodb_store.scripts.example_message_store_client:main", + "example_multi_event_log = mongodb_store.scripts.example_multi_event_log:main", + "message_store_node = mongodb_store.message_store_node:main", + "mongo_bridge = mongodb_store.scripts.mongo_bridge:main", + "mongodb_play = mongodb_store.scripts.mongodb_play:main", + "mongodb_server = mongodb_store.mongodb_server:main", + "replicator_client= mongodb_store.scripts.replicator_client:main", + "replicator_node = mongodb_store.scripts.replicator_node:main", + ], + }, +) diff --git a/mongodb_store/src/mongodb_store/message_store.py b/mongodb_store/src/mongodb_store/message_store.py deleted file mode 100644 index 1f5c0f2..0000000 --- a/mongodb_store/src/mongodb_store/message_store.py +++ /dev/null @@ -1,273 +0,0 @@ -from __future__ import absolute_import -import rospy -import mongodb_store_msgs.srv as dc_srv -import mongodb_store.util as dc_util -from mongodb_store_msgs.msg import StringPair, StringPairList, SerialisedMessage, Insert -from bson import json_util -from bson.objectid import ObjectId -import json -import copy - - -class MessageStoreProxy: - """ - A class that provides functions for storage and retrieval of ROS Message - objects in the mongodb_store. This is achieved by acting as a proxy to the - services provided by the MessageStore ROS node, and therefore requires the message - store node to be running in addition to the datacentre: - - `rosrun mongodb_store message_store_node.py` - - >>> from geometry_msgs.msg import Pose, Quaternion - >>> msg_store = MessageStoreProxy() - >>> p = Pose(Point(0, 1, 2), Quaternion(0, 0, 0 , 1)) - >>> msg_store.insert_named("my favourite pose", p) - >>> retrieved = msg_store.query_named("my favourite pose", Pose._type) - - For usage examples, please see `example_message_store_client.py` within the scripts - folder of mongodb_store. - - """ - - def __init__(self, service_prefix='/message_store', database='message_store', collection='message_store', queue_size=100): - """ - Args: - | service_prefix (str): The prefix to the *insert*, *update*, *delete* and - *query_messages* ROS services/ - | database (str): The MongoDB database that this object works with. - | collection (str): The MongoDB collect/on that this object works with. - """ - self.database = database - self.collection = collection - insert_service = service_prefix + '/insert' - update_service = service_prefix + '/update' - delete_service = service_prefix + '/delete' - query_service = service_prefix + '/query_messages' - # try and get the mongo service, block until available - found_services_first_try = True # if found straight away - while not rospy.is_shutdown(): - try: - rospy.wait_for_service(insert_service,5) - rospy.wait_for_service(update_service,5) - rospy.wait_for_service(query_service,5) - rospy.wait_for_service(delete_service,5) - break - except rospy.ROSException as e: - found_services_first_try = False - rospy.logerr("Could not get message store services. Maybe the message " - "store has not been started? Retrying..") - if not found_services_first_try: - rospy.loginfo("Message store services found.") - self.insert_srv = rospy.ServiceProxy(insert_service, dc_srv.MongoInsertMsg) - self.update_srv = rospy.ServiceProxy(update_service, dc_srv.MongoUpdateMsg) - self.query_srv = rospy.ServiceProxy(query_service, dc_srv.MongoQueryMsg) - self.delete_srv = rospy.ServiceProxy(delete_service, dc_srv.MongoDeleteMsg) - - insert_topic = service_prefix + '/insert' - self.pub_insert = rospy.Publisher(insert_topic, Insert, queue_size=queue_size) - - - def insert_named(self, name, message, meta = {}, wait=True): - """ - Inserts a ROS message into the message storage, giving it a name for convenient - later retrieval. - .. note:: Multiple messages can be stored with the same name. - - :Args: - | name (str): The name to refere to this message as. - | message (ROS Message): An instance of a ROS message type to store - | meta (dict): A dictionary of additional meta data to store in association - with thie message. - | wait (bool): If true, waits until database returns object id after insert - :Returns: - | (str) the ObjectId of the MongoDB document containing the stored message. - """ - # create a copy as we're modifying it - meta_copy = copy.copy(meta) - meta_copy["name"] = name - return self.insert(message, meta_copy, wait=wait) - - - def insert(self, message, meta = {}, wait=True): - """ - Inserts a ROS message into the message storage. - - :Args: - | message (ROS Message): An instance of a ROS message type to store - | meta (dict): A dictionary of additional meta data to store in association - with thie message. - | wait (bool): If true, waits until database returns object id after insert - :Returns: - | (str) the ObjectId of the MongoDB document containing the stored message. - - """ - # assume meta is a dict, convert k/v to tuple pairs - meta_tuple = (StringPair(dc_srv.MongoQueryMsgRequest.JSON_QUERY, json.dumps(meta, default=json_util.default)),) - serialised_msg = dc_util.serialise_message(message) - if wait: - return self.insert_srv(self.database, self.collection, serialised_msg, StringPairList(meta_tuple)).id - else: - msg = Insert(self.database, self.collection, serialised_msg, StringPairList(meta_tuple)) - self.pub_insert.publish(msg) - return True - - def query_id(self, id, type): - """ - Finds and returns the message with the given ID. - - :Parameters: - | id (str): The ObjectID of the MongoDB document holding the message. - | type (str): The ROS message type of the stored messsage to retrieve. - :Returns: - | message (ROS message), meta (dict): The retrieved message and associated metadata - or *None* if the named message could not be found. - """ - return self.query(type, {'_id': ObjectId(id)}, {}, True) - - def delete(self, message_id): - """ - Delete the message with the given ID. - - :Parameters: - | message_id (str) : The ObjectID of the MongoDB document holding the message. - :Returns: - | bool : was the object successfully deleted. - """ - return self.delete_srv(self.database, self.collection, message_id) - - def query_named(self, name, type, single = True, meta = {}, limit = 0): - """ - Finds and returns the message(s) with the given name. - - :Args: - | name (str): The name of the stored messages to retrieve. - | type (str): The type of the stored message. - | single (bool): Should only one message be returned? - | meta (dict): Extra queries on the meta data of the message. - | limit (int): Limit number of return documents - :Return: - | message (ROS message), meta (dict): The retrieved message and associated metadata - or *None* if the named message could not be found. - - """ - # create a copy as we're modifying it - meta_copy = copy.copy(meta) - meta_copy["name"] = name - return self.query(type, {}, meta_copy, single, [], limit) - - def update_named(self, name, message, meta = {}, upsert = False): - """ - Updates a named message. - - :Args: - | name (str): The name of the stored messages to update. - | message (ROS Message): The updated ROS message - | meta (dict): Updated meta data to store with the message. - | upsert (bool): If True, insert the named message if it doesnt exist. - :Return: - | str, bool: The MongoDB ObjectID of the document, and whether it was altered by - the update. - """ - meta_query = {} - meta_query["name"] = name - - # make sure the name goes into the meta info after update - meta_copy = copy.copy(meta) - meta_copy["name"] = name - - return self.update(message, meta_copy, {}, meta_query, upsert) - - def update_id(self, id, message, meta = {}, upsert = False): - """ - Updates a message by MongoDB ObjectId. - - :Args: - | id (str): The MongoDB ObjectId of the doucment storing the message. - | message (ROS Message): The updated ROS message - | meta (dict): Updated meta data to store with the message. - | upsert (bool): If True, insert the named message if it doesnt exist. - :Return: - | str, bool: The MongoDB ObjectID of the document, and whether it was altered by - the update. - - """ - - msg_query = {'_id': ObjectId(id)} - meta_query = {} - - return self.update(message, meta, msg_query, meta_query, upsert) - - def update(self, message, meta = {}, message_query = {}, meta_query = {}, upsert = False): - """ - Updates a message. - - :Args: - | message (ROS Message): The updated ROS message - | meta (dict): Updated meta data to store with the message. - | message_query (dict): A query to match the ROS message that is to be updated. - | meta_query (dict): A query to match against the meta data of the message to be updated - | upsert (bool): If True, insert the named message if it doesnt exist. - :Return: - | str, bool: The MongoDB ObjectID of the document, and whether it was altered by - the update. - - """ - # serialise the json queries to strings using json_util.dumps - message_query_tuple = (StringPair(dc_srv.MongoQueryMsgRequest.JSON_QUERY, json.dumps(message_query, default=json_util.default)),) - meta_query_tuple = (StringPair(dc_srv.MongoQueryMsgRequest.JSON_QUERY, json.dumps(meta_query, default=json_util.default)),) - meta_tuple = (StringPair(dc_srv.MongoQueryMsgRequest.JSON_QUERY, json.dumps(meta, default=json_util.default)),) - return self.update_srv(self.database, self.collection, upsert, StringPairList(message_query_tuple), StringPairList(meta_query_tuple), dc_util.serialise_message(message), StringPairList(meta_tuple)) - - - """ - Returns [message, meta] where message is the queried message and meta a dictionary of meta information. If single is false returns a list of these lists. - """ - def query(self, type, message_query = {}, meta_query = {}, single = False, sort_query = [], projection_query = {}, limit=0): - """ - Finds and returns message(s) matching the message and meta data queries. - - :Parameters: - | type (str): The ROS message type of the stored messsage to retrieve. - | message_query (dict): A query to match the actual ROS message - | meta_query (dict): A query to match against the meta data of the message - | sort_query (list of tuple): A query to request sorted list to mongodb module - | projection_query (dict): A query to request desired fields to be returned or excluded - | single (bool): Should only one message be returned? - | limit (int): Limit number of return documents - :Returns: - | [message, meta] where message is the queried message and meta a dictionary of - meta information. If single is false returns a list of these lists. - """ - # assume meta is a dict, convert k/v to tuple pairs for ROS msg type - - # serialise the json queries to strings using json_util.dumps - message_tuple = (StringPair(dc_srv.MongoQueryMsgRequest.JSON_QUERY, json.dumps(message_query, default=json_util.default)),) - meta_tuple = (StringPair(dc_srv.MongoQueryMsgRequest.JSON_QUERY, json.dumps(meta_query, default=json_util.default)),) - projection_tuple =(StringPair(dc_srv.MongoQueryMsgRequest.JSON_QUERY, json.dumps(projection_query, default=json_util.default)),) - - if len(sort_query) > 0: - sort_tuple = [StringPair(str(k), str(v)) for k, v in sort_query] - else: - sort_tuple = [] - - response = self.query_srv( - self.database, self.collection, type, single, limit, - StringPairList(message_tuple), - StringPairList(meta_tuple), - StringPairList(sort_tuple), - StringPairList(projection_tuple)) - - if response.messages is None: - messages = [] - metas = [] - else: - messages = map(dc_util.deserialise_message, response.messages) - metas = map(dc_util.string_pair_list_to_dictionary, response.metas) - - if single: - if len(messages) > 0: - return [messages[0], metas[0]] - else: - return [None, None] - else: - return zip(messages,metas) diff --git a/mongodb_store/src/mongodb_store/util.py b/mongodb_store/src/mongodb_store/util.py deleted file mode 100644 index 369d62c..0000000 --- a/mongodb_store/src/mongodb_store/util.py +++ /dev/null @@ -1,604 +0,0 @@ -from __future__ import print_function, absolute_import -import rospy -import genpy -from std_srvs.srv import Empty -import yaml -from bson import json_util, Binary -import json - -import copy -import platform -if float(platform.python_version()[0:2]) >= 3.0: - _PY3 = True - import io as StringIO -else: - _PY3 = False - import StringIO -from mongodb_store_msgs.msg import SerialisedMessage -from mongodb_store_msgs.srv import MongoQueryMsgRequest - -from pymongo.errors import ConnectionFailure - -import importlib -from datetime import datetime - -def check_connection_to_mongod(db_host, db_port, connection_string=None): - """ - Check connection to mongod server - - :Returns: - | bool : True on success, False if connection is not established. - """ - if check_for_pymongo(): - try: - try: - # pymongo 2.X - from pymongo import Connection - Connection(db_host, db_port) - return True - except: - # pymongo 3.X - from pymongo import MongoClient - if connection_string is None: - client = MongoClient(db_host, db_port, connect=False) - else: - client = MongoClient(connection_string) - result = client.admin.command('ismaster') - return True - except ConnectionFailure: - if connection_string is None: - rospy.logerr("Could not connect to mongo server %s:%d" % (db_host, db_port)) - rospy.logerr("Make sure mongod is launched on your specified host/port") - else: - rospy.logerr("Could not connect to mongo server %s" % (connection_string)) - rospy.logerr("Make sure mongod is launched on your specified host/port") - - return False - else: - return False - - -def wait_for_mongo(timeout=60, ns="/datacentre"): - """ - Waits for the mongo server, as started through the mongodb_store/mongodb_server.py wrapper - - :Returns: - | bool : True on success, False if server not even started. - """ - # Check that mongo is live, create connection - try: - rospy.wait_for_service(ns + "/wait_ready", timeout) - except rospy.exceptions.ROSException as e: - rospy.logerr("Can't connect to MongoDB server. Make sure mongodb_store/mongodb_server.py node is started.") - return False - wait = rospy.ServiceProxy(ns + '/wait_ready', Empty) - wait() - return True - -def check_for_pymongo(): - """ - Checks for required version of pymongo python library. - - :Returns: - | bool : True if found, otherwise Fale - """ - try: - import pymongo - except: - rospy.logerr("ERROR!!!") - rospy.logerr("Can't import pymongo, this is needed by mongodb_store.") - rospy.logerr("Make sure it is installed (sudo pip install pymongo)") - return False - - return True - -""" -Pick an object to use as MongoClient based on the currently installed pymongo -version. Use this instead of importing Connection or MongoClient from pymongo -directly. - -Example: - MongoClient = util.importMongoClient() -""" -def import_MongoClient(): - import pymongo - if pymongo.version >= '2.4': - def mongo_client_wrapper(*args, **kwargs): - return pymongo.MongoClient(*args, **kwargs) - return mongo_client_wrapper - else: - import functools - def mongo_client_wrapper(*args, **kwargs): - return pymongo.Connection(*args, **kwargs) - return functools.partial(mongo_client_wrapper, safe=True) - - -""" -Given a ROS msg and a dictionary of the right values, fill in the msg -""" -def _fill_msg(msg,dic): - for i in dic: - if isinstance(dic[i],dict): - _fill_msg(getattr(msg,i),dic[i]) - else: - setattr(msg,i,dic[i]) - - -""" -Given a document in the database, return metadata and ROS message -- must have been -""" -def document_to_msg_and_meta(document, TYPE): - meta = document["_meta"] - msg = TYPE() - _fill_msg(msg,document["msg"]) - return meta,msg - -""" -Given a document return ROS message -""" -def document_to_msg(document, TYPE): - msg = TYPE() - _fill_msg(msg,document) - return meta - - -def msg_to_document(msg): - """ - Given a ROS message, turn it into a (nested) dictionary suitable for the datacentre. - - >>> from geometry_msgs.msg import Pose - >>> msg_to_document(Pose()) - {'orientation': {'w': 0.0, 'x': 0.0, 'y': 0.0, 'z': 0.0}, - 'position': {'x': 0.0, 'y': 0.0, 'z': 0.0}} - - :Args: - | msg (ROS Message): An instance of a ROS message to convert - :Returns: - | dict : A dictionary representation of the supplied message. - """ - - - - - d = {} - - slot_types = [] - if hasattr(msg,'_slot_types'): - slot_types = msg._slot_types - else: - slot_types = [None] * len(msg.__slots__) - - - for (attr, type) in zip(msg.__slots__, slot_types): - d[attr] = sanitize_value(attr, getattr(msg, attr), type) - - return d - -def sanitize_value(attr, v, type): - """ - De-rosify a msg. - - Internal function used to convert ROS messages into dictionaries of pymongo insertable - values. - - :Args: - | attr(str): the ROS message slot name the value came from - | v: the value from the message's slot to make into a MongoDB able type - | type (str): The ROS type of the value passed, as given by the ressage slot_types member. - :Returns: - | A sanitized version of v. - """ - - # print '---' - # print attr - # print v.__class__ - # print type - # print v - - if isinstance(v, str): - if type == 'uint8[]': - v = Binary(v) - else: - # ensure unicode - try: - if _PY3: - v = str(v, "utf-8") - else: - v = unicode(v, "utf-8") - except UnicodeDecodeError as e: - # at this point we can deal with the encoding, so treat it as binary - v = Binary(v) - # no need to carry on with the other type checks below - return v - - if isinstance(v, rospy.Message): - return msg_to_document(v) - elif isinstance(v, genpy.rostime.Time): - return msg_to_document(v) - elif isinstance(v, genpy.rostime.Duration): - return msg_to_document(v) - elif isinstance(v, list): - result = [] - for t in v: - if hasattr(t, '_type'): - result.append(sanitize_value(None, t, t._type)) - else: - result.append(sanitize_value(None, t, None)) - return result - else: - return v - - - - -def store_message(collection, msg, meta, oid=None): - """ - Update ROS message into the DB - - :Args: - | collection (pymongo.Collection): the collection to store the message in - | msg (ROS message): an instance of a ROS message to store - | meta (dict): Additional meta data to store with the ROS message - | oid (str): An optional ObjectID for the MongoDB document created. - :Returns: - | str: ObjectId of the MongoDB document. - """ - doc=msg_to_document(msg) - doc["_meta"]=meta - # also store type information - doc["_meta"]["stored_class"] = msg.__module__ + "." + msg.__class__.__name__ - doc["_meta"]["stored_type"] = msg._type - - if msg._type == "soma2_msgs/SOMA2Object" or msg._type == "soma_msgs/SOMAObject" or msg._type == "soma_msgs/SOMAROIObject": - add_soma_fields(msg,doc) - - if hasattr(msg, '_connection_header'): - print(getattr(msg, '_connection_header')) - - if oid != None: - doc["_id"] = oid - - return collection.insert(doc) - -# """ -# Stores a ROS message into the DB with msg and meta as separate fields -# """ -# def store_message_separate(collection, msg, meta): -# doc={} -# doc["_meta"]=meta -# doc["msg"]=msg_to_document(msg) -# return collection.insert(doc) - - - -def store_message_no_meta(collection, msg): - """ - Store a ROS message sans meta data. - - :Args: - | collection (pymongo.Collection): The collection to store the message in - | msg (ROS message): An instance of a ROS message to store - :Returns: - | str: The ObjectId of the MongoDB document created. - """ - doc=msg_to_document(msg) - return collection.insert(doc) - - -def fill_message(message, document): - """ - Fill a ROS message from a dictionary, assuming the slots of the message are keys in the dictionary. - - :Args: - | message (ROS message): An instance of a ROS message that will be filled in - | document (dict): A dicionary containing all of the message attributes - - Example: - - >>> from geometry_msgs.msg import Pose - >>> d = dcu.msg_to_document(Pose()) - >>> d['position']['x']=27.0 - >>> new_pose = Pose( - >>> fill_message(new_pose, d) - >>> new_pose - position: - x: 27.0 - y: 0.0 - z: 0.0 - orientation: - x: 0.0 - y: 0.0 - z: 0.0 - w: 0.0 - """ - for slot, slot_type in zip(message.__slots__, - getattr(message,"_slot_types",[""]*len(message.__slots__))): - - # This check is required since objects returned with projection queries can have absent keys - if slot in document.keys(): - value = document[slot] - # fill internal structures if value is a dictionary itself - if isinstance(value, dict): - fill_message(getattr(message, slot), value) - elif isinstance(value, list) and slot_type.find("/")!=-1: - # if its a list and the type is some message (contains a "/") - lst=[] - # Remove [] from message type ([:-2]) - msg_type = type_to_class_string(slot_type[:-2]) - msg_class = load_class(msg_type) - for i in value: - msg = msg_class() - fill_message(msg, i) - lst.append(msg) - setattr(message, slot, lst) - else: - if ( (not _PY3 and isinstance(value, unicode)) or - (_PY3 and isinstance(value, str)) ): - setattr(message, slot, value.encode('utf-8')) - else: - setattr(message, slot, value) - -def dictionary_to_message(dictionary, cls): - """ - Create a ROS message from the given dictionary, using fill_message. - - :Args: - | dictionary (dict): A dictionary containing all of the atributes of the message - | cls (class): The python class of the ROS message type being reconstructed. - :Returns: - An instance of cls with the attributes filled. - - - Example: - - >>> from geometry_msgs.msg import Pose - >>> d = {'orientation': {'w': 0.0, 'x': 0.0, 'y': 0.0, 'z': 0.0}, - 'position': {'x': 27.0, 'y': 0.0, 'z': 0.0}} - >>> dictionary_to_message(d, Pose) - position: - x: 27.0 - y: 0.0 - z: 0.0 - orientation: - x: 0.0 - y: 0.0 - z: 0.0 - w: 0.0 - """ - message = cls() - - fill_message(message, dictionary) - - return message - -def query_message(collection, query_doc, sort_query=[], projection_query={},find_one=False, limit=0): - """ - Peform a query for a stored messages, returning results in list. - - :Args: - | collection (pymongo.Collection): The collection to query - | query_doc (dict): The MongoDB query to execute - | sort_query (list of tuple): The MongoDB query to sort - | projection_query (dict): The projection query - | find_one (bool): Returns one matching document if True, otherwise all matching. - | limit (int): Limits number of return documents. 0 means no limit - :Returns: - | dict or list of dict: the MongoDB document(s) found by the query - """ - - if find_one: - ids = () - if sort_query: - if not projection_query: - result = collection.find_one(query_doc, sort=sort_query) - else: - result = collection.find_one(query_doc, projection_query, sort=sort_query) - elif projection_query: - result = collection.find_one(query_doc, projection_query) - else: - result = collection.find_one(query_doc) - if result: - return [ result ] - else: - return [] - else: - if sort_query: - if not projection_query: - return [ result for result in collection.find(query_doc).sort(sort_query).limit(limit) ] - else: - return [ result for result in collection.find(query_doc, projection_query).sort(sort_query).limit(limit) ] - elif projection_query: - return [ result for result in collection.find(query_doc, projection_query).limit(limit) ] - else: - return [ result for result in collection.find(query_doc).limit(limit) ] - -def update_message(collection, query_doc, msg, meta, upsert): - """ - Update ROS message in the DB, return updated id and true if db altered. - - :Args: - | collection (pymongo.Collection): The collection to update in - | query_doc (dict): The MongoDB query to execute to select document for update - | msg (ROS message): An instance of a ROS message to update to - | meta (dict): New meta data to update the stored message with - | upsert (bool): If message does not already exits, create if upsert==True. - :Returns: - | str, bool: the OjectId of the updated document and whether it was altered by - the operation - """ - # see if it's in db first - result = collection.find_one(query_doc) - - # if it's not in there but we're allowed to insert - if not result: - if upsert: - return store_message(collection, msg, meta), True - else: - return "", False - - # convert msg to db document - doc=msg_to_document(msg) - - if msg._type == "soma2_msgs/SOMA2Object" or msg._type == "soma_msgs/SOMAObject" or msg._type == "soma_msgs/SOMAROIObject": - add_soma_fields(msg,doc) - - #update _meta - doc["_meta"] = result["_meta"] - #merge the two dicts, overwiriting elements in doc["_meta"] with elements in meta - doc["_meta"]=dict(list(doc["_meta"].items()) + list(meta.items())) - - # ensure necessary parts are there too - doc["_meta"]["stored_class"] = msg.__module__ + "." + msg.__class__.__name__ - doc["_meta"]["stored_type"] = msg._type - - return collection.update(query_doc, doc), True - - -def query_message_ids(collection, query_doc, find_one): - """ - Peform a query for a stored message, returning a tuple of id strings - - :Args: - | collection (pymongo.Collection): The collection to search - | query_doc (dict): The MongoDB query to execute - | find_one (bool): Find one matching document if True, otherwise all matching. - :Returns: - | tuple of strings: all ObjectIds of matching documents - """ - if find_one: - result = collection.find_one(query_doc) - if result: - return str(result["_id"]), - else: - return tuple(str(result["_id"]) for result in collection.find(query_doc, {'_id':1})) - - - -def type_to_class_string(type): - """ - Takes a ROS msg type and turns it into a Python module and class name. - - E.g - - >>> type_to_class_string("geometry_msgs/Pose") - geometry_msgs.msg._Pose.Pose - - :Args: - | type (str): The ROS message type to return class string - :Returns: - | str: A python class string for the ROS message type supplied - """ - parts = type.split('/') - cls_string = "%s.msg._%s.%s" % (parts[0], parts[1], parts[1]) - return cls_string - -def load_class(full_class_string): - """ - Dynamically load a class from a string - shamelessly ripped from: http://thomassileo.com/blog/2012/12/21/dynamically-load-python-modules-or-classes/ - - :Args: - | full_class_string (str): The python class to dynamically load - :Returns: - | class: the loaded python class. - """ - # todo: cache classes (if this is an overhead) - class_data = full_class_string.split(".") - module_path = ".".join(class_data[:-1]) - class_str = class_data[-1] - module = importlib.import_module(module_path) - # Finally, we retrieve the Class - return getattr(module, class_str) - - -def serialise_message(message): - """ - Create a mongodb_store_msgs/SerialisedMessage instance from a ROS message. - - :Args: - | message (ROS message): The message to serialise - :Returns: - | mongodb_store_msgs.msg.SerialisedMessage: A serialies copy of message - """ - buf=StringIO.StringIO() - message.serialize(buf) - serialised_msg = SerialisedMessage() - serialised_msg.msg = buf.getvalue() - serialised_msg.type = message._type - return serialised_msg - -def deserialise_message(serialised_message): - """ - Create a ROS message from a mongodb_store_msgs/SerialisedMessage - - :Args: - | serialised_message (mongodb_store_msgs.msg.SerialisedMessage): The message to deserialise - :Returns: - | ROS message: The message deserialised - """ - cls_string = type_to_class_string(serialised_message.type) - cls = load_class(cls_string) - # instantiate an object from the class - message = cls() - # deserialize data into object - message.deserialize(serialised_message.msg) - return message - - -def string_pair_list_to_dictionary_no_json(spl): - """ - Covert a mongodb_store_msgs/StringPairList into a dictionary, ignoring content - - :Args: - | spl (StringPairList): The list of (key, value) to pairs convert - :Returns: - | dict: resulting dictionary - """ - return dict((pair.first, pair.second) for pair in spl) - -def string_pair_list_to_dictionary(spl): - """ - Creates a dictionary from a mongodb_store_msgs/StringPairList which could contain JSON as a string. - If the first entry in the supplied list is a JSON query then the returned dictionary is loaded from that. - - :Args: - | spl (StringPairList): The list of (key, value) pairs to convert - :Returns: - | dict: resulting dictionary - """ - if len(spl.pairs) > 0 and spl.pairs[0].first == MongoQueryMsgRequest.JSON_QUERY: - # print "looks like %s", spl.pairs[0].second - return json.loads(spl.pairs[0].second, object_hook=json_util.object_hook) - # else use the string pairs - else: - return string_pair_list_to_dictionary_no_json(spl.pairs) - -def topic_name_to_collection_name(topic_name): - """ - Converts the fully qualified name of a topic into legal mongodb collection name. - """ - return topic_name.replace("/", "_")[1:] - -def add_soma_fields(msg,doc): - """ - For soma Object msgs adds the required fields as indexes to the mongodb object. - """ - - if hasattr(msg, 'pose'): - doc["loc"] = [doc["pose"]["position"]["x"],doc["pose"]["position"]["y"]] - if hasattr(msg,'logtimestamp'): - doc["timestamp"] = datetime.utcfromtimestamp(doc["logtimestamp"]) -#doc["timestamp"] = datetime.strptime(doc["logtime"], "%Y-%m-%dT%H:%M:%SZ") - - if hasattr(msg, 'geotype'): - if(doc["geotype"] == "Point"): - for p in doc["geoposearray"]["poses"]: - doc["geoloc"] = {'type': doc['geotype'],'coordinates': [p["position"]["x"], p["position"]["y"]]} - if(msg._type =="soma_msgs/SOMAROIObject"): - coordinates = [] - doc["geotype"] = "Polygon" - for p in doc["geoposearray"]["poses"]: - coordinates.append([p["position"]["x"], p["position"]["y"]]) - coordinates2=[] - coordinates2.append(coordinates) - doc["geoloc"] = {'type': doc['geotype'],'coordinates': coordinates2} diff --git a/mongodb_store/tests/config_manager.test b/mongodb_store/tests/config_manager.test index 827541e..b710864 100644 --- a/mongodb_store/tests/config_manager.test +++ b/mongodb_store/tests/config_manager.test @@ -1,8 +1,8 @@ - + -j + diff --git a/mongodb_store/tests/test_messagestore.py b/mongodb_store/tests/test_messagestore.py index 1aa0887..a406d40 100755 --- a/mongodb_store/tests/test_messagestore.py +++ b/mongodb_store/tests/test_messagestore.py @@ -63,19 +63,16 @@ def test_add_message(self): # get documents with limit result_limited = msg_store.query(Pose._type, message_query={'orientation.z': {'$gt': 10} }, sort_query=[("$natural", 1)], limit=10) self.assertEqual(len(result_limited), 10) - self.assertListEqual([int(doc[0].orientation.x) for doc in result_limited], range(10)) + self.assertListEqual([int(doc[0].orientation.x) for doc in result_limited], list(range(10))) - #get documents without "orientation" field - result_no_id = msg_store.query(Pose._type, message_query={}, projection_query={"orientation": 0}) + #get documents without "orientation" field + result_no_id = msg_store.query(Pose._type, message_query={}, projection_query={"orientation": 0}) for doc in result_no_id: - self.assertEqual(int(doc[0].orientation.z),0 ) - - - + self.assertEqual(int(doc[0].orientation.z), 0) # must remove the item or unittest only really valid once - print meta["_id"] - print str(meta["_id"]) + print(meta["_id"]) + print(str(meta["_id"])) deleted = msg_store.delete(str(meta["_id"])) self.assertTrue(deleted) diff --git a/mongodb_store_msgs/CHANGELOG.rst b/mongodb_store_msgs/CHANGELOG.rst index f737357..fbeae08 100644 --- a/mongodb_store_msgs/CHANGELOG.rst +++ b/mongodb_store_msgs/CHANGELOG.rst @@ -2,6 +2,9 @@ Changelog for package mongodb_store_msgs ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +0.6.0 (2022-09-20) +------------------ + 0.5.2 (2019-11-11) ------------------ diff --git a/mongodb_store_msgs/CMakeLists.txt b/mongodb_store_msgs/CMakeLists.txt index ed1c751..0900219 100644 --- a/mongodb_store_msgs/CMakeLists.txt +++ b/mongodb_store_msgs/CMakeLists.txt @@ -1,35 +1,28 @@ -cmake_minimum_required(VERSION 2.8.3) +cmake_minimum_required(VERSION 3.10) project(mongodb_store_msgs) -find_package(catkin REQUIRED COMPONENTS message_generation actionlib actionlib_msgs) - -add_message_files( - FILES - StringList.msg - StringPair.msg - StringPairList.msg - SerialisedMessage.msg - Insert.msg -) - -add_service_files( - FILES - MongoInsertMsg.srv - MongoUpdateMsg.srv - MongoQueryMsg.srv - MongoQuerywithProjectionMsg.srv - MongoDeleteMsg.srv +if(POLICY CMP0148) +# don't show warnings about this policy +cmake_policy(SET CMP0148 OLD) +endif() + +find_package(ament_cmake) +find_package(std_msgs) +find_package(action_msgs) +find_package(builtin_interfaces) +find_package(rosidl_default_generators) + +file(GLOB msg_files RELATIVE "${CMAKE_CURRENT_LIST_DIR}" + "${CMAKE_CURRENT_LIST_DIR}/msg/*.msg" + "${CMAKE_CURRENT_LIST_DIR}/srv/*.srv" + "${CMAKE_CURRENT_LIST_DIR}/action/*.action" ) -add_action_files( - FILES - MoveEntries.action +rosidl_generate_interfaces(${PROJECT_NAME} + ${msg_files} + DEPENDENCIES std_msgs action_msgs builtin_interfaces ) +ament_export_dependencies(rosidl_default_runtime) -generate_messages(DEPENDENCIES actionlib_msgs) - -catkin_package( - CATKIN_DEPENDS message_generation -) - +ament_package() \ No newline at end of file diff --git a/mongodb_store_msgs/action/MoveEntries.action b/mongodb_store_msgs/action/MoveEntries.action index 4fc24a3..72807ef 100644 --- a/mongodb_store_msgs/action/MoveEntries.action +++ b/mongodb_store_msgs/action/MoveEntries.action @@ -3,7 +3,7 @@ string database # the collections to move entries from StringList collections # only entries before rospy.get_rostime() - move_before are moved. if 0, all are moved -duration move_before +builtin_interfaces/Duration move_before # delete moved entries after replication bool delete_after_move # query to move entries by diff --git a/mongodb_store_msgs/package.xml b/mongodb_store_msgs/package.xml index 58f93c1..5489e47 100644 --- a/mongodb_store_msgs/package.xml +++ b/mongodb_store_msgs/package.xml @@ -1,7 +1,7 @@ - + mongodb_store_msgs - 0.5.2 + 0.6.0 The mongodb_store_msgs package Nick Hawes @@ -13,18 +13,19 @@ MIT Nick Hawes - - catkin - catkin - message_generation - message_runtime - message_generation + ament_cmake -actionlib_msgs - actionlib + rosidl_default_generators + rosidl_default_runtime + rosidl_interface_packages + + _msgs + action_msgs + builtin_interfaces + + + ament_cmake + -actionlib_msgs - actionlib - diff --git a/mongodb_store/srv/GetParam.srv b/mongodb_store_msgs/srv/GetParam.srv similarity index 100% rename from mongodb_store/srv/GetParam.srv rename to mongodb_store_msgs/srv/GetParam.srv diff --git a/mongodb_store/srv/MongoFind.srv b/mongodb_store_msgs/srv/MongoFind.srv similarity index 100% rename from mongodb_store/srv/MongoFind.srv rename to mongodb_store_msgs/srv/MongoFind.srv diff --git a/mongodb_store/srv/MongoInsert.srv b/mongodb_store_msgs/srv/MongoInsert.srv similarity index 100% rename from mongodb_store/srv/MongoInsert.srv rename to mongodb_store_msgs/srv/MongoInsert.srv diff --git a/mongodb_store/srv/MongoUpdate.srv b/mongodb_store_msgs/srv/MongoUpdate.srv similarity index 100% rename from mongodb_store/srv/MongoUpdate.srv rename to mongodb_store_msgs/srv/MongoUpdate.srv diff --git a/mongodb_store/srv/SetParam.srv b/mongodb_store_msgs/srv/SetParam.srv similarity index 100% rename from mongodb_store/srv/SetParam.srv rename to mongodb_store_msgs/srv/SetParam.srv