diff --git a/gazebo_ros2_control/CMakeLists.txt b/gazebo_ros2_control/CMakeLists.txt index cfba1298..fc3004e1 100644 --- a/gazebo_ros2_control/CMakeLists.txt +++ b/gazebo_ros2_control/CMakeLists.txt @@ -28,7 +28,7 @@ link_directories( # Libraries add_library(${PROJECT_NAME} SHARED - src/gazebo_ros2_control_plugin.cpp + src/gazebo_ros2_control_plugin.cpp src/node.cc ) ament_target_dependencies(${PROJECT_NAME} angles diff --git a/gazebo_ros2_control/include/gazebo_ros2_control/gazebo_system.hpp b/gazebo_ros2_control/include/gazebo_ros2_control/gazebo_system.hpp index 8230a4ec..6a1a53ce 100644 --- a/gazebo_ros2_control/include/gazebo_ros2_control/gazebo_system.hpp +++ b/gazebo_ros2_control/include/gazebo_ros2_control/gazebo_system.hpp @@ -74,6 +74,7 @@ class GazeboSystem : public GazeboSystemInterface bool initSim( rclcpp::Node::SharedPtr & model_nh, gazebo::physics::ModelPtr parent_model, + gazebo::transport::NodePtr & transport_nh, const hardware_interface::HardwareInfo & hardware_info, sdf::ElementPtr sdf) override; @@ -86,6 +87,10 @@ class GazeboSystem : public GazeboSystemInterface const hardware_interface::HardwareInfo & hardware_info, gazebo::physics::ModelPtr parent_model); + void registerGazeboTopics( + const hardware_interface::HardwareInfo & hardware_info, + gazebo::physics::ModelPtr parent_model); + /// \brief Private data class std::unique_ptr dataPtr; }; diff --git a/gazebo_ros2_control/include/gazebo_ros2_control/gazebo_system_interface.hpp b/gazebo_ros2_control/include/gazebo_ros2_control/gazebo_system_interface.hpp index 8a4cf1fb..61a58340 100644 --- a/gazebo_ros2_control/include/gazebo_ros2_control/gazebo_system_interface.hpp +++ b/gazebo_ros2_control/include/gazebo_ros2_control/gazebo_system_interface.hpp @@ -26,6 +26,7 @@ #include "hardware_interface/system_interface.hpp" +#include "gazebo_ros2_control/node.hh" #include "rclcpp/rclcpp.hpp" @@ -69,6 +70,7 @@ class GazeboSystemInterface virtual bool initSim( rclcpp::Node::SharedPtr & model_nh, gazebo::physics::ModelPtr parent_model, + gazebo::transport::NodePtr & transport_nh, const hardware_interface::HardwareInfo & hardware_info, sdf::ElementPtr sdf) = 0; diff --git a/gazebo_ros2_control/include/gazebo_ros2_control/node.hh b/gazebo_ros2_control/include/gazebo_ros2_control/node.hh new file mode 100644 index 00000000..87486211 --- /dev/null +++ b/gazebo_ros2_control/include/gazebo_ros2_control/node.hh @@ -0,0 +1,610 @@ +/* + * Copyright (C) 2012 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#ifndef GAZEBO_TRANSPORT_NODE_HH_ +#define GAZEBO_TRANSPORT_NODE_HH_ + +#undef emit +#include +#define emit +#ifndef TBB_VERSION_MAJOR +#include +#endif + +// This fixes compiler warnings, see #3147 and #3160 +#ifndef BOOST_BIND_GLOBAL_PLACEHOLDERS +#define BOOST_BIND_GLOBAL_PLACEHOLDERS +#endif + +#include +#include +#include +#include + +#include +#include + +#if TBB_VERSION_MAJOR >= 2021 +#include "gazebo/transport/TaskGroup.hh" +#endif +#include "gazebo/transport/TransportTypes.hh" +#include "gazebo/transport/TopicManager.hh" +#include "gazebo/util/system.hh" + +namespace gazebo +{ + namespace transport + { + /// \cond + /// \brief Task used by Node::Publish to publish on a one-time publisher +#if TBB_VERSION_MAJOR < 2021 +class GZ_TRANSPORT_VISIBLE PublishTask: public tbb::task +#else +class GZ_TRANSPORT_VISIBLE PublishTask +#endif + { + /// \brief Constructor + /// \param[in] _pub Publisher to publish the message on. + /// \param[in] _message Message to publish + +public: + PublishTask( + transport::PublisherPtr _pub, + const google::protobuf::Message & _message) + : pub(_pub) + { + this->msg = _message.New(); + this->msg->CopyFrom(_message); + } + +#if TBB_VERSION_MAJOR < 2021 + /// \brief Overridden function from tbb::task that exectues the + /// publish task. + +public: + tbb::task * execute() +#else + /// \brief Executes the publish task. + +public: + void operator()() const +#endif + { + this->pub->WaitForConnection(); + this->pub->Publish(*this->msg, true); + this->pub->SendMessage(); + delete this->msg; +#if TBB_VERSION_MAJOR < 2021 + this->pub.reset(); + return NULL; +#endif + } + + /// \brief Pointer to the publisher. + +private: + transport::PublisherPtr pub; + + /// \brief Message to publish + +private: + google::protobuf::Message * msg; + }; + /// \endcond + + /// \addtogroup gazebo_transport + /// \{ + + /// \class Node Node.hh transport/transport.hh + /// \brief A node can advertise and subscribe topics, publish on + /// advertised topics and listen to subscribed topics. +class GZ_TRANSPORT_VISIBLE Node: +public boost::enable_shared_from_this < Node > +{ + /// \brief Constructor + +public: + Node(); + + /// \brief Destructor + +public: + virtual ~Node(); + + /// \brief Init the node + /// \param[in] _space Set the namespace of this topic. If this is the + /// first Node initialized, then this namespace will become the global + /// namespace. If left blank, the topic will initialize to the first + /// namespace on the Master. If the Master does not have any namespaces + /// within 1 second of calling this function, the Node will be initialized + /// with a namespace of "default". + /// \sa TryInit() + +public: + void Init(const std::string & _space = ""); + + /// \brief Try to initialize the node to use the global namespace, and + /// specify the maximum wait time. If a global namespace is not available + /// by the time a duration of _maxWait has transpired, this will return + /// false, and the Node will not be initialized. + /// \param[in] _maxWait The maximum amount of time to wait for the Node to + /// initialize. The initialization might be delayed while waiting for + /// namespaces to be found. The default is 1 second. + /// \return True if a global namespace was found, and this node has been + /// initialized to it. False if a global namespace was not found (the node + /// will not be initialized in that case). + /// \sa Init() + +public: + bool TryInit( + const common::Time & _maxWait = common::Time(1, 0)); + + /// \brief Check if this Node has been initialized. + /// \return True if initialized, otherwise false + /// \sa Init() + /// \sa TryInit() + +public: + bool IsInitialized() const; + + /// \brief Finalize the node + +public: + void Fini(); + + /// \brief Get the topic namespace for this node + /// \return The namespace + +public: + std::string GetTopicNamespace() const; + + /// \brief Decode a topic name + /// \param[in] The encoded name + /// \return The decoded name + +public: + std::string DecodeTopicName(const std::string & _topic); + + /// \brief Encode a topic name + /// \param[in] The decoded name + /// \return The encoded name + +public: + std::string EncodeTopicName(const std::string & _topic); + + /// \brief Get the unique ID of the node + /// \return The unique ID of the node + +public: + unsigned int GetId() const; + + /// \brief Process all publishers, which has each publisher send it's + /// most recent message over the wire. This is for internal use only + +public: + void ProcessPublishers(); + + /// \brief Process incoming messages. + +public: + void ProcessIncoming(); + + /// \brief Return true if a subscriber on a specific topic is latched. + /// \param[in] _topic Name of the topic to check. + /// \return True if a latched subscriber exists. + +public: + bool HasLatchedSubscriber(const std::string & _topic) const; + + + /// \brief A convenience function for a one-time publication of + /// a message. This is inefficient, compared to + /// Node::Advertise followed by Publisher::Publish. This function + /// should only be used when sending a message very infrequently. + /// \param[in] _topic The topic to advertise + /// \param[in] _message Message to be published + +public: + template < typename M > + void Publish( + const std::string & _topic, + const google::protobuf::Message & _message) + { + transport::PublisherPtr pub = this->Advertise < M > (_topic); +#if TBB_VERSION_MAJOR < 2021 + PublishTask * task = new(tbb::task::allocate_root()) + PublishTask(pub, _message); + + tbb::task::enqueue(*task); + return; +#else + this->taskGroup.run < PublishTask > (pub, _message); +#endif + } + + /// \brief Advertise a topic + /// \param[in] _topic The topic to advertise + /// \param[in] _queueLimit The maximum number of outgoing messages to + /// queue for delivery + /// \param[in] _hz Update rate for the publisher. Units are + /// 1.0/seconds. + /// \return Pointer to new publisher object + +public: + template < typename M > + transport::PublisherPtr Advertise( + const std::string & _topic, + unsigned int _queueLimit = 1000, + double _hzRate = 0) + { + std::string decodedTopic = this->DecodeTopicName(_topic); + PublisherPtr publisher = + transport::TopicManager::Instance()->Advertise < M > ( + decodedTopic, _queueLimit, _hzRate); + + boost::mutex::scoped_lock lock(this->publisherMutex); + publisher->SetNode(shared_from_this()); + this->publishers.push_back(publisher); + + return publisher; + } + + /// \brief A convenience function for a one-time publication of + /// a message. This is inefficient, compared to + /// Node::Advertise followed by Publisher::Publish. This function + /// should only be used when sending a message very infrequently. + /// \param[in] _topic The topic to advertise + /// \param[in] _message Message to be published + +public: + void Publish( + const std::string & _topic, + const google::protobuf::Message & _message) + { + transport::PublisherPtr pub = this->Advertise( + _topic, + _message.GetTypeName()); + pub->WaitForConnection(); + + pub->Publish(_message, true); + } + + /// \brief Advertise a topic + /// \param[in] _topic The topic to advertise + /// \param[in] _queueLimit The maximum number of outgoing messages to + /// queue for delivery + /// \param[in] _hz Update rate for the publisher. Units are + /// 1.0/seconds. + /// \return Pointer to new publisher object + +public: + transport::PublisherPtr Advertise( + const std::string & _topic, + const std::string & _msgTypeName, + unsigned int _queueLimit = 1000, + double _hzRate = 0) + { + std::string decodedTopic = this->DecodeTopicName(_topic); + PublisherPtr publisher = + transport::TopicManager::Instance()->Advertise( + decodedTopic, _msgTypeName, _queueLimit, _hzRate); + + boost::mutex::scoped_lock lock(this->publisherMutex); + publisher->SetNode(shared_from_this()); + this->publishers.push_back(publisher); + + return publisher; + } + + /// \brief Subscribe to a topic using a class method as the callback + /// \param[in] _topic The topic to subscribe to + /// \param[in] _fp Class method to be called on receipt of new message + /// \param[in] _obj Class instance to be used on receipt of new message + /// \param[in] _latching If true, latch latest incoming message; + /// otherwise don't latch + /// \return Pointer to new Subscriber object + +public: + template < typename M, typename T > + SubscriberPtr Subscribe( + const std::string & _topic, + void (T::* _fp)(const boost::shared_ptr < M const > &), T * _obj, + bool _latching = false) + { + SubscribeOptions ops; + std::string decodedTopic = this->DecodeTopicName(_topic); + ops.template Init < M > (decodedTopic, shared_from_this(), _latching); + + { + boost::recursive_mutex::scoped_lock lock(this->incomingMutex); + this->callbacks[decodedTopic].push_back( + CallbackHelperPtr( + new CallbackHelperT < M > (boost::bind( + _fp, _obj, + boost::placeholders::_1), _latching))); + } + + SubscriberPtr result = + transport::TopicManager::Instance()->Subscribe(ops); + + result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId()); + + return result; + } + + /// \brief Subscribe to a topic using a bare function as the callback + /// \param[in] _topic The topic to subscribe to + /// \param[in] _fp Function to be called on receipt of new message + /// \param[in] _latching If true, latch latest incoming message; + /// otherwise don't latch + /// \return Pointer to new Subscriber object + +public: + template < typename M > + SubscriberPtr Subscribe( + const std::string & _topic, + void (* _fp)(const boost::shared_ptr < M const > &), + bool _latching = false) + { + SubscribeOptions ops; + std::string decodedTopic = this->DecodeTopicName(_topic); + ops.template Init < M > (decodedTopic, shared_from_this(), _latching); + + { + boost::recursive_mutex::scoped_lock lock(this->incomingMutex); + this->callbacks[decodedTopic].push_back( + CallbackHelperPtr(new CallbackHelperT < M > (_fp, _latching))); + } + + SubscriberPtr result = + transport::TopicManager::Instance()->Subscribe(ops); + + result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId()); + + return result; + } + + // THIS IS NEW + /// \brief Subscribe to a topic using a bare function as the callback + /// \param[in] _topic The topic to subscribe to + /// \param[in] _fp Function to be called on receipt of new message + /// \param[in] _latching If true, latch latest incoming message; + /// otherwise don't latch + /// \return Pointer to new Subscriber object + +public: + template < typename M > + SubscriberPtr Subscribe( + const std::string & _topic, + const boost::function < void(const boost::shared_ptr < M const > &) > & _cb, + bool _latching = false) + { + SubscribeOptions ops; + std::string decodedTopic = this->DecodeTopicName(_topic); + ops.template Init < M > (decodedTopic, shared_from_this(), _latching); + + { + boost::recursive_mutex::scoped_lock lock(this->incomingMutex); + this->callbacks[decodedTopic].push_back( + CallbackHelperPtr(new CallbackHelperT < M > (_cb, _latching))); + } + + SubscriberPtr result = + transport::TopicManager::Instance()->Subscribe(ops); + + result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId()); + + return result; + } + + /// \brief Subscribe to a topic using a class method as the callback + /// \param[in] _topic The topic to subscribe to + /// \param[in] _fp Class method to be called on receipt of new message + /// \param[in] _obj Class instance to be used on receipt of new message + /// \param[in] _latching If true, latch latest incoming message; + /// otherwise don't latch + /// \return Pointer to new Subscriber object + template < typename T > + SubscriberPtr Subscribe( + const std::string & _topic, + void (T::* _fp)(const std::string &), T * _obj, + bool _latching = false) + { + SubscribeOptions ops; + std::string decodedTopic = this->DecodeTopicName(_topic); + ops.Init(decodedTopic, shared_from_this(), _latching); + + { + boost::recursive_mutex::scoped_lock lock(this->incomingMutex); + this->callbacks[decodedTopic].push_back( + CallbackHelperPtr( + new RawCallbackHelper(boost::bind(_fp, _obj, boost::placeholders::_1)))); + } + + SubscriberPtr result = + transport::TopicManager::Instance()->Subscribe(ops); + + result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId()); + + return result; + } + + + /// \brief Subscribe to a topic using a bare function as the callback + /// \param[in] _topic The topic to subscribe to + /// \param[in] _fp Function to be called on receipt of new message + /// \param[in] _latching If true, latch latest incoming message; + /// otherwise don't latch + /// \return Pointer to new Subscriber object + SubscriberPtr Subscribe( + const std::string & _topic, + void (* _fp)(const std::string &), bool _latching = false) + { + SubscribeOptions ops; + std::string decodedTopic = this->DecodeTopicName(_topic); + ops.Init(decodedTopic, shared_from_this(), _latching); + + { + boost::recursive_mutex::scoped_lock lock(this->incomingMutex); + this->callbacks[decodedTopic].push_back( + CallbackHelperPtr(new RawCallbackHelper(_fp))); + } + + SubscriberPtr result = + transport::TopicManager::Instance()->Subscribe(ops); + + result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId()); + + return result; + } + + /// \brief Handle incoming data. + /// \param[in] _topic Topic for which the data was received + /// \param[in] _msg The message that was received + /// \return true if the message was handled successfully, false otherwise + +public: + bool HandleData( + const std::string & _topic, + const std::string & _msg); + + /// \brief Handle incoming msg. + /// \param[in] _topic Topic for which the data was received + /// \param[in] _msg The message that was received + /// \return true if the message was handled successfully, false otherwise + +public: + bool HandleMessage(const std::string & _topic, MessagePtr _msg); + + /// \brief Add a latched message to the node for publication. + /// + /// This is called when a subscription is connected to a + /// publication. + /// \param[in] _topic Name of the topic to publish data on. + /// \param[in] _msg The message to publish. + +public: + void InsertLatchedMsg( + const std::string & _topic, + const std::string & _msg); + + /// \brief Add a latched message to the node for publication. + /// + /// This is called when a subscription is connected to a + /// publication. + /// \param[in] _topic Name of the topic to publish data on. + /// \param[in] _msg The message to publish. + +public: + void InsertLatchedMsg( + const std::string & _topic, + MessagePtr _msg); + + /// \brief Get the message type for a topic + /// \param[in] _topic The topic + /// \return The message type + +public: + std::string GetMsgType(const std::string & _topic) const; + + /// \internal + /// \brief Remove a callback. This should only be called by + /// Subscriber.cc + /// \param[in] _topic Name of the topic. + /// \param[in] _id Id of the callback. + +public: + void RemoveCallback(const std::string & _topic, unsigned int _id); + + /// \internal + /// \brief Private implementation of Init() and TryInit() + /// \param[in] _space Namespace to initialize this Node to. Use an empty + /// string to have the namespace inferred. + /// \param[in] _maxWait Maximum amount of time to wait for a namespace if + /// _space is left blank. + /// \param[in] _fallbackToDefault If true, after _maxWait passes without a + /// global namespace appearing, the Node will be initialized to have a + /// namespace of "default". + /// \return True if this Node is initialized upon returning from this + /// function. + +private: + bool PrivateInit( + const std::string & _space, + const common::Time & _maxWait, + const bool _fallbackToDefault); + +private: + std::string topicNamespace; + +private: + std::vector < PublisherPtr > publishers; + +private: + std::vector < PublisherPtr > ::iterator publishersIter; + +private: + static unsigned int idCounter; + +private: + unsigned int id; + +private: + typedef std::list < CallbackHelperPtr > Callback_L; + +private: + typedef std::map < std::string, Callback_L > Callback_M; + +private: + Callback_M callbacks; + +private: + std::map < std::string, std::list < std::string >> incomingMsgs; + + /// \brief List of newly arrive messages + +private: + std::map < std::string, std::list < MessagePtr >> incomingMsgsLocal; + +#if TBB_VERSION_MAJOR >= 2021 + /// \brief For managing asynchronous tasks with tbb + +private: + TaskGroup taskGroup; +#endif + +private: + boost::mutex publisherMutex; + +private: + boost::mutex publisherDeleteMutex; + +private: + boost::recursive_mutex incomingMutex; + + /// \brief make sure we don't call ProcessingIncoming simultaneously + /// from separate threads. + +private: + boost::recursive_mutex processIncomingMutex; + +private: + bool initialized; + }; + } // namespace transport +} // namespace gazebo +#endif // GAZEBO_TRANSPORT_NODE_HH_ diff --git a/gazebo_ros2_control/src/gazebo_ros2_control_plugin.cpp b/gazebo_ros2_control/src/gazebo_ros2_control_plugin.cpp index b0a8bb07..5c213d5c 100644 --- a/gazebo_ros2_control/src/gazebo_ros2_control_plugin.cpp +++ b/gazebo_ros2_control/src/gazebo_ros2_control_plugin.cpp @@ -42,6 +42,9 @@ #include "gazebo_ros2_control/gazebo_ros2_control_plugin.hpp" #include "gazebo_ros2_control/gazebo_system.hpp" +#include "gazebo/msgs/msgs.hh" +#include "gazebo/physics/physics.hh" + #include "pluginlib/class_loader.hpp" #include "rclcpp/rclcpp.hpp" @@ -73,9 +76,12 @@ class GazeboRosControlPrivate // Get the URDF XML from the parameter server std::string getURDF(std::string param_name) const; - // Node Handles + // Node Handle gazebo_ros::Node::SharedPtr model_nh_; + // Gazebo transport handle + gazebo::transport::NodePtr transport_nh_; + // Pointer to the model gazebo::physics::ModelPtr parent_model_; @@ -134,6 +140,10 @@ GazeboRosControlPlugin::~GazeboRosControlPlugin() // Overloaded Gazebo entry point void GazeboRosControlPlugin::Load(gazebo::physics::ModelPtr parent, sdf::ElementPtr sdf) { + // Create and init a new Gazebo transport node + impl_->transport_nh_ = boost::shared_ptr(new gazebo::transport::Node()); + impl_->transport_nh_->Init(parent->GetName()); + RCLCPP_INFO_STREAM( rclcpp::get_logger("gazebo_ros2_control"), "Loading gazebo_ros2_control plugin"); @@ -309,6 +319,7 @@ void GazeboRosControlPlugin::Load(gazebo::physics::ModelPtr parent, sdf::Element if (!gazeboSystem->initSim( node_ros2, impl_->parent_model_, + impl_->transport_nh_, control_hardware_info[i], sdf)) { diff --git a/gazebo_ros2_control/src/gazebo_system.cpp b/gazebo_ros2_control/src/gazebo_system.cpp index b325d46b..0c103432 100644 --- a/gazebo_ros2_control/src/gazebo_system.cpp +++ b/gazebo_ros2_control/src/gazebo_system.cpp @@ -20,6 +20,7 @@ #include #include "gazebo_ros2_control/gazebo_system.hpp" +#include "gazebo/msgs/msgs.hh" #include "gazebo/sensors/ImuSensor.hh" #include "gazebo/sensors/ForceTorqueSensor.hh" #include "gazebo/sensors/SensorManager.hh" @@ -41,6 +42,9 @@ class gazebo_ros2_control::GazeboSystemPrivate ~GazeboSystemPrivate() = default; + // Gazebo transport handle + gazebo::transport::NodePtr node; + /// \brief Degrees od freedom. size_t n_dof_; @@ -50,6 +54,9 @@ class gazebo_ros2_control::GazeboSystemPrivate /// \brief Gazebo Model Ptr. gazebo::physics::ModelPtr parent_model_; + // Gazebo transport handle + gazebo::transport::NodePtr transport_nh_; + /// \brief last time the write method was called. rclcpp::Time last_update_sim_time_ros_; @@ -62,6 +69,12 @@ class gazebo_ros2_control::GazeboSystemPrivate /// \brief handles to the joints from within Gazebo std::vector sim_joints_; + // Gazebo transport publishers + std::vector gz_pubs_; + + // Gazebo transport subscribers + std::vector gz_subs_; + /// \brief vector with the current joint position std::vector joint_position_; @@ -80,6 +93,12 @@ class gazebo_ros2_control::GazeboSystemPrivate /// \brief vector with the current cmd joint effort std::vector joint_effort_cmd_; + /// \brief vector with the current gazebo transport cmds + std::vector gazebo_topic_cmd_; + + /// \brief vector with the current gazebo transport feedbacks + std::vector gazebo_topic_fbk_; + /// \brief handles to the imus from within Gazebo std::vector sim_imu_sensors_; @@ -108,6 +127,7 @@ namespace gazebo_ros2_control bool GazeboSystem::initSim( rclcpp::Node::SharedPtr & model_nh, gazebo::physics::ModelPtr parent_model, + gazebo::transport::NodePtr & transport_nh, const hardware_interface::HardwareInfo & hardware_info, sdf::ElementPtr sdf) { @@ -116,6 +136,7 @@ bool GazeboSystem::initSim( this->nh_ = model_nh; this->dataPtr->parent_model_ = parent_model; + this->dataPtr->transport_nh_ = transport_nh; gazebo::physics::PhysicsEnginePtr physics = gazebo::physics::get_world()->Physics(); @@ -127,6 +148,7 @@ bool GazeboSystem::initSim( registerJoints(hardware_info, parent_model); registerSensors(hardware_info, parent_model); + registerGazeboTopics(hardware_info, parent_model); if (this->dataPtr->n_dof_ == 0 && this->dataPtr->n_sensors_ == 0) { RCLCPP_WARN_STREAM(this->nh_->get_logger(), "There is no joint or sensor available"); @@ -432,6 +454,170 @@ void GazeboSystem::registerSensors( } } +void GazeboSystem::registerGazeboTopics( + const hardware_interface::HardwareInfo & hardware_info, + gazebo::physics::ModelPtr parent_model) +{ + // This is split in two steps: Count the number and type of sensor and associate the interfaces + // So we have resize only once the structures where the data will be stored, and we can safely + // use pointers to the structures + + int n_commands = 0; + int n_states = 0; + + // STEP 1 + for (unsigned int j = 0; j < hardware_info.joints.size(); j++) { + auto & info = hardware_info.joints[j]; + + for (unsigned int i = 0; i < info.command_interfaces.size(); i++) { + std::string if_name = info.command_interfaces[i].name; + + if (if_name != "position" && if_name != "velocity" && if_name != "effort") { + ++n_commands; + } + } + + for (unsigned int i = 0; i < info.state_interfaces.size(); i++) { + std::string if_name = info.state_interfaces[i].name; + + if (if_name != "position" && if_name != "velocity" && if_name != "effort") { + ++n_states; + } + } + } + + for (unsigned int j = 0; j < hardware_info.sensors.size(); j++) { + auto & info = hardware_info.sensors[j]; + + for (unsigned int i = 0; i < info.state_interfaces.size(); i++) { + std::string if_name = info.state_interfaces[i].name; + + if (if_name != "position" && if_name != "velocity" && if_name != "effort" && + if_name != "orientation.x" && if_name != "orientation.y" && if_name != "orientation.z" && + if_name != "orientation.w" && if_name != "angular_velocity.x" && + if_name != "angular_velocity.y" && + if_name != "angular_velocity.z" && if_name != "linear_acceleration.x" && + if_name != "linear_acceleration.y" && + if_name != "linear_acceleration.z" && if_name != "force.x" && if_name != "force.y" && + if_name != "force.z" && + if_name != "torque.x" && if_name != "torque.y" && if_name != "torque.z") + { + ++n_states; + } + } + } + + dataPtr->gazebo_topic_cmd_.resize(n_commands); + dataPtr->gz_pubs_.reserve(n_commands); + dataPtr->gazebo_topic_fbk_.resize(n_states); + dataPtr->gz_subs_.reserve(n_states); + unsigned int cc = 0; + unsigned int cs = 0; + + // STEP 2 + for (unsigned int j = 0; j < hardware_info.joints.size(); j++) { + auto & info = hardware_info.joints[j]; + std::string joint_name = this->dataPtr->joint_names_[j] = info.name; + + RCLCPP_INFO_STREAM( + this->nh_->get_logger(), "Registering gazebo topics for joint: " << joint_name); + + RCLCPP_INFO_STREAM(this->nh_->get_logger(), "\tCommand:"); + + for (unsigned int i = 0; i < info.command_interfaces.size(); i++) { + std::string if_name = info.command_interfaces[i].name; + std::string initial_v = info.command_interfaces[i].initial_value; + + if (if_name != "position" && if_name != "velocity" && if_name != "effort") { + RCLCPP_INFO_STREAM(this->nh_->get_logger(), "\t\t " << if_name); + + this->dataPtr->command_interfaces_.emplace_back( + joint_name, + if_name, + &(this->dataPtr->gazebo_topic_cmd_[cc])); + + if (!initial_v.empty()) { + this->dataPtr->gazebo_topic_cmd_[cc] = std::stod(initial_v); + } + + dataPtr->gz_pubs_.push_back( + this->dataPtr->transport_nh_->Advertise( + "~/" + joint_name + "/" + + if_name) + ); + cc++; + } + } + + RCLCPP_INFO_STREAM(this->nh_->get_logger(), "\tState:"); + + for (unsigned int i = 0; i < info.state_interfaces.size(); i++) { + std::string if_name = info.state_interfaces[i].name; + + if (if_name != "position" && if_name != "velocity" && if_name != "effort") { + RCLCPP_INFO_STREAM(this->nh_->get_logger(), "\t\t " << if_name); + + this->dataPtr->state_interfaces_.emplace_back( + joint_name, + if_name, + &(this->dataPtr->gazebo_topic_fbk_[cs])); + + dataPtr->gz_subs_.push_back( + this->dataPtr->transport_nh_->Subscribe( + "~/" + joint_name + "/" + if_name, + [this, cs](ConstAnyPtr & msg) mutable -> void { + this->dataPtr->gazebo_topic_fbk_[cs] = msg->double_value(); + } + ) + ); + cs++; + } + } + } + + for (unsigned int j = 0; j < hardware_info.sensors.size(); j++) { + auto & info = hardware_info.sensors[j]; + std::string sensor_name = info.name; + + RCLCPP_INFO_STREAM( + this->nh_->get_logger(), "Registering gazebo topics for sensor: " << sensor_name); + + RCLCPP_INFO_STREAM(this->nh_->get_logger(), "\tState:"); + + for (unsigned int i = 0; i < info.state_interfaces.size(); i++) { + std::string if_name = info.state_interfaces[i].name; + + if (if_name != "position" && if_name != "velocity" && if_name != "effort" && + if_name != "orientation.x" && if_name != "orientation.y" && if_name != "orientation.z" && + if_name != "orientation.w" && if_name != "angular_velocity.x" && + if_name != "angular_velocity.y" && + if_name != "angular_velocity.z" && if_name != "linear_acceleration.x" && + if_name != "linear_acceleration.y" && + if_name != "linear_acceleration.z" && if_name != "force.x" && if_name != "force.y" && + if_name != "force.z" && + if_name != "torque.x" && if_name != "torque.y" && if_name != "torque.z") + { + RCLCPP_INFO_STREAM(this->nh_->get_logger(), "\t\t " << if_name); + + this->dataPtr->state_interfaces_.emplace_back( + sensor_name, + if_name, + &(this->dataPtr->gazebo_topic_fbk_[cs])); + + dataPtr->gz_subs_.push_back( + this->dataPtr->transport_nh_->Subscribe( + "~/" + sensor_name + "/" + if_name, + [this, cs](ConstAnyPtr & msg) mutable -> void { + this->dataPtr->gazebo_topic_fbk_[cs] = msg->double_value(); + } + ) + ); + cs++; + } + } + } +} + CallbackReturn GazeboSystem::on_init(const hardware_interface::HardwareInfo & system_info) { @@ -601,6 +787,13 @@ hardware_interface::return_type GazeboSystem::write( } } + gazebo::msgs::Any counterMsg; + counterMsg.set_type(gazebo::msgs::Any_ValueType::Any_ValueType_DOUBLE); + + for (unsigned int j = 0; j < dataPtr->gz_pubs_.size(); j++) { + counterMsg.set_double_value(this->dataPtr->gazebo_topic_cmd_[j]); + dataPtr->gz_pubs_[j]->Publish(counterMsg); + } this->dataPtr->last_update_sim_time_ros_ = sim_time_ros; return hardware_interface::return_type::OK; diff --git a/gazebo_ros2_control/src/node.cc b/gazebo_ros2_control/src/node.cc new file mode 100644 index 00000000..beb06ec9 --- /dev/null +++ b/gazebo_ros2_control/src/node.cc @@ -0,0 +1,386 @@ +/* + * Copyright (C) 2012 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ +#include +#include +#include "gazebo/transport/TransportIface.hh" +#include "gazebo_ros2_control/node.hh" + +using namespace gazebo; +using namespace transport; + +unsigned int Node::idCounter = 0; + +void dummy_callback_fn(uint32_t) +{ +} +///////////////////////////////////////////////// +Node::Node() +{ + this->id = idCounter++; + this->topicNamespace = ""; + this->initialized = false; +} + +///////////////////////////////////////////////// +Node::~Node() +{ + this->Fini(); +} + +///////////////////////////////////////////////// +void Node::Fini() +{ + if (!this->initialized) { + return; + } + + // Unadvertise all the publishers. + for (std::vector < PublisherPtr > ::iterator iter = this->publishers.begin(); + iter != this->publishers.end(); ++iter) + { + (*iter)->Fini(); + TopicManager::Instance()->Unadvertise(*iter); + } + + this->initialized = false; + TopicManager::Instance()->RemoveNode(this->id); + + { + boost::mutex::scoped_lock lock(this->publisherDeleteMutex); + boost::mutex::scoped_lock lock2(this->publisherMutex); + this->publishers.clear(); + } + + { + boost::recursive_mutex::scoped_lock lock(this->incomingMutex); + this->callbacks.clear(); + } +} + +////////////////////////////////////////////////// +bool Node::TryInit(const common::Time & _maxWait) +{ + return this->PrivateInit("", _maxWait, false); +} + +///////////////////////////////////////////////// +void Node::Init(const std::string & _space) +{ + std::cout << "Hey!\n"; + this->PrivateInit(_space, common::Time(1, 0), true); +} + +///////////////////////////////////////////////// +bool Node::PrivateInit( + const std::string & _space, + const common::Time & _maxWait, + const bool _fallbackToDefault) +{ + if (this->initialized) { + gzerr << "Node is already initialized, skipping initialization. " + << "This shouldn't happen... so fix it.\n"; + + // If the Node is already initialized, return true in order to match the + // function description. + return true; + } + + // Clearing the topicNamespace field shouldn't affect anything, but let's make + // sure that the assumption made later in this function is valid. + this->topicNamespace.clear(); + + if (_space.empty()) { + if (transport::waitForNamespaces(_maxWait)) { + // If waitForNamespaces succeeded, then we are guaranteed to have at least + // one namespace in the list. + std::list < std::string > namespaces; + TopicManager::Instance()->GetTopicNamespaces(namespaces); + + GZ_ASSERT( + !namespaces.empty(), + "It should not be possible for namespaces to be empty here"); + this->topicNamespace = namespaces.front(); + } else { + gzerr << "No namespaces found\n"; + + // If we're coming here from TryInit(), then quit right away. + if (!_fallbackToDefault) { + return false; + } + } + } + + // If the topic namespace field is empty, then either a global namespace was + // not found, or we want a specific namespace for this Node. + if (this->topicNamespace.empty()) { + this->topicNamespace = _space.empty() ? "default" : _space; + TopicManager::Instance()->RegisterTopicNamespace(this->topicNamespace); + } + + TopicManager::Instance()->AddNode(shared_from_this()); + this->initialized = true; + + return true; +} + +////////////////////////////////////////////////// +bool Node::IsInitialized() const +{ + return this->initialized; +} + +////////////////////////////////////////////////// +std::string Node::GetTopicNamespace() const +{ + return this->topicNamespace; +} + +///////////////////////////////////////////////// +std::string Node::DecodeTopicName(const std::string & _topic) +{ + std::string result = _topic; + boost::replace_first(result, "~", "/gazebo/" + this->topicNamespace); + boost::replace_first(result, "//", "/"); + return result; +} + +///////////////////////////////////////////////// +std::string Node::EncodeTopicName(const std::string & topic) +{ + std::string result = topic; + boost::replace_first(result, "/gazebo/" + this->topicNamespace, "~"); + boost::replace_first(result, "//", "/"); + + return result; +} + +///////////////////////////////////////////////// +unsigned int Node::GetId() const +{ + return this->id; +} + +///////////////////////////////////////////////// +void Node::ProcessPublishers() +{ + if (!this->initialized) { + return; + } + + int start, end; + boost::mutex::scoped_lock lock(this->publisherDeleteMutex); + boost::mutex::scoped_lock lock2(this->publisherMutex); + + start = 0; + end = this->publishers.size(); + + for (int i = start; i < end; ++i) { + this->publishers[i]->SendMessage(); + } +} + +///////////////////////////////////////////////// +bool Node::HandleData(const std::string & _topic, const std::string & _msg) +{ + boost::recursive_mutex::scoped_lock lock(this->incomingMutex); + this->incomingMsgs[_topic].push_back(_msg); + ConnectionManager::Instance()->TriggerUpdate(); + return true; +} + +///////////////////////////////////////////////// +bool Node::HandleMessage(const std::string & _topic, MessagePtr _msg) +{ + boost::recursive_mutex::scoped_lock lock(this->incomingMutex); + this->incomingMsgsLocal[_topic].push_back(_msg); + ConnectionManager::Instance()->TriggerUpdate(); + return true; +} + +///////////////////////////////////////////////// +void Node::ProcessIncoming() +{ + boost::recursive_mutex::scoped_lock lock(this->processIncomingMutex); + + if (!this->initialized || + (this->incomingMsgs.empty() && this->incomingMsgsLocal.empty())) + { + return; + } + + Callback_M::iterator cbIter; + Callback_L::iterator liter; + + // For each topic + { + std::list < std::string > ::iterator msgIter; + std::map < std::string, std::list < std::string >> ::iterator inIter; + std::map < std::string, std::list < std::string >> ::iterator endIter; + + boost::recursive_mutex::scoped_lock lock2(this->incomingMutex); + inIter = this->incomingMsgs.begin(); + endIter = this->incomingMsgs.end(); + + for (; inIter != endIter; ++inIter) { + // Find the callbacks for the topic + cbIter = this->callbacks.find(inIter->first); + if (cbIter != this->callbacks.end()) { + std::list < std::string > ::iterator msgInIter; + std::list < std::string > ::iterator msgEndIter; + + msgInIter = inIter->second.begin(); + msgEndIter = inIter->second.end(); + + // For each message in the buffer + for (msgIter = msgInIter; msgIter != msgEndIter; ++msgIter) { + // Send the message to all callbacks + for (liter = cbIter->second.begin(); + liter != cbIter->second.end(); ++liter) + { + using namespace boost::placeholders; + (*liter)->HandleData( + *msgIter, + boost::bind(&dummy_callback_fn, _1), 0); + } + } + } + } + + this->incomingMsgs.clear(); + } + + { + std::list < MessagePtr > ::iterator msgIter; + std::map < std::string, std::list < MessagePtr >> ::iterator inIter; + std::map < std::string, std::list < MessagePtr >> ::iterator endIter; + + boost::recursive_mutex::scoped_lock lock2(this->incomingMutex); + inIter = this->incomingMsgsLocal.begin(); + endIter = this->incomingMsgsLocal.end(); + + for (; inIter != endIter; ++inIter) { + // Find the callbacks for the topic + cbIter = this->callbacks.find(inIter->first); + if (cbIter != this->callbacks.end()) { + std::list < MessagePtr > ::iterator msgInIter; + std::list < MessagePtr > ::iterator msgEndIter; + + msgInIter = inIter->second.begin(); + msgEndIter = inIter->second.end(); + + // For each message in the buffer + for (msgIter = msgInIter; msgIter != msgEndIter; ++msgIter) { + // Send the message to all callbacks + for (liter = cbIter->second.begin(); + liter != cbIter->second.end(); ++liter) + { + (*liter)->HandleMessage(*msgIter); + } + } + } + } + + this->incomingMsgsLocal.clear(); + } +} + +////////////////////////////////////////////////// +void Node::InsertLatchedMsg(const std::string & _topic, const std::string & _msg) +{ + // Find the callbacks for the topic + Callback_M::iterator cbIter = this->callbacks.find(_topic); + + if (cbIter != this->callbacks.end()) { + // Send the message to all callbacks + for (Callback_L::iterator liter = cbIter->second.begin(); + liter != cbIter->second.end(); ++liter) + { + if ((*liter)->GetLatching()) { + using namespace boost::placeholders; + (*liter)->HandleData(_msg, boost::bind(&dummy_callback_fn, _1), 0); + (*liter)->SetLatching(false); + } + } + } +} + +////////////////////////////////////////////////// +void Node::InsertLatchedMsg(const std::string & _topic, MessagePtr _msg) +{ + // Find the callbacks for the topic + Callback_M::iterator cbIter = this->callbacks.find(_topic); + + if (cbIter != this->callbacks.end()) { + // Send the message to all callbacks + for (Callback_L::iterator liter = cbIter->second.begin(); + liter != cbIter->second.end(); ++liter) + { + if ((*liter)->GetLatching()) { + (*liter)->HandleMessage(_msg); + (*liter)->SetLatching(false); + } + } + } +} + +///////////////////////////////////////////////// +std::string Node::GetMsgType(const std::string & _topic) const +{ + Callback_M::const_iterator iter = this->callbacks.find(_topic); + if (iter != this->callbacks.end() && !iter->second.empty()) { + return iter->second.front()->GetMsgType(); + } + + return std::string(); +} + +///////////////////////////////////////////////// +bool Node::HasLatchedSubscriber(const std::string & _topic) const +{ + Callback_M::const_iterator iter = this->callbacks.find(_topic); + if (iter != this->callbacks.end()) { + return iter->second.front()->GetLatching(); + } + + return false; +} + +///////////////////////////////////////////////// +void Node::RemoveCallback(const std::string & _topic, unsigned int _id) +{ + if (!this->initialized) { + return; + } + + boost::recursive_mutex::scoped_lock lock(this->incomingMutex); + + // Find the topic list in the map. + Callback_M::iterator iter = this->callbacks.find(_topic); + + if (iter != this->callbacks.end()) { + Callback_L::iterator liter; + + // Find the callback with the correct ID and remove it. + for (liter = iter->second.begin(); liter != iter->second.end(); ++liter) { + if ((*liter)->GetId() == _id) { + (*liter).reset(); + iter->second.erase(liter); + break; + } + } + } +}