Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/viam/sdk/module/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service {
auto new_parent_addr = request->parent_address();
if (parent.parent_addr_ != new_parent_addr) {
parent.parent_addr_ = std::move(new_parent_addr);
parent.parent_ = RobotClient::at_local_socket(parent.parent_addr_, {0, boost::none});
Options opts{0, boost::none};
opts.set_check_every_interval(5).set_reconnect_every_interval(1);
parent.parent_ = RobotClient::at_local_socket(parent.parent_addr_, opts);
parent.parent_->connect_logging();
}
response->set_ready(parent.module_->ready());
Expand Down
72 changes: 70 additions & 2 deletions src/viam/sdk/robot/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,16 @@ RobotClient::~RobotClient() {

void RobotClient::close() {
should_refresh_.store(false);
should_check_connection_.store(false);

if (refresh_thread_.joinable()) {
refresh_thread_.join();
}

if (check_connection_thread_.joinable()) {
check_connection_thread_.join();
}

stop_all();

viam_channel_.close();
Expand Down Expand Up @@ -231,6 +236,62 @@ void RobotClient::refresh_every() {
}
};

void RobotClient::check_connection() {
unsigned int check_every = check_every_interval_;
unsigned int reconnect_every = reconnect_every_interval_;
if (check_every == 0) {
check_every = reconnect_every;
}
if (check_every == 0 && reconnect_every == 0) {
should_check_connection_.store(false);
}
bool connected(true);
while (should_check_connection_) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lia-viam another ignorant C++ question, should_check_connection_ is set to false when we want to shutdown (makes sense!) but also above when check_every and reconnect_every are set to zero. In such a case this thread will return pretty much immediately, but the client code in such a case could be running for an indefinite amount of time. Are there any safety concerns here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless i'm missing something that should be fine--the thread would return and we would just have the same behavior as pre-this PR

std::exception connection_error;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll want to do this with std::exception_ptr otherwise the polymorphic exception object could get sliced when it's copied into std::exception

for (int i = 0; i < 3; ++i) {
try {
std::this_thread::sleep_for(std::chrono::seconds{check_every});
impl::client_helper(impl_, &RobotService::Stub::ResourceNames).invoke([](auto&) {
return;
});
connected = true;
break;
} catch (const std::exception& e) {
connected = false;
connection_error = e;
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}
}
if (connected) {
continue;
}
const auto* uri = viam_channel_.get_channel_addr();
VIAM_SDK_LOG(error) << "Lost connection to machine at address " << uri << " with error "
<< connection_error.what() << ". Attempting to reconnect to every "
<< reconnect_every << "second(s)";
viam_channel_.close();

for (int i = 0; i < 3; ++i) {
try {
auto channel = ViamChannel::dial(uri, {});
auto impl =
std::make_unique<RobotClient::impl>(RobotService::NewStub(channel.channel()));
impl_.reset();
impl_.swap(impl);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can all just be impl_ = std::make_unique<...>(...)

refresh();
connected = true;
} catch (const std::exception& e) {
viam_channel_.close();
std::this_thread::sleep_for(std::chrono::seconds{reconnect_every});
}
}
if (!connected) {
// NOLINTNEXTLINE
close();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lia-viam here's somewhere that your C++ expertise would be greatly appreciated! close seemed better than just exiting to ensure graceful shutdown (though we do ultimately want to exit if we've reached this point), but I'm not totally confident that calling close here, from within this thread, is safe behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue calling close from here is that joinable will return false because it checks thread.get_id() == this_thread::id(). we should probably be storing false in the should_check_connection_ thing if we establish that we've been disconnected, so that even if we're not calling join we can be sure that the thread will have exited

another thought would be to have a flag variable that says if we've been disconnected, and have impl::client_helper check that the same way it always checks if impl is non-null. That might exit with an exception throw, but I'm not sure if we're too pressed about "graceful" shutdown in the highly ungraceful case of the process being orphaned by a kill -9

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also maybe cc @acmorrow

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably be storing false in the should_check_connection_ thing if we establish that we've been disconnected, so that even if we're not calling join we can be sure that the thread will have exited

I'm not sure I entirely understand this, but I'm a little concerned about it. We'd need to at some point reset the value to true (or else we stop checking connection after the first failure), but I could imagine a case where someone triggers a close (which sets should_check_connection_ to false) and then immediately after, this loop resets it to true and so we just continue this loop forever and fail to shutdown properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, the python SDK at this point just calls sys.exit(), so if we're not too concerned about gracefulness (and this is a case where the module is an orphan process anyway and nothing it logs or does will be trivially inspectable), we could just exit(0) and call it a day.

Copy link
Collaborator

@lia-viam lia-viam Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per offline discussion--this is actually fine bc when the check_connection thread calls close it will not join itself, but the later destructor call will. for transparency it may be good to add break; after close(), but also close() sets the while loop flag variable to false so the function will return immediately after close() returns

}
}
}

