Skip to content

Commit 45f5ace

Browse files
Added DataSubscriber callback for subscription updated which provides access to SignalIndexCache when publisher updates it. This is important to providing "allowed" signal IDs.
1 parent 9d7ca6a commit 45f5ace

File tree

8 files changed

+77
-4
lines changed

8 files changed

+77
-4
lines changed

src/lib/transport/DataSubscriber.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,8 @@ void DataSubscriber::HandleUpdateSignalIndexCache(uint8_t* data, uint32_t offset
664664
SignalIndexCachePtr signalIndexCache = NewSharedPtr<SignalIndexCache>();
665665
signalIndexCache->Parse(uncompressedBuffer, m_subscriberID);
666666
m_signalIndexCache.swap(signalIndexCache);
667+
668+
DispatchSubscriptionUpdated(m_signalIndexCache.get());
667669
}
668670

669671
// Updates base time offsets.
@@ -888,6 +890,11 @@ void DataSubscriber::Dispatch(const DispatcherFunction& function, const uint8_t*
888890
m_callbackQueue.Enqueue(dispatcher);
889891
}
890892

893+
void DataSubscriber::DispatchSubscriptionUpdated(SignalIndexCache* signalIndexCache)
894+
{
895+
Dispatch(&SubscriptionUpdatedDispatcher, reinterpret_cast<uint8_t*>(&signalIndexCache), 0, sizeof(SignalIndexCache**));
896+
}
897+
891898
// Invokes the status message callback on the callback thread and provides the given message to it.
892899
void DataSubscriber::DispatchStatusMessage(const string& message)
893900
{
@@ -953,6 +960,19 @@ void DataSubscriber::MetadataDispatcher(DataSubscriber* source, const vector<uin
953960
metadataCallback(source, buffer);
954961
}
955962

963+
void DataSubscriber::SubscriptionUpdatedDispatcher(DataSubscriber* source, const std::vector<uint8_t>& buffer)
964+
{
965+
SignalIndexCache* signalIndexCache = *reinterpret_cast<SignalIndexCache**>(const_cast<uint8_t*>(&buffer[0]));
966+
967+
if (source != nullptr)
968+
{
969+
const SubscriptionUpdatedCallback subscriptionUpdated = source->m_subscriptionUpdatedCallback;
970+
971+
if (subscriptionUpdated != nullptr)
972+
subscriptionUpdated(source, signalIndexCache->GetReference());
973+
}
974+
}
975+
956976
// Dispatcher for processing complete message that is sent by the server at the end of a temporal session.
957977
void DataSubscriber::ProcessingCompleteDispatcher(DataSubscriber* source, const vector<uint8_t>& buffer)
958978
{
@@ -1017,6 +1037,12 @@ void DataSubscriber::RegisterMetadataCallback(const MetadataCallback& metadataCa
10171037
m_metadataCallback = metadataCallback;
10181038
}
10191039

1040+
// Registers the subscription updated callback.
1041+
void DataSubscriber::RegisterSubscriptionUpdatedCallback(const SubscriptionUpdatedCallback& subscriptionUpdatedCallback)
1042+
{
1043+
m_subscriptionUpdatedCallback = subscriptionUpdatedCallback;
1044+
}
1045+
10201046
// Registers the new measurements callback.
10211047
void DataSubscriber::RegisterNewMeasurementsCallback(const NewMeasurementsCallback& newMeasurementsCallback)
10221048
{

src/lib/transport/DataSubscriber.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ namespace transport
158158
typedef std::function<void(DataSubscriber*, const std::string&)> MessageCallback;
159159
typedef std::function<void(DataSubscriber*, int64_t)> DataStartTimeCallback;
160160
typedef std::function<void(DataSubscriber*, const std::vector<uint8_t>&)> MetadataCallback;
161+
typedef std::function<void(DataSubscriber*, const SignalIndexCachePtr&)> SubscriptionUpdatedCallback;
161162
typedef std::function<void(DataSubscriber*, const std::vector<MeasurementPtr>&)> NewMeasurementsCallback;
162163
typedef std::function<void(DataSubscriber*)> ConfigurationChangedCallback;
163164
typedef std::function<void(DataSubscriber*)> ConnectionTerminatedCallback;
@@ -225,6 +226,7 @@ namespace transport
225226
MessageCallback m_errorMessageCallback;
226227
DataStartTimeCallback m_dataStartTimeCallback;
227228
MetadataCallback m_metadataCallback;
229+
SubscriptionUpdatedCallback m_subscriptionUpdatedCallback;
228230
NewMeasurementsCallback m_newMeasurementsCallback;
229231
MessageCallback m_processingCompleteCallback;
230232
ConfigurationChangedCallback m_configurationChangedCallback;
@@ -258,13 +260,15 @@ namespace transport
258260
// Dispatchers
259261
void Dispatch(const DispatcherFunction& function);
260262
void Dispatch(const DispatcherFunction& function, const uint8_t* data, uint32_t offset, uint32_t length);
263+
void DispatchSubscriptionUpdated(SignalIndexCache* signalIndexCache);
261264
void DispatchStatusMessage(const std::string& message);
262265
void DispatchErrorMessage(const std::string& message);
263266

264267
static void StatusMessageDispatcher(DataSubscriber* source, const std::vector<uint8_t>& buffer);
265268
static void ErrorMessageDispatcher(DataSubscriber* source, const std::vector<uint8_t>& buffer);
266269
static void DataStartTimeDispatcher(DataSubscriber* source, const std::vector<uint8_t>& buffer);
267270
static void MetadataDispatcher(DataSubscriber* source, const std::vector<uint8_t>& buffer);
271+
static void SubscriptionUpdatedDispatcher(DataSubscriber* source, const std::vector<uint8_t>& buffer);
268272
static void ProcessingCompleteDispatcher(DataSubscriber* source, const std::vector<uint8_t>& buffer);
269273
static void ConfigurationChangedDispatcher(DataSubscriber* source, const std::vector<uint8_t>& buffer);
270274

@@ -292,6 +296,7 @@ namespace transport
292296
// void HandleErrorMessage(DataSubscriber* source, const string& message)
293297
// void HandleDataStartTime(DataSubscriber* source, int64_t startTime)
294298
// void HandleMetadata(DataSubscriber* source, const vector<uint8_t>& metadata)
299+
// void HandleSubscriptionUpdated(DataSubscriber* source, const SignalIndexCachePtr& signalIndexCache)
295300
// void HandleNewMeasurements(DataSubscriber* source, const vector<MeasurementPtr>& newMeasurements)
296301
// void HandleProcessingComplete(DataSubscriber* source, const string& message)
297302
// void HandleConfigurationChanged(DataSubscriber* source)
@@ -303,6 +308,7 @@ namespace transport
303308
void RegisterErrorMessageCallback(const MessageCallback& errorMessageCallback);
304309
void RegisterDataStartTimeCallback(const DataStartTimeCallback& dataStartTimeCallback);
305310
void RegisterMetadataCallback(const MetadataCallback& metadataCallback);
311+
void RegisterSubscriptionUpdatedCallback(const SubscriptionUpdatedCallback& subscriptionUpdatedCallback);
306312
void RegisterNewMeasurementsCallback(const NewMeasurementsCallback& newMeasurementsCallback);
307313
void RegisterProcessingCompleteCallback(const MessageCallback& processingCompleteCallback);
308314
void RegisterConfigurationChangedCallback(const ConfigurationChangedCallback& configurationChangedCallback);

src/lib/transport/SignalIndexCache.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,16 @@ unordered_set<Guid> SignalIndexCache::GetSignalIDs() const
8686
return unordered_set<Guid>(m_signalIDList.begin(), m_signalIDList.end());
8787
}
8888

89+
bool SignalIndexCache::GetSignalIDs(vector<Guid>& signalIDs) const
90+
{
91+
signalIDs.clear();
92+
93+
for (Guid signalID : m_signalIDList)
94+
signalIDs.emplace_back(signalID);
95+
96+
return !signalIDs.empty();
97+
}
98+
8999
// Gets the first half of the human-readable measurement
90100
// key associated with the given 32-bit runtime ID.
91101
const string& SignalIndexCache::GetSource(const int32_t signalIndex) const
@@ -158,6 +168,11 @@ uint32_t SignalIndexCache::GetBinaryLength() const
158168
return m_binaryLength;
159169
}
160170

171+
SignalIndexCachePtr SignalIndexCache::GetReference()
172+
{
173+
return shared_from_this();
174+
}
175+
161176
void SignalIndexCache::RecalculateBinaryLength(const SubscriberConnection& connection)
162177
{
163178
uint32_t binaryLength = 28;

src/lib/transport/SignalIndexCache.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,15 @@ namespace sttp {
3333
namespace transport
3434
{
3535
class SubscriberConnection;
36-
typedef SharedPtr<SubscriberConnection> SubscriberConnectionPtr;
36+
typedef sttp::SharedPtr<SubscriberConnection> SubscriberConnectionPtr;
37+
38+
class SignalIndexCache;
39+
typedef sttp::SharedPtr<SignalIndexCache> SignalIndexCachePtr;
3740

3841
// Maps 16-bit runtime IDs to 128-bit globally unique IDs.
3942
// Additionally provides reverse lookup and an extra mapping
4043
// to human-readable measurement keys.
41-
class SignalIndexCache
44+
class SignalIndexCache : public sttp::EnableSharedThisPtr<SignalIndexCache> // NOLINT
4245
{
4346
private:
4447
std::unordered_map<int32_t, uint32_t> m_reference;
@@ -63,9 +66,12 @@ namespace transport
6366
// Gets the globally unique signal ID associated with the given 16-bit runtime ID.
6467
sttp::Guid GetSignalID(int32_t signalIndex) const;
6568

66-
//Gets the full list of signal IDs as an unordered set
69+
// Gets the full list of signal IDs as an unordered set
6770
std::unordered_set<sttp::Guid> GetSignalIDs() const;
6871

72+
// Populates given list with a copy of signal IDs
73+
bool GetSignalIDs(std::vector<sttp::Guid>& signalIDs) const;
74+
6975
// Gets the first half of the human-readable measurement
7076
// key associated with the given 16-bit runtime ID.
7177
const std::string& GetSource(int32_t signalIndex) const;
@@ -88,14 +94,16 @@ namespace transport
8894
// a vector size, for an exact size call RecalculateBinaryLength first
8995
uint32_t GetBinaryLength() const;
9096

97+
SignalIndexCachePtr GetReference();
98+
9199
void RecalculateBinaryLength(const SubscriberConnection& connection);
92100

93101
void Parse(const std::vector<uint8_t>& buffer, Guid& subscriberID);
94102

95103
void Serialize(const SubscriberConnection& connection, std::vector<uint8_t>& buffer);
96104
};
97105

98-
typedef SharedPtr<SignalIndexCache> SignalIndexCachePtr;
106+
typedef sttp::SharedPtr<SignalIndexCache> SignalIndexCachePtr;
99107
}}
100108

101109
#endif

src/lib/transport/SubscriberInstance.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,6 +1001,10 @@ void SubscriberInstance::ParsedMetadata()
10011001
{
10021002
}
10031003

1004+
void SubscriberInstance::SubscriptionUpdated(const SignalIndexCachePtr& signalIndexCache)
1005+
{
1006+
}
1007+
10041008
void SubscriberInstance::ReceivedNewMeasurements(const vector<MeasurementPtr>& measurements)
10051009
{
10061010
if (m_receiveSimpleMeasurements)
@@ -1109,6 +1113,12 @@ void SubscriberInstance::HandleMetadata(DataSubscriber* source, const vector<uin
11091113
source->Subscribe();
11101114
}
11111115

1116+
void SubscriberInstance::HandleSubscriptionUpdated(DataSubscriber* source, const SignalIndexCachePtr& signalIndexCache)
1117+
{
1118+
SubscriberInstance* instance = static_cast<SubscriberInstance*>(source->GetUserData());
1119+
instance->SubscriptionUpdated(signalIndexCache);
1120+
}
1121+
11121122
void SubscriberInstance::HandleNewMeasurements(DataSubscriber* source, const vector<MeasurementPtr>& measurements)
11131123
{
11141124
SubscriberInstance* instance = static_cast<SubscriberInstance*>(source->GetUserData());

src/lib/transport/SubscriberInstance.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ namespace transport
6868
static void HandleErrorMessage(DataSubscriber* source, const std::string& message);
6969
static void HandleDataStartTime(DataSubscriber* source, int64_t startTime);
7070
static void HandleMetadata(DataSubscriber* source, const std::vector<uint8_t>& payload);
71+
static void HandleSubscriptionUpdated(DataSubscriber* source, const SignalIndexCachePtr& signalIndexCache);
7172
static void HandleNewMeasurements(DataSubscriber* source, const std::vector<MeasurementPtr>& measurements);
7273
static void HandleProcessingComplete(DataSubscriber* source, const std::string& message);
7374
static void HandleConfigurationChanged(DataSubscriber* source);
@@ -86,6 +87,7 @@ namespace transport
8687
virtual void DataStartTime(datetime_t startTime);
8788
virtual void ReceivedMetadata(const std::vector<uint8_t>& payload);
8889
virtual void ParsedMetadata();
90+
virtual void SubscriptionUpdated(const SignalIndexCachePtr& signalIndexCache);
8991
virtual void ReceivedNewMeasurements(const std::vector<MeasurementPtr>& measurements);
9092
virtual void ReceivedNewMeasurements(const SimpleMeasurement* measurements, int32_t length);
9193
virtual void ConfigurationChanged();

src/samples/InstanceSubscribe/SubscriberHandler.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ void SubscriberHandler::ReceivedNewMeasurements(const vector<MeasurementPtr>& me
187187
}
188188
}
189189

190+
void SubscriberHandler::SubscriptionUpdated(const SignalIndexCachePtr& signalIndexCache)
191+
{
192+
StatusMessage("Publisher provided " + ToString(signalIndexCache->Count()) + " measurements in response to subscription.");
193+
}
194+
190195
void SubscriberHandler::ConfigurationChanged()
191196
{
192197
StatusMessage("Configuration change detected. Metadata refresh requested.");

src/samples/InstanceSubscribe/SubscriberHandler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class SubscriberHandler : public sttp::transport::SubscriberInstance
4444
void DataStartTime(sttp::datetime_t startTime) override;
4545
void ReceivedMetadata(const std::vector<uint8_t>& payload) override;
4646
void ReceivedNewMeasurements(const std::vector<sttp::transport::MeasurementPtr>& measurements) override;
47+
void SubscriptionUpdated(const sttp::transport::SignalIndexCachePtr& signalIndexCache) override;
4748
void ParsedMetadata() override;
4849
void ConfigurationChanged() override;
4950
void HistoricalReadComplete() override;

0 commit comments

Comments
 (0)