@@ -32,15 +32,19 @@ Mutex SubscriberHandler::s_coutLock{};
3232
3333SubscriberHandler::SubscriberHandler (string name) :
3434 m_name(std::move(name)),
35- m_processCount( 0L )
35+ m_lastMessage(DateTime::MinValue )
3636{
3737}
3838
3939SubscriptionInfo SubscriberHandler::CreateSubscriptionInfo ()
4040{
4141 SubscriptionInfo info = SubscriberInstance::CreateSubscriptionInfo ();
4242
43- // TODO: Modify subscription info properties as desired...
43+ // TODO: Modify any custom subscription info properties as desired...
44+
45+ // Note -- most common subscription info properties are set in the base class
46+ // as common get/set properties - this overload would only be needed if you
47+ // need to set a custom property not already defined in the base class
4448
4549 // See SubscriptionInfo class in DataSubscriber.h for all properties
4650 // info.Throttled = false;
@@ -50,51 +54,26 @@ SubscriptionInfo SubscriberHandler::CreateSubscriptionInfo()
5054 return info;
5155}
5256
53- void SubscriberHandler::SetupSubscriberConnector (SubscriberConnector& connector)
54- {
55- SubscriberInstance::SetupSubscriberConnector (connector);
56-
57- // TODO: Customize subscriber connector properties as desired...
58-
59- // Enable auto-reconnect sequence:
60- // connector.SetAutoReconnect(true);
61-
62- // Set maximum number to attempt reconnection, -1 means never stop retrying connection attempts:
63- // connector.SetMaxRetries(-1);
64-
65- // Set number of initial milliseconds to wait before retrying connection attempt:
66- // connector.SetRetryInterval(2000);
67-
68- // Set maximum number of milliseconds to wait before retrying connection attempt, connection retry attempts use exponential back-off algorithm up to this defined maximum:
69- // connector.SetMaxRetryInterval(30000);
70- }
71-
7257void SubscriberHandler::StatusMessage (const string& message)
7358{
7459 // TODO: Make sure these messages get logged to an appropriate location
75- // For now, the base class just displays to console:
76- stringstream status;
77-
78- status << " [" << m_name << " ] " << message;
7960
8061 // Calls can come from multiple threads, so we impose a simple lock before write to console
81- s_coutLock.lock ();
82- SubscriberInstance::StatusMessage (status.str ());
83- s_coutLock.unlock ();
62+ ScopeLock lock (s_coutLock);
63+
64+ // For now, we just the base class to display to console:
65+ SubscriberInstance::StatusMessage (" [" + m_name + " ] " + message);
8466}
8567
8668void SubscriberHandler::ErrorMessage (const string& message)
8769{
8870 // TODO: Make sure these messages get logged to an appropriate location
89- // For now, the base class just displays to console:
90- stringstream status;
91-
92- status << " [" << m_name << " ] " << ToString (UtcNow ()) << " " << message;
9371
9472 // Calls can come from multiple threads, so we impose a simple lock before write to console
95- s_coutLock.lock ();
96- SubscriberInstance::ErrorMessage (status.str ());
97- s_coutLock.unlock ();
73+ ScopeLock lock (s_coutLock);
74+
75+ // For now, we just the base class to display to console:
76+ SubscriberInstance::ErrorMessage (" [" + m_name + " ] " + ToString (UtcNow ()) + " " + message);
9877}
9978
10079void SubscriberHandler::DataStartTime (time_t unixSOC, uint16_t milliseconds)
@@ -109,10 +88,7 @@ void SubscriberHandler::DataStartTime(datetime_t startTime)
10988
11089void SubscriberHandler::ReceivedMetadata (const vector<uint8_t >& payload)
11190{
112- stringstream message;
113- message << " Received " << payload.size () << " bytes of metadata, parsing..." ;
114- StatusMessage (message.str ());
115-
91+ StatusMessage (" Received " + ToString (payload.size ()) + " bytes of metadata, parsing..." );
11692 SubscriberInstance::ReceivedMetadata (payload);
11793}
11894
@@ -124,68 +100,66 @@ void SubscriberHandler::ParsedMetadata()
124100// ReSharper disable CppDeclaratorNeverUsed
125101void SubscriberHandler::ReceivedNewMeasurements (const vector<MeasurementPtr>& measurements)
126102{
127- static constexpr uint64_t interval = 10ULL * 60ULL ;
103+ static constexpr float32_t interval = 2.0 ; // Show status every 2 seconds
128104 static constexpr uint64_t maxToShow = 20ULL ;
129105 const uint64_t measurementCount = measurements.size ();
130- const bool showMessage = (m_processCount + measurementCount >= (m_processCount / interval + 1 ) * interval);
131106
132- m_processCount += measurementCount;
107+ if (TimeSince (m_lastMessage) < interval)
108+ return ;
133109
134- if (showMessage)
135- {
136- uint64_t shown = 0 ;
137- stringstream message;
110+ m_lastMessage = UtcNow ();
111+ uint64_t shown = 0ULL ;
112+ stringstream message;
138113
139- message << GetTotalMeasurementsReceived () << " measurements received so far..." << endl;
114+ message << GetTotalMeasurementsReceived () << " measurements received so far..." << endl;
140115
141- if (measurementCount > 0 )
142- message << ToString (measurements[0 ]->GetDateTime ()) << endl;
116+ if (measurementCount > 0 )
117+ message << ToString (measurements[0 ]->GetDateTime ()) << endl;
143118
144- message << " \t Runtime-ID\t Meta-data ID\t Value\t\t Type\t SignalID" << endl;
119+ message << " \t Runtime-ID\t Meta-data ID\t Value\t\t Type\t SignalID" << endl;
145120
146- // Start processing measurements
147- for (auto &measurement : measurements)
148- {
149- if (shown++ > maxToShow)
150- break ;
121+ // Start processing measurements
122+ for (auto &measurement : measurements)
123+ {
124+ if (shown++ > maxToShow)
125+ break ;
151126
152- // Get adjusted value
153- // const float64_t value = measurement->AdjustedValue();
127+ // Get adjusted value
128+ const float64_t value = measurement->AdjustedValue ();
154129
155- // Get timestamp
156- // datetime_t timestamp = measurement->GetDateTime();
130+ // Get timestamp
131+ datetime_t timestamp = measurement->GetDateTime ();
157132
158- // Handle per measurement quality flags
159- // MeasurementStateFlags qualityFlags = measurement->Flags;
133+ // Handle per measurement quality flags
134+ MeasurementStateFlags qualityFlags = measurement->Flags ;
160135
161- ConfigurationFramePtr configurationFrame;
162- MeasurementMetadataPtr measurementMetadata;
136+ ConfigurationFramePtr configurationFrame;
137+ MeasurementMetadataPtr measurementMetadata;
163138
164- // Find associated configuration for measurement
165- if (TryFindTargetConfigurationFrame (measurement->SignalID , configurationFrame))
139+ // Find associated configuration for measurement
140+ if (TryFindTargetConfigurationFrame (measurement->SignalID , configurationFrame))
141+ {
142+ // Lookup measurement metadata - it's faster to find metadata from within configuration frame
143+ if (TryGetMeasurementMetadataFromConfigurationFrame (measurement->SignalID , configurationFrame, measurementMetadata))
166144 {
167- // Lookup measurement metadata - it's faster to find metadata from within configuration frame
168- if (TryGetMeasurementMetadataFromConfigurationFrame (measurement->SignalID , configurationFrame, measurementMetadata))
169- {
170- const SignalReference& reference = measurementMetadata->Reference ;
145+ const SignalReference& reference = measurementMetadata->Reference ;
171146
172- // reference.Acronym << target device acronym
173- // reference.Kind << kind of signal (see SignalKind in "Types.h"), like Frequency, Angle, etc
174- // reference.Index << for Phasors, Analogs and Digitals - this is the ordered "index"
147+ // reference.Acronym << target device acronym
148+ // reference.Kind << kind of signal (see SignalKind in "Types.h"), like Frequency, Angle, etc
149+ // reference.Index << for Phasors, Analogs and Digitals - this is the ordered "index"
175150
176- message << ' \t ' << measurement->ID << ' \t ' << ' \t ' << measurementMetadata->ID << ' \t ' << ' \t ' << measurement->Value << fixed << setprecision (3 ) << ' \t ' << ' \t ' << SignalKindAcronym[static_cast <int32_t >(reference.Kind )] << ' \t ' << ToString (measurement->SignalID ) << endl;
177- }
151+ message << ' \t ' << measurement->ID << ' \t ' << ' \t ' << measurementMetadata->ID << ' \t ' << ' \t ' << measurement->Value << fixed << setprecision (3 ) << ' \t ' << ' \t ' << SignalKindAcronym[static_cast <int32_t >(reference.Kind )] << ' \t ' << ToString (measurement->SignalID ) << endl;
178152 }
179- // else if (TryGetMeasurementMetdata(measurement->SignalID, measurementMetadata))
180- // {
181- // // Received measurement is not part of a defined configuration frame, e.g., a statistic
182- // const SignalReference& reference = measurementMetadata->Reference;
183- // }
184153 }
185-
186- // Only display messages every few seconds
187- StatusMessage (message.str ());
154+ // else if (TryGetMeasurementMetdata(measurement->SignalID, measurementMetadata))
155+ // {
156+ // // Received measurement is not part of a defined configuration frame, e.g., a statistic
157+ // const SignalReference& reference = measurementMetadata->Reference;
158+ // }
188159 }
160+
161+ // Only display messages every few seconds
162+ StatusMessage (message.str ());
189163}
190164
191165void SubscriberHandler::SubscriptionUpdated (const SignalIndexCachePtr& signalIndexCache)
0 commit comments