Skip to content

Commit 0f196ba

Browse files
Copilotromanett
andauthored
Add support for restoring triggered monitored items on session reconnect (#3377)
* Initial plan * Add triggering item tracking and restoration support Co-authored-by: romanett <[email protected]> * Changes before error encountered Co-authored-by: romanett <[email protected]> * Fix test to use proper subscription/monitored item creation pattern Co-authored-by: romanett <[email protected]> --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: romanett <[email protected]>
1 parent 7cd5ae3 commit 0f196ba

File tree

4 files changed

+328
-1
lines changed

4 files changed

+328
-1
lines changed

Libraries/Opc.Ua.Client/Subscription/MonitoredItem.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ public virtual void Restore(MonitoredItemState state)
136136
State = state;
137137
ClientHandle = state.ClientId;
138138
ServerId = state.ServerId;
139+
TriggeringItemId = state.TriggeringItemId;
140+
TriggeredItems = state.TriggeredItems != null ? new UInt32Collection(state.TriggeredItems) : null;
139141
}
140142

141143
/// <inheritdoc/>
@@ -144,7 +146,9 @@ public virtual void Snapshot(out MonitoredItemState state)
144146
state = new MonitoredItemState(State)
145147
{
146148
ServerId = Status.Id,
147-
ClientId = ClientHandle
149+
ClientId = ClientHandle,
150+
TriggeringItemId = TriggeringItemId,
151+
TriggeredItems = TriggeredItems != null ? new UInt32Collection(TriggeredItems) : null
148152
};
149153
}
150154

@@ -1074,6 +1078,18 @@ private static EventFilter GetDefaultEventFilter()
10741078
private MonitoredItemEventCache? m_eventCache;
10751079
private IEncodeable? m_lastNotification;
10761080
private event MonitoredItemNotificationEventHandler? m_Notification;
1081+
1082+
/// <summary>
1083+
/// Server-side identifier of the triggering item if this monitored item
1084+
/// is triggered by another item. 0 indicates this item is not triggered.
1085+
/// </summary>
1086+
internal uint TriggeringItemId { get; set; }
1087+
1088+
/// <summary>
1089+
/// Collection of server-side identifiers of monitored items that are
1090+
/// triggered by this item. Null if this item does not trigger any other items.
1091+
/// </summary>
1092+
internal UInt32Collection? TriggeredItems { get; set; }
10771093
}
10781094

10791095
/// <summary>

Libraries/Opc.Ua.Client/Subscription/MonitoredItemState.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,22 @@ public MonitoredItemState(MonitoredItemOptions options)
7979
/// </summary>
8080
[DataMember(Order = 15)]
8181
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
82+
83+
/// <summary>
84+
/// Server-side identifier of the triggering item if this monitored item
85+
/// is triggered by another item. 0 indicates this item is not triggered.
86+
/// Used to restore triggering links after session reconnect.
87+
/// </summary>
88+
[DataMember(Order = 16)]
89+
public uint TriggeringItemId { get; init; }
90+
91+
/// <summary>
92+
/// Collection of server-side identifiers of monitored items that are
93+
/// triggered by this item. Empty or null if this item does not trigger
94+
/// any other items. Used to restore triggering links after session reconnect.
95+
/// </summary>
96+
[DataMember(Order = 17)]
97+
public UInt32Collection? TriggeredItems { get; init; }
8298
}
8399

84100
/// <summary>

Libraries/Opc.Ua.Client/Subscription/Subscription.cs

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,6 +1038,9 @@ public async Task<IList<MonitoredItem>> CreateItemsAsync(CancellationToken ct =
10381038
m_changeMask |= SubscriptionChangeMask.ItemsCreated;
10391039
ChangesCompleted();
10401040

1041+
// Restore triggering relationships after items are created
1042+
await RestoreTriggeringAsync(ct).ConfigureAwait(false);
1043+
10411044
// return the list of items affected by the change.
10421045
return itemsToCreate;
10431046
}
@@ -1137,6 +1140,82 @@ public async Task<IList<MonitoredItem>> DeleteItemsAsync(
11371140
return itemsToDelete;
11381141
}
11391142

