Skip to content
Merged
29 changes: 15 additions & 14 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,43 +350,44 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition
has_status(const rmw_event_type_t event_type) = 0;

void
notify_new_event()
notify_new_event(rmw_event_type_t event_type)
{
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);
if (new_event_cb_) {
new_event_cb_(user_data_, 1);
if (new_event_cb_[event_type]) {
new_event_cb_[event_type](user_data_[event_type], 1);
} else {
unread_events_count_++;
unread_events_count_[event_type]++;
}
}

void
set_new_event_callback(
rmw_event_type_t event_type,
rmw_event_callback_t callback,
const void * user_data)
{
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);

if (callback) {
// Push events arrived before setting the executor's callback
if (unread_events_count_ > 0) {
callback(user_data, unread_events_count_);
unread_events_count_ = 0;
if (unread_events_count_[event_type] > 0) {
callback(user_data, unread_events_count_[event_type]);
unread_events_count_[event_type] = 0;
}
user_data_ = user_data;
new_event_cb_ = callback;
user_data_[event_type] = user_data;
new_event_cb_[event_type] = callback;
} else {
user_data_ = nullptr;
new_event_cb_ = nullptr;
user_data_[event_type] = nullptr;
new_event_cb_[event_type] = nullptr;
}
}

protected:
DDS_StatusCondition * scond;
std::mutex new_event_mutex_;
rmw_event_callback_t new_event_cb_{nullptr};
const void * user_data_{nullptr};
uint64_t unread_events_count_ = 0;
rmw_event_callback_t new_event_cb_[RMW_EVENT_INVALID] = {};
const void * user_data_[RMW_EVENT_INVALID] = {};
uint64_t unread_events_count_[RMW_EVENT_INVALID] = {0};
Comment on lines +388 to +390
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of an enum value to define the size of an array is an unsual pattern which caught me by surprise.

It makes sense, but it's not commonly used, so I would add a comment to point it out.

Something like: "These arrays must be able to contain a value for each event kind. RMW_EVENT_INVALID is used because it is the last enumeration of rmw_event_type_t".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address the comment in #76

};

void
Expand Down
12 changes: 8 additions & 4 deletions rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline(
this->status_deadline.total_count_change = this->status_deadline.total_count;
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;

this->notify_new_event();
this->notify_new_event(RMW_EVENT_REQUESTED_DEADLINE_MISSED);
}

void
Expand All @@ -724,7 +724,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness(
this->status_liveliness.not_alive_count_change -=
this->status_liveliness_last.not_alive_count;

this->notify_new_event();
this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
}

void
Expand All @@ -736,6 +736,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_qos(

this->status_qos.total_count_change = this->status_qos.total_count;
this->status_qos.total_count_change -= this->status_qos_last.total_count;
this->notify_new_event(RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE);
}

void
Expand All @@ -748,6 +749,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost(
this->status_sample_lost.total_count_change = this->status_sample_lost.total_count;
this->status_sample_lost.total_count_change -=
this->status_sample_lost_last.total_count;
this->notify_new_event(RMW_EVENT_MESSAGE_LOST);
}

void
Expand Down Expand Up @@ -882,7 +884,7 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline(
this->status_deadline.total_count_change = this->status_deadline.total_count;
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;

this->notify_new_event();
this->notify_new_event(RMW_EVENT_OFFERED_DEADLINE_MISSED);
}

void
Expand All @@ -895,7 +897,7 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness(
this->status_liveliness.total_count_change = this->status_liveliness.total_count;
this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count;

this->notify_new_event();
this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
}

void
Expand All @@ -907,4 +909,6 @@ RMW_Connext_PublisherStatusCondition::update_status_qos(

this->status_qos.total_count_change = this->status_qos.total_count;
this->status_qos.total_count_change -= this->status_qos_last.total_count;

this->notify_new_event(RMW_EVENT_OFFERED_QOS_INCOMPATIBLE);
}
2 changes: 1 addition & 1 deletion rmw_connextdds_common/src/common/rmw_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ rmw_api_connextdds_event_set_callback(
} else {
condition = RMW_Connext_Event::publisher(event)->condition();
}
condition->set_new_event_callback(callback, user_data);
condition->set_new_event_callback(event->event_type, callback, user_data);
return RMW_RET_OK;
}

Expand Down