Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/viam/sdk/module/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service {
auto new_parent_addr = request->parent_address();
if (parent.parent_addr_ != new_parent_addr) {
parent.parent_addr_ = std::move(new_parent_addr);
parent.parent_ = RobotClient::at_local_socket(parent.parent_addr_, {0, boost::none});
Options opts{0, boost::none};
opts.set_check_every_interval(std::chrono::seconds{5})
.set_reconnect_every_interval(std::chrono::seconds{1});
parent.parent_ = RobotClient::at_local_socket(parent.parent_addr_, opts);
parent.parent_->connect_logging();
}
response->set_ready(parent.module_->ready());
Expand Down
73 changes: 71 additions & 2 deletions src/viam/sdk/robot/client.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <exception>
#include <viam/sdk/robot/client.hpp>

#include <chrono>
Expand Down Expand Up @@ -144,11 +145,16 @@ RobotClient::~RobotClient() {

void RobotClient::close() {
should_refresh_.store(false);
should_check_connection_.store(false);

if (refresh_thread_.joinable()) {
refresh_thread_.join();
}

if (check_connection_thread_.joinable()) {
check_connection_thread_.join();
}

stop_all();

viam_channel_.close();
Expand Down Expand Up @@ -231,6 +237,62 @@ void RobotClient::refresh_every() {
}
};

void RobotClient::check_connection() {
auto check_every = check_every_interval_;
auto reconnect_every = reconnect_every_interval_;
if (check_every == std::chrono::seconds{0}) {
check_every = reconnect_every;
}
if (check_every == std::chrono::seconds{0} && reconnect_every == std::chrono::seconds{0}) {
should_check_connection_.store(false);
}
bool connected(true);
while (should_check_connection_) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lia-viam another ignorant C++ question, should_check_connection_ is set to false when we want to shutdown (makes sense!) but also above when check_every and reconnect_every are set to zero. In such a case this thread will return pretty much immediately, but the client code in such a case could be running for an indefinite amount of time. Are there any safety concerns here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless i'm missing something that should be fine--the thread would return and we would just have the same behavior as pre-this PR

std::exception_ptr connection_error;
std::string what;
for (int i = 0; i < 3; ++i) {
try {
std::this_thread::sleep_for(check_every);
impl::client_helper(impl_, &RobotService::Stub::ResourceNames).invoke([](auto&) {
return;
});
connected = true;
break;
} catch (const std::exception& e) {
connected = false;
connection_error = std::current_exception();
what = e.what();
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}
}
if (connected) {
continue;
}
const auto* uri = viam_channel_.get_channel_addr();
VIAM_SDK_LOG(error) << "Lost connection to machine at address " << uri << " with error "
<< what << ". Attempting to reconnect every " << reconnect_every.count()
<< "second(s)";

viam_channel_.close();

for (int i = 0; i < 3; ++i) {
try {
auto channel = ViamChannel::dial(uri, {});
impl_ =
std::make_unique<RobotClient::impl>(RobotService::NewStub(channel.channel()));
refresh();
connected = true;
} catch (const std::exception& e) {
viam_channel_.close();
std::this_thread::sleep_for(reconnect_every);
}
}
if (!connected) {
close();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lia-viam here's somewhere that your C++ expertise would be greatly appreciated! close seemed better than just exiting to ensure graceful shutdown (though we do ultimately want to exit if we've reached this point), but I'm not totally confident that calling close here, from within this thread, is safe behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue calling close from here is that joinable will return false because it checks thread.get_id() == this_thread::id(). we should probably be storing false in the should_check_connection_ thing if we establish that we've been disconnected, so that even if we're not calling join we can be sure that the thread will have exited

another thought would be to have a flag variable that says if we've been disconnected, and have impl::client_helper check that the same way it always checks if impl is non-null. That might exit with an exception throw, but I'm not sure if we're too pressed about "graceful" shutdown in the highly ungraceful case of the process being orphaned by a kill -9

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also maybe cc @acmorrow

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably be storing false in the should_check_connection_ thing if we establish that we've been disconnected, so that even if we're not calling join we can be sure that the thread will have exited

I'm not sure I entirely understand this, but I'm a little concerned about it. We'd need to at some point reset the value to true (or else we stop checking connection after the first failure), but I could imagine a case where someone triggers a close (which sets should_check_connection_ to false) and then immediately after, this loop resets it to true and so we just continue this loop forever and fail to shutdown properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, the python SDK at this point just calls sys.exit(), so if we're not too concerned about gracefulness (and this is a case where the module is an orphan process anyway and nothing it logs or does will be trivially inspectable), we could just exit(0) and call it a day.

Copy link
Collaborator

@lia-viam lia-viam Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per offline discussion--this is actually fine bc when the check_connection thread calls close it will not join itself, but the later destructor call will. for transparency it may be good to add break; after close(), but also close() sets the while loop flag variable to false so the function will return immediately after close() returns

}
}
}