1143+
/// <summary>
1144+
/// Restores triggering relationships for monitored items that were
1145+
/// configured with triggers before reconnection.
1146+
/// </summary>
1147+
private async Task RestoreTriggeringAsync(CancellationToken ct = default)
1148+
{
1149+
VerifySessionAndSubscriptionState(true);
1150+
1151+
// Build triggering groups outside of lock to avoid await in lock
1152+
Dictionary<uint, List<uint>> triggeringGroups;
1153+
lock (m_cache)
1154+
{
1155+
// Group monitored items by their triggering item
1156+
triggeringGroups = new Dictionary<uint, List<uint>>();
1157+
foreach (MonitoredItem item in m_monitoredItems.Values)
1158+
{
1159+
if (item.TriggeredItems != null && item.TriggeredItems.Count > 0)
1160+
{
1161+
// This item triggers other items
1162+
var triggeredServerIds = new List<uint>();
1163+
foreach (uint triggeredClientHandle in item.TriggeredItems)
1164+
{
1165+
// Find the monitored item by client handle
1166+
if (m_monitoredItems.TryGetValue(triggeredClientHandle, out MonitoredItem? triggeredItem) &&
1167+
triggeredItem.Status.Created)
1168+
{
1169+
triggeredServerIds.Add(triggeredItem.Status.Id);
1170+
}
1171+
}
1172+
1173+
if (triggeredServerIds.Count > 0)
1174+
{
1175+
if (!triggeringGroups.TryGetValue(item.Status.Id, out List<uint>? list))
1176+
{
1177+
list = [];
1178+
triggeringGroups[item.Status.Id] = list;
1179+
}
1180+
list.AddRange(triggeredServerIds);
1181+
}
1182+
}
1183+
}
1184+
}
1185+
1186+
// Call SetTriggering for each triggering item
1187+
foreach (var kvp in triggeringGroups)
1188+
{
1189+
uint triggeringItemId = kvp.Key;
1190+
var linksToAdd = new UInt32Collection(kvp.Value);
1191+
1192+
try
1193+
{
1194+
await Session.SetTriggeringAsync(
1195+
null,
1196+
Id,
1197+
triggeringItemId,
1198+
linksToAdd,
1199+
null,
1200+
ct).ConfigureAwait(false);
1201+
1202+
m_logger.LogInformation(
1203+
"Restored {Count} triggering links for MonitoredItem {TriggeringItemId} in Subscription {SubscriptionId}",
1204+
linksToAdd.Count,
1205+
triggeringItemId,
1206+
Id);
1207+
}
1208+
catch (Exception ex)
1209+
{
1210+
m_logger.LogError(
1211+
ex,
1212+
"Failed to restore triggering links for MonitoredItem {TriggeringItemId} in Subscription {SubscriptionId}",
1213+
triggeringItemId,
1214+
Id);
1215+
}
1216+
}
1217+
}
1218+
11401219
/// <summary>
11411220
/// Set monitoring mode of items.
11421221
/// </summary>
@@ -1198,6 +1277,125 @@ public async Task<IList<MonitoredItem>> DeleteItemsAsync(
11981277
return errors;
11991278
}
12001279