RobotClient::RobotClient(ViamChannel channel)
: viam_channel_(std::move(channel)),
impl_(std::make_unique<impl>(RobotService::NewStub(viam_channel_.channel()))) {}
Expand Down Expand Up @@ -262,8 +323,8 @@ void RobotClient::log(const std::string& name,
ClientContext ctx;
const auto response = impl_->stub->Log(ctx, req, &resp);
if (is_error_response(response)) {
// Manually override to force this to get logged to console so we don't set off an infinite
// loop
// Manually override to force this to get logged to console so we don't set off an
// infinite loop
VIAM_SDK_LOG(error) << boost::log::add_value(sdk::impl::attr_console_force_type{}, true)
<< "Error sending log message over grpc: " << response.error_message()
<< response.error_details();
Expand All @@ -279,6 +340,13 @@ std::shared_ptr<RobotClient> RobotClient::with_channel(ViamChannel channel,
robot->refresh_thread_ = std::thread{&RobotClient::refresh_every, robot.get()};
}

robot->should_check_connection_ = true;

robot->check_every_interval_ = options.check_every_interval();
robot->reconnect_every_interval_ = options.reconnect_every_interval();

robot->check_connection_thread_ = std::thread{&RobotClient::check_connection, robot.get()};

robot->refresh();
return robot;
};
Expand Down
5 changes: 5 additions & 0 deletions src/viam/sdk/robot/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,15 @@ class RobotClient {
void connect_logging();

void refresh_every();
void check_connection();

std::thread refresh_thread_;
std::thread check_connection_thread_;
std::atomic<bool> should_refresh_;
std::atomic<bool> should_check_connection_;
std::chrono::seconds refresh_interval_;
unsigned int check_every_interval_;
unsigned int reconnect_every_interval_;

ViamChannel viam_channel_;

Expand Down
29 changes: 26 additions & 3 deletions src/viam/sdk/rpc/dial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ ViamChannel ViamChannel::dial(const char* uri, const boost::optional<DialOptions
}
address += proxy_path;

return ViamChannel(sdk::impl::create_viam_channel(address, grpc::InsecureChannelCredentials()),
proxy_path,
ptr);
auto chan =
ViamChannel(sdk::impl::create_viam_channel(address, grpc::InsecureChannelCredentials()),
proxy_path,
ptr);
chan.uri_ = uri;
return chan;
}

const std::shared_ptr<grpc::Channel>& ViamChannel::channel() const {
Expand All @@ -200,6 +203,26 @@ void ViamChannel::close() {
pimpl_.reset();
}

const char* ViamChannel::get_channel_addr() const {
return uri_;
}
Options& Options::set_check_every_interval(unsigned int interval) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with the other comment let's have these be std::chrono::seconds

check_every_interval_ = interval;
return *this;
}
Options& Options::set_reconnect_every_interval(unsigned int interval) {
reconnect_every_interval_ = interval;
return *this;
}

unsigned int Options::check_every_interval() const {
return check_every_interval_;
}

unsigned int Options::reconnect_every_interval() const {
return reconnect_every_interval_;
}

unsigned int Options::refresh_interval() const {
return refresh_interval_;
}
Expand Down
23 changes: 23 additions & 0 deletions src/viam/sdk/rpc/dial.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ class ViamChannel {

void close();

const char* get_channel_addr() const;

private:
const char* uri_;
struct impl;

std::shared_ptr<GrpcChannel> channel_;
Expand Down Expand Up @@ -117,12 +120,32 @@ class Options {
: refresh_interval_(std::move(refresh_interval)), dial_options_(std::move(dial_options)) {}

unsigned int refresh_interval() const;
unsigned int check_every_interval() const;
unsigned int reconnect_every_interval() const;

/// @brief Sets the frequency (in seconds) to verify connectivity
Options& set_check_every_interval(unsigned int interval);

/// @brief Sets the frequency (in seconds) to attempt to reconnect when connectivity is lost
Options& set_reconnect_every_interval(unsigned int interval);
const boost::optional<DialOptions>& dial_options() const;

private:
/// @brief How often to refresh the status/parts of the robot, in seconds. If set to 0, the
/// robot will not automatically refresh.
unsigned int refresh_interval_;

/// @brief How often to verify connectivity to the robot, in seconds. If set to 0, will not
/// check, will default to the `reconnect_every_interval_` value. Defaults to 0.
/// @note Setting to a non-zero value is useful in modules but may result in delays shutting
/// down client code
unsigned int check_every_interval_ = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are new let's change them to std::chrono::seconds which is what I had done to refresh_interval_ on the dial direct branch


/// @brief How often to attempt to reconnect to the robot when disconnected. If set to 0,
/// will not attempt to reconnect. Defaults to 0.
/// @note Setting to a non-zero value is useful in modules but may result in delays shutting
/// down client code
unsigned int reconnect_every_interval_ = 0;
Copy link
Member Author

@stuqdog stuqdog Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to set these both to 0 by default. It leads to no meaningful behavioral change for existing clients, and sidesteps the issue of having to wait for the check_connection loop to finish before shutting down client code. Also if a client is having issues connecting, they probably can figure out what they want to do and stop the process as necessary by interacting directly, without us making any assumptions.

We could put these into constructors, but I chose not to because I didn't want any breaking changes or constructor bloat (what if a user only wants to set one?), and at any rate maintaining the existing constructor to avoid breaking existing code means that we still have to be opinionated about a default value.

the default is overridden for modules in modules/service.cpp, to ensure that modules don't get orphaned.

boost::optional<DialOptions> dial_options_;
};

Expand Down
Loading