Skip to content

Commit ef9387b

Browse files
authored
Enable Durable Subscriptions (#2978)
* Enable Durable Subscriptions (#9) * [Server] Durable Subscriptions (#2683) * Implement a system test for DurableSubscriptions (#2839) * Client Side Co-authored-by: ALTERNATE-DEV\Archie <[email protected]> * make a static Method CalculateRevisedQueueSize * Adress review feedback * fix FilterRetainTests * Add MonitoredItem Id to Queues Use NodeIdDictionary where possible
1 parent 9dac51b commit ef9387b

File tree

53 files changed

+5031
-870
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+5031
-870
lines changed

.github/workflows/buildandtest.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: Build and Test .NET 9.0
33
on:
44
push:
55
pull_request:
6-
branches: [ master, main ]
6+
branches: [ master, main, develop/* ]
77
paths:
88
- '**.cs'
99
- '**.csproj'

.github/workflows/codeql-analysis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: "CodeQL"
33

44
on:
55
push:
6-
branches: [ master, main, release/* ]
6+
branches: [ master, main, release/*, develop/* ]
77
pull_request:
88
# The branches below must be a subset of the branches above
99
branches: [ master, main ]

Applications/ConsoleReferenceClient/ClientSamples.cs

Lines changed: 209 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
using System.IO;
3636
using System.Linq;
3737
using System.Reflection;
38+
using System.Text;
3839
using System.Threading;
3940
using System.Threading.Tasks;
4041
using Newtonsoft.Json;
@@ -69,6 +70,13 @@ public ClientSamples(TextWriter output, Action<IList, IList> validateResponse, M
6970
m_validateResponse = validateResponse ?? ClientBase.ValidateResponse;
7071
m_quitEvent = quitEvent;
7172
m_verbose = verbose;
73+
m_desiredEventFields = new Dictionary<int, QualifiedNameCollection>();
74+
int eventIndexCounter = 0;
75+
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.Time }));
76+
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.ActiveState }));
77+
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.Message }));
78+
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.LimitState, BrowseNames.CurrentState }));
79+
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.LimitState, BrowseNames.LastTransition }));
7280
}
7381

7482
#region Public Sample Methods
@@ -290,34 +298,107 @@ public void CallMethod(ISession session)
290298
}
291299

292300
/// <summary>
293-
/// Create Subscription and MonitoredItems for DataChanges
301+
/// Call the Start method for Alarming to enable events
294302
/// </summary>
295-
public void SubscribeToDataChanges(ISession session, uint minLifeTime)
303+
public void EnableEvents(ISession session, uint timeToRun)
296304
{
297305
if (session == null || session.Connected == false)
298306
{
299307
m_output.WriteLine("Session not connected!");
300308
return;
301309
}
302310

311+
try
312+
{
313+
// Define the UA Method to call
314+
// Parent node - Objects\CTT\Alarms
315+
// Method node - Objects\CTT\Alarms\Start
316+
NodeId objectId = new NodeId("ns=7;s=Alarms");
317+
NodeId methodId = new NodeId("ns=7;s=Alarms.Start");
318+
319+
// Define the method parameters
320+
// Input argument requires a Float and an UInt32 value
321+
object[] inputArguments = new object[] { timeToRun };
322+
IList<object> outputArguments = null;
323+
324+
// Invoke Call service
325+
m_output.WriteLine("Calling UAMethod for node {0} ...", methodId);
326+
outputArguments = session.Call(objectId, methodId, inputArguments);
327+
328+
// Display results
329+
m_output.WriteLine("Method call returned {0} output argument(s):", outputArguments.Count);
330+
331+
foreach (var outputArgument in outputArguments)
332+
{
333+
m_output.WriteLine(" OutputValue = {0}", outputArgument.ToString());
334+
}
335+
}
336+
catch (Exception ex)
337+
{
338+
m_output.WriteLine("Method call error: {0}", ex.Message);
339+
}
340+
}
341+
342+
/// <summary>
343+
/// Create Subscription and MonitoredItems for DataChanges
344+
/// </summary>
345+
public bool SubscribeToDataChanges(ISession session, uint minLifeTime, bool enableDurableSubscriptions)
346+
{
347+
bool isDurable = false;
348+
349+
if (session == null || session.Connected == false)
350+
{
351+
m_output.WriteLine("Session not connected!");
352+
return isDurable;
353+
}
354+
303355
try
304356
{
305357
// Create a subscription for receiving data change notifications
358+
int subscriptionPublishingInterval = 1000;
359+
int itemSamplingInterval = 1000;
360+
uint queueSize = 10;
361+
uint lifetime = minLifeTime;
362+
363+
if (enableDurableSubscriptions)
364+
{
365+
queueSize = 100;
366+
lifetime = 20;
367+
}
306368

307369
// Define Subscription parameters
308370
Subscription subscription = new Subscription(session.DefaultSubscription) {
309371
DisplayName = "Console ReferenceClient Subscription",
310372
PublishingEnabled = true,
311-
PublishingInterval = 1000,
373+
PublishingInterval = subscriptionPublishingInterval,
312374
LifetimeCount = 0,
313-
MinLifetimeInterval = minLifeTime,
375+
MinLifetimeInterval = lifetime,
376+
KeepAliveCount = 5,
314377
};
315378

316379
session.AddSubscription(subscription);
317380

318381
// Create the subscription on Server side
319382
subscription.Create();
320-
m_output.WriteLine("New Subscription created with SubscriptionId = {0}.", subscription.Id);
383+
m_output.WriteLine("New Subscription created with SubscriptionId = {0}, Sampling Interval {1}, Publishing Interval {2}.",
384+
subscription.Id, itemSamplingInterval, subscriptionPublishingInterval);
385+
386+
if (enableDurableSubscriptions)
387+
{
388+
uint revisedLifetimeInHours = 0;
389+
390+
if (subscription.SetSubscriptionDurable(1, out revisedLifetimeInHours))
391+
{
392+
isDurable = true;
393+
394+
m_output.WriteLine("Subscription {0} is now durable, Revised Lifetime {1} in hours.",
395+
subscription.Id, revisedLifetimeInHours);
396+
}
397+
else
398+
{
399+
m_output.WriteLine("Subscription {0} failed durable call", subscription.Id);
400+
}
401+
}
321402

322403
// Create MonitoredItems for data changes (Reference Server)
323404

@@ -326,8 +407,8 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
326407
intMonitoredItem.StartNodeId = new NodeId("ns=2;s=Scalar_Simulation_Int32");
327408
intMonitoredItem.AttributeId = Attributes.Value;
328409
intMonitoredItem.DisplayName = "Int32 Variable";
329-
intMonitoredItem.SamplingInterval = 1000;
330-
intMonitoredItem.QueueSize = 10;
410+
intMonitoredItem.SamplingInterval = itemSamplingInterval;
411+
intMonitoredItem.QueueSize = queueSize;
331412
intMonitoredItem.DiscardOldest = true;
332413
intMonitoredItem.Notification += OnMonitoredItemNotification;
333414

@@ -338,8 +419,8 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
338419
floatMonitoredItem.StartNodeId = new NodeId("ns=2;s=Scalar_Simulation_Float");
339420
floatMonitoredItem.AttributeId = Attributes.Value;
340421
floatMonitoredItem.DisplayName = "Float Variable";
341-
floatMonitoredItem.SamplingInterval = 1000;
342-
floatMonitoredItem.QueueSize = 10;
422+
floatMonitoredItem.SamplingInterval = itemSamplingInterval;
423+
floatMonitoredItem.QueueSize = queueSize;
343424
floatMonitoredItem.Notification += OnMonitoredItemNotification;
344425

345426
subscription.AddItem(floatMonitoredItem);
@@ -349,12 +430,54 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
349430
stringMonitoredItem.StartNodeId = new NodeId("ns=2;s=Scalar_Simulation_String");
350431
stringMonitoredItem.AttributeId = Attributes.Value;
351432
stringMonitoredItem.DisplayName = "String Variable";
352-
stringMonitoredItem.SamplingInterval = 1000;
353-
stringMonitoredItem.QueueSize = 10;
433+
stringMonitoredItem.SamplingInterval = itemSamplingInterval;
434+
stringMonitoredItem.QueueSize = queueSize;
354435
stringMonitoredItem.Notification += OnMonitoredItemNotification;
355436

356437
subscription.AddItem(stringMonitoredItem);
357438

439+
MonitoredItem eventMonitoredItem = new MonitoredItem(subscription.DefaultItem);
440+
eventMonitoredItem.StartNodeId = new NodeId(Opc.Ua.ObjectIds.Server);
441+
eventMonitoredItem.AttributeId = Attributes.EventNotifier;
442+
eventMonitoredItem.DisplayName = "Event Variable";
443+
eventMonitoredItem.SamplingInterval = itemSamplingInterval;
444+
eventMonitoredItem.QueueSize = queueSize;
445+
eventMonitoredItem.Notification += OnMonitoredItemEventNotification;
446+
447+
EventFilter filter = new EventFilter();
448+
449+
SimpleAttributeOperandCollection simpleAttributeOperands = new SimpleAttributeOperandCollection();
450+
451+
foreach (QualifiedNameCollection desiredEventField in m_desiredEventFields.Values)
452+
{
453+
simpleAttributeOperands.Add(new SimpleAttributeOperand() {
454+
AttributeId = Attributes.Value,
455+
TypeDefinitionId = ObjectTypeIds.BaseEventType,
456+
BrowsePath = desiredEventField
457+
});
458+
}
459+
filter.SelectClauses = simpleAttributeOperands;
460+
461+
ContentFilter whereClause = new ContentFilter();
462+
SimpleAttributeOperand existingEventType = new SimpleAttributeOperand() {
463+
AttributeId = Attributes.Value,
464+
TypeDefinitionId = ObjectTypeIds.ExclusiveLevelAlarmType,
465+
BrowsePath = new QualifiedNameCollection(new QualifiedName[] { "EventType" })
466+
};
467+
LiteralOperand desiredEventType = new LiteralOperand();
468+
desiredEventType.Value = new Variant(new NodeId(Opc.Ua.ObjectTypeIds.ExclusiveLevelAlarmType));
469+
470+
471+
whereClause.Push(FilterOperator.Equals, new FilterOperand[] { existingEventType, desiredEventType });
472+
473+
filter.WhereClause = whereClause;
474+
475+
eventMonitoredItem.Filter = filter;
476+
eventMonitoredItem.NodeClass = NodeClass.Object;
477+
478+
479+
subscription.AddItem(eventMonitoredItem);
480+
358481
// Create the monitored items on Server side
359482
subscription.ApplyChanges();
360483
m_output.WriteLine("MonitoredItems created for SubscriptionId = {0}.", subscription.Id);
@@ -363,6 +486,8 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
363486
{
364487
m_output.WriteLine("Subscribe error: {0}", ex.Message);
365488
}
489+
490+
return isDurable;
366491
}
367492
#endregion
368493

@@ -1186,14 +1311,83 @@ private void OnMonitoredItemNotification(MonitoredItem monitoredItem, MonitoredI
11861311
{
11871312
// Log MonitoredItem Notification event
11881313
MonitoredItemNotification notification = e.NotificationValue as MonitoredItemNotification;
1189-
m_output.WriteLine("Notification: {0} \"{1}\" and Value = {2}.", notification.Message.SequenceNumber, monitoredItem.ResolvedNodeId, notification.Value);
1314+
DateTime localTime = notification.Value.SourceTimestamp.ToLocalTime();
1315+
m_output.WriteLine("Notification: {0} \"{1}\" and Value = {2} at [{3}].",
1316+
notification.Message.SequenceNumber,
1317+
monitoredItem.ResolvedNodeId,
1318+
notification.Value,
1319+
localTime.ToLongTimeString());
11901320
}
11911321
catch (Exception ex)
11921322
{
11931323
m_output.WriteLine("OnMonitoredItemNotification error: {0}", ex.Message);
11941324
}
11951325
}
11961326

1327+
/// <summary>
1328+
/// Handle Requested Event notifications from Server
1329+
/// </summary>
1330+
private void OnMonitoredItemEventNotification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e)
1331+
{
1332+
try
1333+
{
1334+
// Log MonitoredItem Notification event
1335+
EventFieldList notification = e.NotificationValue as EventFieldList;
1336+
1337+
foreach (KeyValuePair<int, QualifiedNameCollection> entry in m_desiredEventFields)
1338+
{
1339+
Variant field = notification.EventFields[entry.Key];
1340+
if (field.TypeInfo.BuiltInType != BuiltInType.Null)
1341+
{
1342+
StringBuilder fieldPath = new StringBuilder();
1343+
1344+
int lastIndex = entry.Value.Count - 1;
1345+
for (int index = 0; index < entry.Value.Count; index++)
1346+
{
1347+
fieldPath.Append(entry.Value[index].Name);
1348+
if (index < lastIndex)
1349+
{
1350+
fieldPath.Append(".");
1351+
}
1352+
}
1353+
1354+
string fieldName = fieldPath.ToString();
1355+
if (fieldName.Equals("Time"))
1356+
{
1357+
try
1358+
{
1359+
DateTime currentTime = (DateTime)field.Value;
1360+
TimeSpan timeSpan = currentTime - m_lastEventTime;
1361+
m_lastEventTime = currentTime;
1362+
m_processedEvents++;
1363+
string timeBetweenEvents = "";
1364+
if (m_processedEvents > 1)
1365+
{
1366+
timeBetweenEvents = ", time since last event = " + timeSpan.Seconds.ToString() + " seconds";
1367+
}
1368+
1369+
m_output.WriteLine("Event Received - total count = {0}{1}",
1370+
m_processedEvents.ToString(),
1371+
timeBetweenEvents);
1372+
}
1373+
catch (Exception ex)
1374+
{
1375+
m_output.WriteLine("Unexpected error retrieving Event Time Field Value: {0}", ex.Message);
1376+
}
1377+
}
1378+
1379+
m_output.WriteLine("\tField [{0}] \"{1}\" = [{2}]",
1380+
entry.Key.ToString(), fieldName, field.Value);
1381+
}
1382+
}
1383+
}
1384+
catch (Exception ex)
1385+
{
1386+
m_output.WriteLine("OnMonitoredItemEventNotification error: {0}", ex.Message);
1387+
}
1388+
}
1389+
1390+
11971391
/// <summary>
11981392
/// Event handler to defer publish response sequence number acknowledge.
11991393
/// </summary>
@@ -1256,5 +1450,8 @@ private static ByteStringCollection PrepareBrowseNext(BrowseResultCollection bro
12561450
private readonly TextWriter m_output;
12571451
private readonly ManualResetEvent m_quitEvent;
12581452
private readonly bool m_verbose;
1453+
private Dictionary<int, QualifiedNameCollection> m_desiredEventFields = null;
1454+
private int m_processedEvents = 0;
1455+
private DateTime m_lastEventTime = DateTime.Now;
12591456
}
12601457
}

0 commit comments

Comments
 (0)