1280+
/// <summary>
1281+
/// Sets the triggering relationships for a monitored item in this subscription
1282+
/// and tracks them for automatic restoration after reconnection.
1283+
/// </summary>
1284+
/// <param name="triggeringItem">The monitored item that will trigger other items.</param>
1285+
/// <param name="linksToAdd">Monitored items to be reported when the triggering item changes.</param>
1286+
/// <param name="linksToRemove">Monitored items to stop reporting when the triggering item changes.</param>
1287+
/// <param name="ct">Cancellation token.</param>
1288+
/// <returns>The response from the server.</returns>
1289+
/// <exception cref="ArgumentNullException">Thrown when triggeringItem is null.</exception>
1290+
/// <exception cref="ServiceResultException">Thrown when the operation fails.</exception>
1291+
public async Task<SetTriggeringResponse> SetTriggeringAsync(
1292+
MonitoredItem triggeringItem,
1293+
IList<MonitoredItem>? linksToAdd,
1294+
IList<MonitoredItem>? linksToRemove,
1295+
CancellationToken ct = default)
1296+
{
1297+
if (triggeringItem == null)
1298+
{
1299+
throw new ArgumentNullException(nameof(triggeringItem));
1300+
}
1301+
1302+
using Activity? activity = m_telemetry.StartActivity();
1303+
VerifySessionAndSubscriptionState(true);
1304+
1305+
if (!triggeringItem.Status.Created)
1306+
{
1307+
throw new ServiceResultException(
1308+
StatusCodes.BadInvalidState,
1309+
"Triggering item has not been created on the server.");
1310+
}
1311+
1312+
// Convert monitored items to server IDs
1313+
var serverIdsToAdd = new UInt32Collection();
1314+
var clientHandlesToAdd = new UInt32Collection();
1315+
if (linksToAdd != null)
1316+
{
1317+
foreach (MonitoredItem item in linksToAdd)
1318+
{
1319+
if (!item.Status.Created)
1320+
{
1321+
throw new ServiceResultException(
1322+
StatusCodes.BadInvalidState,
1323+
$"Monitored item '{item.DisplayName}' has not been created on the server.");
1324+
}
1325+
serverIdsToAdd.Add(item.Status.Id);
1326+
clientHandlesToAdd.Add(item.ClientHandle);
1327+
}
1328+
}
1329+
1330+
var serverIdsToRemove = new UInt32Collection();
1331+
var clientHandlesToRemove = new UInt32Collection();
1332+
if (linksToRemove != null)
1333+
{
1334+
foreach (MonitoredItem item in linksToRemove)
1335+
{
1336+
if (!item.Status.Created)
1337+
{
1338+
throw new ServiceResultException(
1339+
StatusCodes.BadInvalidState,
1340+
$"Monitored item '{item.DisplayName}' has not been created on the server.");
1341+
}
1342+
serverIdsToRemove.Add(item.Status.Id);
1343+
clientHandlesToRemove.Add(item.ClientHandle);
1344+
}
1345+
}
1346+
1347+
// Call the Session SetTriggering method
1348+
SetTriggeringResponse response = await Session.SetTriggeringAsync(
1349+
null,
1350+
Id,
1351+
triggeringItem.Status.Id,
1352+
serverIdsToAdd,
1353+
serverIdsToRemove,
1354+
ct).ConfigureAwait(false);
1355+
1356+
// Update the triggering relationships for automatic restoration
1357+
lock (m_cache)
1358+
{
1359+
// Initialize the triggered items collection if needed
1360+
triggeringItem.TriggeredItems ??= new UInt32Collection();
1361+
1362+
// Add new links
1363+
if (clientHandlesToAdd.Count > 0)
1364+
{
1365+
foreach (uint clientHandle in clientHandlesToAdd)
1366+
{
1367+
if (!triggeringItem.TriggeredItems.Contains(clientHandle))
1368+
{
1369+
triggeringItem.TriggeredItems.Add(clientHandle);
1370+
}
1371+
1372+
// Update the triggered item to remember its triggering item
1373+
if (m_monitoredItems.TryGetValue(clientHandle, out MonitoredItem? triggeredItem))
1374+
{
1375+
triggeredItem.TriggeringItemId = triggeringItem.Status.Id;
1376+
}
1377+
}
1378+
}
1379+
1380+
// Remove links
1381+
if (clientHandlesToRemove.Count > 0)
1382+
{
1383+
foreach (uint clientHandle in clientHandlesToRemove)
1384+
{
1385+
triggeringItem.TriggeredItems.Remove(clientHandle);
1386+
1387+
// Clear the triggering item reference
1388+
if (m_monitoredItems.TryGetValue(clientHandle, out MonitoredItem? triggeredItem))
1389+
{
1390+
triggeredItem.TriggeringItemId = 0;
1391+
}
1392+
}
1393+
}
1394+
}
1395+
1396+
return response;
1397+
}
1398+
12011399
/// <summary>
12021400
/// Tells the server to refresh all conditions being monitored by the subscription.
12031401
/// </summary>
@@ -1350,6 +1548,9 @@ await session.RemoveSubscriptionsAsync(subscriptionsToRemove, ct)
13501548
m_changeMask |= SubscriptionChangeMask.Transferred;
13511549
ChangesCompleted();
13521550

1551+
// Restore triggering relationships after subscription transfer
1552+
await RestoreTriggeringAsync(ct).ConfigureAwait(false);
1553+
13531554
StartKeepAliveTimer();
13541555

