Skip to content

Commit bf67f39

Browse files
Copilotromanett
andauthored
Fix race condition causing duplicate MonitoredItems in concurrent CreateItemsAsync calls (#3399)
* Initial plan * Fix race condition in MonitoredItem creation Add Creating flag to prevent duplicate items when CreateItemsAsync is called concurrently. - Add internal Creating flag to MonitoredItemStatus - Check Creating flag in PrepareItemsToCreateAsync - Set Creating flag before releasing lock - Clear Creating flag in SetCreateResult (success or failure) - Add exception handling in CreateItemsAsync to clear Creating flag on errors - Add test for concurrent CreateItemsAsync calls Co-authored-by: romanett <[email protected]> * Address code review feedback - add comment for concurrent tasks constant Co-authored-by: romanett <[email protected]> --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: romanett <[email protected]>
1 parent c9b211e commit bf67f39

File tree

3 files changed

+119
-18
lines changed

3 files changed

+119
-18
lines changed

Libraries/Opc.Ua.Client/Subscription/MonitoredItemStatus.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ public sealed record class MonitoredItemStatus
4646
/// </summary>
4747
public bool Created => Id != 0;
4848

49+
/// <summary>
50+
/// Whether a create request has been sent to the server but not yet completed.
51+
/// This flag is used to prevent duplicate creation requests in multi-threaded scenarios.
52+
/// </summary>
53+
internal bool Creating { get; set; }
54+
4955
/// <summary>
5056
/// Any error condition associated with the monitored item.
5157
/// </summary>
@@ -170,6 +176,9 @@ internal void SetCreateResult(
170176
FilterResult = Utils.Clone(result.FilterResult.Body) as MonitoringFilterResult;
171177
}
172178
}
179+
180+
// Clear the Creating flag now that the create operation is complete
181+
Creating = false;
173182
}
174183

175184
/// <summary>

Libraries/Opc.Ua.Client/Subscription/Subscription.cs

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,25 +1014,37 @@ public async Task<IList<MonitoredItem>> CreateItemsAsync(CancellationToken ct =
10141014
}
10151015

10161016
using Activity? activity = m_telemetry.StartActivity();
1017-
// create monitored items.
1018-
CreateMonitoredItemsResponse response = await Session
1019-
.CreateMonitoredItemsAsync(null, Id, TimestampsToReturn, requestItems, ct)
1020-
.ConfigureAwait(false);
1017+
try
1018+
{
1019+
// create monitored items.
1020+
CreateMonitoredItemsResponse response = await Session
1021+
.CreateMonitoredItemsAsync(null, Id, TimestampsToReturn, requestItems, ct)
1022+
.ConfigureAwait(false);
10211023

1022-
MonitoredItemCreateResultCollection results = response.Results;
1023-
ClientBase.ValidateResponse(results, itemsToCreate);
1024-
ClientBase.ValidateDiagnosticInfos(response.DiagnosticInfos, itemsToCreate);
1024+
MonitoredItemCreateResultCollection results = response.Results;
1025+
ClientBase.ValidateResponse(results, itemsToCreate);
1026+
ClientBase.ValidateDiagnosticInfos(response.DiagnosticInfos, itemsToCreate);
10251027

1026-
// update results.
1027-
for (int ii = 0; ii < results.Count; ii++)
1028+
// update results.
1029+
for (int ii = 0; ii < results.Count; ii++)
1030+
{
1031+
itemsToCreate[ii]
1032+
.SetCreateResult(
1033+
requestItems[ii],
1034+
results[ii],
1035+
ii,
1036+
response.DiagnosticInfos,
1037+
response.ResponseHeader);
1038+
}
1039+
}
1040+
catch
10281041
{
1029-
itemsToCreate[ii]
1030-
.SetCreateResult(
1031-
requestItems[ii],
1032-
results[ii],
1033-
ii,
1034-
response.DiagnosticInfos,
1035-
response.ResponseHeader);
1042+
// Clear the Creating flag on all items if an exception occurs
1043+
foreach (MonitoredItem monitoredItem in itemsToCreate)
1044+
{
1045+
monitoredItem.Status.Creating = false;
1046+
}
1047+
throw;
10361048
}
10371049

