Skip to content

Commit ce23e6d

Browse files
committed
Apply suggestions from code review
Also fixed that we can destroy the rtdeClient while reconnecting
1 parent 174955d commit ce23e6d

File tree

4 files changed

+142
-98
lines changed

4 files changed

+142
-98
lines changed

include/ur_client_library/comm/pipeline.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,6 @@ class Pipeline
299299
, queue_{ 32 }
300300
, running_{ false }
301301
, producer_fifo_scheduling_(producer_fifo_scheduling)
302-
, threads_stopped_(true)
303302
{
304303
}
305304
/*!
@@ -319,7 +318,6 @@ class Pipeline
319318
, queue_{ 32 }
320319
, running_{ false }
321320
, producer_fifo_scheduling_(producer_fifo_scheduling)
322-
, threads_stopped_(true)
323321
{
324322
}
325323

@@ -356,7 +354,6 @@ class Pipeline
356354
return;
357355

358356
running_ = true;
359-
threads_stopped_ = false;
360357
producer_.startProducer();
361358
pThread_ = std::thread(&Pipeline::runProducer, this);
362359
if (consumer_ != nullptr)
@@ -369,13 +366,9 @@ class Pipeline
369366
*/
370367
void stop()
371368
{
372-
if (threads_stopped_)
373-
return;
374-
375369
URCL_LOG_DEBUG("Stopping pipeline! <%s>", name_.c_str());
376370

377371
running_ = false;
378-
threads_stopped_ = true;
379372
producer_.stopProducer();
380373
if (pThread_.joinable())
381374
{
@@ -420,7 +413,6 @@ class Pipeline
420413
std::atomic<bool> running_;
421414
std::thread pThread_, cThread_;
422415
bool producer_fifo_scheduling_;
423-
bool threads_stopped_;
424416

425417
void runProducer()
426418
{

include/ur_client_library/rtde/rtde_client.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,11 @@ class RTDEClient
252252
// the robot is booted.
253253
std::vector<std::string> ensureTimestampIsPresent(const std::vector<std::string>& output_recipe) const;
254254

255-
bool setupCommunication();
256-
uint16_t setProtocolVersion();
257-
bool negotiateProtocolVersion(const uint16_t protocol_version);
255+
bool setupCommunication(const size_t max_num_tries = 0,
256+
const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10));
257+
std::pair<bool, uint16_t> setProtocolVersion();
258258
bool queryURControlVersion();
259-
bool setTargetFrequency();
259+
void setTargetFrequency();
260260
bool setupOutputs(const uint16_t protocol_version);
261261
bool setupInputs();
262262
void disconnect();

src/rtde/rtde_client.cpp

Lines changed: 136 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ RTDEClient::RTDEClient(std::string robot_ip, comm::INotifier& notifier, const st
7575

7676
RTDEClient::~RTDEClient()
7777
{
78+
reconnecting_ = false;
79+
if (reconnecting_thread_.joinable())
80+
{
81+
reconnecting_thread_.join();
82+
}
7883
disconnect();
7984
}
8085

@@ -92,19 +97,11 @@ bool RTDEClient::init(const size_t max_connection_attempts, const std::chrono::m
9297
}
9398

9499
prod_->setRTDEReconnectionCallback(std::bind(&RTDEClient::reconnectCallback, this));
95-
std::cout << "what is happening here" << std::endl;
96100

97101
unsigned int attempts = 0;
98102
std::stringstream ss;
99103

100-
// A running pipeline is needed inside setup.
101-
pipeline_->init(max_connection_attempts, reconnection_timeout);
102-
pipeline_->run();
103-
104-
// The state initializing is used inside disconnect to stop the pipeline again.
105-
client_state_ = ClientState::INITIALIZING;
106-
107-
while (!setupCommunication())
104+
while (!setupCommunication(max_connection_attempts, reconnection_timeout))
108105
{
109106
if (++attempts >= max_initialization_attempts)
110107
{
@@ -116,108 +113,104 @@ bool RTDEClient::init(const size_t max_connection_attempts, const std::chrono::m
116113
disconnect();
117114
URCL_LOG_ERROR("Failed to initialize RTDE client, retrying in %d seconds", initialization_timeout.count() / 1000);
118115
std::this_thread::sleep_for(initialization_timeout);
119-
120-
// A running pipeline is needed inside setup
121-
pipeline_->init(max_connection_attempts, reconnection_timeout);
122-
pipeline_->run();
123116
}
124117
// Stop pipeline again
125118
pipeline_->stop();
126119
client_state_ = ClientState::INITIALIZED;
127120
return true;
128121
}
129122

130-
bool RTDEClient::setupCommunication()
123+
bool RTDEClient::setupCommunication(const size_t max_num_tries, const std::chrono::milliseconds reconnection_time)
131124
{
132-
uint16_t protocol_version = setProtocolVersion();
125+
// The state initializing is used inside disconnect to stop the pipeline again.
126+
client_state_ = ClientState::INITIALIZING;
127+
// A running pipeline is needed inside setup.
128+
try
129+
{
130+
pipeline_->init(max_num_tries, reconnection_time);
131+
}
132+
catch (const UrException& exc)
133+
{
134+
URCL_LOG_ERROR("Caught exception %s, while trying to initialize pipeline", exc.what());
135+
return false;
136+
}
137+
pipeline_->run();
138+
139+
std::pair<bool, uint16_t> protocol_version = setProtocolVersion();
133140
// Protocol version must be above zero
134-
if (protocol_version == 0)
141+
if (protocol_version.first == false)
135142
{
136143
return false;
137144
}
138145

139146
bool is_rtde_comm_setup = true;
140147
is_rtde_comm_setup = queryURControlVersion();
141148

142-
is_rtde_comm_setup = is_rtde_comm_setup && setTargetFrequency();
149+
if (is_rtde_comm_setup)
150+
{
151+
setTargetFrequency();
152+
}
143153

144-
is_rtde_comm_setup = is_rtde_comm_setup && setupOutputs(protocol_version);
154+
is_rtde_comm_setup = is_rtde_comm_setup && setupOutputs(protocol_version.second);
145155

146156
is_rtde_comm_setup = is_rtde_comm_setup && isRobotBooted();
147157

148158
return is_rtde_comm_setup && setupInputs();
149159
}
150160

151-
uint16_t RTDEClient::setProtocolVersion()
161+
std::pair<bool, uint16_t> RTDEClient::setProtocolVersion()
152162
{
153163
uint16_t protocol_version = MAX_RTDE_PROTOCOL_VERSION;
154-
while (!negotiateProtocolVersion(protocol_version))
155-
{
156-
if (stream_.getState() != comm::SocketState::Connected)
157-
{
158-
URCL_LOG_ERROR("Protocol version for RTDE communication could not be established, because the RTDE client is "
159-
"disconnected from the server.");
160-
return 0;
161-
}
162-
URCL_LOG_INFO("Robot did not accept RTDE protocol version '%hu'. Trying lower protocol version", protocol_version);
163-
protocol_version--;
164-
if (protocol_version == 0)
164+
while (protocol_version > 0)
165+
{
166+
// Protocol version should always be 1 before starting negotiation
167+
parser_.setProtocolVersion(1);
168+
unsigned int num_retries = 0;
169+
uint8_t buffer[4096];
170+
size_t size;
171+
size_t written;
172+
size = RequestProtocolVersionRequest::generateSerializedRequest(buffer, protocol_version);
173+
if (!stream_.write(buffer, size, written))
165174
{
166-
URCL_LOG_ERROR("Protocol version for RTDE communication could not be established. Robot didn't accept any of "
167-
"the suggested versions.");
168-
return 0;
175+
URCL_LOG_ERROR("Sending protocol version query to robot failed");
176+
return { false, 0 };
169177
}
170-
}
171178

172-
URCL_LOG_INFO("Negotiated RTDE protocol version to %hu.", protocol_version);
173-
parser_.setProtocolVersion(protocol_version);
174-
return protocol_version;
175-
}
176-
177-
bool RTDEClient::negotiateProtocolVersion(const uint16_t protocol_version)
178-
{
179-
// Protocol version should always be 1 before starting negotiation
180-
parser_.setProtocolVersion(1);
181-
unsigned int num_retries = 0;
182-
uint8_t buffer[4096];
183-
size_t size;
184-
size_t written;
185-
size = RequestProtocolVersionRequest::generateSerializedRequest(buffer, protocol_version);
186-
if (!stream_.write(buffer, size, written))
187-
{
188-
URCL_LOG_ERROR("Sending protocol version query to robot failed");
189-
return false;
190-
}
191-
192-
while (num_retries < MAX_REQUEST_RETRIES)
193-
{
194-
std::unique_ptr<RTDEPackage> package;
195-
if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000)))
196-
{
197-
URCL_LOG_ERROR("failed to get package from rtde interface");
198-
return false;
199-
}
200-
if (rtde_interface::RequestProtocolVersion* tmp_version =
201-
dynamic_cast<rtde_interface::RequestProtocolVersion*>(package.get()))
179+
while (num_retries < MAX_REQUEST_RETRIES)
202180
{
203-
// Reset the num_tries variable in case we have to try with another protocol version.
204-
num_retries = 0;
205-
return tmp_version->accepted_;
206-
}
207-
else
208-
{
209-
std::stringstream ss;
210-
ss << "Did not receive protocol negotiation answer from robot. Message received instead: " << std::endl
211-
<< package->toString() << ". Retrying...";
212-
num_retries++;
213-
URCL_LOG_WARN("%s", ss.str().c_str());
181+
std::unique_ptr<RTDEPackage> package;
182+
if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000)))
183+
{
184+
URCL_LOG_ERROR("failed to get package from RTDE interface");
185+
return { false, 0 };
186+
}
187+
if (rtde_interface::RequestProtocolVersion* tmp_version =
188+
dynamic_cast<rtde_interface::RequestProtocolVersion*>(package.get()))
189+
{
190+
if (tmp_version->accepted_)
191+
{
192+
URCL_LOG_INFO("Negotiated RTDE protocol version to %hu.", protocol_version);
193+
parser_.setProtocolVersion(protocol_version);
194+
return { true, protocol_version };
195+
}
196+
break;
197+
}
198+
else
199+
{
200+
std::stringstream ss;
201+
ss << "Did not receive protocol negotiation answer from robot. Message received instead: " << std::endl
202+
<< package->toString() << ". Retrying...";
203+
num_retries++;
204+
URCL_LOG_WARN("%s", ss.str().c_str());
205+
}
214206
}
207+
208+
URCL_LOG_INFO("Robot did not accept RTDE protocol version '%hu'. Trying lower protocol version", protocol_version);
209+
protocol_version--;
215210
}
216-
std::stringstream ss;
217-
ss << "Could not negotiate RTDE protocol version after " << MAX_REQUEST_RETRIES
218-
<< " tries. Please check the output of the "
219-
"negotiation attempts above to get a hint what could be wrong.";
220-
return false;
211+
URCL_LOG_ERROR("Protocol version for RTDE communication could not be established. Robot didn't accept any of "
212+
"the suggested versions.");
213+
return { false, 0 };
221214
}
222215

