Skip to content

Commit 31a6818

Browse files
Fixed race conditions related to dispatch queue with shared pointers and reconnects on shutdown
1 parent 4fededf commit 31a6818

12 files changed

+192
-33
lines changed

src/lib/Timer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ namespace sttp
4242
TimerElapsedCallback m_callback;
4343
void* m_userData;
4444
bool m_autoReset;
45-
volatile bool m_running;
45+
std::atomic_bool m_running;
4646

4747
void TimerThread()
4848
{
@@ -142,6 +142,9 @@ namespace sttp
142142

143143
void Stop()
144144
{
145+
if (!m_running)
146+
return;
147+
145148
m_running = false;
146149

147150
if (m_timerThread != nullptr)

src/lib/Version.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
#define __VERSION_H
2626

2727
#define STTP_TITLE "STTP C++ Library"
28-
#define STTP_VERSION "1.0.8"
28+
#define STTP_VERSION "1.0.9"
2929
#define STTP_UPDATEDON "2019-07-02"
3030

3131
#endif

src/lib/transport/DataPublisher.cpp

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ void DataPublisher::AcceptConnection(const SubscriberConnectionPtr& connection,
111111

112112
if (connectionAccepted)
113113
{
114-
DispatchClientConnected(connection.get());
114+
DispatchClientConnected(AddDispatchReference(connection));
115115
}
116116
else
117117
{
@@ -132,7 +132,7 @@ void DataPublisher::AcceptConnection(const SubscriberConnectionPtr& connection,
132132

133133
void DataPublisher::ConnectionTerminated(const SubscriberConnectionPtr& connection)
134134
{
135-
DispatchClientDisconnected(connection.get());
135+
DispatchClientDisconnected(AddDispatchReference(connection));
136136
}
137137

138138
void DataPublisher::RemoveConnection(const SubscriberConnectionPtr& connection)
@@ -144,6 +144,26 @@ void DataPublisher::RemoveConnection(const SubscriberConnectionPtr& connection)
144144
m_subscriberConnections.erase(connection);
145145
}
146146

147+
SubscriberConnection* DataPublisher::AddDispatchReference(SubscriberConnectionPtr connectionRef)
148+
{
149+
SubscriberConnection* connectionPtr = connectionRef.get();
150+
151+
// Hold onto subscriber connection shared pointer until it's delivered
152+
m_subscriberConnectionDispatchRefs.emplace(connectionRef);
153+
154+
return connectionPtr;
155+
}
156+
157+
SubscriberConnectionPtr DataPublisher::ReleaseDispatchReference(SubscriberConnection* connectionPtr)
158+
{
159+
const SubscriberConnectionPtr connectionRef = connectionPtr->GetReference();
160+
161+
// Remove used reference to subscriber connection pointer
162+
m_subscriberConnectionDispatchRefs.erase(connectionRef);
163+
164+
return connectionRef;
165+
}
166+
147167
void DataPublisher::Dispatch(const DispatcherFunction& function)
148168
{
149169
Dispatch(function, nullptr, 0, 0);
@@ -252,7 +272,7 @@ void DataPublisher::ClientConnectedDispatcher(DataPublisher* source, const vecto
252272
const SubscriberConnectionCallback clientConnectedCallback = source->m_clientConnectedCallback;
253273

254274
if (clientConnectedCallback != nullptr)
255-
clientConnectedCallback(source, connection->GetReference());
275+
clientConnectedCallback(source, source->ReleaseDispatchReference(connection));
256276
}
257277
}
258278

@@ -263,11 +283,12 @@ void DataPublisher::ClientDisconnectedDispatcher(DataPublisher* source, const st
263283
if (source != nullptr)
264284
{
265285
const SubscriberConnectionCallback clientDisconnectedCallback = source->m_clientDisconnectedCallback;
286+
const SubscriberConnectionPtr subscriberConnectionRef = source->ReleaseDispatchReference(connection);
266287

267288
if (clientDisconnectedCallback != nullptr)
268-
clientDisconnectedCallback(source, connection->GetReference());
289+
clientDisconnectedCallback(source, subscriberConnectionRef);
269290

270-
source->RemoveConnection(connection->GetReference());
291+
source->RemoveConnection(subscriberConnectionRef);
271292
}
272293
}
273294

@@ -280,7 +301,7 @@ void DataPublisher::ProcessingIntervalChangeRequestedDispatcher(DataPublisher* s
280301
const SubscriberConnectionCallback temporalProcessingIntervalChangeRequestedCallback = source->m_processingIntervalChangeRequestedCallback;
281302

282303
if (temporalProcessingIntervalChangeRequestedCallback != nullptr)
283-
temporalProcessingIntervalChangeRequestedCallback(source, connection->GetReference());
304+
temporalProcessingIntervalChangeRequestedCallback(source, source->ReleaseDispatchReference(connection));
284305
}
285306
}
286307

@@ -293,7 +314,7 @@ void DataPublisher::TemporalSubscriptionRequestedDispatcher(DataPublisher* sourc
293314
const SubscriberConnectionCallback temporalSubscriptionRequestedCallback = source->m_temporalSubscriptionRequestedCallback;
294315

295316
if (temporalSubscriptionRequestedCallback != nullptr)
296-
temporalSubscriptionRequestedCallback(source, connection->GetReference());
317+
temporalSubscriptionRequestedCallback(source, source->ReleaseDispatchReference(connection));
297318
}
298319
}
299320

@@ -306,7 +327,7 @@ void DataPublisher::TemporalSubscriptionCanceledDispatcher(DataPublisher* source
306327
const SubscriberConnectionCallback temporalSubscriptionCanceledCallback = source->m_temporalSubscriptionCanceledCallback;
307328

308329
if (temporalSubscriptionCanceledCallback != nullptr)
309-
temporalSubscriptionCanceledCallback(source, connection->GetReference());
330+
temporalSubscriptionCanceledCallback(source, source->ReleaseDispatchReference(connection));
310331
}
311332
}
312333

@@ -319,7 +340,7 @@ void DataPublisher::UserCommandDispatcher(DataPublisher* source, const std::vect
319340
const UserCommandCallback userCommandCallback = source->m_userCommandCallback;
320341

321342
if (userCommandCallback != nullptr)
322-
userCommandCallback(source, userCommandData->connection->GetReference(), userCommandData->command, userCommandData->data);
343+
userCommandCallback(source, source->ReleaseDispatchReference(userCommandData->connection), userCommandData->command, userCommandData->data);
323344
}
324345

325346
delete userCommandData;
@@ -782,6 +803,9 @@ void DataPublisher::Start(const TcpEndPoint& endpoint)
782803
if (m_started)
783804
Stop();
784805

806+
// Let any pending start operation complete before stop - prevents destruction stop before start is completed
807+
ScopeLock lock(m_connectActionMutex);
808+
785809
#if BOOST_LEGACY
786810
m_commandChannelService.reset();
787811
#else
@@ -827,6 +851,9 @@ void DataPublisher::Start(const string& networkInterfaceIP, uint16_t port)
827851

828852
void DataPublisher::Stop()
829853
{
854+
// Let any pending start operation complete before stop - prevents destruction stop before start is completed
855+
ScopeLock lock(m_connectActionMutex);
856+
830857
// Notify running threads that the
831858
// publisher is shutting down
832859
m_shuttingDown = true;

src/lib/transport/DataPublisher.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ namespace transport
6868
sttp::data::DataSetPtr m_filteringMetadata;
6969
RoutingTables m_routingTables;
7070
std::unordered_set<SubscriberConnectionPtr> m_subscriberConnections;
71+
std::unordered_set<SubscriberConnectionPtr> m_subscriberConnectionDispatchRefs;
7172
sttp::SharedMutex m_subscriberConnectionsLock;
7273
SecurityMode m_securityMode;
7374
int32_t m_maximumAllowedConnections;
@@ -77,8 +78,9 @@ namespace transport
7778
bool m_supportsTemporalSubscriptions;
7879
bool m_useBaseTimeOffsets;
7980
uint32_t m_cipherKeyRotationPeriod;
80-
volatile bool m_started;
81-
volatile bool m_shuttingDown;
81+
std::atomic_bool m_started;
82+
std::atomic_bool m_shuttingDown;
83+
sttp::Mutex m_connectActionMutex;
8284
void* m_userData;
8385

8486
// Callback queue
@@ -106,6 +108,9 @@ namespace transport
106108
SubscriberConnectionCallback m_temporalSubscriptionCanceledCallback;
107109
UserCommandCallback m_userCommandCallback;
108110

111+
SubscriberConnection* AddDispatchReference(SubscriberConnectionPtr connectionRef);
112+
SubscriberConnectionPtr ReleaseDispatchReference(SubscriberConnection* connectionPtr);
113+
109114
// Dispatchers
110115
void Dispatch(const DispatcherFunction& function);
111116
void Dispatch(const DispatcherFunction& function, const uint8_t* data, uint32_t offset, uint32_t length);

src/lib/transport/DataSubscriber.cpp

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,12 @@ int SubscriberConnector::Connect(DataSubscriber& subscriber, bool autoReconnecti
172172
try
173173
{
174174
m_connectAttempt++;
175+
176+
if (subscriber.m_disposing)
177+
return ConnectCanceled;
178+
175179
subscriber.Connect(m_hostname, m_port, autoReconnecting);
180+
176181
connected = true;
177182
break;
178183
}
@@ -223,7 +228,7 @@ int SubscriberConnector::Connect(DataSubscriber& subscriber, bool autoReconnecti
223228
}
224229
}
225230

226-
return subscriber.IsConnected() ? ConnectSuccess : ConnectFailed;
231+
return subscriber.m_disposing ? ConnectCanceled : subscriber.IsConnected() ? ConnectSuccess : ConnectFailed;
227232
}
228233

229234
// Cancel all current and
@@ -682,7 +687,7 @@ void DataSubscriber::HandleUpdateSignalIndexCache(uint8_t* data, uint32_t offset
682687
signalIndexCache->Parse(uncompressedBuffer, m_subscriberID);
683688
m_signalIndexCache.swap(signalIndexCache);
684689

685-
DispatchSubscriptionUpdated(m_signalIndexCache.get());
690+
DispatchSubscriptionUpdated(AddDispatchReference(m_signalIndexCache));
686691
}
687692

688693
// Updates base time offsets.
@@ -880,6 +885,26 @@ void DataSubscriber::ParseCompactMeasurements(uint8_t* data, uint32_t offset, ui
880885
}
881886
}
882887

