Skip to content

Commit 20d413d

Browse files
Improved lock handling of signal index cache and implemented better thread release management during shutdown.
1 parent 2720908 commit 20d413d

File tree

8 files changed

+37
-23
lines changed

8 files changed

+37
-23
lines changed

src/lib/Timer.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,15 @@ namespace sttp
5050

5151
do
5252
{
53-
boost::this_thread::sleep(boost::posix_time::milliseconds(m_interval));
53+
try
54+
{
55+
boost::this_thread::sleep(boost::posix_time::milliseconds(m_interval));
56+
}
57+
catch (boost::thread_interrupted&)
58+
{
59+
m_running = false;
60+
return;
61+
}
5462

5563
if (m_running)
5664
m_callback(this, m_userData);

src/lib/transport/DataPublisher.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ void DataPublisher::Start(const TcpEndPoint& endpoint)
851851
m_commandChannelService.restart();
852852
#endif
853853

854-
m_clientAcceptor = TcpAcceptor(m_commandChannelService, endpoint, false);
854+
m_clientAcceptor = TcpAcceptor(m_commandChannelService, endpoint, false); //-V601
855855

856856
// Run call-back thread
857857
m_commandChannelAcceptThread = Thread([&,this]

src/lib/transport/DataSubscriber.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ void DataSubscriber::ReadPayloadHeader(const ErrorCode& error, size_t bytesTrans
413413
if (error == error::connection_aborted || error == error::connection_reset || error == error::eof)
414414
{
415415
// Connection closed by peer; terminate connection
416-
Thread(bind(&DataSubscriber::ConnectionTerminatedDispatcher, this));
416+
m_connectionTerminationThread = Thread(bind(&DataSubscriber::ConnectionTerminatedDispatcher, this));
417417
return;
418418
}
419419

@@ -451,7 +451,7 @@ void DataSubscriber::ReadPacket(const ErrorCode& error, size_t bytesTransferred)
451451
if (error == error::connection_aborted || error == error::connection_reset || error == error::eof)
452452
{
453453
// Connection closed by peer; terminate connection
454-
Thread(bind(&DataSubscriber::ConnectionTerminatedDispatcher, this));
454+
m_connectionTerminationThread = Thread(bind(&DataSubscriber::ConnectionTerminatedDispatcher, this));
455455
return;
456456
}
457457

@@ -1017,7 +1017,7 @@ void DataSubscriber::SubscriptionUpdatedDispatcher(DataSubscriber* source, const
10171017
if (signalIndexCachePtr != nullptr)
10181018
{
10191019
const SubscriptionUpdatedCallback subscriptionUpdated = source->m_subscriptionUpdatedCallback;
1020-
const SignalIndexCachePtr signalIndexCacheRef = source->ReleaseDispatchReference(signalIndexCachePtr);
1020+
const SignalIndexCachePtr signalIndexCacheRef = source->ReleaseDispatchReference(signalIndexCachePtr); //-V821
10211021

10221022
if (subscriptionUpdated != nullptr)
10231023
subscriptionUpdated(source, signalIndexCacheRef);
@@ -1258,6 +1258,7 @@ void DataSubscriber::Disconnect(bool autoReconnecting)
12581258
if (!autoReconnecting)
12591259
{
12601260
m_connector.Cancel();
1261+
m_connectionTerminationThread.join();
12611262
m_connectActionMutex.lock();
12621263
}
12631264

@@ -1504,7 +1505,7 @@ void DataSubscriber::WriteHandler(const ErrorCode& error, size_t bytesTransferre
15041505
if (error == error::connection_aborted || error == error::connection_reset || error == error::eof)
15051506
{
15061507
// Connection closed by peer; terminate connection
1507-
Thread(bind(&DataSubscriber::ConnectionTerminatedDispatcher, this));
1508+
m_connectionTerminationThread = Thread(bind(&DataSubscriber::ConnectionTerminatedDispatcher, this));
15081509
return;
15091510
}
15101511

src/lib/transport/DataSubscriber.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ namespace transport
220220
// Callback thread members
221221
Thread m_callbackThread;
222222
ThreadSafeQueue<CallbackDispatcher> m_callbackQueue;
223+
Thread m_connectionTerminationThread;
223224

224225
// Command channel
225226
Thread m_commandChannelResponseThread;

src/lib/transport/SubscriberConnection.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -342,17 +342,24 @@ void SubscriberConnection::SetSubscriptionInfo(const string& value)
342342
m_subscriptionInfo = source + " version " + version + " updated on " + updatedOn;
343343
}
344344

345-
const SignalIndexCachePtr& SubscriberConnection::GetSignalIndexCache() const
345+
const SignalIndexCachePtr& SubscriberConnection::GetSignalIndexCache()
346346
{
347+
ReaderLock readLock(m_signalIndexCacheLock);
348+
347349
return m_signalIndexCache;
348350
}
349351

350352
void SubscriberConnection::SetSignalIndexCache(SignalIndexCachePtr signalIndexCache)
351353
{
354+
WriterLock writeLock(m_signalIndexCacheLock);
355+
352356
m_signalIndexCache = std::move(signalIndexCache);
353357

354358
// Update measurement routes for newly subscribed measurement signal IDs
355359
m_parent->m_routingTables.UpdateRoutes(shared_from_this(), m_signalIndexCache->GetSignalIDs());
360+
361+
// Reset TSSC encoder on successful (re)subscription
362+
m_tsscResetRequested = true;
356363
}
357364

358365
uint64_t SubscriberConnection::GetTotalCommandChannelBytesSent() const
@@ -748,15 +755,9 @@ void SubscriberConnection::HandleSubscribe(uint8_t* data, uint32_t length)
748755
// Send updated signal index cache to client with validated rights of the selected input measurement keys
749756
SendResponse(ServerResponse::UpdateSignalIndexCache, ServerCommand::Subscribe, SerializeSignalIndexCache(*signalIndexCache));
750757
}
751-
752-
m_tsscEncoderLock.lock();
753758

754-
// Reset TSSC encoder on successful (re)subscription
755-
m_tsscResetRequested = true;
756759
SetSignalIndexCache(signalIndexCache);
757760

758-
m_tsscEncoderLock.unlock();
759-
760761
// If using compact measurement format with base time offsets, setup base time rotation timer
761762
if (!m_usingPayloadCompression && m_parent->GetUseBaseTimeOffsets() && m_includeTime)
762763
{
@@ -1126,7 +1127,8 @@ SignalIndexCachePtr SubscriberConnection::ParseSubscriptionRequest(const string&
11261127

11271128
void SubscriberConnection::PublishCompactMeasurements(const std::vector<MeasurementPtr>& measurements)
11281129
{
1129-
CompactMeasurement serializer(m_signalIndexCache, m_baseTimeOffsets, m_includeTime, m_useMillisecondResolution, m_timeIndex);
1130+
const SignalIndexCachePtr signalIndexCache = GetSignalIndexCache();
1131+
CompactMeasurement serializer(signalIndexCache, m_baseTimeOffsets, m_includeTime, m_useMillisecondResolution, m_timeIndex);
11301132
vector<uint8_t> packet, buffer;
11311133
int32_t count = 0;
11321134

@@ -1137,7 +1139,7 @@ void SubscriberConnection::PublishCompactMeasurements(const std::vector<Measurem
11371139
{
11381140
const Measurement& measurement = *measurements[i];
11391141
const int64_t timestamp = measurement.Timestamp;
1140-
const int32_t runtimeID = m_signalIndexCache->GetSignalIndex(measurement.SignalID);
1142+
const int32_t runtimeID = signalIndexCache->GetSignalIndex(measurement.SignalID);
11411143

11421144
if (runtimeID == Int32::MaxValue)
11431145
continue;
@@ -1193,7 +1195,7 @@ void SubscriberConnection::PublishCompactDataPacket(const vector<uint8_t>& packe
11931195

11941196
void SubscriberConnection::PublishTSSCMeasurements(const std::vector<MeasurementPtr>& measurements)
11951197
{
1196-
m_tsscEncoderLock.lock();
1198+
const SignalIndexCachePtr signalIndexCache = GetSignalIndexCache();
11971199

11981200
if (m_tsscResetRequested)
11991201
{
@@ -1216,7 +1218,7 @@ void SubscriberConnection::PublishTSSCMeasurements(const std::vector<Measurement
12161218

12171219
for (const auto& measurement : measurements)
12181220
{
1219-
const int32_t index = m_signalIndexCache->GetSignalIndex(measurement->SignalID);
1221+
const int32_t index = signalIndexCache->GetSignalIndex(measurement->SignalID);
12201222

12211223
if (!m_tsscEncoder.TryAddMeasurement(index, measurement->Timestamp, static_cast<uint32_t>(measurement->Flags), static_cast<float32_t>(measurement->AdjustedValue())))
12221224
{
@@ -1231,8 +1233,6 @@ void SubscriberConnection::PublishTSSCMeasurements(const std::vector<Measurement
12311233

12321234
if (count > 0)
12331235
PublishTSSCDataPacket(count);
1234-
1235-
m_tsscEncoderLock.unlock();
12361236
}
12371237

12381238
void SubscriberConnection::PublishTSSCDataPacket(int32_t count)

src/lib/transport/SubscriberConnection.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ namespace transport
101101

102102
// Measurement parsing
103103
SignalIndexCachePtr m_signalIndexCache;
104+
sttp::SharedMutex m_signalIndexCacheLock;
104105
TimerPtr m_baseTimeRotationTimer;
105106
int32_t m_timeIndex;
106107
int64_t m_baseTimeOffsets[2];
@@ -110,7 +111,6 @@ namespace transport
110111
sttp::Mutex m_latestMeasurementsLock;
111112
TimerPtr m_throttledPublicationTimer;
112113
tssc::TSSCEncoder m_tsscEncoder;
113-
sttp::Mutex m_tsscEncoderLock;
114114
uint8_t m_tsscWorkingBuffer[TSSCBufferSize];
115115
bool m_tsscResetRequested;
116116
uint16_t m_tsscSequenceNumber;
@@ -247,7 +247,7 @@ namespace transport
247247
void SetSubscriptionInfo(const std::string& value);
248248

249249
// Gets or sets signal index cache for subscriber representing run-time mappings for subscribed points
250-
const SignalIndexCachePtr& GetSignalIndexCache() const;
250+
const SignalIndexCachePtr& GetSignalIndexCache();
251251
void SetSignalIndexCache(SignalIndexCachePtr signalIndexCache);
252252

253253
// Statistical functions

src/lib/transport/SubscriberInstance.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ SubscriberInstance::SubscriberInstance() :
5858

5959
SubscriberInstance::~SubscriberInstance()
6060
{
61-
m_subscriber.reset();
61+
if (m_subscriber != nullptr)
62+
m_subscriber->Disconnect();
63+
64+
m_connectThread.join();
6265
}
6366

6467
// public functions
@@ -150,7 +153,7 @@ void SubscriberInstance::SetMetadataFilters(const std::string& metadataFilters)
150153

151154
void SubscriberInstance::ConnectAsync()
152155
{
153-
Thread(bind(&SubscriberInstance::Connect, this));
156+
m_connectThread = Thread(bind(&SubscriberInstance::Connect, this));
154157
}
155158

156159
void SubscriberInstance::Connect()

src/lib/transport/SubscriberInstance.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ namespace transport
5151
std::unordered_map<Guid, MeasurementMetadataPtr> m_measurements;
5252
sttp::StringMap<DeviceMetadataPtr> m_devices;
5353
sttp::StringMap<ConfigurationFramePtr> m_configurationFrames;
54+
sttp::Thread m_connectThread;
5455

5556
Mutex m_configurationUpdateLock;
5657
void* m_userData;

0 commit comments

Comments
 (0)