Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,32 @@ class NoRclcppTakeSubscriber : public agnocast::Node
agnocast::PollingSubscriber<agnocast_sample_interfaces::msg::DynamicSizeArray>::SharedPtr sub_;
agnocast::TimerBase::SharedPtr timer_;

public:
// Callback groups added for coverage testing
rclcpp::CallbackGroup::SharedPtr manual_group_;
rclcpp::CallbackGroup::SharedPtr late_auto_group_;

explicit NoRclcppTakeSubscriber() : agnocast::Node("no_rclcpp_take_subscriber")
{
sub_ = this->create_subscription<agnocast_sample_interfaces::msg::DynamicSizeArray>(
"/my_topic", rclcpp::QoS(rclcpp::KeepLast(1)));

timer_ = this->create_wall_timer(1s, std::bind(&NoRclcppTakeSubscriber::timer_callback, this));

// [Test Prep 1] Create a group with the auto-add flag set to false
manual_group_ =
this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false);

RCLCPP_INFO(get_logger(), "NoRclcppTakeSubscriber started");
}

void create_late_group()
{
// [Test Prep 2] Create this after the node has been added to the Executor
late_auto_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
}

private:
void timer_callback()
{
auto message = sub_->take_data();
Expand All @@ -19,25 +45,37 @@ class NoRclcppTakeSubscriber : public agnocast::Node
message->data.size());
}
}
};

// [Added] Test Executor for calling protected methods
class TestExecutor : public agnocast::AgnocastOnlyMultiThreadedExecutor
{
public:
explicit NoRclcppTakeSubscriber() : agnocast::Node("no_rclcpp_take_subscriber")
void trigger_add_groups()
{
sub_ = this->create_subscription<agnocast_sample_interfaces::msg::DynamicSizeArray>(
"/my_topic", rclcpp::QoS(rclcpp::KeepLast(1)));

timer_ = this->create_wall_timer(1s, std::bind(&NoRclcppTakeSubscriber::timer_callback, this));

RCLCPP_INFO(get_logger(), "NoRclcppTakeSubscriber started");
// Within a child class, we can call protected parent methods
this->add_callback_groups_from_nodes_associated_to_executor();
}
};

int main(int argc, char ** argv)
{
agnocast::init(argc, argv);
agnocast::AgnocastOnlySingleThreadedExecutor executor;

// Use the created test executor instead of the standard executor
TestExecutor executor;
auto node = std::make_shared<NoRclcppTakeSubscriber>();

// Existing path 3: Automatic addition via add_node
executor.add_node(node);

// Uncovered path 1: Direct call to add_callback_group
executor.add_callback_group(node->manual_group_, node->get_node_base_interface());

// Uncovered path 2: Calling via the test executor
node->create_late_group();
executor.trigger_add_groups();

executor.spin();
return 0;
}
6 changes: 4 additions & 2 deletions src/agnocastlib/include/agnocast/agnocast_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,11 @@ class BasicPublisher
agnocast::Node * node, const std::string & topic_name, const rclcpp::QoS & qos,
const PublisherOptions & options = PublisherOptions{})
{
constructor_impl(node, topic_name, qos, options, false);
const rclcpp::QoS actual_qos = constructor_impl(node, topic_name, qos, options, false);

// TODO: CARET tracepoint for agnocast::Node
TRACEPOINT(
agnocast_publisher_init, static_cast<const void *>(this), static_cast<const void *>(node),
topic_name_.c_str(), actual_qos.depth());
}

~BasicPublisher()
Expand Down
29 changes: 25 additions & 4 deletions src/agnocastlib/include/agnocast/agnocast_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ extern std::mutex mmap_mtx;

void map_read_only_area(const pid_t pid, const uint64_t shm_addr, const uint64_t shm_size);

// Create a dummy callback group for agnocast::Node tracepoint use.
// Defined in .cpp to avoid circular inclusion between agnocast_subscription.hpp and
// agnocast_node.hpp.
rclcpp::CallbackGroup::SharedPtr create_dummy_callback_group(agnocast::Node * node);

