Skip to content

Commit e55432d

Browse files
committed
Potential fix for the race condition
Applied suggestions from code review
1 parent d3c06eb commit e55432d

File tree

4 files changed

+23
-55
lines changed

4 files changed

+23
-55
lines changed

include/ur_client_library/comm/producer.h

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class URProducer : public IProducer<T>
4343
URStream<T>& stream_;
4444
Parser<T>& parser_;
4545
std::chrono::seconds timeout_;
46-
std::function<void()> on_rtde_reconnect_cb_;
46+
std::function<void()> on_reconnect_cb_;
4747

4848
bool running_;
4949

@@ -133,17 +133,10 @@ class URProducer : public IProducer<T>
133133
if (stream_.closed())
134134
return false;
135135

136-
if (stream_.getStreamType() == URStreamType::RTDE)
136+
if (on_reconnect_cb_)
137137
{
138-
if (on_rtde_reconnect_cb_)
139-
{
140-
URCL_LOG_WARN("Failed to read from RTDE stream, invoking on reconnect callback and stopping the producer");
141-
on_rtde_reconnect_cb_();
142-
}
143-
else
144-
{
145-
URCL_LOG_ERROR("Failed to read from RTDE stream without a reconnect handler stopping the producer");
146-
}
138+
URCL_LOG_WARN("Failed to read from stream, invoking on reconnect callback and stopping the producer");
139+
on_reconnect_cb_();
147140
return false;
148141
}
149142

@@ -162,14 +155,15 @@ class URProducer : public IProducer<T>
162155
}
163156

164157
/*!
165-
* \brief Sets the RTDE reconnection callback. RTDE requires setting up the communication again upon reconnection
166-
* it is not enough to just reconnect to the stream.
158+
* \brief Sets the reconnection callback. Use this to configure a reconnection callback instead of connecting directly
159+
* to the stream again. This is needed for RTDE as it requires setting up the communication again upon reconnection it
160+
* is not enough to just reconnect to the stream.
167161
*
168-
* \param on_rtde_reconnect_cb Callback to be invoked when connection is lost to the RTDE stream.
162+
* \param on_reconnect_cb Callback to be invoked when connection is lost to the stream.
169163
*/
170-
void setRTDEReconnectionCallback(std::function<void()> on_rtde_reconnect_cb)
164+
void setReconnectionCallback(std::function<void()> on_reconnect_cb)
171165
{
172-
on_rtde_reconnect_cb_ = on_rtde_reconnect_cb;
166+
on_reconnect_cb_ = on_reconnect_cb;
173167
}
174168
};
175169
} // namespace comm