223216
bool RTDEClient::queryURControlVersion()
@@ -264,7 +257,7 @@ bool RTDEClient::queryURControlVersion()
264257
return false;
265258
}
266259

267-
bool RTDEClient::setTargetFrequency()
260+
void RTDEClient::setTargetFrequency()
268261
{
269262
if (urcontrol_version_.major < 5)
270263
{
@@ -281,7 +274,6 @@ bool RTDEClient::setTargetFrequency()
281274
// Target frequency outside valid range
282275
throw UrException("Invalid target frequency of RTDE connection");
283276
}
284-
return true;
285277
}
286278

287279
void RTDEClient::resetOutputRecipe(const std::vector<std::string> new_recipe)
@@ -748,13 +740,71 @@ void RTDEClient::reconnect()
748740
std::lock_guard<std::mutex> lock(reconnect_mutex_);
749741
ClientState cur_client_state = client_state_;
750742
disconnect();
751-
try
743+
744+
const size_t max_initialization_attempts = 3;
745+
size_t cur_initialization_attempt = 0;
746+
bool client_reconnected = false;
747+
while (cur_initialization_attempt < max_initialization_attempts)
752748
{
753-
init();
749+
bool is_communication_setup = false;
750+
try
751+
{
752+
is_communication_setup = setupCommunication(1, std::chrono::milliseconds{ 10000 });
753+
}
754+
catch (const UrException& exc)
755+
{
756+
URCL_LOG_ERROR("Caught exception while reconnecting to the RTDE interface %s. Unable to reconnect", exc.what());
757+
disconnect();
758+
reconnecting_ = false;
759+
return;
760+
}
761+
762+
const std::string reconnecting_stopped_msg = "Reconnecting has been stopped, because the object is being destroyed";
763+
if (reconnecting_ == false)
764+
{
765+
URCL_LOG_WARN(reconnecting_stopped_msg.c_str());
766+
return;
767+
}
768+
769+
if (is_communication_setup)
770+
{
771+
client_reconnected = true;
772+
break;
773+
}
774+
775+
auto duration = std::chrono::seconds(1);
776+
if (stream_.getState() != comm::SocketState::Connected)
777+
{
778+
// We don't wanna count it as an initialization attempt if we cannot connect to the socket and we want to wait
779+
// longer before reconnecting.
780+
duration = std::chrono::seconds(10);
781+
URCL_LOG_ERROR("Failed to connect to the RTDE server, retrying in %i seconds", duration.count());
782+
}
783+
else
784+
{
785+
URCL_LOG_ERROR("Failed to initialize RTDE client, retrying in %i second", duration.count());
786+
cur_initialization_attempt += 1;
787+
}
788+
789+
disconnect();
790+
791+
auto start_time = std::chrono::steady_clock::now();
792+
while (std::chrono::steady_clock::now() - start_time < duration)
793+
{
794+
std::this_thread::sleep_for(std::chrono::milliseconds(250));
795+
if (reconnecting_ == false)
796+
{
797+
URCL_LOG_WARN(reconnecting_stopped_msg.c_str());
798+
return;
799+
}
800+
}
754801
}
755-
catch (const UrException& exc)
802+
803+
if (client_reconnected == false)
756804
{
757-
URCL_LOG_ERROR("Caught exception while reconnecting to the RTDE interface %s. Unable to reconnect", exc.what());
805+
URCL_LOG_ERROR("Failed to initialize RTDE client after %i attempts, unable to reconnect",
806+
max_initialization_attempts);
807+
disconnect();
758808
reconnecting_ = false;
759809
return;
760810
}

tests/test_pipeline.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ TEST_F(PipelineTest, stop_pipeline)
199199

200200
TEST_F(PipelineTest, consumer_pipeline)
201201
{
202+
pipeline_.reset();
202203
stream_.reset(new comm::URStream<rtde_interface::RTDEPackage>("127.0.0.1", 60002));
203204
producer_.reset(new comm::URProducer<rtde_interface::RTDEPackage>(*stream_.get(), *parser_.get()));
204205
TestConsumer consumer;
@@ -236,6 +237,7 @@ TEST_F(PipelineTest, consumer_pipeline)
236237

237238
TEST_F(PipelineTest, connect_non_connected_robot)
238239
{
240+
pipeline_.reset();
239241
stream_.reset(new comm::URStream<rtde_interface::RTDEPackage>("127.0.0.1", 12321));
240242
producer_.reset(new comm::URProducer<rtde_interface::RTDEPackage>(*stream_.get(), *parser_.get()));
241243
TestConsumer consumer;

0 commit comments

Comments
 (0)