struct SubscriptionOptions
{
rclcpp::CallbackGroup::SharedPtr callback_group{nullptr};
Expand Down Expand Up @@ -168,10 +173,17 @@ class BasicSubscription : public SubscriptionBase
{
rclcpp::CallbackGroup::SharedPtr callback_group = get_valid_callback_group(node, options);

[[maybe_unused]] const rclcpp::QoS actual_qos =
const rclcpp::QoS actual_qos =
constructor_impl(node, qos, std::forward<Func>(callback), callback_group, options, false);

// TODO(atsushi421): CARET tracepoint for agnocast::Node
{
uint64_t pid_callback_info_id = (static_cast<uint64_t>(getpid()) << 32) | callback_info_id_;
TRACEPOINT(
agnocast_subscription_init, static_cast<const void *>(this),
static_cast<const void *>(node), static_cast<const void *>(&callback),
static_cast<const void *>(callback_group.get()), tracetools::get_symbol(callback),
topic_name_.c_str(), actual_qos.depth(), pid_callback_info_id);
}
}

~BasicSubscription()
Expand Down Expand Up @@ -251,9 +263,18 @@ class BasicTakeSubscription : public SubscriptionBase
agnocast::SubscriptionOptions options = agnocast::SubscriptionOptions())
: SubscriptionBase(node, topic_name)
{
constructor_impl(node, qos, options);
const rclcpp::QoS actual_qos = constructor_impl(node, qos, options);

// TODO(atsushi421): CARET tracepoint for agnocast::Node
{
auto dummy_cbg = create_dummy_callback_group(node);
auto dummy_cb = []() {};
std::string dummy_cb_symbols = "dummy_take" + topic_name;
TRACEPOINT(
agnocast_subscription_init, static_cast<const void *>(this),
static_cast<const void *>(node), static_cast<const void *>(&dummy_cb),
static_cast<const void *>(dummy_cbg.get()), dummy_cb_symbols.c_str(), topic_name_.c_str(),
actual_qos.depth(), 0);
}
}

agnocast::ipc_shared_ptr<const MessageT> take(bool allow_same_message = false)
Expand Down
2 changes: 1 addition & 1 deletion src/agnocastlib/include/agnocast/agnocast_timer_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ extern std::atomic<uint32_t> next_timer_id;

int create_timer_fd(uint32_t timer_id, std::chrono::nanoseconds period);

void handle_timer_event(TimerInfo & timer_info);
void handle_timer_event(TimerInfo & timer_info, uint64_t expirations);

uint32_t allocate_timer_id();

Expand Down
75 changes: 75 additions & 0 deletions src/agnocastlib/include/agnocast/agnocast_tracepoint_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,32 @@

#include <lttng/tracepoint.h>

TRACEPOINT_EVENT(
TRACEPOINT_PROVIDER,
agnocast_init,
TP_ARGS(
const void *, context_handle_arg
),
TP_FIELDS(
ctf_integer_hex(const void *, context_handle, context_handle_arg)
)
)

TRACEPOINT_EVENT(
TRACEPOINT_PROVIDER,
agnocast_node_init,
TP_ARGS(
const void *, node_handle_arg,
const char *, node_name_arg,
const char *, namespace_arg
),
TP_FIELDS(
ctf_integer_hex(const void *, node_handle, node_handle_arg)
ctf_string(node_name, node_name_arg)
ctf_string(namespace, namespace_arg)
)
)

TRACEPOINT_EVENT(
TRACEPOINT_PROVIDER,
agnocast_publisher_init,
Expand Down Expand Up @@ -58,6 +84,42 @@ TRACEPOINT_EVENT(
)
)

TRACEPOINT_EVENT(
TRACEPOINT_PROVIDER,
agnocast_timer_init,
TP_ARGS(
const void *, timer_handle_arg,
const void *, node_handle_arg,
const void *, callback_arg,
const void *, callback_group_arg,
const char *, symbol_arg,
int64_t, period_arg
),
TP_FIELDS(
ctf_integer_hex(const void *, timer_handle, timer_handle_arg)
ctf_integer_hex(const void *, node_handle, node_handle_arg)
ctf_integer_hex(const void *, callback, callback_arg)
ctf_integer_hex(const void *, callback_group, callback_group_arg)
ctf_string(symbol, symbol_arg)
ctf_integer(const int64_t, period, period_arg)
)
)

TRACEPOINT_EVENT(
TRACEPOINT_PROVIDER,
agnocast_add_callback_group,
TP_ARGS(
const void *, executor_addr_arg,
const void *, callback_group_addr_arg,
const char *, group_type_name_arg
),
TP_FIELDS(
ctf_integer_hex(const void *, executor_addr, executor_addr_arg)
ctf_integer_hex(const void *, callback_group_addr, callback_group_addr_arg)
ctf_string(group_type_name, group_type_name_arg)
)
)

TRACEPOINT_EVENT(
TRACEPOINT_PROVIDER,
agnocast_publish,
Expand Down Expand Up @@ -86,6 +148,19 @@ TRACEPOINT_EVENT(
)
)

