Skip to content

Commit 431a3a9

Browse files
author
Alberto Soragna
committed
shortcircuit events executor callback invocation in rclcpp for ipc subscriptions
1 parent 53aa9fd commit 431a3a9

File tree

9 files changed

+48
-22
lines changed

9 files changed

+48
-22
lines changed

rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class EventsExecutorNotifyWaitable final : public EventWaitable
5959
void
6060
set_events_executor_callback(
6161
const rclcpp::executors::EventsExecutor * executor,
62-
rmw_listener_cb_t executor_callback) const override
62+
rmw_listener_cb_t executor_callback) override
6363
{
6464
for (auto gc : notify_guard_conditions_) {
6565
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(

rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,14 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
101101
return size_ == capacity_;
102102
}
103103

104-
void clear() {}
104+
void clear()
105+
{
106+
std::lock_guard<std::mutex> lock(mutex_);
107+
ring_buffer_ = std::vector<BufferT>(capacity_);
108+
write_index_ = capacity_ - 1;
109+
read_index_ = 0;
110+
size_ = 0;
111+
}
105112

106113
private:
107114
size_t capacity_;

rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,25 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
118118
provide_intra_process_message(ConstMessageSharedPtr message)
119119
{
120120
buffer_->add_shared(std::move(message));
121-
trigger_guard_condition();
121+
std::lock_guard<std::mutex> lock(executor_callback_mutex_);
122+
if (executor_callback_) {
123+
executor_callback_(executor_, {this, WAITABLE_EVENT});
124+
} else {
125+
trigger_guard_condition();
126+
}
122127
}
123128

124129
void
125130
provide_intra_process_message(MessageUniquePtr message)
126131
{
127132
buffer_->add_unique(std::move(message));
128-
trigger_guard_condition();
133+
134+
std::lock_guard<std::mutex> lock(executor_callback_mutex_);
135+
if (executor_callback_) {
136+
executor_callback_(executor_, {this, WAITABLE_EVENT});
137+
} else {
138+
trigger_guard_condition();
139+
}
129140
}
130141

131142
bool
@@ -134,6 +145,18 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
134145
return buffer_->use_take_shared_method();
135146
}
136147

148+
void
149+
set_events_executor_callback(
150+
const rclcpp::executors::EventsExecutor * executor,
151+
rmw_listener_cb_t executor_callback) override
152+
{
153+
std::lock_guard<std::mutex> lock(executor_callback_mutex_);
154+
executor_ = executor;
155+
executor_callback_ = executor_callback;
156+
// Buffer must be cleared under the executor callback lock to make sure that other threads wait for this
157+
buffer_->clear();
158+
}
159+
137160
private:
138161
void
139162
trigger_guard_condition()

rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,16 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
7474
void
7575
set_events_executor_callback(
7676
const rclcpp::executors::EventsExecutor * executor,
77-
rmw_listener_cb_t executor_callback) const override;
77+
rmw_listener_cb_t executor_callback) override;
7878

7979
protected:
8080
std::recursive_mutex reentrant_mutex_;
8181
rcl_guard_condition_t gc_;
8282

83+
const rclcpp::executors::EventsExecutor * executor_;
84+
rmw_listener_cb_t executor_callback_ = nullptr;
85+
std::mutex executor_callback_mutex_;
86+
8387
private:
8488
virtual void
8589
trigger_guard_condition() = 0;

rclcpp/include/rclcpp/qos_event.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class QOSEventHandlerBase : public Waitable
107107
void
108108
set_events_executor_callback(
109109
const rclcpp::executors::EventsExecutor * executor,
110-
rmw_listener_cb_t executor_callback) const override;
110+
rmw_listener_cb_t executor_callback) override;
111111

112112
protected:
113113
rcl_event_t event_handle_;

rclcpp/include/rclcpp/waitable.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ class Waitable
175175
void
176176
set_events_executor_callback(
177177
const rclcpp::executors::EventsExecutor * executor,
178-
rmw_listener_cb_t executor_callback) const;
178+
rmw_listener_cb_t executor_callback);
179179

180180
private:
181181
std::atomic<bool> in_use_by_wait_set_{false};

rclcpp/src/rclcpp/qos_event.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set)
7171
void
7272
QOSEventHandlerBase::set_events_executor_callback(
7373
const rclcpp::executors::EventsExecutor * executor,
74-
rmw_listener_cb_t executor_callback) const
74+
rmw_listener_cb_t executor_callback)
7575
{
7676
rcl_ret_t ret = rcl_event_set_listener_callback(
7777
&event_handle_,

rclcpp/src/rclcpp/subscription_intra_process_base.cpp

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,12 @@ SubscriptionIntraProcessBase::get_actual_qos() const
3737
return qos_profile_;
3838
}
3939

40-
4140
void
4241
SubscriptionIntraProcessBase::set_events_executor_callback(
4342
const rclcpp::executors::EventsExecutor * executor,
44-
rmw_listener_cb_t executor_callback) const
43+
rmw_listener_cb_t executor_callback)
4544
{
46-
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
47-
&gc_,
48-
executor_callback,
49-
executor,
50-
this,
51-
true /*Use previous events*/);
52-
53-
if (RCL_RET_OK != ret) {
54-
throw std::runtime_error("Couldn't set guard condition callback");
55-
}
56-
}
45+
(void)executor;
46+
(void)executor_callback;
47+
assert(0);
48+
}

rclcpp/src/rclcpp/waitable.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ Waitable::exchange_in_use_by_wait_set_state(bool in_use_state)
6161
void
6262
Waitable::set_events_executor_callback(
6363
const rclcpp::executors::EventsExecutor * executor,
64-
rmw_listener_cb_t executor_callback) const
64+
rmw_listener_cb_t executor_callback)
6565
{
6666
(void)executor;
6767
(void)executor_callback;

0 commit comments

Comments
 (0)