@@ -367,4 +367,245 @@ Clock::create_jump_callback(
367367 // *INDENT-ON*
368368}
369369
370+ class ClockWaiter ::ClockWaiterImpl
371+ {
372+ private:
373+ std::condition_variable cv_;
374+
375+ rclcpp::Clock::SharedPtr clock_;
376+ bool time_source_changed_ = false ;
377+ std::function<void (const rcl_time_jump_t &)> post_time_jump_callback;
378+
379+ bool
380+ wait_until_system_time (
381+ std::unique_lock<std::mutex> & lock,
382+ const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
383+ {
384+ auto system_time = std::chrono::system_clock::time_point (
385+ // Cast because system clock resolution is too big for nanoseconds on some systems
386+ std::chrono::duration_cast<std::chrono::system_clock::duration>(
387+ std::chrono::nanoseconds (abs_time.nanoseconds ())));
388+
389+ return cv_.wait_until (lock, system_time, pred);
390+ }
391+
392+ bool
393+ wait_until_steady_time (
394+ std::unique_lock<std::mutex> & lock,
395+ const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
396+ {
397+ // Synchronize because RCL steady clock epoch might differ from chrono::steady_clock epoch
398+ const rclcpp::Time rcl_entry = clock_->now ();
399+ const std::chrono::steady_clock::time_point chrono_entry = std::chrono::steady_clock::now ();
400+ const rclcpp::Duration delta_t = abs_time - rcl_entry;
401+ const std::chrono::steady_clock::time_point chrono_until =
402+ chrono_entry + std::chrono::nanoseconds (delta_t .nanoseconds ());
403+
404+ return cv_.wait_until (lock, chrono_until, pred);
405+ }
406+
407+
408+ bool
409+ wait_until_ros_time (
410+ std::unique_lock<std::mutex> & lock,
411+ const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
412+ {
413+ // Install jump handler for any amount of time change, for two purposes:
414+ // - if ROS time is active, check if time reached on each new clock sample
415+ // - Trigger via on_clock_change to detect if time source changes, to invalidate sleep
416+ rcl_jump_threshold_t threshold;
417+ threshold.on_clock_change = true ;
418+ // 0 is disable, so -1 and 1 are smallest possible time changes
419+ threshold.min_backward .nanoseconds = -1 ;
420+ threshold.min_forward .nanoseconds = 1 ;
421+
422+ time_source_changed_ = false ;
423+
424+ post_time_jump_callback = [this , &lock] (const rcl_time_jump_t & jump)
425+ {
426+ if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) {
427+ std::lock_guard<std::mutex> lk (*lock.mutex ());
428+ time_source_changed_ = true ;
429+ }
430+ cv_.notify_one ();
431+ };
432+
433+ // Note this is a trade-off. Adding the callback for every call
434+ // is expensive for high frequency calls. For low frequency waits
435+ // its more overhead to have the callback being called all the time.
436+ // As we expect the use case to be low frequency calls to wait_until
437+ // with relative big pauses between the calls, we install it on demand.
438+ auto clock_handler = clock_->create_jump_callback (
439+ nullptr ,
440+ post_time_jump_callback,
441+ threshold);
442+
443+ if (!clock_->ros_time_is_active ()) {
444+ auto system_time = std::chrono::system_clock::time_point (
445+ // Cast because system clock resolution is too big for nanoseconds on some systems
446+ std::chrono::duration_cast<std::chrono::system_clock::duration>(
447+ std::chrono::nanoseconds (abs_time.nanoseconds ())));
448+
449+ return cv_.wait_until (lock, system_time, [this , &pred] () {
450+ return time_source_changed_ || pred ();
451+ });
452+ }
453+
454+ // RCL_ROS_TIME with ros_time_is_active.
455+ // Just wait without "until" because installed
456+ // jump callbacks wake the cv on every new sample.
457+ cv_.wait (lock, [this , &pred, &abs_time] () {
458+ return clock_->now () >= abs_time || time_source_changed_ || pred ();
459+ });
460+
461+ return clock_->now () < abs_time;
462+ }
463+
464+ public:
465+ explicit ClockWaiterImpl (const rclcpp::Clock::SharedPtr & clock)
466+ :clock_(clock)
467+ {
468+ }
469+
470+ bool
471+ wait_until (
472+ std::unique_lock<std::mutex> & lock,
473+ const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
474+ {
475+ switch (clock_->get_clock_type ()) {
476+ case RCL_CLOCK_UNINITIALIZED:
477+ throw std::runtime_error (" Error, wait on uninitialized clock called" );
478+ case RCL_ROS_TIME:
479+ return wait_until_ros_time (lock, abs_time, pred);
480+ break ;
481+ case RCL_STEADY_TIME:
482+ return wait_until_steady_time (lock, abs_time, pred);
483+ break ;
484+ case RCL_SYSTEM_TIME:
485+ return wait_until_system_time (lock, abs_time, pred);
486+ break ;
487+ }
488+
489+ return false ;
490+ }
491+
492+ void
493+ notify_one ()
494+ {
495+ cv_.notify_one ();
496+ }
497+ };
498+
499+ ClockWaiter::ClockWaiter (const rclcpp::Clock::SharedPtr & clock)
500+ :impl_(std::make_unique<ClockWaiterImpl>(clock))
501+ {
502+ }
503+
504+ ClockWaiter::~ClockWaiter () = default ;
505+
506+ bool
507+ ClockWaiter::wait_until (
508+ std::unique_lock<std::mutex> & lock,
509+ const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
510+ {
511+ return impl_->wait_until (lock, abs_time, pred);
512+ }
513+
514+ void
515+ ClockWaiter::notify_one ()
516+ {
517+ impl_->notify_one ();
518+ }
519+
520+ class ClockConditionalVariable ::Impl
521+ {
522+ std::mutex pred_mutex_;
523+ bool shutdown_ = false ;
524+ rclcpp::Context::SharedPtr context_;
525+ rclcpp::OnShutdownCallbackHandle shutdown_cb_handle_;
526+ ClockWaiter::UniquePtr clock_;
527+
528+ public:
529+ Impl (const rclcpp::Clock::SharedPtr & clock, rclcpp::Context::SharedPtr context)
530+ :context_(context),
531+ clock_ (std::make_unique<ClockWaiter>(clock))
532+ {
533+ if (!context_ || !context_->is_valid ()) {
534+ throw std::runtime_error (" context cannot be slept with because it's invalid" );
535+ }
536+ // Wake this thread if the context is shutdown
537+ shutdown_cb_handle_ = context_->add_on_shutdown_callback (
538+ [this ]() {
539+ {
540+ std::unique_lock lock (pred_mutex_);
541+ shutdown_ = true ;
542+ }
543+ clock_->notify_one ();
544+ });
545+ }
546+
547+ ~Impl ()
548+ {
549+ context_->remove_on_shutdown_callback (shutdown_cb_handle_);
550+ }
551+
552+ bool
553+ wait_until (
554+ std::unique_lock<std::mutex> & lock, rclcpp::Time until,
555+ const std::function<bool ()> & pred)
556+ {
557+ if (lock.mutex () != &pred_mutex_) {
558+ throw std::runtime_error (
559+ " ClockConditionalVariable::wait_until: Internal error, given lock does not use"
560+ " mutex returned by this->mutex()" );
561+ }
562+
563+ clock_->wait_until (lock, until, [this , &pred] () -> bool {
564+ return shutdown_ || pred ();
565+ });
566+ return true ;
567+ }
568+
569+ void
570+ notify_one ()
571+ {
572+ clock_->notify_one ();
573+ }
574+
575+ std::mutex &
576+ mutex ()
577+ {
578+ return pred_mutex_;
579+ }
580+ };
581+
582+ ClockConditionalVariable::ClockConditionalVariable (
583+ const rclcpp::Clock::SharedPtr & clock,
584+ rclcpp::Context::SharedPtr context)
585+ :impl_(std::make_unique<Impl>(clock, context))
586+ {
587+ }
588+
589+ ClockConditionalVariable::~ClockConditionalVariable () = default ;
590+
591+ void
592+ ClockConditionalVariable::notify_one ()
593+ {
594+ impl_->notify_one ();
595+ }
596+
597+ bool
598+ ClockConditionalVariable::wait_until (
599+ std::unique_lock<std::mutex> & lock, rclcpp::Time until,
600+ const std::function<bool ()> & pred)
601+ {
602+ return impl_->wait_until (lock, until, pred);
603+ }
604+
605+ std::mutex &
606+ ClockConditionalVariable::mutex ()
607+ {
608+ return impl_->mutex ();
609+ }
610+
370611} // namespace rclcpp
0 commit comments