888+
SignalIndexCache* DataSubscriber::AddDispatchReference(SignalIndexCachePtr signalIndexCacheRef)
889+
{
890+
SignalIndexCache* signalIndexCachePtr = signalIndexCacheRef.get();
891+
892+
// Hold onto signal index cache shared pointer until it's delivered
893+
m_signalIndexCacheDispatchRefs.emplace(signalIndexCacheRef);
894+
895+
return signalIndexCachePtr;
896+
}
897+
898+
SignalIndexCachePtr DataSubscriber::ReleaseDispatchReference(SignalIndexCache* signalIndexCachePtr)
899+
{
900+
const SignalIndexCachePtr signalIndexCacheRef = signalIndexCachePtr->GetReference();
901+
902+
// Remove used reference to signal index cache pointer
903+
m_signalIndexCacheDispatchRefs.erase(signalIndexCacheRef);
904+
905+
return signalIndexCacheRef;
906+
}
907+
883908
// Dispatches the given function to the callback thread.
884909
void DataSubscriber::Dispatch(const DispatcherFunction& function)
885910
{
@@ -986,7 +1011,7 @@ void DataSubscriber::SubscriptionUpdatedDispatcher(DataSubscriber* source, const
9861011
const SubscriptionUpdatedCallback subscriptionUpdated = source->m_subscriptionUpdatedCallback;
9871012

9881013
if (subscriptionUpdated != nullptr)
989-
subscriptionUpdated(source, signalIndexCache->GetReference());
1014+
subscriptionUpdated(source, source->ReleaseDispatchReference(signalIndexCache));
9901015
}
9911016
}
9921017

