diff --git a/src/viam/sdk/robot/client.cpp b/src/viam/sdk/robot/client.cpp index 1c143a402..6e172ac24 100644 --- a/src/viam/sdk/robot/client.cpp +++ b/src/viam/sdk/robot/client.cpp @@ -129,11 +129,15 @@ RobotClient::~RobotClient() { void RobotClient::close() { should_refresh_.store(false); - for (const std::shared_ptr& t : threads_) { - t->~thread(); + + for (auto& thread : threads_) { + thread.join(); } + threads_.clear(); + stop_all(); - viam_channel_->close(); + + viam_channel_.close(); } bool is_error_response(const grpc::Status& response) { @@ -211,7 +215,7 @@ void RobotClient::refresh() { if (rs) { try { const std::shared_ptr rpc_client = - rs->create_rpc_client(name.name(), channel_); + rs->create_rpc_client(name.name(), viam_channel_.channel()); const Name name_({name.namespace_(), name.type(), name.subtype()}, "", name.name()); new_resources.emplace(name_, rpc_client); } catch (const std::exception& exc) { @@ -250,11 +254,10 @@ void RobotClient::refresh_every() { } }; -RobotClient::RobotClient(std::shared_ptr channel) - : channel_(channel->channel()), - viam_channel_(std::move(channel)), +RobotClient::RobotClient(ViamChannel channel) + : viam_channel_(std::move(channel)), should_close_channel_(false), - impl_(std::make_unique(RobotService::NewStub(channel_))) {} + impl_(std::make_unique(RobotService::NewStub(viam_channel_.channel()))) {} std::vector RobotClient::resource_names() const { const std::lock_guard lock(lock_); @@ -286,20 +289,14 @@ void RobotClient::log(const std::string& name, } } -std::shared_ptr RobotClient::with_channel(std::shared_ptr channel, +std::shared_ptr RobotClient::with_channel(ViamChannel channel, const Options& options) { - std::shared_ptr robot = std::make_shared(std::move(channel)); + auto robot = std::make_shared(std::move(channel)); robot->refresh_interval_ = options.refresh_interval(); robot->should_refresh_ = (robot->refresh_interval_ > 0); if (robot->should_refresh_) { - const std::shared_ptr t = - std::make_shared(&RobotClient::refresh_every, robot); - // TODO(RSDK-1743): this was leaking, confirm that adding thread catching in - // close/destructor lets us shutdown gracefully. See also address sanitizer, - // UB sanitizer - t->detach(); - robot->threads_.push_back(t); - }; + robot->threads_.emplace_back(&RobotClient::refresh_every, robot); + } robot->refresh(); return robot; @@ -308,8 +305,8 @@ std::shared_ptr RobotClient::with_channel(std::shared_ptr RobotClient::at_address(const std::string& address, const Options& options) { const char* uri = address.c_str(); - auto channel = ViamChannel::dial_initial(uri, options.dial_options()); - std::shared_ptr robot = RobotClient::with_channel(channel, options); + auto robot = + RobotClient::with_channel(ViamChannel::dial_initial(uri, options.dial_options()), options); robot->should_close_channel_ = true; return robot; @@ -318,11 +315,9 @@ std::shared_ptr RobotClient::at_address(const std::string& address, std::shared_ptr RobotClient::at_local_socket(const std::string& address, const Options& options) { const std::string addr = "unix://" + address; - const char* uri = addr.c_str(); - const std::shared_ptr channel = - sdk::impl::create_viam_channel(uri, grpc::InsecureChannelCredentials()); - auto viam_channel = std::make_shared(channel, address.c_str(), nullptr); - std::shared_ptr robot = RobotClient::with_channel(viam_channel, options); + auto robot = RobotClient::with_channel( + ViamChannel(sdk::impl::create_viam_channel(addr, grpc::InsecureChannelCredentials())), + options); robot->should_close_channel_ = true; return robot; diff --git a/src/viam/sdk/robot/client.hpp b/src/viam/sdk/robot/client.hpp index 8b00d4250..1c2e325d6 100644 --- a/src/viam/sdk/robot/client.hpp +++ b/src/viam/sdk/robot/client.hpp @@ -64,7 +64,10 @@ class RobotClient { friend bool operator==(const operation& lhs, const operation& rhs); }; + explicit RobotClient(ViamChannel channel); + ~RobotClient(); + void refresh(); void close(); @@ -87,10 +90,7 @@ class RobotClient { /// @param options Options for connecting and refreshing. /// Connects directly to a pre-existing channel. A robot created this way must be /// `close()`d manually. - static std::shared_ptr with_channel(std::shared_ptr channel, - const Options& options); - - RobotClient(std::shared_ptr channel); + static std::shared_ptr with_channel(ViamChannel channel, const Options& options); std::vector resource_names() const; @@ -165,13 +165,12 @@ class RobotClient { void refresh_every(); - std::vector> threads_; + std::vector threads_; std::atomic should_refresh_; unsigned int refresh_interval_; - std::shared_ptr channel_; - std::shared_ptr viam_channel_; + ViamChannel viam_channel_; bool should_close_channel_; struct impl; diff --git a/src/viam/sdk/rpc/dial.cpp b/src/viam/sdk/rpc/dial.cpp index 35a990363..7425019f0 100644 --- a/src/viam/sdk/rpc/dial.cpp +++ b/src/viam/sdk/rpc/dial.cpp @@ -18,19 +18,46 @@ namespace viam { namespace sdk { -const std::shared_ptr& ViamChannel::channel() const { - return channel_; -} +struct ViamChannel::impl { + impl(const char* path, void* runtime) : path(path), rust_runtime(runtime) {} -void ViamChannel::close() { - if (closed_) { - return; + impl(const impl&) = delete; + + impl(impl&& other) noexcept + : path(std::exchange(other.path, nullptr)), + rust_runtime(std::exchange(other.rust_runtime, nullptr)) {} + + impl& operator=(const impl&) = delete; + + impl& operator=(impl&& other) noexcept { + path = std::exchange(other.path, nullptr); + rust_runtime = std::exchange(other.rust_runtime, nullptr); + + return *this; } - closed_ = true; - free_string(path_); - free_rust_runtime(rust_runtime_); + + ~impl() { + free_string(path); + free_rust_runtime(rust_runtime); + } + + const char* path; + void* rust_runtime; }; +ViamChannel::ViamChannel(std::shared_ptr channel, const char* path, void* runtime) + : channel_(std::move(channel)), pimpl_(std::make_unique(path, runtime)) {} + +ViamChannel::ViamChannel(std::shared_ptr channel) : channel_(std::move(channel)) {} + +ViamChannel::ViamChannel(ViamChannel&&) noexcept = default; + +ViamChannel& ViamChannel::operator=(ViamChannel&&) noexcept = default; + +ViamChannel::~ViamChannel() { + close(); +} + const std::string& Credentials::type() const { return type_; } @@ -39,9 +66,6 @@ const std::string& Credentials::payload() const { return payload_; } -ViamChannel::ViamChannel(std::shared_ptr channel, const char* path, void* runtime) - : channel_(std::move(channel)), path_(path), closed_(false), rust_runtime_(runtime) {} - DialOptions::DialOptions() = default; DialOptions& DialOptions::set_credentials(boost::optional creds) { @@ -105,8 +129,8 @@ bool DialOptions::allows_insecure_downgrade() const { return allow_insecure_downgrade_; } -std::shared_ptr ViamChannel::dial_initial( - const char* uri, const boost::optional& options) { +ViamChannel ViamChannel::dial_initial(const char* uri, + const boost::optional& options) { DialOptions opts = options.get_value_or(DialOptions()); auto timeout = opts.timeout(); auto attempts_remaining = opts.initial_connection_attempts(); @@ -129,11 +153,10 @@ std::shared_ptr ViamChannel::dial_initial( } // the while loop will run until we either return or throw an error, so we can never reach this // point - BOOST_UNREACHABLE_RETURN(nullptr) + BOOST_UNREACHABLE_RETURN(ViamChannel(nullptr)) } -std::shared_ptr ViamChannel::dial(const char* uri, - const boost::optional& options) { +ViamChannel ViamChannel::dial(const char* uri, const boost::optional& options) { void* ptr = init_rust_runtime(); const DialOptions opts = options.get_value_or(DialOptions()); const std::chrono::duration float_timeout = opts.timeout(); @@ -157,12 +180,19 @@ std::shared_ptr ViamChannel::dial(const char* uri, std::string address("unix://"); address += socket_path; - const std::shared_ptr channel = - impl::create_viam_channel(address, grpc::InsecureChannelCredentials()); - const std::unique_ptr st = - viam::robot::v1::RobotService::NewStub(channel); - return std::make_shared(channel, socket_path, ptr); -}; + + return ViamChannel(sdk::impl::create_viam_channel(address, grpc::InsecureChannelCredentials()), + socket_path, + ptr); +} + +const std::shared_ptr& ViamChannel::channel() const { + return channel_; +} + +void ViamChannel::close() { + pimpl_.reset(); +} unsigned int Options::refresh_interval() const { return refresh_interval_; diff --git a/src/viam/sdk/rpc/dial.hpp b/src/viam/sdk/rpc/dial.hpp index 64cf4e5df..c05b28d08 100644 --- a/src/viam/sdk/rpc/dial.hpp +++ b/src/viam/sdk/rpc/dial.hpp @@ -13,18 +13,24 @@ namespace sdk { class DialOptions; class ViamChannel { - public: - void close(); ViamChannel(std::shared_ptr channel, const char* path, void* runtime); + public: + explicit ViamChannel(std::shared_ptr channel); + + ViamChannel(ViamChannel&&) noexcept; + + ViamChannel& operator=(ViamChannel&&) noexcept; + + ~ViamChannel(); + /// @brief Connects to a robot at the given URI address, using the provided dial options (or /// default options is none are provided). Ignores initial connection options specifying /// how many times to attempt to connect and with what timeout. /// In general, use of this method is discouraged. `RobotClient::at_address(...)` is the /// preferred method to connect to a robot, and creates the channel itself. /// @throws Exception if it is unable to establish a connection to the provided URI - static std::shared_ptr dial(const char* uri, - const boost::optional& options); + static ViamChannel dial(const char* uri, const boost::optional& options); // @brief Dials to a robot at the given URI address, using the provided dial options (or default // options is none are provided). Additionally specifies that this dial is an initial connection @@ -33,16 +39,18 @@ class ViamChannel { /// preferred method to connect to a robot, and creates the channel itself. /// @throws Exception if it is unable to establish a connection to the provided URI within /// the given number of initial connection attempts - static std::shared_ptr dial_initial(const char* uri, - const boost::optional& options); + static ViamChannel dial_initial(const char* uri, const boost::optional& options); const std::shared_ptr& channel() const; + void close(); + private: + struct impl; + std::shared_ptr channel_; - const char* path_; - bool closed_; - void* rust_runtime_; + + std::unique_ptr pimpl_; }; class Credentials { diff --git a/src/viam/sdk/tests/test_robot.cpp b/src/viam/sdk/tests/test_robot.cpp index 2018ed36b..81970673c 100644 --- a/src/viam/sdk/tests/test_robot.cpp +++ b/src/viam/sdk/tests/test_robot.cpp @@ -50,8 +50,7 @@ void robot_client_to_mocks_pipeline(F&& test_case) { // in-process gRPC channel. auto test_server = TestServer(server); auto grpc_channel = test_server.grpc_in_process_channel(); - auto viam_channel = std::make_shared(grpc_channel, "", nullptr); - auto client = RobotClient::with_channel(viam_channel, Options(0, boost::none)); + auto client = RobotClient::with_channel(ViamChannel(grpc_channel), Options(0, boost::none)); // Run the passed-in test case on the created stack and give access to the // created RobotClient and MockRobotService.