Skip to content

Commit 7f03834

Browse files
authored
RSDK-10842 - shut down cpp modules properly when viam-server is hard killed (#448)
1 parent 0d912b2 commit 7f03834

File tree

5 files changed

+132
-9
lines changed

5 files changed

+132
-9
lines changed

src/viam/sdk/module/service.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,10 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service {
194194
auto new_parent_addr = request->parent_address();
195195
if (parent.parent_addr_ != new_parent_addr) {
196196
parent.parent_addr_ = std::move(new_parent_addr);
197-
parent.parent_ = RobotClient::at_local_socket(parent.parent_addr_, {0, boost::none});
197+
Options opts{0, boost::none};
198+
opts.set_check_every_interval(std::chrono::seconds{5})
199+
.set_reconnect_every_interval(std::chrono::seconds{1});
200+
parent.parent_ = RobotClient::at_local_socket(parent.parent_addr_, opts);
198201
parent.parent_->connect_logging();
199202
}
200203
response->set_ready(parent.module_->ready());

src/viam/sdk/robot/client.cpp

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <exception>
12
#include <viam/sdk/robot/client.hpp>
23

34
#include <chrono>
@@ -144,11 +145,16 @@ RobotClient::~RobotClient() {
144145

145146
void RobotClient::close() {
146147
should_refresh_.store(false);
148+
should_check_connection_.store(false);
147149

148150
if (refresh_thread_.joinable()) {
149151
refresh_thread_.join();
150152
}
151153

154+
if (check_connection_thread_.joinable()) {
155+
check_connection_thread_.join();
156+
}
157+
152158
stop_all();
153159

154160
viam_channel_.close();
@@ -231,6 +237,62 @@ void RobotClient::refresh_every() {
231237
}
232238
};
233239

240+
void RobotClient::check_connection() {
241+
auto check_every = check_every_interval_;
242+
auto reconnect_every = reconnect_every_interval_;
243+
if (check_every == std::chrono::seconds{0}) {
244+
check_every = reconnect_every;
245+
}
246+
if (check_every == std::chrono::seconds{0} && reconnect_every == std::chrono::seconds{0}) {
247+
should_check_connection_.store(false);
248+
}
249+
bool connected(true);
250+
while (should_check_connection_) {
251+
std::exception_ptr connection_error;
252+
std::string what;
253+
for (int i = 0; i < 3; ++i) {
254+
try {
255+
std::this_thread::sleep_for(check_every);
256+
impl::client_helper(impl_, &RobotService::Stub::ResourceNames).invoke([](auto&) {
257+
return;
258+
});
259+
connected = true;
260+
break;
261+
} catch (const std::exception& e) {
262+
connected = false;
263+
connection_error = std::current_exception();
264+
what = e.what();
265+
std::this_thread::sleep_for(std::chrono::milliseconds{100});
266+
}
267+
}
268+
if (connected) {
269+
continue;
270+
}
271+
const auto* uri = viam_channel_.get_channel_addr();
272+
VIAM_SDK_LOG(error) << "Lost connection to machine at address " << uri << " with error "
273+
<< what << ". Attempting to reconnect every " << reconnect_every.count()
274+
<< "second(s)";
275+
276+
viam_channel_.close();
277+
278+
for (int i = 0; i < 3; ++i) {
279+
try {
280+
auto channel = ViamChannel::dial(uri, {});
281+
impl_ =
282+
std::make_unique<RobotClient::impl>(RobotService::NewStub(channel.channel()));
283+
refresh();
284+
connected = true;
285+
} catch (const std::exception& e) {
286+
viam_channel_.close();
287+
std::this_thread::sleep_for(reconnect_every);
288+
}
289+
}
290+
if (!connected) {
291+
close();
292+
}
293+
}
294+
}
295+
234296
RobotClient::RobotClient(ViamChannel channel)
235297
: viam_channel_(std::move(channel)),
236298
impl_(std::make_unique<impl>(RobotService::NewStub(viam_channel_.channel()))) {}
@@ -262,8 +324,8 @@ void RobotClient::log(const std::string& name,
262324
ClientContext ctx;
263325
const auto response = impl_->stub->Log(ctx, req, &resp);
264326
if (is_error_response(response)) {
265-
// Manually override to force this to get logged to console so we don't set off an infinite
266-
// loop
327+
// Manually override to force this to get logged to console so we don't set off an
328+
// infinite loop
267329
VIAM_SDK_LOG(error) << boost::log::add_value(sdk::impl::attr_console_force_type{}, true)
268330
<< "Error sending log message over grpc: " << response.error_message()
269331
<< response.error_details();
@@ -279,6 +341,13 @@ std::shared_ptr<RobotClient> RobotClient::with_channel(ViamChannel channel,
279341
robot->refresh_thread_ = std::thread{&RobotClient::refresh_every, robot.get()};
280342
}
281343

344+
robot->should_check_connection_ = true;
345+
346+
robot->check_every_interval_ = options.check_every_interval();
347+
robot->reconnect_every_interval_ = options.reconnect_every_interval();
348+
349+
robot->check_connection_thread_ = std::thread{&RobotClient::check_connection, robot.get()};
350+
282351
robot->refresh();
283352
return robot;
284353
};

src/viam/sdk/robot/client.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,15 @@ class RobotClient {
184184
void connect_logging();
185185

186186
void refresh_every();
187+
void check_connection();
187188

188189
std::thread refresh_thread_;
190+
std::thread check_connection_thread_;
189191
std::atomic<bool> should_refresh_;
192+
std::atomic<bool> should_check_connection_;
190193
std::chrono::seconds refresh_interval_;
194+
std::chrono::seconds check_every_interval_;
195+
std::chrono::seconds reconnect_every_interval_;
191196

192197
ViamChannel viam_channel_;
193198

src/viam/sdk/rpc/dial.cpp

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,12 @@ ViamChannel ViamChannel::dial(const char* uri, const boost::optional<DialOptions
187187
}
188188
address += proxy_path;
189189

190-
return ViamChannel(sdk::impl::create_viam_channel(address, grpc::InsecureChannelCredentials()),
191-
proxy_path,
192-
ptr);
190+
auto chan =
191+
ViamChannel(sdk::impl::create_viam_channel(address, grpc::InsecureChannelCredentials()),
192+
proxy_path,
193+
ptr);
194+
chan.uri_ = uri;
195+
return chan;
193196
}
194197

195198
const std::shared_ptr<grpc::Channel>& ViamChannel::channel() const {
@@ -200,7 +203,27 @@ void ViamChannel::close() {
200203
pimpl_.reset();
201204
}
202205

203-
unsigned int Options::refresh_interval() const {
206+
const char* ViamChannel::get_channel_addr() const {
207+
return uri_;
208+
}
209+
Options& Options::set_check_every_interval(std::chrono::seconds interval) {
210+
check_every_interval_ = interval;
211+
return *this;
212+
}
213+
Options& Options::set_reconnect_every_interval(std::chrono::seconds interval) {
214+
reconnect_every_interval_ = interval;
215+
return *this;
216+
}
217+
218+
std::chrono::seconds Options::check_every_interval() const {
219+
return check_every_interval_;
220+
}
221+
222+
std::chrono::seconds Options::reconnect_every_interval() const {
223+
return reconnect_every_interval_;
224+
}
225+
226+
std::chrono::seconds Options::refresh_interval() const {
204227
return refresh_interval_;
205228
}
206229

src/viam/sdk/rpc/dial.hpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ class ViamChannel {
4545

4646
void close();
4747

48+
const char* get_channel_addr() const;
49+
4850
private:
51+
const char* uri_;
4952
struct impl;
5053

5154
std::shared_ptr<GrpcChannel> channel_;
@@ -116,13 +119,33 @@ class Options {
116119
Options(unsigned int refresh_interval, boost::optional<DialOptions> dial_options)
117120
: refresh_interval_(std::move(refresh_interval)), dial_options_(std::move(dial_options)) {}
118121

119-
unsigned int refresh_interval() const;
122+
std::chrono::seconds refresh_interval() const;
123+
std::chrono::seconds check_every_interval() const;
124+
std::chrono::seconds reconnect_every_interval() const;
125+
126+
/// @brief Sets the frequency (in seconds) to verify connectivity
127+
Options& set_check_every_interval(std::chrono::seconds interval);
128+
129+
/// @brief Sets the frequency (in seconds) to attempt to reconnect when connectivity is lost
130+
Options& set_reconnect_every_interval(std::chrono::seconds interval);
120131
const boost::optional<DialOptions>& dial_options() const;
121132

122133
private:
123134
/// @brief How often to refresh the status/parts of the robot, in seconds. If set to 0, the
124135
/// robot will not automatically refresh.
125-
unsigned int refresh_interval_;
136+
std::chrono::seconds refresh_interval_{0};
137+
138+
/// @brief How often to verify connectivity to the robot, in seconds. If set to 0, will not
139+
/// check, will default to the `reconnect_every_interval_` value. Defaults to 0.
140+
/// @note Setting to a non-zero value is useful in modules but may result in delays shutting
141+
/// down client code
142+
std::chrono::seconds check_every_interval_{0};
143+
144+
/// @brief How often to attempt to reconnect to the robot when disconnected. If set to 0,
145+
/// will not attempt to reconnect. Defaults to 0.
146+
/// @note Setting to a non-zero value is useful in modules but may result in delays shutting
147+
/// down client code
148+
std::chrono::seconds reconnect_every_interval_{0};
126149
boost::optional<DialOptions> dial_options_;
127150
};
128151

0 commit comments

Comments
 (0)