diff --git a/Libraries/Opc.Ua.Client/Subscription/MonitoredItemStatus.cs b/Libraries/Opc.Ua.Client/Subscription/MonitoredItemStatus.cs index 8cbabdaf4..f64f0d047 100644 --- a/Libraries/Opc.Ua.Client/Subscription/MonitoredItemStatus.cs +++ b/Libraries/Opc.Ua.Client/Subscription/MonitoredItemStatus.cs @@ -46,6 +46,12 @@ public sealed record class MonitoredItemStatus /// public bool Created => Id != 0; + /// + /// Whether a create request has been sent to the server but not yet completed. + /// This flag is used to prevent duplicate creation requests in multi-threaded scenarios. + /// + internal bool Creating { get; set; } + /// /// Any error condition associated with the monitored item. /// @@ -170,6 +176,9 @@ internal void SetCreateResult( FilterResult = Utils.Clone(result.FilterResult.Body) as MonitoringFilterResult; } } + + // Clear the Creating flag now that the create operation is complete + Creating = false; } /// diff --git a/Libraries/Opc.Ua.Client/Subscription/Subscription.cs b/Libraries/Opc.Ua.Client/Subscription/Subscription.cs index 5429a6749..9b8ba4993 100644 --- a/Libraries/Opc.Ua.Client/Subscription/Subscription.cs +++ b/Libraries/Opc.Ua.Client/Subscription/Subscription.cs @@ -1014,25 +1014,37 @@ public async Task> CreateItemsAsync(CancellationToken ct = } using Activity? activity = m_telemetry.StartActivity(); - // create monitored items. - CreateMonitoredItemsResponse response = await Session - .CreateMonitoredItemsAsync(null, Id, TimestampsToReturn, requestItems, ct) - .ConfigureAwait(false); + try + { + // create monitored items. + CreateMonitoredItemsResponse response = await Session + .CreateMonitoredItemsAsync(null, Id, TimestampsToReturn, requestItems, ct) + .ConfigureAwait(false); - MonitoredItemCreateResultCollection results = response.Results; - ClientBase.ValidateResponse(results, itemsToCreate); - ClientBase.ValidateDiagnosticInfos(response.DiagnosticInfos, itemsToCreate); + MonitoredItemCreateResultCollection results = response.Results; + ClientBase.ValidateResponse(results, itemsToCreate); + ClientBase.ValidateDiagnosticInfos(response.DiagnosticInfos, itemsToCreate); - // update results. - for (int ii = 0; ii < results.Count; ii++) + // update results. + for (int ii = 0; ii < results.Count; ii++) + { + itemsToCreate[ii] + .SetCreateResult( + requestItems[ii], + results[ii], + ii, + response.DiagnosticInfos, + response.ResponseHeader); + } + } + catch { - itemsToCreate[ii] - .SetCreateResult( - requestItems[ii], - results[ii], - ii, - response.DiagnosticInfos, - response.ResponseHeader); + // Clear the Creating flag on all items if an exception occurs + foreach (MonitoredItem monitoredItem in itemsToCreate) + { + monitoredItem.Status.Creating = false; + } + throw; } m_changeMask |= SubscriptionChangeMask.ItemsCreated; @@ -2795,8 +2807,8 @@ private static bool UpdateMonitoringMode( { foreach (MonitoredItem monitoredItem in m_monitoredItems.Values) { - // ignore items that have been created. - if (monitoredItem.Status.Created) + // ignore items that have been created or are being created. + if (monitoredItem.Status.Created || monitoredItem.Status.Creating) { continue; } @@ -2825,6 +2837,13 @@ private static bool UpdateMonitoringMode( requestItems.Add(request); itemsToCreate.Add(monitoredItem); } + + // Mark all items as being created before releasing the lock + // to prevent duplicate creation requests in multi-threaded scenarios + foreach (MonitoredItem monitoredItem in itemsToCreate) + { + monitoredItem.Status.Creating = true; + } } return (requestItems, itemsToCreate); } diff --git a/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs b/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs index 93da845fd..fd95341ae 100644 --- a/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs +++ b/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs @@ -1556,5 +1556,78 @@ public async Task SetTriggeringTrackingAsync() // Clean up await subscription.DeleteAsync(true, CancellationToken.None).ConfigureAwait(false); } + + /// + /// Test that concurrent calls to CreateItemsAsync do not create duplicate monitored items. + /// This test verifies the fix for the race condition where multiple threads calling + /// CreateItemsAsync could include the same items in their create requests. + /// + [Test] + [Order(1100)] + public async Task ConcurrentCreateItemsNoDuplicates() + { + var subscription = new TestableSubscription(Session.DefaultSubscription); + Session.AddSubscription(subscription); + await subscription.CreateAsync().ConfigureAwait(false); + + // Create multiple monitored items + var items = new List(); + for (int i = 0; i < 10; i++) + { + items.Add(new TestableMonitoredItem(subscription.DefaultItem) + { + DisplayName = $"Item{i}", + StartNodeId = VariableIds.Server_ServerStatus_CurrentTime, + AttributeId = Attributes.Value + }); + } + + subscription.AddItems(items); + Assert.That(subscription.MonitoredItemCount, Is.EqualTo(10)); + + // Simulate concurrent CreateItemsAsync calls + // Use 3 concurrent tasks to ensure at least 2 will race with each other + const int ConcurrentTasks = 3; + var tasks = new List>>(); + for (int i = 0; i < ConcurrentTasks; i++) + { + tasks.Add(Task.Run(() => + subscription.CreateItemsAsync(CancellationToken.None))); + } + + var results = await Task.WhenAll(tasks).ConfigureAwait(false); + + // Verify that all items were created exactly once + int totalCreated = 0; + foreach (var item in items) + { + if (item.Status.Created) + { + totalCreated++; + Assert.That(item.Status.Id, Is.GreaterThan(0u), + $"Item {item.DisplayName} should have a server-assigned ID"); + } + } + + Assert.That(totalCreated, Is.EqualTo(10), + "All 10 items should be created exactly once"); + + // Verify that each result list contains only the items that were actually created + // by that specific call (should be empty for concurrent calls after the first) + int nonEmptyResults = 0; + foreach (var result in results) + { + if (result.Count > 0) + { + nonEmptyResults++; + } + } + + Assert.That(nonEmptyResults, Is.LessThanOrEqualTo(1), + "Only one CreateItemsAsync call should have created items"); + + // Clean up + await subscription.DeleteAsync(true, CancellationToken.None).ConfigureAwait(false); + } } }