Skip to content
45 changes: 20 additions & 25 deletions src/viam/sdk/robot/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,15 @@ RobotClient::~RobotClient() {

void RobotClient::close() {
should_refresh_.store(false);
for (const std::shared_ptr<std::thread>& t : threads_) {
t->~thread();

for (auto& thread : threads_) {
thread.join();
}
threads_.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah. This was also a double destroy for all these std::thread objects, right? I'm sort of amazed this went unnoticed for so long. Are we doing ASAN / UBSAN builds anywhere in CI? If not, I think it might be time to do so.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our unit tests in CI run with santized builds for shared libraries, but our sanitizer seems to only be ubsan, not asan. i'm pretty surprised this hasn't been caught before either, and same with the other main issue being resolved in this PR which is that we had an instance of rust-utils free_string being called on a c_str managed by a std::string. perhaps a blessing in disguise that logging directed us to these!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ASAN and UBSAN work together just fine so it might be pretty easy to light it up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc turning on ASAN causes compilation to fail because it modifies the underlying memory shape but gRPC is not being compiled with ASAN, leading to ABI errors (I might have the details here somewhat wrong). Definitely fixable but it means compiling gRPC by hand in tests, which I believe is why we haven't done it thus far.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. That's a nuisance, and one I'd forgotten about since most of my time with ASAN was in a world where all third party C++ dependencies were vendored (in large part, exactly to solve this). I wonder if conan can be leveraged to solve this for us since it will build dependencies from source.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realizing I encountered this too yesterday when trying to debug the original issue in this PR. As for conan, here is a discussion from 1.x but the answer is 'kinda sorta' conan-io/conan#4754


stop_all();
viam_channel_->close();

viam_channel_.close();
}

bool is_error_response(const grpc::Status& response) {
Expand Down Expand Up @@ -211,7 +215,7 @@ void RobotClient::refresh() {
if (rs) {
try {
const std::shared_ptr<Resource> rpc_client =
rs->create_rpc_client(name.name(), channel_);
rs->create_rpc_client(name.name(), viam_channel_.channel());
const Name name_({name.namespace_(), name.type(), name.subtype()}, "", name.name());
new_resources.emplace(name_, rpc_client);
} catch (const std::exception& exc) {
Expand Down Expand Up @@ -250,11 +254,10 @@ void RobotClient::refresh_every() {
}
};

RobotClient::RobotClient(std::shared_ptr<ViamChannel> channel)
: channel_(channel->channel()),
viam_channel_(std::move(channel)),
RobotClient::RobotClient(ViamChannel channel)
: viam_channel_(std::move(channel)),
should_close_channel_(false),
impl_(std::make_unique<impl>(RobotService::NewStub(channel_))) {}
impl_(std::make_unique<impl>(RobotService::NewStub(viam_channel_.channel()))) {}

std::vector<Name> RobotClient::resource_names() const {
const std::lock_guard<std::mutex> lock(lock_);
Expand Down Expand Up @@ -286,20 +289,14 @@ void RobotClient::log(const std::string& name,
}
}

std::shared_ptr<RobotClient> RobotClient::with_channel(std::shared_ptr<ViamChannel> channel,
std::shared_ptr<RobotClient> RobotClient::with_channel(ViamChannel channel,
const Options& options) {
std::shared_ptr<RobotClient> robot = std::make_shared<RobotClient>(std::move(channel));
auto robot = std::make_shared<RobotClient>(std::move(channel));
robot->refresh_interval_ = options.refresh_interval();
robot->should_refresh_ = (robot->refresh_interval_ > 0);
if (robot->should_refresh_) {
const std::shared_ptr<std::thread> t =
std::make_shared<std::thread>(&RobotClient::refresh_every, robot);
// TODO(RSDK-1743): this was leaking, confirm that adding thread catching in
// close/destructor lets us shutdown gracefully. See also address sanitizer,
// UB sanitizer
t->detach();
robot->threads_.push_back(t);
};
robot->threads_.emplace_back(&RobotClient::refresh_every, robot);
}

robot->refresh();
return robot;
Expand All @@ -308,8 +305,8 @@ std::shared_ptr<RobotClient> RobotClient::with_channel(std::shared_ptr<ViamChann
std::shared_ptr<RobotClient> RobotClient::at_address(const std::string& address,
const Options& options) {
const char* uri = address.c_str();
auto channel = ViamChannel::dial_initial(uri, options.dial_options());
std::shared_ptr<RobotClient> robot = RobotClient::with_channel(channel, options);
auto robot =
RobotClient::with_channel(ViamChannel::dial_initial(uri, options.dial_options()), options);
robot->should_close_channel_ = true;

return robot;
Expand All @@ -318,11 +315,9 @@ std::shared_ptr<RobotClient> RobotClient::at_address(const std::string& address,
std::shared_ptr<RobotClient> RobotClient::at_local_socket(const std::string& address,
const Options& options) {
const std::string addr = "unix://" + address;
const char* uri = addr.c_str();
const std::shared_ptr<grpc::Channel> channel =
sdk::impl::create_viam_channel(uri, grpc::InsecureChannelCredentials());
auto viam_channel = std::make_shared<ViamChannel>(channel, address.c_str(), nullptr);
std::shared_ptr<RobotClient> robot = RobotClient::with_channel(viam_channel, options);
auto robot = RobotClient::with_channel(
ViamChannel(sdk::impl::create_viam_channel(addr, grpc::InsecureChannelCredentials())),
options);
robot->should_close_channel_ = true;

return robot;
Expand Down
13 changes: 6 additions & 7 deletions src/viam/sdk/robot/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ class RobotClient {
friend bool operator==(const operation& lhs, const operation& rhs);
};

explicit RobotClient(ViamChannel channel);

~RobotClient();

void refresh();
void close();

Expand All @@ -87,10 +90,7 @@ class RobotClient {
/// @param options Options for connecting and refreshing.
/// Connects directly to a pre-existing channel. A robot created this way must be
/// `close()`d manually.
static std::shared_ptr<RobotClient> with_channel(std::shared_ptr<ViamChannel> channel,
const Options& options);

RobotClient(std::shared_ptr<ViamChannel> channel);
static std::shared_ptr<RobotClient> with_channel(ViamChannel channel, const Options& options);

std::vector<Name> resource_names() const;

Expand Down Expand Up @@ -165,13 +165,12 @@ class RobotClient {

void refresh_every();

std::vector<std::shared_ptr<std::thread>> threads_;
std::vector<std::thread> threads_;

std::atomic<bool> should_refresh_;
unsigned int refresh_interval_;

std::shared_ptr<GrpcChannel> channel_;
std::shared_ptr<ViamChannel> viam_channel_;
ViamChannel viam_channel_;
bool should_close_channel_;

struct impl;
Expand Down
76 changes: 53 additions & 23 deletions src/viam/sdk/rpc/dial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,46 @@
namespace viam {
namespace sdk {

const std::shared_ptr<grpc::Channel>& ViamChannel::channel() const {
return channel_;
}
struct ViamChannel::impl {
impl(const char* path, void* runtime) : path(path), rust_runtime(runtime) {}

void ViamChannel::close() {
if (closed_) {
return;
impl(const impl&) = delete;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think because you are declaring the move ops that the copy ops are implicitly deleted, but if you don't trust that and want to make it explicit I don't mind either.


impl(impl&& other) noexcept
: path(std::exchange(other.path, nullptr)),
rust_runtime(std::exchange(other.rust_runtime, nullptr)) {}

impl& operator=(const impl&) = delete;

impl& operator=(impl&& other) noexcept {
path = std::exchange(other.path, nullptr);
rust_runtime = std::exchange(other.rust_runtime, nullptr);

return *this;
}
closed_ = true;
free_string(path_);
free_rust_runtime(rust_runtime_);

~impl() {
free_string(path);
free_rust_runtime(rust_runtime);
}

const char* path;
void* rust_runtime;
};

ViamChannel::ViamChannel(std::shared_ptr<grpc::Channel> channel, const char* path, void* runtime)
: channel_(std::move(channel)), pimpl_(std::make_unique<ViamChannel::impl>(path, runtime)) {}

ViamChannel::ViamChannel(std::shared_ptr<grpc::Channel> channel) : channel_(std::move(channel)) {}

ViamChannel::ViamChannel(ViamChannel&&) noexcept = default;

ViamChannel& ViamChannel::operator=(ViamChannel&&) noexcept = default;

ViamChannel::~ViamChannel() {
close();
}

const std::string& Credentials::type() const {
return type_;
}
Expand All @@ -39,9 +66,6 @@ const std::string& Credentials::payload() const {
return payload_;
}

ViamChannel::ViamChannel(std::shared_ptr<grpc::Channel> channel, const char* path, void* runtime)
: channel_(std::move(channel)), path_(path), closed_(false), rust_runtime_(runtime) {}

DialOptions::DialOptions() = default;

DialOptions& DialOptions::set_credentials(boost::optional<Credentials> creds) {
Expand Down Expand Up @@ -105,8 +129,8 @@ bool DialOptions::allows_insecure_downgrade() const {
return allow_insecure_downgrade_;
}

std::shared_ptr<ViamChannel> ViamChannel::dial_initial(
const char* uri, const boost::optional<DialOptions>& options) {
ViamChannel ViamChannel::dial_initial(const char* uri,
const boost::optional<DialOptions>& options) {
DialOptions opts = options.get_value_or(DialOptions());
auto timeout = opts.timeout();
auto attempts_remaining = opts.initial_connection_attempts();
Expand All @@ -129,11 +153,10 @@ std::shared_ptr<ViamChannel> ViamChannel::dial_initial(
}
// the while loop will run until we either return or throw an error, so we can never reach this
// point
BOOST_UNREACHABLE_RETURN(nullptr)
BOOST_UNREACHABLE_RETURN(ViamChannel(nullptr))
}

std::shared_ptr<ViamChannel> ViamChannel::dial(const char* uri,
const boost::optional<DialOptions>& options) {
ViamChannel ViamChannel::dial(const char* uri, const boost::optional<DialOptions>& options) {
void* ptr = init_rust_runtime();
const DialOptions opts = options.get_value_or(DialOptions());
const std::chrono::duration<float> float_timeout = opts.timeout();
Expand All @@ -157,12 +180,19 @@ std::shared_ptr<ViamChannel> ViamChannel::dial(const char* uri,

std::string address("unix://");
address += socket_path;
const std::shared_ptr<grpc::Channel> channel =
impl::create_viam_channel(address, grpc::InsecureChannelCredentials());
const std::unique_ptr<viam::robot::v1::RobotService::Stub> st =
viam::robot::v1::RobotService::NewStub(channel);
return std::make_shared<ViamChannel>(channel, socket_path, ptr);
};

return ViamChannel(sdk::impl::create_viam_channel(address, grpc::InsecureChannelCredentials()),
socket_path,
ptr);
}

const std::shared_ptr<grpc::Channel>& ViamChannel::channel() const {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this just unimplemented before? If so, maybe that means nobody was using it, and therefore it could be renamed to something like grpc::Channel? Or, simply deleted if we don't want gRPC types in our ABI?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry weird diff, it was implemented before but I rearranged the method definition order in the .cpp to conform with the sequencing in the .hpp. I will do a check to see if it's used anywhere but I think yes because I probably touched this when I was defining grpc aliases and doing abi insulation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So yes this was used, but I have taken this opportunity to remove the data member shared_ptr<grpc::Channel> in RobotClient which was redundant and replace all its uses with this method

return channel_;
}

void ViamChannel::close() {
pimpl_.reset();
}

unsigned int Options::refresh_interval() const {
return refresh_interval_;
Expand Down
26 changes: 17 additions & 9 deletions src/viam/sdk/rpc/dial.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@ namespace sdk {

class DialOptions;
class ViamChannel {
public:
void close();
ViamChannel(std::shared_ptr<GrpcChannel> channel, const char* path, void* runtime);

public:
explicit ViamChannel(std::shared_ptr<GrpcChannel> channel);

ViamChannel(ViamChannel&&) noexcept;

ViamChannel& operator=(ViamChannel&&) noexcept;

~ViamChannel();

/// @brief Connects to a robot at the given URI address, using the provided dial options (or
/// default options is none are provided). Ignores initial connection options specifying
/// how many times to attempt to connect and with what timeout.
/// In general, use of this method is discouraged. `RobotClient::at_address(...)` is the
/// preferred method to connect to a robot, and creates the channel itself.
/// @throws Exception if it is unable to establish a connection to the provided URI
static std::shared_ptr<ViamChannel> dial(const char* uri,
const boost::optional<DialOptions>& options);
static ViamChannel dial(const char* uri, const boost::optional<DialOptions>& options);

// @brief Dials to a robot at the given URI address, using the provided dial options (or default
// options is none are provided). Additionally specifies that this dial is an initial connection
Expand All @@ -33,16 +39,18 @@ class ViamChannel {
/// preferred method to connect to a robot, and creates the channel itself.
/// @throws Exception if it is unable to establish a connection to the provided URI within
/// the given number of initial connection attempts
static std::shared_ptr<ViamChannel> dial_initial(const char* uri,
const boost::optional<DialOptions>& options);
static ViamChannel dial_initial(const char* uri, const boost::optional<DialOptions>& options);

const std::shared_ptr<GrpcChannel>& channel() const;

void close();

private:
struct impl;

std::shared_ptr<GrpcChannel> channel_;
const char* path_;
bool closed_;
void* rust_runtime_;

std::unique_ptr<impl> pimpl_;
};

class Credentials {
Expand Down
3 changes: 1 addition & 2 deletions src/viam/sdk/tests/test_robot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ void robot_client_to_mocks_pipeline(F&& test_case) {
// in-process gRPC channel.
auto test_server = TestServer(server);
auto grpc_channel = test_server.grpc_in_process_channel();
auto viam_channel = std::make_shared<ViamChannel>(grpc_channel, "", nullptr);
auto client = RobotClient::with_channel(viam_channel, Options(0, boost::none));
auto client = RobotClient::with_channel(ViamChannel(grpc_channel), Options(0, boost::none));

// Run the passed-in test case on the created stack and give access to the
// created RobotClient and MockRobotService.
Expand Down