Skip to content

Commit 151c431

Browse files
Updated dispatch reference release callers to always run even when no handler has been registered.
1 parent e4996f0 commit 151c431

File tree

2 files changed

+61
-34
lines changed

2 files changed

+61
-34
lines changed

src/lib/transport/DataPublisher.cpp

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ void DataPublisher::DispatchUserCommand(SubscriberConnection* connection, uint32
258258
// Dispatcher function for status messages. Decodes the message and provides it to the user via the status message callback.
259259
void DataPublisher::StatusMessageDispatcher(DataPublisher* source, const vector<uint8_t>& buffer)
260260
{
261-
if (source == nullptr)
261+
if (source == nullptr || buffer.empty())
262262
return;
263263

264264
const MessageCallback statusMessageCallback = source->m_statusMessageCallback;
@@ -270,7 +270,7 @@ void DataPublisher::StatusMessageDispatcher(DataPublisher* source, const vector<
270270
// Dispatcher function for error messages. Decodes the message and provides it to the user via the error message callback.
271271
void DataPublisher::ErrorMessageDispatcher(DataPublisher* source, const vector<uint8_t>& buffer)
272272
{
273-
if (source == nullptr)
273+
if (source == nullptr || buffer.empty())
274274
return;
275275

276276
const MessageCallback errorMessageCallback = source->m_errorMessageCallback;
@@ -281,95 +281,118 @@ void DataPublisher::ErrorMessageDispatcher(DataPublisher* source, const vector<u
281281

282282
void DataPublisher::ClientConnectedDispatcher(DataPublisher* source, const vector<uint8_t>& buffer)
283283
{
284-
SubscriberConnection* connection = *reinterpret_cast<SubscriberConnection**>(const_cast<uint8_t*>(&buffer[0]));
284+
if (source == nullptr || buffer.empty())
285+
return;
286+
287+
SubscriberConnection* connectionPtr = *reinterpret_cast<SubscriberConnection**>(const_cast<uint8_t*>(&buffer[0]));
285288

286-
if (source != nullptr)
289+
if (connectionPtr != nullptr)
287290
{
288291
const SubscriberConnectionCallback clientConnectedCallback = source->m_clientConnectedCallback;
292+
const SubscriberConnectionPtr connectionRef = source->ReleaseDispatchReference(connectionPtr);
289293

290294
if (clientConnectedCallback != nullptr)
291-
clientConnectedCallback(source, source->ReleaseDispatchReference(connection));
295+
clientConnectedCallback(source, connectionRef);
292296
}
293297
}
294298

295299
void DataPublisher::ClientDisconnectedDispatcher(DataPublisher* source, const std::vector<uint8_t>& buffer)
296300
{
297-
SubscriberConnection* connection = *reinterpret_cast<SubscriberConnection**>(const_cast<uint8_t*>(&buffer[0]));
301+
if (source == nullptr || buffer.empty())
302+
return;
298303

299-
if (source != nullptr)
304+
SubscriberConnection* connectionPtr = *reinterpret_cast<SubscriberConnection**>(const_cast<uint8_t*>(&buffer[0]));
305+
306+
if (connectionPtr != nullptr)
300307
{
301308
const SubscriberConnectionCallback clientDisconnectedCallback = source->m_clientDisconnectedCallback;
302-
const SubscriberConnectionPtr subscriberConnectionRef = source->ReleaseDispatchReference(connection);
309+
const SubscriberConnectionPtr connectionRef = source->ReleaseDispatchReference(connectionPtr);
303310

304311
if (clientDisconnectedCallback != nullptr)
305-
clientDisconnectedCallback(source, subscriberConnectionRef);
312+
clientDisconnectedCallback(source, connectionRef);
306313

307-
source->RemoveConnection(subscriberConnectionRef);
314+
source->RemoveConnection(connectionRef);
308315
}
309316
}
310317

311318
void DataPublisher::ProcessingIntervalChangeRequestedDispatcher(DataPublisher* source, const std::vector<uint8_t>& buffer)
312319
{
313-
SubscriberConnection* connection = *reinterpret_cast<SubscriberConnection**>(const_cast<uint8_t*>(&buffer[0]));
320+
if (source == nullptr || buffer.empty())
321+
return;
322+
323+
SubscriberConnection* connectionPtr = *reinterpret_cast<SubscriberConnection**>(const_cast<uint8_t*>(&buffer[0]));
314324

315-
if (source != nullptr)
325+
if (connectionPtr != nullptr)
316326
{
317327
const SubscriberConnectionCallback temporalProcessingIntervalChangeRequestedCallback = source->m_processingIntervalChangeRequestedCallback;
328+
const SubscriberConnectionPtr connectionRef = source->ReleaseDispatchReference(connectionPtr);
318329

319330
if (temporalProcessingIntervalChangeRequestedCallback != nullptr)
320-
temporalProcessingIntervalChangeRequestedCallback(source, source->ReleaseDispatchReference(connection));
331+
temporalProcessingIntervalChangeRequestedCallback(source, connectionRef);
321332
}
322333
}
323334

324335
void DataPublisher::TemporalSubscriptionRequestedDispatcher(DataPublisher* source, const std::vector<uint8_t>& buffer)
325336
{
326-
SubscriberConnection* connection = *reinterpret_cast<SubscriberConnection**>(const_cast<uint8_t*>(&buffer[0]));
337+
if (source == nullptr || buffer.empty())
338+
return;
339+
340+
SubscriberConnection* connectionPtr = *reinterpret_cast<SubscriberConnection**>(const_cast<uint8_t*>(&buffer[0]));
327341

328-
if (source != nullptr)
342+
if (connectionPtr != nullptr)
329343
{
330344
const SubscriberConnectionCallback temporalSubscriptionRequestedCallback = source->m_temporalSubscriptionRequestedCallback;
345+
const SubscriberConnectionPtr connectionRef = source->ReleaseDispatchReference(connectionPtr);
331346

332347
if (temporalSubscriptionRequestedCallback != nullptr)
333-
temporalSubscriptionRequestedCallback(source, source->ReleaseDispatchReference(connection));
348+
temporalSubscriptionRequestedCallback(source, connectionRef);
334349
}
335350
}
336351

337352
void DataPublisher::TemporalSubscriptionCanceledDispatcher(DataPublisher* source, const std::vector<uint8_t>& buffer)
338353
{
339-
SubscriberConnection* connection = *reinterpret_cast<SubscriberConnection**>(const_cast<uint8_t*>(&buffer[0]));
354+
if (source == nullptr || buffer.empty())
355+
return;
340356

341-
if (source != nullptr)
357+
SubscriberConnection* connectionPtr = *reinterpret_cast<SubscriberConnection**>(const_cast<uint8_t*>(&buffer[0]));
358+
359+
if (connectionPtr != nullptr)
342360
{
343361
const SubscriberConnectionCallback temporalSubscriptionCanceledCallback = source->m_temporalSubscriptionCanceledCallback;
362+
const SubscriberConnectionPtr connectionRef = source->ReleaseDispatchReference(connectionPtr);
344363

345364
if (temporalSubscriptionCanceledCallback != nullptr)
346-
temporalSubscriptionCanceledCallback(source, source->ReleaseDispatchReference(connection));
365+
temporalSubscriptionCanceledCallback(source, connectionRef);
347366
}
348367
}
349368

350369
void DataPublisher::UserCommandDispatcher(DataPublisher* source, const std::vector<uint8_t>& buffer)
351370
{
371+
if (source == nullptr || buffer.empty())
372+
return;
373+
352374
UserCommandData* userCommandData = *reinterpret_cast<UserCommandData**>(const_cast<uint8_t*>(&buffer[0]));
353375

354-
if (source != nullptr && userCommandData != nullptr)
376+
if (userCommandData != nullptr && userCommandData->connection != nullptr)
355377
{
356378
const UserCommandCallback userCommandCallback = source->m_userCommandCallback;
379+
const SubscriberConnectionPtr connectionRef = source->ReleaseDispatchReference(userCommandData->connection);
357380

358381
if (userCommandCallback != nullptr)
359-
userCommandCallback(source, source->ReleaseDispatchReference(userCommandData->connection), userCommandData->command, userCommandData->data);
382+
userCommandCallback(source, connectionRef, userCommandData->command, userCommandData->data);
360383
}
361384

362385
delete userCommandData;
363386
}
364387

365388
int32_t DataPublisher::GetColumnIndex(const sttp::data::DataTablePtr& table, const std::string& columnName)
366389
{
367-
const DataColumnPtr& column = table->Column(columnName);
390+
const DataColumnPtr& column = table->Column(columnName);
368391

369-
if (column == nullptr)
370-
throw PublisherException("Column name \"" + columnName + "\" was not found in table \"" + table->Name() + "\"");
392+
if (column == nullptr)
393+
throw PublisherException("Column name \"" + columnName + "\" was not found in table \"" + table->Name() + "\"");
371394

372-
return column->Index();
395+
return column->Index();
373396
}
374397

375398
void DataPublisher::DefineMetadata(const vector<DeviceMetadataPtr>& deviceMetadata, const vector<MeasurementMetadataPtr>& measurementMetadata, const vector<PhasorMetadataPtr>& phasorMetadata, const int32_t versionNumber)

src/lib/transport/DataSubscriber.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -958,7 +958,7 @@ void DataSubscriber::DispatchErrorMessage(const string& message)
958958
// Dispatcher function for status messages. Decodes the message and provides it to the user via the status message callback.
959959
void DataSubscriber::StatusMessageDispatcher(DataSubscriber* source, const vector<uint8_t>& buffer)
960960
{
961-
if (source == nullptr)
961+
if (source == nullptr || buffer.empty())
962962
return;
963963

964964
const MessageCallback statusMessageCallback = source->m_statusMessageCallback;
@@ -970,7 +970,7 @@ void DataSubscriber::StatusMessageDispatcher(DataSubscriber* source, const vecto
970970
// Dispatcher function for error messages. Decodes the message and provides it to the user via the error message callback.
971971
void DataSubscriber::ErrorMessageDispatcher(DataSubscriber* source, const vector<uint8_t>& buffer)
972972
{
973-
if (source == nullptr)
973+
if (source == nullptr || buffer.empty())
974974
return;
975975

976976
const MessageCallback errorMessageCallback = source->m_errorMessageCallback;
@@ -982,7 +982,7 @@ void DataSubscriber::ErrorMessageDispatcher(DataSubscriber* source, const vector
982982
// Dispatcher function for data start time. Decodes the start time and provides it to the user via the data start time callback.
983983
void DataSubscriber::DataStartTimeDispatcher(DataSubscriber* source, const vector<uint8_t>& buffer)
984984
{
985-
if (source == nullptr)
985+
if (source == nullptr || buffer.empty())
986986
return;
987987

988988
const DataStartTimeCallback dataStartTimeCallback = source->m_dataStartTimeCallback;
@@ -997,7 +997,7 @@ void DataSubscriber::DataStartTimeDispatcher(DataSubscriber* source, const vecto
997997
// Dispatcher function for metadata. Provides encoded metadata to the user via the metadata callback.
998998
void DataSubscriber::MetadataDispatcher(DataSubscriber* source, const vector<uint8_t>& buffer)
999999
{
1000-
if (source == nullptr)
1000+
if (source == nullptr) // Empty buffer OK
10011001
return;
10021002

10031003
const MetadataCallback metadataCallback = source->m_metadataCallback;
@@ -1008,21 +1008,25 @@ void DataSubscriber::MetadataDispatcher(DataSubscriber* source, const vector<uin
10081008

10091009
void DataSubscriber::SubscriptionUpdatedDispatcher(DataSubscriber* source, const std::vector<uint8_t>& buffer)
10101010
{
1011-
SignalIndexCache* signalIndexCache = *reinterpret_cast<SignalIndexCache**>(const_cast<uint8_t*>(&buffer[0]));
1011+
if (source == nullptr || buffer.empty())
1012+
return;
1013+
1014+
SignalIndexCache* signalIndexCachePtr = *reinterpret_cast<SignalIndexCache**>(const_cast<uint8_t*>(&buffer[0]));
10121015

1013-
if (source != nullptr)
1016+
if (signalIndexCachePtr != nullptr)
10141017
{
10151018
const SubscriptionUpdatedCallback subscriptionUpdated = source->m_subscriptionUpdatedCallback;
1019+
const SignalIndexCachePtr signalIndexCacheRef = source->ReleaseDispatchReference(signalIndexCachePtr);
10161020

10171021
if (subscriptionUpdated != nullptr)
1018-
subscriptionUpdated(source, source->ReleaseDispatchReference(signalIndexCache));
1022+
subscriptionUpdated(source, signalIndexCacheRef);
10191023
}
10201024
}
10211025

10221026
// Dispatcher for processing complete message that is sent by the server at the end of a temporal session.
10231027
void DataSubscriber::ProcessingCompleteDispatcher(DataSubscriber* source, const vector<uint8_t>& buffer)
10241028
{
1025-
if (source == nullptr)
1029+
if (source == nullptr) // Empty buffer OK
10261030
return;
10271031

10281032
const MessageCallback processingCompleteCallback = source->m_processingCompleteCallback;
@@ -1041,7 +1045,7 @@ void DataSubscriber::ProcessingCompleteDispatcher(DataSubscriber* source, const
10411045
// Dispatcher for processing complete message that is sent by the server at the end of a temporal session.
10421046
void DataSubscriber::ConfigurationChangedDispatcher(DataSubscriber* source, const vector<uint8_t>& buffer)
10431047
{
1044-
if (source == nullptr)
1048+
if (source == nullptr) // Empty buffer OK
10451049
return;
10461050

10471051
const ConfigurationChangedCallback configurationChangedCallback = source->m_configurationChangedCallback;

0 commit comments

Comments
 (0)