10381050
m_changeMask |= SubscriptionChangeMask.ItemsCreated;
@@ -2795,8 +2807,8 @@ private static bool UpdateMonitoringMode(
27952807
{
27962808
foreach (MonitoredItem monitoredItem in m_monitoredItems.Values)
27972809
{
2798-
// ignore items that have been created.
2799-
if (monitoredItem.Status.Created)
2810+
// ignore items that have been created or are being created.
2811+
if (monitoredItem.Status.Created || monitoredItem.Status.Creating)
28002812
{
28012813
continue;
28022814
}
@@ -2825,6 +2837,13 @@ private static bool UpdateMonitoringMode(
28252837
requestItems.Add(request);
28262838
itemsToCreate.Add(monitoredItem);
28272839
}
2840+
2841+
// Mark all items as being created before releasing the lock
2842+
// to prevent duplicate creation requests in multi-threaded scenarios
2843+
foreach (MonitoredItem monitoredItem in itemsToCreate)
2844+
{
2845+
monitoredItem.Status.Creating = true;
2846+
}
28282847
}
28292848
return (requestItems, itemsToCreate);
28302849
}

Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1556,5 +1556,78 @@ public async Task SetTriggeringTrackingAsync()
15561556
// Clean up
15571557
await subscription.DeleteAsync(true, CancellationToken.None).ConfigureAwait(false);
15581558
}
1559+
1560+
/// <summary>
1561+
/// Test that concurrent calls to CreateItemsAsync do not create duplicate monitored items.
1562+
/// This test verifies the fix for the race condition where multiple threads calling
1563+
/// CreateItemsAsync could include the same items in their create requests.
1564+
/// </summary>
1565+
[Test]
1566+
[Order(1100)]
1567+
public async Task ConcurrentCreateItemsNoDuplicates()
1568+
{
1569+
var subscription = new TestableSubscription(Session.DefaultSubscription);
1570+
Session.AddSubscription(subscription);
1571+
await subscription.CreateAsync().ConfigureAwait(false);
1572+
1573+
// Create multiple monitored items
1574+
var items = new List<MonitoredItem>();
1575+
for (int i = 0; i < 10; i++)
1576+
{
1577+
items.Add(new TestableMonitoredItem(subscription.DefaultItem)
1578+
{
1579+
DisplayName = $"Item{i}",
1580+
StartNodeId = VariableIds.Server_ServerStatus_CurrentTime,
1581+
AttributeId = Attributes.Value
1582+
});
1583+
}
1584+
1585+
subscription.AddItems(items);
1586+
Assert.That(subscription.MonitoredItemCount, Is.EqualTo(10));
1587+
1588+
// Simulate concurrent CreateItemsAsync calls
1589+
// Use 3 concurrent tasks to ensure at least 2 will race with each other
1590+
const int ConcurrentTasks = 3;
1591+
var tasks = new List<Task<IList<MonitoredItem>>>();
1592+
for (int i = 0; i < ConcurrentTasks; i++)
1593+
{
1594+
tasks.Add(Task.Run(() =>
1595+
subscription.CreateItemsAsync(CancellationToken.None)));
1596+
}
1597+
1598+
var results = await Task.WhenAll(tasks).ConfigureAwait(false);
1599+
1600+
// Verify that all items were created exactly once
1601+
int totalCreated = 0;
1602+
foreach (var item in items)
1603+
{
1604+
if (item.Status.Created)
1605+
{
1606+
totalCreated++;
1607+
Assert.That(item.Status.Id, Is.GreaterThan(0u),
1608+
$"Item {item.DisplayName} should have a server-assigned ID");
1609+
}
1610+
}
1611+
1612+
Assert.That(totalCreated, Is.EqualTo(10),
1613+
"All 10 items should be created exactly once");
1614+
1615+
// Verify that each result list contains only the items that were actually created
1616+
// by that specific call (should be empty for concurrent calls after the first)
1617+
int nonEmptyResults = 0;
1618+
foreach (var result in results)
1619+
{
1620+
if (result.Count > 0)
1621+
{
1622+
nonEmptyResults++;
1623+
}
1624+
}
1625+
1626+
Assert.That(nonEmptyResults, Is.LessThanOrEqualTo(1),
1627+
"Only one CreateItemsAsync call should have created items");
1628+
1629+
// Clean up
1630+
await subscription.DeleteAsync(true, CancellationToken.None).ConfigureAwait(false);
1631+
}
15591632
}
15601633
}

0 commit comments

Comments
 (0)