Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions nexus_capabilities/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ if(BUILD_TESTING)
# MAX_LINE_LENGTH 80
# LANGUAGE CPP
#)

ament_add_catch2(context_test src/context_test.cpp)
target_link_libraries(context_test PRIVATE
${PROJECT_NAME}
nexus_common::nexus_common
nexus_common::nexus_common_test
rmf_utils::rmf_utils
)
endif()

include(GNUInstallDirs)
Expand Down
181 changes: 177 additions & 4 deletions nexus_capabilities/include/nexus_capabilities/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
//==============================================================================
namespace nexus {

class Context
class Context : std::enable_shared_from_this<Context>
{
private: struct Private{ explicit Private() = default; };
public: using GoalHandlePtr =
std::shared_ptr<rclcpp_action::ServerGoalHandle<endpoints::WorkcellRequestAction::ActionType>>;
public: using TaskState = nexus_orchestrator_msgs::msg::TaskState;
Expand All @@ -53,11 +54,183 @@ public: Task task;
public: std::vector<std::string> errors;
public: std::unique_ptr<common::BtLogging> bt_logging;
public: std::unordered_set<std::string> signals;
public: TaskState task_state;
private: uint8_t task_status = TaskState::STATUS_NONE;
public: uint64_t tick_count = 0;
// Used to keep track of the last time the state was changed
public: rclcpp::Time state_transition_time;

public: Context(Private, rclcpp_lifecycle::LifecycleNode& node)
: node(node), state_transition_time(node.now()) {}
public: static std::shared_ptr<Context> make(rclcpp_lifecycle::LifecycleNode& node) {
Private pvt;
return std::make_shared<Context>(pvt, node);
}

public: std::shared_ptr<Context> get_ptr() {
return shared_from_this();
}

public: void set_task_status(uint8_t status)
{
this->task_status = status;
this->state_transition_time = node.now();
}

public: TaskState get_task_state() const
{
TaskState task_state;
task_state.task_id = this->task.task_id;
task_state.workcell_id = this->node.get_name();
task_state.status = this->task_status;
return task_state;
}

// bool is C1 < C2, smallest gets on top
public: static bool compare(const std::shared_ptr<Context> c1, const std::shared_ptr<Context> c2) {
using TaskState = nexus_orchestrator_msgs::msg::TaskState;
const auto s1 = c1->task_status;
const auto s2 = c2->task_status;
if (s1 == s2)
{
// For equal task state, prioritize earlier transition time to make sure
// tasks that started earlier are first in the queue
return c1->state_transition_time < c2->state_transition_time;
}
// An executing task will always have priority over a non executing task
if (s1 == TaskState::STATUS_RUNNING)
{
return true;
}
if (s1 == TaskState::STATUS_QUEUED)
{
if (s2 == TaskState::STATUS_RUNNING)
{
// Prioritize the running task
return false;
}
// Prioritize queued task
return true;
}
if (s1 == TaskState::STATUS_FINISHED || s1 == TaskState::STATUS_FAILED)
{
if (s2 == TaskState::STATUS_FINISHED || s2 == TaskState::STATUS_FAILED)
{
// Sort by time
return c1->state_transition_time < c2->state_transition_time;
}
// Push to end of queue
return false;
}
if (s1 == TaskState::STATUS_ASSIGNED)
{
if (s2 == TaskState::STATUS_RUNNING || s2 == TaskState::STATUS_QUEUED)
{
// Prioritize tasks that have been requested or are running already
return false;
}
return true;
}
// Undefined, just sort in order of transition time
return c1->state_transition_time < c2->state_transition_time;
}
};

class ContextSet {
private:
std::vector<std::shared_ptr<Context>> set;
// TODO(luca) consider map of task_id to iterator + change the above to a list

void reorder() {
std::sort(set.begin(), set.end(), [](const std::shared_ptr<Context>& ctx1, const std::shared_ptr<Context>& ctx2) {
return Context::compare(ctx1, ctx2);
});
}

public:
ContextSet() = default;

std::optional<std::shared_ptr<const Context>> get_at(const std::size_t n) const {
if (n >= this->size()) {
return std::nullopt;
}
return set[n];
}

std::optional<std::shared_ptr<Context>> get_task_id(const std::string& task_id) const {
for (auto it = set.begin(); it != set.end(); ++it) {
if ((*it)->task.task_id == task_id) {
return *it;
}
}
return std::nullopt;
}

bool modify_at(const std::size_t n, std::function<void(Context&)> modifier) {
if (n >= this->size()) {
return false;
}
modifier(*set[n]);
this->reorder();
return true;
}

bool modify_task_id(const std::string& task_id, std::function<void(Context&)> modifier) {
for (auto it = set.begin(); it != set.end(); ++it) {
if ((*it)->task.task_id == task_id) {
modifier(**it);
this->reorder();
return true;
}
}
return false;
}

std::size_t size() const {
return set.size();
}

bool remove_at(const std::size_t n) {
if (n >= this->size()) {
return false;
}
set.erase(set.begin() + n);
return true;
}

bool remove(const std::shared_ptr<Context>& ptr) {
for (auto it = set.begin(); it != set.end(); ++it) {
if ((*it) == ptr) {
set.erase(it);
return true;
}
}
return false;
}

bool remove_task_id(const std::string& task_id) {
for (auto it = set.begin(); it != set.end(); ++it) {
if ((*it)->task.task_id == task_id) {
set.erase(it);
return true;
}
}
return false;
}

bool insert(const std::shared_ptr<Context> ptr) {
const auto it = get_task_id(ptr->task.task_id);
if (it != std::nullopt) {
return false;
}
set.emplace_back(std::move(ptr));
this->reorder();
return true;
}

void clear() {
set.clear();
}

public: Context(rclcpp_lifecycle::LifecycleNode& node)
: node(node) {}
};

} // namespace nexus
Expand Down
85 changes: 85 additions & 0 deletions nexus_capabilities/src/context_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#define CATCH_CONFIG_MAIN
#include <rmf_utils/catch.hpp>

