Skip to content

Commit 7cadae3

Browse files
author
Mauro Passerino
committed
IPC Subscription to have callback
Signed-off-by: Mauro Passerino <[email protected]>
1 parent 32aa3db commit 7cadae3

File tree

7 files changed

+53
-14
lines changed

7 files changed

+53
-14
lines changed

rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,10 @@ class EventsExecutorEntitiesCollector final
209209
unset_callback_group_entities_callbacks(rclcpp::CallbackGroup::SharedPtr group);
210210

211211
void
212-
set_guard_condition_callback(const rclcpp::GuardCondition * guard_condition);
212+
set_guard_condition_callback(rclcpp::GuardCondition * guard_condition);
213213

214214
void
215-
unset_guard_condition_callback(const rclcpp::GuardCondition * guard_condition);
215+
unset_guard_condition_callback(rclcpp::GuardCondition * guard_condition);
216216

217217
std::function<void(size_t)>
218218
create_entity_callback(void * exec_entity_id, ExecutorEventType type);
@@ -249,7 +249,7 @@ class EventsExecutorEntitiesCollector final
249249
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_;
250250

251251
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
252-
const rclcpp::GuardCondition *,
252+
rclcpp::GuardCondition *,
253253
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
254254
WeakNodesToGuardConditionsMap;
255255
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;

rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
148148
void
149149
trigger_guard_condition()
150150
{
151-
gc_.trigger();
151+
if (on_new_message_callback_) {
152+
on_new_message_callback_(1);
153+
} else {
154+
gc_.trigger();
155+
unread_count_++;
156+
}
152157
}
153158

154159
template<typename T>

rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
8484
protected:
8585
std::recursive_mutex reentrant_mutex_;
8686
rclcpp::GuardCondition gc_;
87+
std::function<void(size_t)> on_new_message_callback_{nullptr};
88+
size_t unread_count_{0};
8789

8890
private:
8991
virtual void

rclcpp/include/rclcpp/guard_condition.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,16 @@ class GuardCondition
100100
void
101101
add_to_wait_set(rcl_wait_set_t * wait_set) const;
102102

103+
RCLCPP_PUBLIC
104+
void
105+
set_on_trigger_callback(std::function<void(size_t, int)> callback);
106+
103107
protected:
104108
rclcpp::Context::SharedPtr context_;
105109
rcl_guard_condition_t rcl_guard_condition_;
106110
std::atomic<bool> in_use_by_wait_set_{false};
111+
std::function<void(size_t)> on_trigger_callback_{nullptr};
112+
size_t unread_count_{0};
107113
};
108114

109115
} // namespace rclcpp

rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ EventsExecutorEntitiesCollector::~EventsExecutorEntitiesCollector()
6767
for (const auto & pair : weak_nodes_to_guard_conditions_) {
6868
auto node = pair.first.lock();
6969
if (node) {
70-
auto node_gc = pair.second;
70+
auto & node_gc = pair.second;
7171
unset_guard_condition_callback(node_gc);
7272
}
7373
}
@@ -267,8 +267,8 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks(
267267
if (waitable) {
268268
weak_waitables_map_.emplace(waitable.get(), waitable);
269269

270-
// waitable->set_listener_callback(
271-
// create_entity_callback(waitable.get(), WAITABLE_EVENT));
270+
waitable->set_listener_callback(
271+
create_waitable_callback(waitable.get()));
272272
}
273273
return false;
274274
});
@@ -476,15 +476,16 @@ EventsExecutorEntitiesCollector::get_automatically_added_callback_groups_from_no
476476

477477
void
478478
EventsExecutorEntitiesCollector::set_guard_condition_callback(
479-
const rclcpp::GuardCondition * guard_condition)
479+
rclcpp::GuardCondition * guard_condition)
480480
{
481-
create_waitable_callback(this);
481+
guard_condition->set_on_trigger_callback(create_waitable_callback(this));
482482
}
483483

484484
void
485485
EventsExecutorEntitiesCollector::unset_guard_condition_callback(
486-
const rclcpp::GuardCondition * guard_condition)
486+
rclcpp::GuardCondition * guard_condition)
487487
{
488+
guard_condition->set_on_trigger_callback(nullptr);
488489
}
489490

490491
rclcpp::SubscriptionBase::SharedPtr

rclcpp/src/rclcpp/guard_condition.cpp

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include <functional>
16+
1517
#include "rclcpp/guard_condition.hpp"
1618

1719
#include "rclcpp/exceptions.hpp"
@@ -66,9 +68,14 @@ GuardCondition::get_rcl_guard_condition() const
6668
void
6769
GuardCondition::trigger()
6870
{
69-
rcl_ret_t ret = rcl_trigger_guard_condition(&rcl_guard_condition_);
70-
if (RCL_RET_OK != ret) {
71-
rclcpp::exceptions::throw_from_rcl_error(ret);
71+
if (on_trigger_callback_) {
72+
on_trigger_callback_(1);
73+
} else {
74+
rcl_ret_t ret = rcl_trigger_guard_condition(&rcl_guard_condition_);
75+
if (RCL_RET_OK != ret) {
76+
rclcpp::exceptions::throw_from_rcl_error(ret);
77+
}
78+
unread_count_++;
7279
}
7380
}
7481

@@ -87,4 +94,15 @@ GuardCondition::add_to_wait_set(rcl_wait_set_t * wait_set) const
8794
ret, "failed to add guard condition to wait set");
8895
}
8996
}
97+
98+
void
99+
GuardCondition::set_on_trigger_callback(std::function<void(size_t, int)> callback)
100+
{
101+
on_trigger_callback_ = std::bind(callback, std::placeholders::_1, -1);
102+
103+
if (unread_count_) {
104+
on_trigger_callback_(unread_count_);
105+
unread_count_ = 0;
106+
}
107+
}
90108
} // namespace rclcpp

rclcpp/src/rclcpp/subscription_intra_process_base.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include <functional>
16+
1517
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
1618

1719
using rclcpp::experimental::SubscriptionIntraProcessBase;
@@ -41,5 +43,10 @@ SubscriptionIntraProcessBase::get_actual_qos() const
4143
void
4244
SubscriptionIntraProcessBase::set_listener_callback(std::function<void(size_t, int)> callback)
4345
{
44-
(void)callback;
46+
on_new_message_callback_ = std::bind(callback, std::placeholders::_1, -1);
47+
48+
if (unread_count_ && on_new_message_callback_) {
49+
on_new_message_callback_(unread_count_);
50+
unread_count_ = 0;
51+
}
4552
}

0 commit comments

Comments
 (0)