diff --git a/doc/state_estimation_nodes.rst b/doc/state_estimation_nodes.rst index 350e5b441..5abc1560c 100644 --- a/doc/state_estimation_nodes.rst +++ b/doc/state_estimation_nodes.rst @@ -242,6 +242,10 @@ If any of your sensors produce data with timestamps that are older than the most ^^^^^^^^^^^^^^^ If ``smooth_lagged_data`` is set to *true*, this parameter specifies the number of seconds for which the filter will retain its state and measurement history. This value should be at least as large as the time delta between your lagged measurements and the current time. +~max_future_queue_size +^^^^^^^^^^^^^^^ +This parameter specifies the number of total data samples, arriving between filter runs, the filter will retain. This is to ensure excessive memory is not used as would happen if incoming data is badly time stamped or the filter ``frequency`` is badly set. + ~[sensor]_nodelay ^^^^^^^^^^^^^^^^^ diff --git a/include/robot_localization/ros_filter.hpp b/include/robot_localization/ros_filter.hpp index e32ffeddc..38b79fba7 100644 --- a/include/robot_localization/ros_filter.hpp +++ b/include/robot_localization/ros_filter.hpp @@ -473,6 +473,25 @@ class RosFilter : public rclcpp::Node std::vector & updateVector, Eigen::VectorXd & measurement, Eigen::MatrixXd & measurementCovariance); + //! @brief Validates incoming measurement ordering and produces errors if + //! validation does not pass. + //! @param[in] topicName - The name of the topic over which this message was + //! received + //! @param[in] stamp - The timestamp associated with this message. + //! @return true if the measrement is newer than all known measurements. + //! + bool validateMeasurementOrdering(const std::string& topicName, + const builtin_interfaces::msg::Time& stamp); + + //! @brief Validates incoming measurements are within a reasonable timeframe. + //! @param[in] topicName - The name of the topic over which this message was + //! received + //! @param[in] stamp - The timestamp associated with this message. + //! @return true if the measrement is newer than all known measurements. + //! + bool validateMeasurementTime(const std::string& topicName, + const builtin_interfaces::msg::Time& stamp); + //! @brief Whether or not we print diagnostic messages to the /diagnostics //! topic //! @@ -545,14 +564,17 @@ class RosFilter : public rclcpp::Node //! double gravitational_acceleration_; - //! @brief The depth of the history we track for smoothing/delayed measurement - //! processing + //! @brief The depth of the history we track for smoothing/delayed + //! measurement processing //! //! This is the guaranteed minimum buffer size for which previous states and //! measurements are kept. //! rclcpp::Duration history_length_; + //! @brief This is the maximum measurement queque length. + uint64_t max_future_queue_size_; + //! @brief tf frame name for the robot's body frame //! std::string base_link_frame_id_; @@ -622,8 +644,8 @@ class RosFilter : public rclcpp::Node //! @brief This object accumulates static diagnostics, e.g., diagnostics //! relating to the configuration parameters. //! - //! The values are treated as static and always reported (i.e., this object is - //! never cleared) + //! The values are treated as static and always reported (i.e., this object + //! is never cleared) //! std::map static_diagnostics_; @@ -646,8 +668,8 @@ class RosFilter : public rclcpp::Node //! stores the initial measurements. Note that this is different from using //! differential mode, as in differential mode, pose data is converted to //! twist data, resulting in boundless error growth for the variables being - //! fused. With relative measurements, the vehicle will start with a 0 heading - //! and position, but the measurements are still fused absolutely. + //! fused. With relative measurements, the vehicle will start with a 0 + //! heading and position, but the measurements are still fused absolutely. std::map initial_measurements_; //! @brief If including acceleration for each IMU input, whether or not we @@ -688,9 +710,9 @@ class RosFilter : public rclcpp::Node //! std::map previous_measurement_covariances_; - //! @brief By default, the filter predicts and corrects up to the time of the - //! latest measurement. If this is set to true, the filter does the same, but - //! then also predicts up to the current time step. + //! @brief By default, the filter predicts and corrects up to the time of + //! the latest measurement. If this is set to true, the filter does the + //! same, but then also predicts up to the current time step. //! bool predict_to_current_time_; diff --git a/src/navsat_transform.cpp b/src/navsat_transform.cpp index d9bdef80b..ce44c3cf3 100644 --- a/src/navsat_transform.cpp +++ b/src/navsat_transform.cpp @@ -83,6 +83,7 @@ NavSatTransform::NavSatTransform(const rclcpp::NodeOptions & options) yaw_offset_(0.0), zero_altitude_(false) { + tf_buffer_ = std::make_unique(this->get_clock()); tf_listener_ = std::make_unique(*tf_buffer_); diff --git a/src/ros_filter.cpp b/src/ros_filter.cpp index 1315490cb..ad6220343 100644 --- a/src/ros_filter.cpp +++ b/src/ros_filter.cpp @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -47,9 +48,10 @@ #include #include #include +#include +#include #include #include -#include #include namespace robot_localization @@ -75,6 +77,7 @@ RosFilter::RosFilter(const rclcpp::NodeOptions & options) frequency_(30.0), gravitational_acceleration_(9.80665), history_length_(0ns), + max_future_queue_size_(10000), latest_control_(), last_diag_time_(0, 0, RCL_ROS_TIME), last_published_stamp_(0, 0, RCL_ROS_TIME), @@ -151,7 +154,62 @@ void RosFilter::reset() filter_.reset(); } -template +template +bool RosFilter::validateMeasurementTime( + const std::string& topicName, const builtin_interfaces::msg::Time& stamp) { + if (last_set_pose_time_ >= stamp) { + std::stringstream stream; + stream << "The " << topicName + << " message has a timestamp equal to or before the last filter " + "reset, this message will be ignored. This may indicate an " + "empty or bad timestamp. (message time: " + << filter_utilities::toSec(stamp) << ")"; + addDiagnostic(diagnostic_msgs::msg::DiagnosticStatus::WARN, + topicName + "_timestamp", stream.str(), false); + + RF_DEBUG( + "Received message that preceded the most recent pose reset. " + "Ignoring topic : " + << topicName); + return false; + } + + return true; +} + +template +bool RosFilter::validateMeasurementOrdering( + const std::string& topic_name, const builtin_interfaces::msg::Time& stamp) { + // Put the initial value in the lastMessagTimes_ for this variable if it's + // empty + if (last_message_times_.count(topic_name) == 0) { + last_message_times_.insert( + std::pair(topic_name, stamp)); + } + + if (last_message_times_[topic_name] <= stamp) { + return true; + } + + std::stringstream stream; + stream << "The " << topic_name << " message has a timestamp before that of " + "the previous message received," + << " this message will be ignored. This may " + "indicate a bad timestamp. (message time: " + << stamp.nanosec << ")"; + addDiagnostic(diagnostic_msgs::msg::DiagnosticStatus::WARN, + topic_name + "_timestamp", stream.str(), false); + + RF_DEBUG("Message is too old. Last message time for " + << topic_name << " is " + << filter_utilities::toSec(last_message_times_[topic_name]) + << ", current message time is " << filter_utilities::toSec(stamp) + << ".\n"); + + return false; +} + +template void RosFilter::toggleFilterProcessingCallback( const std::shared_ptr/*request_header*/, const std::shared_ptr< @@ -177,22 +235,17 @@ void RosFilter::toggleFilterProcessingCallback( // @todo: Replace with AccelWithCovarianceStamped template void RosFilter::accelerationCallback( - const sensor_msgs::msg::Imu::SharedPtr msg, - const CallbackData & callback_data, - const std::string & target_frame) -{ - // If we've just reset the filter, then we want to ignore any messages - // that arrive with an older timestamp - if (last_set_pose_time_ >= msg->header.stamp) { + const sensor_msgs::msg::Imu::SharedPtr msg, + const CallbackData& callback_data, const std::string& target_frame) { + const std::string& topic_name = callback_data.topic_name_; + + // Validate reasonable measurement time and generate debug. + if (!validateMeasurementTime(topic_name, msg->header.stamp)) { return; } - const std::string & topic_name = callback_data.topic_name_; - - RF_DEBUG( - "------ RosFilter::accelerationCallback (" << topic_name << - ") ------\n") - // "Twist message:\n" << *msg); + RF_DEBUG("------ RosFilter::accelerationCallback (" << topic_name + << ") ------\n") if (last_message_times_.count(topic_name) == 0) { last_message_times_.insert( @@ -200,7 +253,7 @@ void RosFilter::accelerationCallback( } // Make sure this message is newer than the last one - if (last_message_times_[topic_name] <= msg->header.stamp) { + if (validateMeasurementOrdering(topic_name, msg->header.stamp)) { RF_DEBUG("Update vector for " << topic_name << " is:\n" << topic_name); Eigen::VectorXd measurement(STATE_SIZE); @@ -237,39 +290,13 @@ void RosFilter::accelerationCallback( last_message_times_[topic_name] = msg->header.stamp; - RF_DEBUG( - "Last message time for " << - topic_name << " is now " << - filter_utilities::toSec(last_message_times_[topic_name]) << - "\n"); - } else { - // else if (reset_on_time_jump_ && rclcpp::Time::isSimTime()) - //{ - // reset(); - //} - - std::stringstream stream; - stream << "The " << topic_name << " message has a timestamp before that of " - "the previous message received," << " this message will be ignored. This may" - " indicate a bad timestamp. (message time: " << msg->header.stamp.nanosec << - ")"; - - addDiagnostic( - diagnostic_msgs::msg::DiagnosticStatus::WARN, topic_name + - "_timestamp", stream.str(), false); - - - RF_DEBUG( - "Message is too old. Last message time for " << - topic_name << " is " << - filter_utilities::toSec(last_message_times_[topic_name]) << - ", current message time is " << - filter_utilities::toSec(msg->header.stamp) << ".\n"); + RF_DEBUG("Last message time for " + << topic_name << " is now " + << filter_utilities::toSec(last_message_times_[topic_name]) + << "\n"); } - - RF_DEBUG( - "\n----- /RosFilter::accelerationCallback (" << topic_name << - ") ------\n"); + RF_DEBUG("\n----- /RosFilter::accelerationCallback (" << topic_name + << ") ------\n"); } template @@ -302,13 +329,10 @@ void RosFilter::controlStampedCallback( // Update the filter with this control term filter_.setControl(latest_control_, msg->header.stamp); } else { - // ROS_WARN_STREAM_THROTTLE(5.0, "Commanded velocities must be given in the - // robot's body frame (" << base_link_frame_id_ << "). Message frame was " << - // msg->header.frame_id); - std::cerr << - "Commanded velocities must be given in the robot's body frame (" << - base_link_frame_id_ << "). Message frame was " << - msg->header.frame_id << "\n"; + std::cerr + << "Commanded velocities must be given in the robot's body frame (" + << base_link_frame_id_ << "). Message frame was " + << msg->header.frame_id << "\n"; } } @@ -319,8 +343,17 @@ void RosFilter::enqueueMeasurement( const std::vector & update_vector, const double mahalanobis_thresh, const rclcpp::Time & time) { - MeasurementPtr meas = MeasurementPtr(new Measurement()); + rcpputils::check_true( + measurement_queue_.size() < max_future_queue_size_, + "Too many measurements on queue . This may mean the filter is " + "running slowly or that measurements are comming in with bad " + "timestamps. max_future_queue_size_ is: " + + std::to_string(max_future_queue_size_)); + + constexpr double kMaxQueueTimeS = 0.5; + + MeasurementPtr meas = MeasurementPtr(new Measurement()); meas->topic_name_ = topic_name; meas->measurement_ = measurement; meas->covariance_ = measurement_covariance; @@ -330,6 +363,13 @@ void RosFilter::enqueueMeasurement( meas->latest_control_ = latest_control_; meas->latest_control_time_ = latest_control_time_; measurement_queue_.push(meas); + + if ((time - measurement_queue_.top()->time_).nanoseconds() / 1e9 > + kMaxQueueTimeS) { + RCLCPP_ERROR(this->get_logger(), "Warning: messages are queued at least " + "0.5s. This likely means the filter is " + "not working as excpected."); + } } template @@ -467,25 +507,10 @@ void RosFilter::imuCallback( { RF_DEBUG( "------ RosFilter::imuCallback (" << - topic_name << ") ------\n") // << "IMU message:\n" << *msg); - - // If we've just reset the filter, then we want to ignore any messages - // that arrive with an older timestamp - if (last_set_pose_time_ >= msg->header.stamp) { - std::stringstream stream; - stream << "The " << topic_name << " message has a timestamp equal to or" - " before the last filter reset, " << "this message will be ignored. This may" - "indicate an empty or bad timestamp. (message time: " << msg->header.stamp.nanosec << - ")"; - addDiagnostic( - diagnostic_msgs::msg::DiagnosticStatus::WARN, - topic_name + "_timestamp", stream.str(), false); - - - RF_DEBUG( - "Received message that preceded the most recent pose reset. " - "Ignoring..."); + topic_name << ") ------\n") + // Validate reasonable measurement time and generate debug. + if (!validateMeasurementTime(topic_name, msg->header.stamp)) { return; } @@ -610,16 +635,8 @@ void RosFilter::integrateMeasurements(const rclcpp::Time & current_time) first_measurement->topic_name_; // revertTo may invalidate first_measurement if (!revertTo(first_measurement_time - rclcpp::Duration(1ns))) { - RF_DEBUG( - "ERROR: history interval is too small to revert to time " << - filter_utilities::toSec(first_measurement_time) << "\n"); - // ROS_WARN_STREAM_DELAYED_THROTTLE(history_length_, - // "Received old measurement for topic " << first_measurement_topic << - // ", but history interval is insufficiently sized. " - // "Measurement time is " << std::setprecision(20) << - // first_measurement_time << - // ", current time is " << current_time << - // ", history length is " << history_length_ << "."); + RF_DEBUG("ERROR: history interval is too small to revert to time " + << filter_utilities::toSec(first_measurement_time) << "\n"); restored_measurement_count = 0; } @@ -634,6 +651,11 @@ void RosFilter::integrateMeasurements(const rclcpp::Time & current_time) // should wait until a future iteration. Since measurements are stored in // a priority queue, all remaining measurements will be in the future. if (current_time < measurement->time_) { + RCLCPP_FATAL(this->get_logger(), + "Warning: messages came in future. Current time: %li : " + "Message time : %li", + current_time.nanoseconds(), + measurement->time_.nanoseconds()); break; } @@ -654,7 +676,6 @@ void RosFilter::integrateMeasurements(const rclcpp::Time & current_time) measurement->latest_control_time_); restored_measurement_count--; } - // This will call predict and, if necessary, correct filter_.processMeasurement(*(measurement.get())); @@ -861,6 +882,8 @@ void RosFilter::loadParams() // Smoothing window size smooth_lagged_data_ = this->declare_parameter("smooth_lagged_data", false); double history_length_double = this->declare_parameter("history_length", 0.0); + max_future_queue_size_ = + this->declare_parameter("max_future_queue_size", 10000); if (!smooth_lagged_data_ && std::abs(history_length_double) > 0) { std::cerr << "Filter history interval of " << history_length_double << @@ -1786,23 +1809,8 @@ void RosFilter::odometryCallback( const CallbackData & pose_callback_data, const CallbackData & twist_callback_data) { - // If we've just reset the filter, then we want to ignore any messages - // that arrive with an older timestamp - if (last_set_pose_time_ >= msg->header.stamp) { - std::stringstream stream; - stream << - "The " << topic_name << - " message has a timestamp equal to or before the last filter reset, " << - "this message will be ignored. This may indicate an empty or bad " - "timestamp. (message time: " << - filter_utilities::toSec(msg->header.stamp) << ")"; - addDiagnostic( - diagnostic_msgs::msg::DiagnosticStatus::WARN, - topic_name + "_timestamp", stream.str(), false); - RF_DEBUG( - "Received message that preceded the most recent pose reset. " - "Ignoring..."); - + // Validate reasonable measurement time and generate debug. + if (!validateMeasurementTime(topic_name, msg->header.stamp)) { return; } @@ -1845,19 +1853,8 @@ void RosFilter::poseCallback( { const std::string & topic_name = callback_data.topic_name_; - // If we've just reset the filter, then we want to ignore any messages - // that arrive with an older timestamp - if (last_set_pose_time_ >= msg->header.stamp) { - std::stringstream stream; - stream << - "The " << topic_name << - " message has a timestamp equal to or before the last filter reset, " << - "this message will be ignored. This may indicate an empty or bad " - "timestamp. (message time: " << - filter_utilities::toSec(msg->header.stamp) << ")"; - addDiagnostic( - diagnostic_msgs::msg::DiagnosticStatus::WARN, - topic_name + "_timestamp", stream.str(), false); + // Validate reasonable measurement time and generate debug. + if (!validateMeasurementTime(topic_name, msg->header.stamp)) { return; } @@ -1865,18 +1862,10 @@ void RosFilter::poseCallback( "------ RosFilter::poseCallback (" << topic_name << ") ------\n" "Pose message:\n" << msg); - // Put the initial value in the lastMessagTimes_ for this variable if it's - // empty - if (last_message_times_.count(topic_name) == 0) { - last_message_times_.insert( - std::pair(topic_name, msg->header.stamp)); - } - // Make sure this message is newer than the last one - if (last_message_times_[topic_name] <= msg->header.stamp) { - RF_DEBUG( - "Update vector for " << topic_name << " is:\n" << - callback_data.update_vector_); + if (validateMeasurementOrdering(topic_name, msg->header.stamp)) { + RF_DEBUG("Update vector for " << topic_name << " is:\n" + << callback_data.update_vector_); Eigen::VectorXd measurement(STATE_SIZE); Eigen::MatrixXd measurement_covariance(STATE_SIZE, STATE_SIZE); @@ -1907,31 +1896,10 @@ void RosFilter::poseCallback( last_message_times_[topic_name] = msg->header.stamp; - RF_DEBUG( - "Last message time for " << - topic_name << " is now " << - filter_utilities::toSec(last_message_times_[topic_name]) << - "\n"); - } else { - // else if (reset_on_time_jump_ && rclcpp::Time::isSimTime()) - //{ - // reset(); - // } - - std::stringstream stream; - stream << "The " << topic_name << " message has a timestamp before that of " - "the previous message received," << " this message will be ignored. This may " - "indicate a bad timestamp. (message time: " << msg->header.stamp.nanosec << - ")"; - addDiagnostic( - diagnostic_msgs::msg::DiagnosticStatus::WARN, - topic_name + "_timestamp", stream.str(), false); - - RF_DEBUG( - "Message is too old. Last message time for " << topic_name << " is " << - filter_utilities::toSec(last_message_times_[topic_name]) << - ", current message time is " << filter_utilities::toSec(msg->header.stamp) << - ".\n"); + RF_DEBUG("Last message time for " + << topic_name << " is now " + << filter_utilities::toSec(last_message_times_[topic_name]) + << "\n"); } RF_DEBUG("\n----- /RosFilter::poseCallback (" << topic_name << ") ------\n"); @@ -2001,6 +1969,7 @@ void RosFilter::periodicUpdate() enable_filter_srv_->get_service_name()); return; } + RF_DEBUG("------ RosFilter::periodicUpdate ------\n") rclcpp::Time cur_time = this->now(); @@ -2021,8 +1990,8 @@ void RosFilter::periodicUpdate() auto filtered_position = std::make_unique(); bool corrected_data = false; - if (getFilteredOdometryMessage(filtered_position.get())) { + RF_DEBUG("Inside main loop function \n") world_base_link_trans_msg_.header.stamp = static_cast(filtered_position->header.stamp) + tf_time_offset_; world_base_link_trans_msg_.header.frame_id = @@ -2115,10 +2084,7 @@ void RosFilter::periodicUpdate() world_transform_broadcaster_->sendTransform(map_odom_trans_msg); } catch (...) { - // ROS_ERROR_STREAM_DELAYED_THROTTLE(5.0, "Could not obtain - // transform from " - // << odom_frame_id_ << "->" << - // base_link_frame_id_); + RF_DEBUG("Filter could not get odom to baselink ids. \n") } } else { std::cerr << "Odometry message frame_id was " << @@ -2184,8 +2150,6 @@ void RosFilter::setPoseCallback( RF_DEBUG( "------ RosFilter::setPoseCallback ------\nPose message:\n" << msg); - // ROS_INFO_STREAM("Received set_pose request with value\n" << *msg); - std::string topic_name("set_pose"); // Get rid of any initial poses (pretend we've never had a measurement) @@ -2274,19 +2238,8 @@ void RosFilter::twistCallback( { const std::string & topic_name = callback_data.topic_name_; - // If we've just reset the filter, then we want to ignore any messages - // that arrive with an older timestamp - if (last_set_pose_time_ >= msg->header.stamp) { - std::stringstream stream; - stream << - "The " << topic_name << - " message has a timestamp equal to or before the last filter reset, " << - "this message will be ignored. This may indicate an empty or bad " - "timestamp. (message time: " << - filter_utilities::toSec(msg->header.stamp) << ")"; - addDiagnostic( - diagnostic_msgs::msg::DiagnosticStatus::WARN, - topic_name + "_timestamp", stream.str(), false); + // Validate reasonable measurement time and generate debug. + if (!validateMeasurementTime(topic_name, msg->header.stamp)) { return; } @@ -2294,16 +2247,10 @@ void RosFilter::twistCallback( "------ RosFilter::twistCallback (" << topic_name << ") ------\n" "Twist message:\n" << msg); - if (last_message_times_.count(topic_name) == 0) { - last_message_times_.insert( - std::pair(topic_name, msg->header.stamp)); - } - // Make sure this message is newer than the last one - if (last_message_times_[topic_name] <= msg->header.stamp) { - RF_DEBUG( - "Update vector for " << topic_name << " is:\n" << - callback_data.update_vector_); + if (validateMeasurementOrdering(topic_name, msg->header.stamp)) { + RF_DEBUG("Update vector for " << topic_name << " is:\n" + << callback_data.update_vector_); Eigen::VectorXd measurement(STATE_SIZE); Eigen::MatrixXd measurement_covariance(STATE_SIZE, STATE_SIZE); @@ -2335,25 +2282,10 @@ void RosFilter::twistCallback( last_message_times_[topic_name] = msg->header.stamp; - RF_DEBUG( - "Last message time for " << - topic_name << " is now " << - filter_utilities::toSec(last_message_times_[topic_name]) << - "\n"); - } else { - std::stringstream stream; - stream << "The " << topic_name << " message has a timestamp before that of " - "the previous message received," << " this message will be ignored. This may " - "indicate a bad timestamp. (message time: " << msg->header.stamp.nanosec << ")"; - addDiagnostic( - diagnostic_msgs::msg::DiagnosticStatus::WARN, - topic_name + "_timestamp", stream.str(), false); - - RF_DEBUG( - "Message is too old. Last message time for " << topic_name << " is" << - filter_utilities::toSec(last_message_times_[topic_name]) << - ", current message time is " << filter_utilities::toSec(msg->header.stamp) << - ".\n"); + RF_DEBUG("Last message time for " + << topic_name << " is now " + << filter_utilities::toSec(last_message_times_[topic_name]) + << "\n"); } RF_DEBUG("\n----- /RosFilter::twistCallback (" << topic_name << ") ------\n"); @@ -3302,8 +3234,6 @@ bool RosFilter::revertTo(const rclcpp::Time & time) "\nRequested time was " << std::setprecision(20) << filter_utilities::toSec(time) << "\n") - // size_t history_size = filter_state_history_.size(); - // Walk back through the queue until we reach a filter state whose time stamp // is less than or equal to the requested time. Since every saved state after // that time will be overwritten/corrected, we can pop from the queue. If the @@ -3330,15 +3260,9 @@ bool RosFilter::revertTo(const rclcpp::Time & time) if (last_history_state) { RF_DEBUG( - "Will revert to oldest state at " << - filter_utilities::toSec(last_history_state->latest_control_time_) << - ".\n"); - - // ROS_WARN_STREAM_DELAYED_THROTTLE(history_length_, "Could not revert " - // "to state with time " << std::setprecision(20) << time << - // ". Instead reverted to state with time " << - // lastHistoryState->lastMeasurementTime_ << ". History size was " << - // history_size); + "Will revert to oldest state at " + << filter_utilities::toSec(last_history_state->latest_control_time_) + << ".\n"); } } @@ -3447,8 +3371,8 @@ void RosFilter::clearMeasurementQueue() { // Clear the measurement queue. // This prevents us from immediately undoing our reset. - while (!measurement_queue_.empty() && rclcpp::ok()) { - measurement_queue_.pop(); + if (!measurement_queue_.empty() && rclcpp::ok()) { + measurement_queue_ = MeasurementQueue(); } } } // namespace robot_localization