Skip to content

Commit 102a1d1

Browse files
Updated InstanceSubscriber to provide better measurement display details.
1 parent 10248f6 commit 102a1d1

File tree

3 files changed

+65
-62
lines changed

3 files changed

+65
-62
lines changed

src/samples/DynamicMetadataPublish/PublisherHandler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void PublisherHandler::ClientDisconnected(const SubscriberConnectionPtr& connect
7575
void PublisherHandler::DefineMetadata(int32_t devices)
7676
{
7777
static int32_t accessID = 1;
78-
static int32_t runtimeIndex = 1;
78+
int32_t runtimeIndex = 1;
7979

8080
WriterLock writeLock(m_measurementUpdateLock);
8181

src/samples/InstanceSubscribe/InstanceSubscribe.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,16 @@ int main(int argc, char* argv[])
6464
switch (i)
6565
{
6666
case 0:
67-
subscriber->SetFilterExpression("FILTER TOP 10 ActiveMeasurements WHERE SignalType = 'FREQ'");
67+
subscriber->SetFilterExpression("FILTER ActiveMeasurements WHERE SignalType = 'FREQ'");
6868
break;
6969
case 1:
70-
subscriber->SetFilterExpression("FILTER TOP 10 ActiveMeasurements WHERE SignalType LIKE '%PHA'");
70+
subscriber->SetFilterExpression("FILTER ActiveMeasurements WHERE SignalType LIKE '%PHA'");
7171

7272
// In this example we also specify a meta-data filtering expression:
7373
subscriber->SetMetadataFilters(SubscriberInstance::FilterMetadataStatsExpression);
7474
break;
7575
case 2:
76-
subscriber->SetFilterExpression("FILTER TOP 10 ActiveMeasurements WHERE SignalType LIKE '%PHM'");
76+
subscriber->SetFilterExpression("FILTER ActiveMeasurements WHERE SignalType LIKE '%PHM'");
7777
break;
7878
default:
7979
subscriber->SetFilterExpression(SubscriberInstance::SubscribeAllNoStatsExpression);

src/samples/InstanceSubscribe/SubscriberHandler.cpp

Lines changed: 61 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ using namespace std;
2828
using namespace sttp;
2929
using namespace sttp::transport;
3030

31-
Mutex SubscriberHandler::s_coutLock {};
31+
Mutex SubscriberHandler::s_coutLock{};
3232

3333
SubscriberHandler::SubscriberHandler(string name) :
3434
m_name(std::move(name)),
@@ -57,16 +57,16 @@ void SubscriberHandler::SetupSubscriberConnector(SubscriberConnector& connector)
5757
// TODO: Customize subscriber connector properties as desired...
5858

5959
//// Enable auto-reconnect sequence:
60-
//connector.SetAutoReconnect(true);
60+
connector.SetAutoReconnect(true);
6161

6262
//// Set maximum number to attempt reconnection, -1 means never stop retrying connection attempts:
63-
//connector.SetMaxRetries(-1);
64-
//
63+
connector.SetMaxRetries(-1);
64+
6565
//// Set number of initial milliseconds to wait before retrying connection attempt:
66-
//connector.SetRetryInterval(2000);
67-
//
68-
//// Set maximum number of milliseconds to wait before retrying connection attempt, connection retry attempts use exponential back-off algorithm up to this defined maximum:
69-
//connector.SetMaxRetryInterval(120000);
66+
connector.SetRetryInterval(5000);
67+
68+
// Set maximum number of milliseconds to wait before retrying connection attempt, connection retry attempts use exponential back-off algorithm up to this defined maximum:
69+
connector.SetMaxRetryInterval(10000);
7070
}
7171

7272
void SubscriberHandler::StatusMessage(const string& message)
@@ -89,7 +89,7 @@ void SubscriberHandler::ErrorMessage(const string& message)
8989
// For now, the base class just displays to console:
9090
stringstream status;
9191

92-
status << "[" << m_name << "] " << message;
92+
status << "[" << m_name << "] " << ToString(UtcNow()) << " " << message;
9393

9494
// Calls can come from multiple threads, so we impose a simple lock before write to console
9595
s_coutLock.lock();
@@ -123,66 +123,69 @@ void SubscriberHandler::ParsedMetadata()
123123

124124
// ReSharper disable CppDeclaratorNeverUsed
125125
void SubscriberHandler::ReceivedNewMeasurements(const vector<MeasurementPtr>& measurements)
126-
{
127-
// TODO: The following code could be used to generate frame based output, e.g., for IEEE C37.118
128-
// Start processing measurements
129-
//for (auto &measurement : measurements)
130-
//{
131-
// // Get adjusted value
132-
// const float64_t value = measurement->AdjustedValue();
133-
134-
// // Get timestamp
135-
// DateTime timestamp = measurement->GetDateTime();
136-
137-
// // Handle per measurement quality flags
138-
// int32_t qualityFlags = measurement->Flags;
139-
140-
// ConfigurationFramePtr configurationFrame;
141-
// MeasurementMetadataPtr measurementMetadata;
142-
143-
// // Find associated configuration for measurement
144-
// if (TryFindTargetConfigurationFrame(measurement->SignalID, configurationFrame))
145-
// {
146-
// // Lookup measurement metadata - it's faster to find metadata from within configuration frame
147-
// if (TryGetMeasurementMetdataFromConfigurationFrame(measurement->SignalID, configurationFrame, measurementMetadata))
148-
// {
149-
// const SignalReference& reference = measurementMetadata->Reference;
150-
151-
// // reference.Acronym << target device acronym
152-
// // reference.Kind << kind of signal (see SignalKind in "Types.h"), like Frequency, Angle, etc
153-
// // reference.Index << for Phasors, Analogs and Digitals - this is the ordered "index"
154-
155-
// // TODO: Handle measurement processing here...
156-
// }
157-
// }
158-
// else if (TryGetMeasurementMetdata(measurement->SignalID, measurementMetadata))
159-
// {
160-
// // Received measurement is not part of a defined configuration frame, e.g., a statistic
161-
// const SignalReference& reference = measurementMetadata->Reference;
162-
// }
163-
//}
164-
165-
// TODO: *** Temporary Testing Code Below *** -- REMOVE BEFORE USE
126+
{
166127
static const uint64_t interval = 10 * 60;
128+
static const uint64_t maxToShow = 20;
167129
const uint64_t measurementCount = measurements.size();
168130
const bool showMessage = (m_processCount + measurementCount >= (m_processCount / interval + 1) * interval);
131+
uint64_t shown = 0;
169132

170133
m_processCount += measurementCount;
171134

172-
// Only display messages every few seconds
173135
if (showMessage)
174136
{
175137
stringstream message;
176138

177139
message << GetTotalMeasurementsReceived() << " measurements received so far..." << endl;
178-
message << ToString(measurements[0]->GetDateTime()) << endl;
179-
message << "Signal ID: " << ToString(measurements[0]->SignalID) << endl;
180-
message << "\tPoint\tValue" << endl;
181-
182-
for (const auto& measurement : measurements)
183-
message << '\t' << measurement->ID << '\t' << measurement->Value << endl;
184140

185-
StatusMessage(message.str());
141+
if (measurementCount > 0)
142+
message << ToString(measurements[0]->GetDateTime()) << endl;
143+
144+
message << "\tRuntime-ID\tMeta-data ID\tValue\t\tType\tSignalID" << endl;
145+
146+
// Start processing measurements
147+
for (auto &measurement : measurements)
148+
{
149+
if (shown++ > maxToShow)
150+
break;
151+
152+
// Get adjusted value
153+
//const float64_t value = measurement->AdjustedValue();
154+
155+
// Get timestamp
156+
//datetime_t timestamp = measurement->GetDateTime();
157+
158+
// Handle per measurement quality flags
159+
//MeasurementStateFlags qualityFlags = measurement->Flags;
160+
161+
ConfigurationFramePtr configurationFrame;
162+
MeasurementMetadataPtr measurementMetadata;
163+
164+
// Find associated configuration for measurement
165+
if (TryFindTargetConfigurationFrame(measurement->SignalID, configurationFrame))
166+
{
167+
// Lookup measurement metadata - it's faster to find metadata from within configuration frame
168+
if (TryGetMeasurementMetdataFromConfigurationFrame(measurement->SignalID, configurationFrame, measurementMetadata))
169+
{
170+
const SignalReference& reference = measurementMetadata->Reference;
171+
172+
// reference.Acronym << target device acronym
173+
// reference.Kind << kind of signal (see SignalKind in "Types.h"), like Frequency, Angle, etc
174+
// reference.Index << for Phasors, Analogs and Digitals - this is the ordered "index"
175+
176+
message << '\t' << measurement->ID << '\t' << '\t' << measurementMetadata->ID << '\t' << '\t' << measurement->Value << fixed << setprecision(3) << '\t' << '\t' << SignalKindAcronym[reference.Kind] << '\t' << ToString(measurement->SignalID) << endl;
177+
}
178+
}
179+
//else if (TryGetMeasurementMetdata(measurement->SignalID, measurementMetadata))
180+
//{
181+
// // Received measurement is not part of a defined configuration frame, e.g., a statistic
182+
// const SignalReference& reference = measurementMetadata->Reference;
183+
//}
184+
}
185+
186+
// Only display messages every few seconds
187+
if (showMessage)
188+
StatusMessage(message.str());
186189
}
187190
}
188191

@@ -203,7 +206,7 @@ void SubscriberHandler::HistoricalReadComplete()
203206

204207
void SubscriberHandler::ConnectionEstablished()
205208
{
206-
StatusMessage("Connection established.");
209+
StatusMessage("Connection established.");
207210
}
208211

209212
void SubscriberHandler::ConnectionTerminated()

0 commit comments

Comments
 (0)