13551556
TraceState("TRANSFERRED ASYNC");

Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,5 +1462,99 @@ private void DeferSubscriptionAcknowledge(
14621462
e.DeferredAcknowledgementsToSend.Clear();
14631463
e.AcknowledgementsToSend.Clear();
14641464
}
1465+
1466+
[Test]
1467+
[Order(900)]
1468+
public async Task SetTriggeringTrackingAsync()
1469+
{
1470+
// Create a subscription
1471+
var subscription = new Subscription(Session.DefaultSubscription)
1472+
{
1473+
PublishingEnabled = true,
1474+
PublishingInterval = 1000,
1475+
KeepAliveCount = 10,
1476+
LifetimeCount = 100,
1477+
MaxNotificationsPerPublish = 1000,
1478+
Priority = 100
1479+
};
1480+
1481+
Session.AddSubscription(subscription);
1482+
await subscription.CreateAsync(CancellationToken.None).ConfigureAwait(false);
1483+
Assert.That(subscription.Created, Is.True);
1484+
1485+
// Create monitored items
1486+
var triggeringItem = new MonitoredItem(subscription.DefaultItem)
1487+
{
1488+
StartNodeId = VariableIds.Server_ServerStatus_CurrentTime,
1489+
AttributeId = Attributes.Value,
1490+
MonitoringMode = MonitoringMode.Reporting,
1491+
SamplingInterval = 0,
1492+
QueueSize = 0,
1493+
DiscardOldest = true
1494+
};
1495+
1496+
var triggeredItem1 = new MonitoredItem(subscription.DefaultItem)
1497+
{
1498+
StartNodeId = VariableIds.Server_ServerStatus_State,
1499+
AttributeId = Attributes.Value,
1500+
MonitoringMode = MonitoringMode.Sampling,
1501+
SamplingInterval = 0,
1502+
QueueSize = 0,
1503+
DiscardOldest = true
1504+
};
1505+
1506+
var triggeredItem2 = new MonitoredItem(subscription.DefaultItem)
1507+
{
1508+
StartNodeId = VariableIds.Server_ServerStatus_BuildInfo,
1509+
AttributeId = Attributes.Value,
1510+
MonitoringMode = MonitoringMode.Sampling,
1511+
SamplingInterval = 0,
1512+
QueueSize = 0,
1513+
DiscardOldest = true
1514+
};
1515+
1516+
subscription.AddItem(triggeringItem);
1517+
subscription.AddItem(triggeredItem1);
1518+
subscription.AddItem(triggeredItem2);
1519+
1520+
// Create the items
1521+
await subscription.ApplyChangesAsync(CancellationToken.None).ConfigureAwait(false);
1522+
1523+
Assert.That(triggeringItem.Created, Is.True);
1524+
Assert.That(triggeredItem1.Created, Is.True);
1525+
Assert.That(triggeredItem2.Created, Is.True);
1526+
1527+
// Set up triggering relationship using the new method
1528+
var linksToAdd = new List<MonitoredItem> { triggeredItem1, triggeredItem2 };
1529+
SetTriggeringResponse response = await subscription.SetTriggeringAsync(
1530+
triggeringItem,
1531+
linksToAdd,
1532+
null,
1533+
CancellationToken.None).ConfigureAwait(false);
1534+
1535+
Assert.That(response, Is.Not.Null);
1536+
1537+
// Verify the triggering relationships are tracked
1538+
Assert.That(triggeringItem.TriggeredItems, Is.Not.Null);
1539+
Assert.That(triggeringItem.TriggeredItems.Count, Is.EqualTo(2));
1540+
Assert.That(triggeringItem.TriggeredItems, Does.Contain(triggeredItem1.ClientHandle));
1541+
Assert.That(triggeringItem.TriggeredItems, Does.Contain(triggeredItem2.ClientHandle));
1542+
1543+
Assert.That(triggeredItem1.TriggeringItemId, Is.EqualTo(triggeringItem.Status.Id));
1544+
Assert.That(triggeredItem2.TriggeringItemId, Is.EqualTo(triggeringItem.Status.Id));
1545+
1546+
// Snapshot the subscription state
1547+
subscription.Snapshot(out SubscriptionState state);
1548+
1549+
// Verify that the triggering relationships are persisted
1550+
MonitoredItemState triggeringItemState = state.MonitoredItems
1551+
.FirstOrDefault(m => m.ClientId == triggeringItem.ClientHandle);
1552+
Assert.That(triggeringItemState, Is.Not.Null);
1553+
Assert.That(triggeringItemState.TriggeredItems, Is.Not.Null);
1554+
Assert.That(triggeringItemState.TriggeredItems.Count, Is.EqualTo(2));
1555+
1556+
// Clean up
1557+
await subscription.DeleteAsync(true, CancellationToken.None).ConfigureAwait(false);
1558+
}
14651559
}
14661560
}

0 commit comments

Comments
 (0)