RobotClient::RobotClient(ViamChannel channel)
: viam_channel_(std::move(channel)),
impl_(std::make_unique<impl>(RobotService::NewStub(viam_channel_.channel()))) {}
Expand Down Expand Up @@ -262,8 +324,8 @@ void RobotClient::log(const std::string& name,
ClientContext ctx;
const auto response = impl_->stub->Log(ctx, req, &resp);
if (is_error_response(response)) {
// Manually override to force this to get logged to console so we don't set off an infinite
// loop
// Manually override to force this to get logged to console so we don't set off an
// infinite loop
VIAM_SDK_LOG(error) << boost::log::add_value(sdk::impl::attr_console_force_type{}, true)
<< "Error sending log message over grpc: " << response.error_message()
<< response.error_details();
Expand All @@ -279,6 +341,13 @@ std::shared_ptr<RobotClient> RobotClient::with_channel(ViamChannel channel,
robot->refresh_thread_ = std::thread{&RobotClient::refresh_every, robot.get()};
}

robot->should_check_connection_ = true;

robot->check_every_interval_ = options.check_every_interval();
robot->reconnect_every_interval_ = options.reconnect_every_interval();

robot->check_connection_thread_ = std::thread{&RobotClient::check_connection, robot.get()};

robot->refresh();
return robot;
};
Expand Down
5 changes: 5 additions & 0 deletions src/viam/sdk/robot/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,15 @@ class RobotClient {
void connect_logging();

void refresh_every();
void check_connection();

std::thread refresh_thread_;
std::thread check_connection_thread_;
std::atomic<bool> should_refresh_;
std::atomic<bool> should_check_connection_;
std::chrono::seconds refresh_interval_;
std::chrono::seconds check_every_interval_;
std::chrono::seconds reconnect_every_interval_;

ViamChannel viam_channel_;

Expand Down
31 changes: 27 additions & 4 deletions src/viam/sdk/rpc/dial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ ViamChannel ViamChannel::dial(const char* uri, const boost::optional<DialOptions
}
address += proxy_path;

return ViamChannel(sdk::impl::create_viam_channel(address, grpc::InsecureChannelCredentials()),
proxy_path,
ptr);
auto chan =
ViamChannel(sdk::impl::create_viam_channel(address, grpc::InsecureChannelCredentials()),
proxy_path,
ptr);
chan.uri_ = uri;
return chan;
}

const std::shared_ptr<grpc::Channel>& ViamChannel::channel() const {
Expand All @@ -200,7 +203,27 @@ void ViamChannel::close() {
pimpl_.reset();
}

unsigned int Options::refresh_interval() const {
const char* ViamChannel::get_channel_addr() const {
return uri_;
}
Options& Options::set_check_every_interval(std::chrono::seconds interval) {
check_every_interval_ = interval;
return *this;
}
Options& Options::set_reconnect_every_interval(std::chrono::seconds interval) {
reconnect_every_interval_ = interval;
return *this;
}

std::chrono::seconds Options::check_every_interval() const {
return check_every_interval_;
}

std::chrono::seconds Options::reconnect_every_interval() const {
return reconnect_every_interval_;
}

std::chrono::seconds Options::refresh_interval() const {
return refresh_interval_;
}

Expand Down
27 changes: 25 additions & 2 deletions src/viam/sdk/rpc/dial.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ class ViamChannel {

void close();

const char* get_channel_addr() const;

private:
const char* uri_;
struct impl;

std::shared_ptr<GrpcChannel> channel_;
Expand Down Expand Up @@ -116,13 +119,33 @@ class Options {
Options(unsigned int refresh_interval, boost::optional<DialOptions> dial_options)
: refresh_interval_(std::move(refresh_interval)), dial_options_(std::move(dial_options)) {}

unsigned int refresh_interval() const;
std::chrono::seconds refresh_interval() const;
std::chrono::seconds check_every_interval() const;
std::chrono::seconds reconnect_every_interval() const;

/// @brief Sets the frequency (in seconds) to verify connectivity
Options& set_check_every_interval(std::chrono::seconds interval);

/// @brief Sets the frequency (in seconds) to attempt to reconnect when connectivity is lost
Options& set_reconnect_every_interval(std::chrono::seconds interval);
const boost::optional<DialOptions>& dial_options() const;

private:
/// @brief How often to refresh the status/parts of the robot, in seconds. If set to 0, the
/// robot will not automatically refresh.
unsigned int refresh_interval_;
std::chrono::seconds refresh_interval_{0};

/// @brief How often to verify connectivity to the robot, in seconds. If set to 0, will not
/// check, will default to the `reconnect_every_interval_` value. Defaults to 0.
/// @note Setting to a non-zero value is useful in modules but may result in delays shutting
/// down client code
std::chrono::seconds check_every_interval_{0};

/// @brief How often to attempt to reconnect to the robot when disconnected. If set to 0,
/// will not attempt to reconnect. Defaults to 0.
/// @note Setting to a non-zero value is useful in modules but may result in delays shutting
/// down client code
std::chrono::seconds reconnect_every_interval_{0};
boost::optional<DialOptions> dial_options_;
};

Expand Down