Skip to content

Commit 00e5943

Browse files
authored
🎮 Implemented node exec cmd client. (#7)
1 parent 1c8b97b commit 00e5943

30 files changed

+1094
-39
lines changed

include/ocvsmd/platform/bsd/kqueue_single_threaded_executor.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ namespace platform
4040
namespace bsd
4141
{
4242

43-
/// @brief Defines BSD Linux platform specific single-threaded executor based on `kqueue` mechanism.
43+
/// @brief Defines BSD Linux platform-specific single-threaded executor based on `kqueue` mechanism.
4444
///
4545
class KqueueSingleThreadedExecutor final : public libcyphal::platform::SingleThreadedExecutor,
4646
public IPosixExecutorExtension

include/ocvsmd/platform/defines.hpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ using SingleThreadedExecutor = bsd::KqueueSingleThreadedExecutor;
3131
using SingleThreadedExecutor = Linux::EpollSingleThreadedExecutor;
3232
#endif
3333

34+
/// Waits for the predicate to be fulfilled by spinning the executor and its awaitable resources.
35+
///
3436
template <typename Executor, typename Predicate>
3537
void waitPollingUntil(Executor& executor, Predicate predicate)
3638
{
@@ -55,13 +57,14 @@ void waitPollingUntil(Executor& executor, Predicate predicate)
5557
timeout = std::min(timeout, spin_result.next_exec_time.value() - executor.now());
5658
}
5759

58-
if (const auto maybe_poll_failure = executor.pollAwaitableResourcesFor(cetl::make_optional(timeout)))
60+
if (const auto poll_failure = executor.pollAwaitableResourcesFor(cetl::make_optional(timeout)))
5961
{
60-
spdlog::warn("Failed to poll awaitable resources.");
62+
(void) poll_failure;
63+
spdlog::warn("Failed to poll awaitable resources."); // TODO: Log the error.
6164
}
6265
}
6366

64-
spdlog::debug("Predicate is fulfilled (worst_lateness={}us).",
67+
spdlog::trace("Predicate is fulfilled (worst_lateness={}us).",
6568
std::chrono::duration_cast<std::chrono::microseconds>(worst_lateness).count());
6669
}
6770

include/ocvsmd/platform/linux/epoll_single_threaded_executor.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace platform
3838
namespace Linux
3939
{
4040

41-
/// @brief Defines Linux platform specific single-threaded executor based on `epoll` mechanism.
41+
/// @brief Defines Linux platform-specific single-threaded executor based on `epoll` mechanism.
4242
///
4343
class EpollSingleThreadedExecutor final : public libcyphal::platform::SingleThreadedExecutor,
4444
public IPosixExecutorExtension

include/ocvsmd/platform/posix_utils.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ namespace ocvsmd
1313
namespace platform
1414
{
1515

16+
/// Wraps a POSIX syscall and retries it if it was interrupted by a signal.
17+
///
1618
template <typename Call>
1719
int posixSyscallError(const Call& call)
1820
{

include/ocvsmd/sdk/daemon.hpp

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#ifndef OCVSMD_SDK_DAEMON_HPP_INCLUDED
77
#define OCVSMD_SDK_DAEMON_HPP_INCLUDED
88

9-
//#include "node_command_client.hpp"
9+
#include "node_command_client.hpp"
1010

1111
#include <cetl/cetl.hpp>
1212
#include <cetl/pf17/cetlpf.hpp>
@@ -25,20 +25,39 @@ namespace sdk
2525
class Daemon
2626
{
2727
public:
28+
/// Defines the shared pointer type for the factory.
29+
///
2830
using Ptr = std::shared_ptr<Daemon>;
2931

32+
/// Creates a new instance of the factory, and establishes a connection to the daemon.
33+
///
34+
/// @param memory The memory resource to use for the factory and its subcomponents.
35+
/// The memory resource must outlive the factory.
36+
/// In use for IPC (de)serialization only; other functionality uses usual c++ heap.
37+
/// @param executor The executor to use for the factory and its subcomponents.
38+
/// Instance of the executor must outlive the factory.
39+
/// Should support `IPosixExecutorExtension` interface (via `cetl::rtti`).
40+
/// @return Shared pointer to the successfully created factory.
41+
/// `nullptr` on failure (see logs for the reason of failure).
42+
///
3043
CETL_NODISCARD static Ptr make(cetl::pmr::memory_resource& memory,
3144
libcyphal::IExecutor& executor,
3245
const std::string& connection);
3346

47+
// No copy/move semantics.
3448
Daemon(Daemon&&) = delete;
3549
Daemon(const Daemon&) = delete;
3650
Daemon& operator=(Daemon&&) = delete;
3751
Daemon& operator=(const Daemon&) = delete;
3852

3953
virtual ~Daemon() = default;
4054

41-
// ➕ virtual NodeCommandClient::Ptr getNodeCommandClient() = 0;
55+
/// Gets a pointer to the shared entity which represents the Node Exec Command component of the OCVSMD engine.
56+
///
57+
/// @return Shared pointer to the client side of the Node Exec Command component.
58+
/// The component is always present in the OCVSMD engine, so the result is never `nullptr`.
59+
///
60+
virtual NodeCommandClient::Ptr getNodeCommandClient() const = 0;
4261

4362
protected:
4463
Daemon() = default;

include/ocvsmd/sdk/execution.hpp

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ namespace sdk
2424
namespace detail
2525
{
2626

27+
// Defines various internal types for `sync_wait` implementation.
28+
//
2729
template <typename Result>
2830
class StateOf;
29-
31+
//
3032
template <typename Result>
3133
class ReceiverOf final
3234
{
@@ -46,7 +48,7 @@ class ReceiverOf final
4648
std::shared_ptr<StateOf<Result>> state_;
4749

4850
}; // ReceiverOf
49-
51+
//
5052
template <typename Result>
5153
class StateOf final : public std::enable_shared_from_this<StateOf<Result>>
5254
{
@@ -78,13 +80,23 @@ class StateOf final : public std::enable_shared_from_this<StateOf<Result>>
7880

7981
/// Abstract interface of a result sender.
8082
///
83+
/// There is no (at least for now) support of async failures via C++ exceptions in the SDK.
84+
/// So, if an async sender might fail, then the `Result` subtype
85+
/// is expected to cover both success and failure cases (f.e. using `variant`).
86+
///
8187
template <typename Result_>
8288
class SenderOf
8389
{
8490
public:
85-
using Ptr = std::unique_ptr<SenderOf>;
91+
/// Defines the shared pointer type for the interface.
92+
///
93+
using Ptr = std::unique_ptr<SenderOf>;
94+
95+
/// Defines the result type of the sender.
96+
///
8697
using Result = Result_;
8798

99+
// No copy/move semantics.
88100
SenderOf(SenderOf&&) = delete;
89101
SenderOf(const SenderOf&) = delete;
90102
SenderOf& operator=(SenderOf&&) = delete;
@@ -94,7 +106,9 @@ class SenderOf
94106

95107
/// Initiates an operation execution by submitting a given receiver to this sender.
96108
///
97-
/// The submit "consumes" the receiver (no longer usable after this call).
109+
/// @tparam Receiver The type of the receiver functor. Should be callable with the result of the sender.
110+
/// @param receiver The receiver of the sender's result.
111+
/// Method "consumes" the receiver (no longer usable after this call).
98112
///
99113
template <typename Receiver>
100114
void submit(Receiver&& receiver)
@@ -108,13 +122,15 @@ class SenderOf
108122
protected:
109123
SenderOf() = default;
110124

125+
/// Implementation extension point for the derived classes.
126+
///
111127
virtual void submitImpl(std::function<void(Result&&)>&& receiver) = 0;
112128

113129
}; // SenderOf
114130

115131
/// Initiates an operation execution by submitting a given receiver to the sender.
116132
///
117-
/// The submit "consumes" the receiver (no longer usable after this call).
133+
/// Submit "consumes" the receiver (no longer usable after this call).
118134
///
119135
template <typename Sender, typename Receiver>
120136
void submit(Sender& sender, Receiver&& receiver)
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
//
2+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
// SPDX-License-Identifier: MIT
4+
//
5+
6+
#ifndef OCVSMD_SDK_NODE_COMMAND_CLIENT_HPP_INCLUDED
7+
#define OCVSMD_SDK_NODE_COMMAND_CLIENT_HPP_INCLUDED
8+
9+
#include "execution.hpp"
10+
11+
#include <uavcan/node/ExecuteCommand_1_3.hpp>
12+
13+
#include <cetl/pf17/cetlpf.hpp>
14+
#include <cetl/pf20/cetlpf.hpp>
15+
16+
#include <chrono>
17+
#include <cstdint>
18+
#include <memory>
19+
#include <unordered_map>
20+
21+
namespace ocvsmd
22+
{
23+
namespace sdk
24+
{
25+
26+
/// Defines client side interface of the OCVSMD Node Exec Command component.
27+
///
28+
class NodeCommandClient
29+
{
30+
public:
31+
/// Defines the shared pointer type for the interface.
32+
///
33+
using Ptr = std::shared_ptr<NodeCommandClient>;
34+
35+
NodeCommandClient(NodeCommandClient&&) = delete;
36+
NodeCommandClient(const NodeCommandClient&) = delete;
37+
NodeCommandClient& operator=(NodeCommandClient&&) = delete;
38+
NodeCommandClient& operator=(const NodeCommandClient&) = delete;
39+
40+
virtual ~NodeCommandClient() = default;
41+
42+
/// Defines the result type of the command execution.
43+
///
44+
/// On success, the result is a map of node IDs to their responses (`status` and `output` params).
45+
/// Missing Cyphal nodes (or failed to respond in a given timeout) are not included in the map.
46+
///
47+
struct Command final
48+
{
49+
using NodeRequest = uavcan::node::ExecuteCommand_1_3::Request;
50+
using NodeResponse = uavcan::node::ExecuteCommand_1_3::Response;
51+
52+
using Success = std::unordered_map<std::uint16_t, NodeResponse>;
53+
using Failure = int; // `errno`-like error code.
54+
using Result = cetl::variant<Success, Failure>;
55+
56+
}; // Command
57+
58+
/// Sends a Cyphal command to the specified Cyphal network nodes.
59+
///
60+
/// On the OCVSMD engine side, the `node_request` is sent concurrently to all specified Cyphal nodes.
61+
/// Responses are sent back to the client side as they arrive.
62+
/// Result will be available when the last response has arrived, or the timeout has expired.
63+
///
64+
/// @param node_ids The list of Cyphal node IDs to send the command to. Duplicates are ignored.
65+
/// @param node_request The Cyphal command request to send (aka broadcast) to the `node_ids`.
66+
/// @param timeout The maximum time to wait for all Cyphal node responses to arrive.
67+
/// @return An execution sender which emits the async overall result of the operation.
68+
///
69+
virtual SenderOf<Command::Result>::Ptr sendCommand(const cetl::span<const std::uint16_t> node_ids,
70+
const Command::NodeRequest& node_request,
71+
const std::chrono::microseconds timeout) = 0;
72+
73+
/// A convenience method for invoking `sendCommand` with COMMAND_RESTART.
74+
///
75+
/// @param node_ids The list of Cyphal node IDs to send the command to. Duplicates are ignored.
76+
/// @param timeout The maximum time to wait for all Cyphal node responses to arrive. Default is 1 second.
77+
/// @return An execution sender which emits the async result of the operation.
78+
///
79+
SenderOf<Command::Result>::Ptr restart( //
80+
const cetl::span<const std::uint16_t> node_ids,
81+
const std::chrono::microseconds timeout = std::chrono::seconds{1});
82+
83+
/// A convenience method for invoking `sendCommand` with COMMAND_BEGIN_SOFTWARE_UPDATE.
84+
///
85+
/// @param node_ids The list of Cyphal node IDs to send the command to. Duplicates are ignored.
86+
/// @param file_path The path to the software update file. Limited to 255 characters.
87+
/// Relative to one of the roots configured in the file server.
88+
/// @param timeout The maximum time to wait for all Cyphal node responses to arrive. Default is 1 second.
89+
/// @return An execution sender which emits the async result of the operation.
90+
///
91+
SenderOf<Command::Result>::Ptr beginSoftwareUpdate( //
92+
const cetl::span<const std::uint16_t> node_ids,
93+
const cetl::string_view file_path,
94+
const std::chrono::microseconds timeout = std::chrono::seconds{1});
95+
96+
protected:
97+
NodeCommandClient() = default;
98+
99+
virtual cetl::pmr::memory_resource& getMemoryResource() const noexcept = 0;
100+
101+
}; // NodeCommandClient
102+
103+
} // namespace sdk
104+
} // namespace ocvsmd
105+
106+
#endif // OCVSMD_SDK_NODE_COMMAND_CLIENT_HPP_INCLUDED

src/cli/main.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
#include <ocvsmd/platform/defines.hpp>
99
#include <ocvsmd/sdk/daemon.hpp>
1010
#include <ocvsmd/sdk/execution.hpp>
11-
//#include <ocvsmd/sdk/node_command_client.hpp>
11+
#include <ocvsmd/sdk/node_command_client.hpp>
1212

1313
#include <cetl/pf17/cetlpf.hpp>
1414

@@ -82,18 +82,19 @@ int main(const int argc, const char** const argv)
8282
std::cerr << "Failed to create daemon.";
8383
return EXIT_FAILURE;
8484
}
85-
/*
86-
// Demo of daemon's node command client, sending a command to node 42, 43 & 44.
85+
86+
#if 1 // NOLINT
87+
88+
// Demo of daemon's node command client - sending a command to node 42, 43 & 44.
8789
{
8890
using Command = ocvsmd::sdk::NodeCommandClient::Command;
8991

9092
auto node_cmd_client = daemon->getNodeCommandClient();
9193

92-
const std::vector<std::uint16_t> node_ids = {42};
94+
const std::vector<std::uint16_t> node_ids = {42, 43, 44};
9395
// auto sender = node_cmd_client->restart({node_ids.data(), node_ids.size()});
9496
auto sender = node_cmd_client->beginSoftwareUpdate({node_ids.data(), node_ids.size()}, "firmware.bin");
9597
auto cmd_result = ocvsmd::sdk::sync_wait<Command::Result>(executor, std::move(sender));
96-
9798
if (const auto* const err = cetl::get_if<Command::Failure>(&cmd_result))
9899
{
99100
spdlog::error("Failed to send command: {}", std::strerror(*err));
@@ -109,7 +110,8 @@ int main(const int argc, const char** const argv)
109110
}
110111
}
111112
}
112-
*/
113+
#endif
114+
113115
if (g_running == 0)
114116
{
115117
spdlog::debug("Received termination signal.");

src/common/ipc/channel.hpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,20 @@ class AnyChannel
5959

6060
}; // AnyChannel
6161

62+
/// Defines an abstract bidirectional communication channel.
63+
///
64+
/// Supports sending of messages (plural!) of type `Output`,
65+
/// and receiving messages of type `Input` via `EventHandler` callback.
66+
///
67+
/// Channel could be opened permanently (f.e. for receiving some kind of notifications, like status updates),
68+
/// but usually it represents one RPC session, which could be completed (finished) with an optional error code.
69+
/// Such completion normally done at server side (when RPC request has been fulfilled),
70+
/// but client could also complete the channel. Any unexpected IPC communication error (like f.e. sudden death of
71+
/// either client or server process) also leads to channel completion (with `ipc::ErrorCode::Disconnected` error).
72+
///
73+
/// Channel could be moved, but not copied.
74+
/// Channel lifetime is managed by its owner - an IPC service client or server.
75+
///
6276
template <typename Input_, typename Output_>
6377
class Channel final : public AnyChannel
6478
{
@@ -69,10 +83,12 @@ class Channel final : public AnyChannel
6983
using EventVar = cetl::variant<Connected, Input, Completed>;
7084
using EventHandler = std::function<void(const EventVar&)>;
7185

86+
// Move-only.
7287
~Channel() = default;
7388
Channel(Channel&& other) noexcept = default;
7489
Channel& operator=(Channel&& other) noexcept = default;
7590

91+
// No copy.
7692
Channel(const Channel&) = delete;
7793
Channel& operator=(const Channel&) = delete;
7894

@@ -89,7 +105,7 @@ class Channel final : public AnyChannel
89105
});
90106
}
91107

92-
void complete(const int error_code)
108+
void complete(const int error_code = 0)
93109
{
94110
return gateway_->complete(error_code);
95111
}

0 commit comments

Comments
 (0)