@@ -1171,6 +1196,8 @@ void DataSubscriber::Connect(const string& hostname, const uint16_t port, bool a
11711196
if (m_connected)
11721197
throw SubscriberException("Subscriber is already connected; disconnect first");
11731198

1199+
// Let any pending connect operation complete before disconnect - prevents destruction disconnect before connection is completed
1200+
ScopeLock lock(m_connectActionMutex);
11741201
DnsResolver resolver(m_commandChannelService);
11751202
const DnsResolver::query query(hostname, to_string(port));
11761203
const DnsResolver::iterator endpointIterator = resolver.resolve(query);
@@ -1218,6 +1245,10 @@ void DataSubscriber::Connect(const string& hostname, const uint16_t port)
12181245

12191246
void DataSubscriber::Disconnect(bool autoReconnecting)
12201247
{
1248+
// Let any pending connect operation complete before disconnect - prevents destruction disconnect before connection is completed
1249+
if (!autoReconnecting)
1250+
m_connectActionMutex.lock();
1251+
12211252
ErrorCode error;
12221253

12231254
// Notify running threads that
@@ -1264,6 +1295,9 @@ void DataSubscriber::Disconnect(bool autoReconnecting)
12641295

12651296
// Disconnect completed
12661297
m_disconnecting = false;
1298+
1299+
if (!autoReconnecting)
1300+
m_connectActionMutex.unlock();
12671301
}
12681302

12691303
// Disconnects from the publisher.

src/lib/transport/DataSubscriber.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ namespace transport
8989
int32_t m_connectAttempt;
9090
bool m_connectionRefused;
9191
bool m_autoReconnect;
92-
bool m_cancel;
92+
std::atomic_bool m_cancel;
9393

9494
// Auto-reconnect handler.
9595
static void AutoReconnect(DataSubscriber* subscriber);
@@ -188,16 +188,17 @@ namespace transport
188188
bool m_compressPayloadData;
189189
bool m_compressMetadata;
190190
bool m_compressSignalIndexCache;
191-
volatile bool m_disconnecting;
192-
volatile bool m_disposing;
191+
std::atomic_bool m_disconnecting;
192+
std::atomic_bool m_disposing;
193+
sttp::Mutex m_connectActionMutex;
193194
void* m_userData;
194195

195196
// Statistics counters
196197
uint64_t m_totalCommandChannelBytesReceived;
197198
uint64_t m_totalDataChannelBytesReceived;
198199
uint64_t m_totalMeasurementsReceived;
199-
volatile bool m_connected;
200-
volatile bool m_subscribed;
200+
std::atomic_bool m_connected;
201+
std::atomic_bool m_subscribed;
201202

202203
// Assembly info
203204
std::string m_assemblySource;
@@ -206,6 +207,7 @@ namespace transport
206207

207208
// Measurement parsing
208209
SignalIndexCachePtr m_signalIndexCache;
210+
std::unordered_set<SignalIndexCachePtr> m_signalIndexCacheDispatchRefs;
209211
int32_t m_timeIndex;
210212
int64_t m_baseTimeOffsets[2];
211213
tssc::TSSCDecoder m_tsscDecoder;
@@ -264,6 +266,9 @@ namespace transport
264266
void ParseTSSCMeasurements(uint8_t* data, uint32_t offset, uint32_t length, std::vector<MeasurementPtr>& measurements);
265267
void ParseCompactMeasurements(uint8_t* data, uint32_t offset, uint32_t length, bool includeTime, bool useMillisecondResolution, int64_t frameLevelTimestamp, std::vector<MeasurementPtr>& measurements);
266268

269+
SignalIndexCache* AddDispatchReference(SignalIndexCachePtr signalIndexCacheRef);
270+
SignalIndexCachePtr ReleaseDispatchReference(SignalIndexCache* signalIndexCachePtr);
271+
267272
// Dispatchers
268273
void Dispatch(const DispatcherFunction& function);
269274
void Dispatch(const DispatcherFunction& function, const uint8_t* data, uint32_t offset, uint32_t length);

0 commit comments

Comments
 (0)