#include <chrono>
#include <thread>
#include "nexus_capabilities/context.hpp"

#include "nexus_common_test/test_utils.hpp"

namespace nexus::capabilities::test {

using TaskState = nexus_orchestrator_msgs::msg::TaskState;

bool check_order(const ContextSet& set, const std::vector<std::string> expected_tasks) {
REQUIRE(set.size() == expected_tasks.size());
std::cout << "BEGIN CHECK" << std::endl;
for (std::size_t i = 0; i < set.size(); ++i) {
const auto it_opt = set.get_at(i);
REQUIRE(it_opt != std::nullopt);
std::cout << (*it_opt)->task.task_id << " " << expected_tasks[i] << std::endl;
if ((*it_opt)->task.task_id != expected_tasks[i]) {
return false;
}
}
return true;
}

TEST_CASE("test_context_ordering") {
ContextSet contexts;
nexus::common::test::RosFixture<rclcpp_lifecycle::LifecycleNode> uut_fixture;
auto& uut_node = *uut_fixture.node;

// We don't really need the node so we can reuse it
auto ctx1 = Context::make(uut_node);
auto ctx2 = Context::make(uut_node);

ctx1->task.task_id = "task_1";
ctx2->task.task_id = "task_2";

// Both tasks are added in a sequence, expect the first to be on top
contexts.insert(ctx1);
contexts.insert(ctx2);

REQUIRE(contexts.size() == 2);
CHECK(check_order(contexts, {"task_1", "task_2"}));

// Queue task_2, it should now be on top
contexts.modify_task_id("task_2", [](Context& ctx) {
ctx.set_task_status(TaskState::STATUS_QUEUED);
});
CHECK(check_order(contexts, {"task_2", "task_1"}));

// Now execute task_1, then add a new task afterwards, we should get task_1, task_2, task_3
contexts.modify_task_id("task_1", [](Context& ctx) {
ctx.set_task_status(TaskState::STATUS_RUNNING);
});
auto ctx3 = Context::make(uut_node);
ctx3->task.task_id = "task_3";
contexts.insert(ctx3);
CHECK(check_order(contexts, {"task_1", "task_2", "task_3"}));
// If the new task gets queued, nothing changes because one is running already and another
// one was queued before
contexts.modify_task_id("task_3", [](Context& ctx) {
ctx.set_task_status(TaskState::STATUS_QUEUED);
});
CHECK(check_order(contexts, {"task_1", "task_2", "task_3"}));
// If the task that was queued is set to running, it will still stay in its place because we
// can't pre-empt currently running tasks
contexts.modify_task_id("task_2", [](Context& ctx) {
ctx.set_task_status(TaskState::STATUS_RUNNING);
});
CHECK(check_order(contexts, {"task_1", "task_2", "task_3"}));
// Once the task finishes or fails, it gets pushed to the end of the queue
contexts.modify_task_id("task_1", [](Context& ctx) {
ctx.set_task_status(TaskState::STATUS_FINISHED);
});
CHECK(check_order(contexts, {"task_2", "task_3", "task_1"}));
// Unnecessary but for multiple finished / failed the earliest updated is more in front of the queue
contexts.modify_at(0, [](Context& ctx) {
ctx.set_task_status(TaskState::STATUS_FAILED);
});
CHECK(check_order(contexts, {"task_3", "task_1", "task_2"}));
}

}
2 changes: 1 addition & 1 deletion nexus_workcell_orchestrator/src/task_results_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace nexus::workcell_orchestrator::test {
TEST_CASE("get and set results") {
auto fixture = common::test::RosFixture<rclcpp_lifecycle::LifecycleNode>{};
auto ctx_mgr = std::make_shared<ContextManager>();
auto ctx = std::make_shared<Context>(Context{*fixture.node});
auto ctx = Context::make(*fixture.node);
ctx_mgr->set_active_context(ctx);
BT::BehaviorTreeFactory bt_factory;

Expand Down
Loading