TRACEPOINT_EVENT(
TRACEPOINT_PROVIDER,
agnocast_create_timer_callable,
TP_ARGS(
const void *, callable_arg,
const void *, timer_handle_arg
),
TP_FIELDS(
ctf_integer_hex(const void *, callable, callable_arg)
ctf_integer_hex(const void *, timer_handle, timer_handle_arg)
)
)

TRACEPOINT_EVENT(
TRACEPOINT_PROVIDER,
agnocast_callable_start,
Expand Down
30 changes: 30 additions & 0 deletions src/agnocastlib/include/agnocast/agnocast_tracepoint_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ extern "C" {

// clang-format off

DECLARE_TRACEPOINT(
agnocast_init,
const void * context_handle)

DECLARE_TRACEPOINT(
agnocast_node_init,
const void * node_handle,
const char * node_name,
const char * namespace_arg)

DECLARE_TRACEPOINT(
agnocast_publisher_init,
const void * publisher_handle,
Expand All @@ -30,6 +40,21 @@ DECLARE_TRACEPOINT(
const size_t queue_depth,
const uint64_t pid_callback_info_id)

DECLARE_TRACEPOINT(
agnocast_timer_init,
const void * timer_handle,
const void * node_handle,
const void * callback,
const void * callback_group,
const char * function_symbol,
int64_t period)

DECLARE_TRACEPOINT(
agnocast_add_callback_group,
const void * executor_addr,
const void * callback_group_addr,
const char * group_type_name)

DECLARE_TRACEPOINT(
agnocast_publish,
const void * publisher_handle,
Expand All @@ -41,6 +66,11 @@ DECLARE_TRACEPOINT(
const int64_t entry_id,
const uint64_t pid_callback_info_id)

DECLARE_TRACEPOINT(
agnocast_create_timer_callable,
const void * callable,
const void * timer_handle)

DECLARE_TRACEPOINT(
agnocast_callable_start,
const void * callable)
Expand Down
10 changes: 10 additions & 0 deletions src/agnocastlib/include/agnocast/node/agnocast_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ class Node

register_timer_info(timer_id, timer, period, group, timer->get_clock());

TRACEPOINT(
agnocast_timer_init, static_cast<const void *>(timer.get()), static_cast<const void *>(this),
static_cast<const void *>(&callback), static_cast<const void *>(group.get()),
tracetools::get_symbol(callback), period.count());

return timer;
}

Expand Down Expand Up @@ -382,6 +387,11 @@ class Node

register_timer_info(timer_id, timer, period_ns, group, clock);

TRACEPOINT(
agnocast_timer_init, static_cast<const void *>(timer.get()), static_cast<const void *>(this),
static_cast<const void *>(&callback), static_cast<const void *>(group.get()),
tracetools::get_symbol(callback), period_ns.count());

return timer;
}

Expand Down
28 changes: 25 additions & 3 deletions src/agnocastlib/src/agnocast_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "agnocast/agnocast.hpp"

#include <unistd.h>

namespace agnocast
{

Expand Down Expand Up @@ -60,9 +62,29 @@ bool wait_and_handle_epoll_event(
callback_group = timer_info->callback_group;
}

// Create a callable that handles the timer event
const std::shared_ptr<std::function<void()>> callable =
std::make_shared<std::function<void()>>([timer_info]() { handle_timer_event(*timer_info); });
auto timer_ptr = timer_info->timer.lock();

// Read the number of expirations to clear the event
uint64_t expirations = 0;
const ssize_t ret = read(timer_info->timer_fd, &expirations, sizeof(expirations));
if (ret == -1 || expirations == 0) {
return false;
}

auto callable = std::make_shared<std::function<void()>>();
void * callable_ptr = callable.get();

*callable = [timer_info, expirations, callable_ptr]() {
TRACEPOINT(agnocast_callable_start, callable_ptr);

handle_timer_event(*timer_info, expirations);

TRACEPOINT(agnocast_callable_end, callable_ptr);
};

TRACEPOINT(
agnocast_create_timer_callable, static_cast<const void *>(callable_ptr),
static_cast<const void *>(timer_ptr.get()));

{
std::lock_guard<std::mutex> ready_lock{ready_agnocast_executables_mutex};
Expand Down
7 changes: 7 additions & 0 deletions src/agnocastlib/src/agnocast_subscription.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "agnocast/agnocast.hpp"
#include "agnocast/node/agnocast_node.hpp"

namespace agnocast
{
Expand Down Expand Up @@ -80,4 +81,10 @@ void remove_mq(const std::pair<mqd_t, std::string> & mq_subscription)
}
}

rclcpp::CallbackGroup::SharedPtr create_dummy_callback_group(agnocast::Node * node)
{
return node->get_node_base_interface()->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive, false);
}

} // namespace agnocast
Loading
Loading