include/ur_client_library/comm/stream.h

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,6 @@ namespace urcl
3131
namespace comm
3232
{
3333

34-
/*!
35-
* \brief Different types of UR streams
36-
*/
37-
enum class URStreamType
38-
{
39-
Primary = 30001, ///< Stream connected to the primary interface
40-
Secondary = 30002, ///< Stream connected to the secondary interface
41-
RTDE = 30004, ///< Stream connected to the RTDE interface
42-
UNKNOWN = -1, ///< Stream type is fetched from the port, this is to handle unknown ports
43-
};
44-
4534
/*!
4635
* \brief The stream is an abstraction of the TCPSocket that offers reading a full UR data package
4736
* out of the socket. This means, it has to have some knowledge about the package structure to
@@ -129,26 +118,6 @@ class URStream : public TCPSocket
129118
return host_;
130119
}
131120

132-
/*!
133-
* \brief Get the stream type
134-
*
135-
* \returns The stream type
136-
*/
137-
URStreamType getStreamType()
138-
{
139-
switch (port_)
140-
{
141-
case static_cast<int>(URStreamType::Primary):
142-
return URStreamType::Primary;
143-
case static_cast<int>(URStreamType::Secondary):
144-
return URStreamType::Secondary;
145-
case static_cast<int>(URStreamType::RTDE):
146-
return URStreamType::RTDE;
147-
default:
148-
return URStreamType::UNKNOWN;
149-
}
150-
}
151-
152121
private:
153122
std::string host_;
154123
int port_;

include/ur_client_library/rtde/rtde_client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ class RTDEClient
232232
std::unique_ptr<comm::Pipeline<RTDEPackage>> pipeline_;
233233
RTDEWriter writer_;
234234
bool reconnecting_;
235+
bool stop_reconnection_;
235236
std::mutex reconnect_mutex_;
236237
std::thread reconnecting_thread_;
237238

src/rtde/rtde_client.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ RTDEClient::RTDEClient(std::string robot_ip, comm::INotifier& notifier, const st
4747
, pipeline_(std::make_unique<comm::Pipeline<RTDEPackage>>(*prod_, PIPELINE_NAME, notifier, true))
4848
, writer_(&stream_, input_recipe_)
4949
, reconnecting_(false)
50+
, stop_reconnection_(false)
5051
, max_frequency_(URE_MAX_FREQUENCY)
5152
, target_frequency_(target_frequency)
5253
, client_state_(ClientState::UNINITIALIZED)
@@ -71,6 +72,7 @@ RTDEClient::RTDEClient(std::string robot_ip, comm::INotifier& notifier, const st
7172
, pipeline_(std::make_unique<comm::Pipeline<RTDEPackage>>(*prod_, PIPELINE_NAME, notifier, true))
7273
, writer_(&stream_, input_recipe_)
7374
, reconnecting_(false)
75+
, stop_reconnection_(false)
7476
, max_frequency_(URE_MAX_FREQUENCY)
7577
, target_frequency_(target_frequency)
7678
, client_state_(ClientState::UNINITIALIZED)
@@ -79,7 +81,8 @@ RTDEClient::RTDEClient(std::string robot_ip, comm::INotifier& notifier, const st
7981

8082
RTDEClient::~RTDEClient()
8183
{
82-
reconnecting_ = false;
84+
prod_->setReconnectionCallback(nullptr);
85+
stop_reconnection_ = true;
8386
if (reconnecting_thread_.joinable())
8487
{
8588
reconnecting_thread_.join();
@@ -100,7 +103,7 @@ bool RTDEClient::init(const size_t max_connection_attempts, const std::chrono::m
100103
return true;
101104
}
102105

103-
prod_->setRTDEReconnectionCallback(std::bind(&RTDEClient::reconnectCallback, this));
106+
prod_->setReconnectionCallback(nullptr);
104107

105108
unsigned int attempts = 0;
106109
std::stringstream ss;
@@ -121,6 +124,9 @@ bool RTDEClient::init(const size_t max_connection_attempts, const std::chrono::m
121124
// Stop pipeline again
122125
pipeline_->stop();
123126
client_state_ = ClientState::INITIALIZED;
127+
// Set reconnection callback after we are initialized to ensure that a disconnect during initialization doesn't
128+
// trigger a reconnect
129+
prod_->setReconnectionCallback(std::bind(&RTDEClient::reconnectCallback, this));
124130
return true;
125131
}
126132

@@ -296,7 +302,6 @@ void RTDEClient::resetOutputRecipe(const std::vector<std::string> new_recipe)
296302

297303
parser_ = RTDEParser(output_recipe_);
298304
prod_ = std::make_unique<comm::URProducer<RTDEPackage>>(stream_, parser_);
299-
prod_->setRTDEReconnectionCallback(std::bind(&RTDEClient::reconnectCallback, this));
300305
pipeline_ = std::make_unique<comm::Pipeline<RTDEPackage>>(*prod_, PIPELINE_NAME, notifier_, true);
301306
}
302307

@@ -611,7 +616,6 @@ bool RTDEClient::sendStart()
611616
ss << "Did not receive answer to RTDE start request. Message received instead: " << std::endl
612617
<< package->toString();
613618
URCL_LOG_WARN("%s", ss.str().c_str());
614-
return false;
615619
}
616620
}
617621
std::stringstream ss;
@@ -644,7 +648,6 @@ bool RTDEClient::sendPause()
644648
}
645649
if (rtde_interface::ControlPackagePause* tmp = dynamic_cast<rtde_interface::ControlPackagePause*>(package.get()))
646650
{
647-
client_state_ = ClientState::PAUSED;
648651
return tmp->accepted_;
649652
}
650653
}
@@ -774,7 +777,7 @@ void RTDEClient::reconnect()
774777
}
775778

776779
const std::string reconnecting_stopped_msg = "Reconnecting has been stopped, because the object is being destroyed";
777-
if (reconnecting_ == false)
780+
if (stop_reconnection_)
778781
{
779782
URCL_LOG_WARN(reconnecting_stopped_msg.c_str());
780783
return;
@@ -806,7 +809,7 @@ void RTDEClient::reconnect()
806809
while (std::chrono::steady_clock::now() - start_time < duration)
807810
{
808811
std::this_thread::sleep_for(std::chrono::milliseconds(250));
809-
if (reconnecting_ == false)
812+
if (stop_reconnection_)
810813
{
811814
URCL_LOG_WARN(reconnecting_stopped_msg.c_str());
812815
return;
@@ -828,12 +831,13 @@ void RTDEClient::reconnect()
828831
{
829832
pause();
830833
}
834+
URCL_LOG_INFO("Done reconnecting to the RTDE interface");
831835
reconnecting_ = false;
832836
}
833837

834838
void RTDEClient::reconnectCallback()
835839
{
836-
if (reconnecting_)
840+
if (reconnecting_ || stop_reconnection_)
837841
{
838842
return;
839843
}

0 commit comments

Comments
 (0)