Skip to content

Commit e4996f0

Browse files
Updated dispatch reference operations for subscriber connection references in data publisher to use synchronized reference counting map since instances are persisted through multiple dispatches
1 parent 31a6818 commit e4996f0

File tree

5 files changed

+36
-9
lines changed

5 files changed

+36
-9
lines changed

src/lib/transport/DataPublisher.cpp

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,20 +147,36 @@ void DataPublisher::RemoveConnection(const SubscriberConnectionPtr& connection)
147147
SubscriberConnection* DataPublisher::AddDispatchReference(SubscriberConnectionPtr connectionRef)
148148
{
149149
SubscriberConnection* connectionPtr = connectionRef.get();
150+
ScopeLock lock(m_subscriberConnectionDispatchRefsLock);
150151

151-
// Hold onto subscriber connection shared pointer until it's delivered
152-
m_subscriberConnectionDispatchRefs.emplace(connectionRef);
152+
// Increment reference count for subscriber connection shared pointer until all instances are delivered
153+
const auto iterator = m_subscriberConnectionDispatchRefs.find(connectionRef);
154+
155+
if (iterator != m_subscriberConnectionDispatchRefs.end())
156+
iterator->second++;
157+
else
158+
m_subscriberConnectionDispatchRefs.emplace(connectionRef, uint32_t(1));
153159

154160
return connectionPtr;
155161
}
156162

157163
SubscriberConnectionPtr DataPublisher::ReleaseDispatchReference(SubscriberConnection* connectionPtr)
158164
{
159165
const SubscriberConnectionPtr connectionRef = connectionPtr->GetReference();
166+
ScopeLock lock(m_subscriberConnectionDispatchRefsLock);
160167

161-
// Remove used reference to subscriber connection pointer
162-
m_subscriberConnectionDispatchRefs.erase(connectionRef);
168+
// Decrement reference count to subscriber connection pointer
169+
const auto iterator = m_subscriberConnectionDispatchRefs.find(connectionRef);
170+
171+
if (iterator != m_subscriberConnectionDispatchRefs.end())
172+
{
173+
iterator->second--;
163174

175+
// Remove references when count hits zero
176+
if (iterator->second < 1)
177+
m_subscriberConnectionDispatchRefs.erase(connectionRef);
178+
}
179+
164180
return connectionRef;
165181
}
166182

src/lib/transport/DataPublisher.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ namespace transport
6868
sttp::data::DataSetPtr m_filteringMetadata;
6969
RoutingTables m_routingTables;
7070
std::unordered_set<SubscriberConnectionPtr> m_subscriberConnections;
71-
std::unordered_set<SubscriberConnectionPtr> m_subscriberConnectionDispatchRefs;
7271
sttp::SharedMutex m_subscriberConnectionsLock;
7372
SecurityMode m_securityMode;
7473
int32_t m_maximumAllowedConnections;
@@ -83,6 +82,11 @@ namespace transport
8382
sttp::Mutex m_connectActionMutex;
8483
void* m_userData;
8584

85+
// Dispatch reference - unordered map needed to manage reference
86+
// counts on subscriber connections since these are persisted
87+
std::unordered_map<SubscriberConnectionPtr, uint32_t> m_subscriberConnectionDispatchRefs;
88+
sttp::Mutex m_subscriberConnectionDispatchRefsLock;
89+
8690
// Callback queue
8791
Thread m_callbackThread;
8892
ThreadSafeQueue<CallbackDispatcher> m_callbackQueue;

src/lib/transport/DataSubscriber.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,7 @@ void DataSubscriber::HandleSucceeded(uint8_t commandCode, uint8_t* data, uint32_
587587
// still an associated message to be processed.
588588
m_subscribed = (commandCode == ServerCommand::Subscribe); //-V796
589589

590+
case ServerCommand::UpdateProcessingInterval:
590591
case ServerCommand::RotateCipherKeys:
591592
// Each of these responses come with a message that will
592593
// be delivered to the user via the status message callback.
@@ -607,7 +608,7 @@ void DataSubscriber::HandleSucceeded(uint8_t commandCode, uint8_t* data, uint32_
607608
// If we don't know what the message is, we can't interpret
608609
// the data sent with the packet. Deliver an error message
609610
// to the user via the error message callback.
610-
messageStream << "Received success code in response to unknown server command 0x" << ToHex(commandCode);
611+
messageStream << "Received success code in response to unknown server command " << ToHex(commandCode);
611612
DispatchErrorMessage(messageStream.str());
612613
break;
613614
}
@@ -889,6 +890,9 @@ SignalIndexCache* DataSubscriber::AddDispatchReference(SignalIndexCachePtr signa
889890
{
890891
SignalIndexCache* signalIndexCachePtr = signalIndexCacheRef.get();
891892

893+
// Note that no lock is needed for m_signalIndexCacheDispatchRefs set since all calls from
894+
// HandleUpdateSignalIndexCache are from a single connection and are processed sequentially
895+
892896
// Hold onto signal index cache shared pointer until it's delivered
893897
m_signalIndexCacheDispatchRefs.emplace(signalIndexCacheRef);
894898

src/lib/transport/DataSubscriber.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,16 @@ namespace transport
207207

208208
// Measurement parsing
209209
SignalIndexCachePtr m_signalIndexCache;
210-
std::unordered_set<SignalIndexCachePtr> m_signalIndexCacheDispatchRefs;
211210
int32_t m_timeIndex;
212211
int64_t m_baseTimeOffsets[2];
213212
tssc::TSSCDecoder m_tsscDecoder;
214213
bool m_tsscResetRequested;
215214
uint16_t m_tsscSequenceNumber;
216215

216+
// Dispatch reference - unordered set works fine for signal
217+
// index cache since each new call will be for a new instance
218+
std::unordered_set<SignalIndexCachePtr> m_signalIndexCacheDispatchRefs;
219+
217220
// Callback thread members
218221
Thread m_callbackThread;
219222
ThreadSafeQueue<CallbackDispatcher> m_callbackQueue;

src/lib/transport/SubscriberConnection.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1715,7 +1715,7 @@ string SubscriberConnection::DecodeString(const uint8_t* data, uint32_t offset,
17151715
return string(reinterpret_cast<char*>(const_cast<uint8_t*>(&data[offset])), length / sizeof(char));
17161716
case OperationalEncoding::UTF16BE:
17171717
// UTF16 in C++ is encoded as big-endian
1718-
swapBytes = !swapBytes;
1718+
swapBytes = !swapBytes; //-V796
17191719
case OperationalEncoding::UTF16LE:
17201720
{
17211721
wstring value(length / enc_sizeof_wchar, L'\0');
@@ -1755,7 +1755,7 @@ vector<uint8_t> SubscriberConnection::EncodeString(const string& value) const
17551755
break;
17561756
case OperationalEncoding::UTF16BE:
17571757
// UTF16 in C++ is encoded as big-endian
1758-
swapBytes = !swapBytes;
1758+
swapBytes = !swapBytes; //-V796
17591759
case OperationalEncoding::UTF16LE:
17601760
{
17611761
const wstring utf16 = ToUTF16(value);

0 commit comments

Comments
 (0)