diff --git a/.github/agents/opcua-v16-migration.agent.md b/.github/agents/opcua-v16-migration.agent.md index 2faab7c7e..160016ca1 100644 --- a/.github/agents/opcua-v16-migration.agent.md +++ b/.github/agents/opcua-v16-migration.agent.md @@ -173,6 +173,7 @@ Also: `ReadOnlyList` → `ArrayOf`, `IList` parameters → `ArrayOf` - Use `List` when items are added/removed/modified, then convert to `ArrayOf` with `.ToArrayOf()`. - `ArrayOf` implicitly converts from `List` but not vice versa. Use `.ToList()` to convert back. - `ArrayOf` supports collection expressions: `ArrayOf arr = [1, 2, 3];` +- To follow best coding practices Do NOT use casts to create the ArrayOf but use `.ToArrayOf()` or create it directly using a collection expression ### ArrayOf Key API diff --git a/Directory.Packages.props b/Directory.Packages.props index 01016a176..d08082e25 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -29,6 +29,7 @@ + @@ -63,6 +64,7 @@ + \ No newline at end of file diff --git a/Libraries/Opc.Ua.Server/Configuration/ConfigurationNodeManager.cs b/Libraries/Opc.Ua.Server/Configuration/ConfigurationNodeManager.cs index 19d404933..16a9e6ef0 100644 --- a/Libraries/Opc.Ua.Server/Configuration/ConfigurationNodeManager.cs +++ b/Libraries/Opc.Ua.Server/Configuration/ConfigurationNodeManager.cs @@ -185,7 +185,7 @@ protected override async ValueTask AddBehaviourToPredefinedNodeAsync( } else { - NodeState serverNode = Server.NodeManager.FindNodeInAddressSpaceAsync(ObjectIds.Server).AsTask().GetAwaiter().GetResult(); + NodeState serverNode = await Server.NodeManager.FindNodeInAddressSpaceAsync(ObjectIds.Server).ConfigureAwait(false); serverNode?.ReplaceChild(context, activeNode); } // remove the reference to server node because it is set as parent diff --git a/Libraries/Opc.Ua.Server/Diagnostics/DiagnosticsNodeManager.cs b/Libraries/Opc.Ua.Server/Diagnostics/DiagnosticsNodeManager.cs index cfa79a62a..a72714f3f 100644 --- a/Libraries/Opc.Ua.Server/Diagnostics/DiagnosticsNodeManager.cs +++ b/Libraries/Opc.Ua.Server/Diagnostics/DiagnosticsNodeManager.cs @@ -28,6 +28,7 @@ * ======================================================================*/ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Globalization; using System.Linq; @@ -256,29 +257,26 @@ protected ServiceResult OnGetMonitoredItems( return StatusCodes.BadInvalidArgument; } - foreach (ISubscription subscription in Server.SubscriptionManager.GetSubscriptions()) + if (!Server.SubscriptionManager.TryGetSubscription(subscriptionId, out ISubscription subscription)) { - if (subscription.Id == subscriptionId) - { - if (context is ISessionSystemContext session && - subscription.SessionId != session.SessionId) - { - // user tries to access subscription of different session - return StatusCodes.BadUserAccessDenied; - } + return StatusCodes.BadSubscriptionIdInvalid; + } - subscription.GetMonitoredItems( - out ArrayOf serverHandles, - out ArrayOf clientHandles); + if (context is ISessionSystemContext session && + subscription.SessionId != session.SessionId) + { + // user tries to access subscription of different session + return StatusCodes.BadUserAccessDenied; + } - outputArguments[0] = serverHandles; - outputArguments[1] = clientHandles; + subscription.GetMonitoredItems( + out ArrayOf serverHandles, + out ArrayOf clientHandles); - return ServiceResult.Good; - } - } + outputArguments[0] = serverHandles; + outputArguments[1] = clientHandles; - return StatusCodes.BadSubscriptionIdInvalid; + return ServiceResult.Good; } /// @@ -300,24 +298,21 @@ protected ServiceResult OnResendData( return StatusCodes.BadInvalidArgument; } - foreach (ISubscription subscription in Server.SubscriptionManager.GetSubscriptions()) + if (!Server.SubscriptionManager.TryGetSubscription(subscriptionId, out ISubscription subscription)) { - if (subscription.Id == subscriptionId) - { - if (context is not ServerSystemContext session || - subscription.SessionId != session.SessionId) - { - // user tries to access subscription of different session - return StatusCodes.BadUserAccessDenied; - } - - subscription.ResendData(session.OperationContext); + return StatusCodes.BadSubscriptionIdInvalid; + } - return ServiceResult.Good; - } + if (context is not ServerSystemContext session || + subscription.SessionId != session.SessionId) + { + // user tries to access subscription of different session + return StatusCodes.BadUserAccessDenied; } - return StatusCodes.BadSubscriptionIdInvalid; + subscription.ResendData(session.OperationContext); + + return ServiceResult.Good; } /// @@ -448,9 +443,10 @@ protected override async ValueTask AddBehaviourToPredefinedNodeAsync( { if (passiveNode is ServerObjectState) { + // add the server object as the root notifier. + await AddRootNotifierAsync(passiveNode, cancellationToken).ConfigureAwait(false); break; } - var activeNode = new ServerObjectState(passiveNode.Parent); activeNode.Create(context, passiveNode); @@ -1883,7 +1879,7 @@ handle.Node is BaseVariableState variable && if (monitoredItem.MonitoringMode != MonitoringMode.Disabled) { - m_diagnosticsMonitoringCount++; + Interlocked.Increment(ref m_diagnosticsMonitoringCount); m_diagnosticsScanTimer ??= new Timer(DoScan, null, 1000, 1000); @@ -1909,7 +1905,7 @@ protected override ValueTask OnMonitoredItemDeletedAsync( if (IsDiagnosticsNode(handle.Node) && monitoredItem.MonitoringMode != MonitoringMode.Disabled) { - m_diagnosticsMonitoringCount--; + Interlocked.Decrement(ref m_diagnosticsMonitoringCount); if (m_diagnosticsMonitoringCount == 0 && m_diagnosticsScanTimer != null) { @@ -1953,12 +1949,12 @@ protected override ValueTask OnMonitoringModeChangedAsync( { if (previousMode != MonitoringMode.Disabled) { - m_diagnosticsMonitoringCount--; + Interlocked.Decrement(ref m_diagnosticsMonitoringCount); } if (monitoringMode != MonitoringMode.Disabled) { - m_diagnosticsMonitoringCount++; + Interlocked.Increment(ref m_diagnosticsMonitoringCount); } if (m_diagnosticsMonitoringCount == 0 && m_diagnosticsScanTimer != null) @@ -2023,7 +2019,7 @@ private void CreateSampledItem( double samplingInterval, ISampledDataChangeMonitoredItem monitoredItem) { - m_sampledItems.Add(monitoredItem); + m_sampledItems.TryAdd(monitoredItem.Id, monitoredItem); m_samplingTimer ??= new Timer( DoSample, @@ -2037,16 +2033,9 @@ private void CreateSampledItem( /// private void DeleteSampledItem(ISampledDataChangeMonitoredItem monitoredItem) { - for (int ii = 0; ii < m_sampledItems.Count; ii++) - { - if (ReferenceEquals(monitoredItem, m_sampledItems[ii])) - { - m_sampledItems.RemoveAt(ii); - break; - } - } + m_sampledItems.TryRemove(monitoredItem.Id, out _); - if (m_sampledItems.Count == 0 && m_samplingTimer != null) + if (m_sampledItems.IsEmpty && m_samplingTimer != null) { m_samplingTimer.Dispose(); m_samplingTimer = null; @@ -2062,9 +2051,9 @@ private void DoSample(object state) { lock (m_diagnosticsLock) { - for (int ii = 0; ii < m_sampledItems.Count; ii++) + foreach (KeyValuePair kvp in m_sampledItems) { - ISampledDataChangeMonitoredItem monitoredItem = m_sampledItems[ii]; + ISampledDataChangeMonitoredItem monitoredItem = kvp.Value; // get the handle. if (monitoredItem.ManagerHandle is not NodeHandle handle) @@ -2121,7 +2110,7 @@ private void DoSample(object state) private readonly List m_subscriptions; private NodeId m_serverLockHolder; private Timer m_samplingTimer; - private readonly List m_sampledItems; + private readonly ConcurrentDictionary m_sampledItems; private readonly double m_minimumSamplingInterval; private HistoryServerCapabilitiesState m_historyCapabilities; diff --git a/Libraries/Opc.Ua.Server/NodeManager/MonitoredItem/MonitoredNode.cs b/Libraries/Opc.Ua.Server/NodeManager/MonitoredItem/MonitoredNode.cs index 0fce0d308..602ba23d9 100644 --- a/Libraries/Opc.Ua.Server/NodeManager/MonitoredItem/MonitoredNode.cs +++ b/Libraries/Opc.Ua.Server/NodeManager/MonitoredItem/MonitoredNode.cs @@ -137,10 +137,7 @@ public void Remove(IDataChangeMonitoredItem2 datachangeItem) Node.OnStateChanged = null; // Unsubscribe from namespace default permission changes when the last item is removed. - if (m_server.ConfigurationNodeManager != null) - { - m_server.ConfigurationNodeManager.DefaultPermissionsChanged -= OnDefaultPermissionsChanged; - } + m_server.ConfigurationNodeManager?.DefaultPermissionsChanged -= OnDefaultPermissionsChanged; } } diff --git a/Libraries/Opc.Ua.Server/Subscription/ISubscriptionManager.cs b/Libraries/Opc.Ua.Server/Subscription/ISubscriptionManager.cs index fc27ad4f2..bd5cf4fcc 100644 --- a/Libraries/Opc.Ua.Server/Subscription/ISubscriptionManager.cs +++ b/Libraries/Opc.Ua.Server/Subscription/ISubscriptionManager.cs @@ -58,6 +58,14 @@ public interface ISubscriptionManager : IDisposable /// A list of the subscriptions. IList GetSubscriptions(); + /// + /// Get the subscription with the specified id + /// + /// The id of the subscription + /// The subscription if found else null + /// True if found + bool TryGetSubscription(uint id, out ISubscription subscription); + /// /// Set a subscription into durable mode /// diff --git a/Libraries/Opc.Ua.Server/Subscription/SessionPublishQueue.cs b/Libraries/Opc.Ua.Server/Subscription/SessionPublishQueue.cs index a252db0ee..2d5473899 100644 --- a/Libraries/Opc.Ua.Server/Subscription/SessionPublishQueue.cs +++ b/Libraries/Opc.Ua.Server/Subscription/SessionPublishQueue.cs @@ -108,7 +108,7 @@ public Task PublishAsync(string secureChannelId, } QueuedSubscription subscriptionToPublish; - lock (m_subscriptionPublishLock) + lock (m_lock) { // find the waiting subscription with the highest priority. subscriptionToPublish = GetSubscriptionToPublish(); @@ -117,10 +117,7 @@ public Task PublishAsync(string secureChannelId, { return Task.FromResult(subscriptionToPublish.Subscription); } - } - lock (m_lock) - { // check if queue is full. if (m_queuedRequests.Count >= m_maxRequestCount) { @@ -150,6 +147,8 @@ public Task PublishAsync(string secureChannelId, /// The list of subscriptions in the queue. public IList Close() { + var subscriptions = new List(); + lock (m_lock) { // TraceState("SESSION CLOSED"); @@ -164,19 +163,21 @@ public IList Close() } // tell the subscriptions that the session is closed. - var subscriptions = new List(m_queuedSubscriptions.Count); - foreach (KeyValuePair entry in m_queuedSubscriptions) { subscriptions.Add(entry.Value.Subscription); - entry.Value.Subscription.SessionClosed(); } // clear the queue. m_queuedSubscriptions.Clear(); + } - return subscriptions; + foreach (ISubscription subscription in subscriptions) + { + subscription.SessionClosed(); } + + return subscriptions; } /// @@ -371,19 +372,19 @@ public void PublishCompleted(ISubscription subscription, bool moreNotifications) if (m_queuedSubscriptions.TryGetValue(subscription.Id, out QueuedSubscription queuedSubscription)) { - queuedSubscription.Publishing = false; - - if (moreNotifications) + lock (m_lock) { - lock (m_subscriptionPublishLock) + queuedSubscription.Publishing = false; + + if (moreNotifications) { AssignSubscriptionToRequest(queuedSubscription); } - } - else - { - queuedSubscription.ReadyToPublish = false; - queuedSubscription.Timestamp = DateTime.UtcNow; + else + { + queuedSubscription.ReadyToPublish = false; + queuedSubscription.Timestamp = DateTime.UtcNow; + } } } } @@ -395,8 +396,11 @@ public void Requeue(ISubscription subscription) { if (m_queuedSubscriptions.TryGetValue(subscription.Id, out QueuedSubscription queuedSubscription)) { - queuedSubscription.Publishing = false; - queuedSubscription.ReadyToPublish = true; + lock (m_lock) + { + queuedSubscription.Publishing = false; + queuedSubscription.ReadyToPublish = true; + } } } @@ -439,9 +443,12 @@ public void PublishTimerExpired() // assign subscription to request if one is available. if (!subscription.Publishing) { - lock (m_subscriptionPublishLock) + lock (m_lock) { - AssignSubscriptionToRequest(subscription); + if (!subscription.Publishing) + { + AssignSubscriptionToRequest(subscription); + } } } } @@ -659,7 +666,6 @@ internal void TraceState(string context, params object[] args) } private readonly Lock m_lock = new(); - private readonly Lock m_subscriptionPublishLock = new(); private readonly ILogger m_logger; private readonly IServerInternal m_server; private readonly ISession m_session; diff --git a/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs b/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs index 9f6e1e9cc..0233ff3c6 100644 --- a/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs +++ b/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs @@ -181,15 +181,19 @@ public event SubscriptionEventHandler SubscriptionDeleted } } - /// - /// Returns all of the subscriptions known to the subscription manager. - /// - /// A list of the subscriptions. + + /// public IList GetSubscriptions() { return [.. m_subscriptions.Values]; } + /// + public bool TryGetSubscription(uint id, out ISubscription subscription) + { + return m_subscriptions.TryGetValue(id, out subscription); + } + /// /// Raises an event related to a subscription. /// diff --git a/README.md b/README.md index 63dc6ef59..23985382e 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,8 @@ More samples based on the official [Nuget](https://www.nuget.org/packages/OPCFou * Source generator generated code behind during build * See [MigrationGuide](Docs/MigrationGuide.md) for details. * New AsyncCustomNodeManager (successor of CustomNodeManager2) with improved Locking Strategy, see [Server Async (TAP) Support](Docs/AsyncServerSupport.md) +* In our Load Test the Server shows at least 2.5x higher throughput under load with 750 subscriptions totaling 450k Monitored items + and write times for 600 items below 5 seconds were before > 10 seconds were needed. Also for event at least 3x faster event reporting was observed. #### **New in 1.05.378** diff --git a/Stack/Opc.Ua.Core/Opc.Ua.Core.csproj b/Stack/Opc.Ua.Core/Opc.Ua.Core.csproj index a978a6fb0..e4b2e56e2 100644 --- a/Stack/Opc.Ua.Core/Opc.Ua.Core.csproj +++ b/Stack/Opc.Ua.Core/Opc.Ua.Core.csproj @@ -18,6 +18,7 @@ + diff --git a/Stack/Opc.Ua.Core/Stack/Server/ServerBase.cs b/Stack/Opc.Ua.Core/Stack/Server/ServerBase.cs index 167c58079..55e9fc8a0 100644 --- a/Stack/Opc.Ua.Core/Stack/Server/ServerBase.cs +++ b/Stack/Opc.Ua.Core/Stack/Server/ServerBase.cs @@ -28,7 +28,6 @@ * ======================================================================*/ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Globalization; using System.Linq; @@ -1611,14 +1610,20 @@ public RequestQueue( m_server = server; m_minThreadCount = minThreadCount; m_maxThreadCount = maxThreadCount; - m_maxRequestCount = maxRequestCount; - m_queue = new ConcurrentQueue(); - m_queueSignal = new SemaphoreSlim(0); + + var options = new System.Threading.Channels.BoundedChannelOptions(maxRequestCount) + { + SingleWriter = false, + SingleReader = false, + FullMode = System.Threading.Channels.BoundedChannelFullMode.Wait + }; + + m_queue = System.Threading.Channels.Channel.CreateBounded(options); + m_workers = []; m_cts = new CancellationTokenSource(); m_activeThreadCount = 0; m_totalThreadCount = 0; - m_queuedRequestsCount = 0; m_stopped = false; ThreadPool.GetMinThreads(out minThreadCount, out int minCompletionPortThreads); @@ -1636,9 +1641,10 @@ public RequestQueue( ); // Start worker tasks + var token = m_cts.Token; for (int i = 0; i < m_minThreadCount; i++) { - m_workers.Add(Task.Run(() => WorkerLoopAsync(m_cts.Token))); + m_workers.Add(Task.Run(() => WorkerLoopAsync(token))); } } @@ -1661,19 +1667,30 @@ protected virtual void Dispose(bool disposing) m_stopped = true; m_cts.Cancel(); - if (m_totalThreadCount > 0) + m_queue.Writer.Complete(); + + // drain any remaining requests from the queue + while (m_queue.Reader.TryRead(out IEndpointIncomingRequest request)) { - m_queueSignal.Release(m_totalThreadCount); // Unblock all workers + request.OperationCompleted(null, StatusCodes.BadServerHalted); } - Utils.SilentDispose(m_queueSignal); - foreach (IEndpointIncomingRequest request in m_queue.ToList()) + // Wait for all worker threads to complete + Task[] workerTasks; + lock (m_workers) { - request.OperationCompleted(null, StatusCodes.BadServerHalted); + workerTasks = [.. m_workers]; } -#if NETSTANDARD2_1_OR_GREATER - m_queue.Clear(); -#endif + + try + { + Task.WaitAll(workerTasks, TimeSpan.FromSeconds(5)); + } + catch (AggregateException) + { + // Ignore exceptions during shutdown + } + Utils.SilentDispose(m_cts); } } @@ -1691,14 +1708,15 @@ public void ScheduleIncomingRequest(IEndpointIncomingRequest request) return; } - // check if we can accept more requests - if (m_queuedRequestsCount >= m_maxRequestCount) + // Enqueue requests. Use TryWrite to fail immediately if limit is reached. + if (!m_queue.Writer.TryWrite(request)) { request.OperationCompleted(null, StatusCodes.BadServerTooBusy); // TODO: make a metric m_server.m_logger.LogDebug("Too many operations. Active threads: {Count}", m_activeThreadCount); return; } + // Optionally scale up workers if needed if (m_totalThreadCount < m_maxThreadCount && m_activeThreadCount >= m_totalThreadCount) @@ -1708,10 +1726,6 @@ public void ScheduleIncomingRequest(IEndpointIncomingRequest request) m_workers.Add(Task.Run(() => WorkerLoopAsync(m_cts.Token))); } } - // Enqueue requests - m_queue.Enqueue(request); - Interlocked.Increment(ref m_queuedRequestsCount); - m_queueSignal.Release(); } /// @@ -1723,22 +1737,12 @@ private async Task WorkerLoopAsync(CancellationToken ct) Interlocked.Increment(ref m_totalThreadCount); try { - while (!ct.IsCancellationRequested) + while (await m_queue.Reader.WaitToReadAsync(ct).ConfigureAwait(false)) { - // wait for a request - if ((!await m_queueSignal.WaitAsync(15_000, ct).ConfigureAwait(false)) && - m_totalThreadCount > m_minThreadCount) - { - //end loop if no requests and we have enough threads - return; - } - - //process request from queue - if (m_queue.TryDequeue(out IEndpointIncomingRequest request)) + while (m_queue.Reader.TryRead(out IEndpointIncomingRequest request)) { try { - Interlocked.Decrement(ref m_queuedRequestsCount); Interlocked.Increment(ref m_activeThreadCount); await m_server.ProcessRequestAsync(request, ct) .ConfigureAwait(false); @@ -1759,6 +1763,10 @@ await m_server.ProcessRequestAsync(request, ct) { // Graceful shutdown } + catch (System.Threading.Channels.ChannelClosedException) + { + // Graceful shutdown + } finally { Interlocked.Decrement(ref m_totalThreadCount); @@ -1768,14 +1776,11 @@ await m_server.ProcessRequestAsync(request, ct) private readonly ServerBase m_server; private readonly int m_minThreadCount; private readonly int m_maxThreadCount; - private readonly int m_maxRequestCount; - private readonly ConcurrentQueue m_queue; - private readonly SemaphoreSlim m_queueSignal; + private readonly System.Threading.Channels.Channel m_queue; private readonly List m_workers; private readonly CancellationTokenSource m_cts; private int m_activeThreadCount; private int m_totalThreadCount; - private int m_queuedRequestsCount; private bool m_stopped; } diff --git a/Tests/Opc.Ua.Client.Tests/ClientTest.cs b/Tests/Opc.Ua.Client.Tests/ClientTest.cs index c1f2545df..1c3262440 100644 --- a/Tests/Opc.Ua.Client.Tests/ClientTest.cs +++ b/Tests/Opc.Ua.Client.Tests/ClientTest.cs @@ -985,6 +985,12 @@ public async Task ReconnectSession_ReuseUsertokenPolicyAsync( $"No UserTokenPolicy found for {userIdentity.TokenType}" + $" / {userIdentity.IssuedTokenType}"); } + if (identityPolicy.SecurityPolicyUri != userTokenPolicy) + { + NUnit.Framework.Assert.Fail( + $"UserTokenPolicy SecurityPolicyUri {identityPolicy.SecurityPolicyUri} does not match test expected SecurityPolicyUri {userTokenPolicy}" + + $"Please fix Test parameters or Test server configuration"); + } userIdentity.PolicyId = identityPolicy.PolicyId; // the active channel @@ -994,6 +1000,8 @@ public async Task ReconnectSession_ReuseUsertokenPolicyAsync( try { await session1.ReconnectAsync(null, null).ConfigureAwait(false); + Assert.That(session1.Identity.PolicyId, Is.EqualTo(identityPolicy.PolicyId), + "User Token PolicyId needs to be preserved after reconnect."); } finally { diff --git a/Tests/Opc.Ua.Client.Tests/ClientTestFramework.cs b/Tests/Opc.Ua.Client.Tests/ClientTestFramework.cs index 5fb6beced..a857c131c 100644 --- a/Tests/Opc.Ua.Client.Tests/ClientTestFramework.cs +++ b/Tests/Opc.Ua.Client.Tests/ClientTestFramework.cs @@ -239,7 +239,12 @@ public virtual async Task CreateReferenceServerFixtureAsync( { IssuedTokenType = Profiles.JwtUserToken }; - + ServerFixture.Config.ServerConfiguration.UserTokenPolicies += + new UserTokenPolicy(UserTokenType.UserName) + { + SecurityPolicyUri + = SecurityPolicies.Basic128Rsa15 + }; ServerFixture.Config.ServerConfiguration.UserTokenPolicies += new UserTokenPolicy(UserTokenType.UserName) { diff --git a/Tests/Opc.Ua.Client.Tests/LoadTest.cs b/Tests/Opc.Ua.Client.Tests/LoadTest.cs index 4aa8b26a4..b472be7bb 100644 --- a/Tests/Opc.Ua.Client.Tests/LoadTest.cs +++ b/Tests/Opc.Ua.Client.Tests/LoadTest.cs @@ -35,6 +35,7 @@ using System.Threading; using System.Threading.Tasks; using NUnit.Framework; +using Opc.Ua.Server; namespace Opc.Ua.Client.Tests { @@ -507,5 +508,186 @@ public async Task ServerReadLoadTestAsync() await Task.WhenAll(closeTasks).ConfigureAwait(false); } } + + /// + /// Load test a server with multiple sessions subscribing to events. + /// Verifies that event handling remains responsive under load with many concurrent + /// event subscriptions, addressing the performance issue where server becomes + /// unresponsive as the number of monitored items increases. + /// + [Test] + [Explicit] + [Order(120)] + public async Task ServerEventSubscribeLoadTestAsync() + { + const int sessionCount = 50; + const int subscriptionsPerSession = 15; + const int publishingInterval = 100; + const int testDurationSeconds = 30; + + var sessions = new ConcurrentBag(); + long eventsReceived = 0; + long totalDelayTicks = 0; + + try + { + TestContext.Out.WriteLine($"Creating {sessionCount} sessions with {subscriptionsPerSession} event subscriptions each."); + + // Create sessions with event subscriptions in parallel. + var createSessionTasks = new List(); + for (int i = 0; i < sessionCount; i++) + { + createSessionTasks.Add(Task.Run(async () => + { + ISession session = await ClientFixture.ConnectAsync( + ServerUrl, + SecurityPolicies.Basic256Sha256).ConfigureAwait(false); + sessions.Add(session); + + for (int j = 0; j < subscriptionsPerSession; j++) + { + var subscription = new Subscription(session.DefaultSubscription) + { + PublishingInterval = publishingInterval + }; + + // Build an event filter for the base event type. + var eventFilter = new EventFilter(); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.EventId)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.EventType)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.SourceNode)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.SourceName)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.Time)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.Message)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.Severity)); + + // Monitor the Server node for events. + var monitoredItem = new MonitoredItem(subscription.DefaultItem) + { + StartNodeId = ObjectIds.Server, + AttributeId = Attributes.EventNotifier, + MonitoringMode = MonitoringMode.Reporting, + Filter = eventFilter, + QueueSize = 10000 + }; + + subscription.FastEventCallback = (sub, notification, _) => + { + Interlocked.Add(ref eventsReceived, notification.Events.Count); + foreach (EventFieldList fieldList in notification.Events) + { + if (fieldList.EventFields.Count > 4 && fieldList.EventFields[4].TryGet(out DateTimeUtc eventTime)) + { + TimeSpan delay = DateTime.UtcNow - ((DateTime)eventTime).ToUniversalTime(); + Interlocked.Add(ref totalDelayTicks, delay.Ticks); + } + } + }; + + subscription.AddItem(monitoredItem); + session.AddSubscription(subscription); + await subscription.CreateAsync().ConfigureAwait(false); + } + })); + } + + await Task.WhenAll(createSessionTasks).ConfigureAwait(false); + + const int totalSubscriptions = sessionCount * subscriptionsPerSession; + TestContext.Out.WriteLine( + $"Created {totalSubscriptions} event subscriptions across {sessionCount} sessions."); + TestContext.Out.WriteLine($"Generating events on the server for {testDurationSeconds} seconds..."); + + // Generate events directly on the server to stress-test event delivery. + IServerInternal serverInternal = ReferenceServer.CurrentInstance; + ISystemContext serverContext = serverInternal.DefaultSystemContext; + int eventCount = 0; + + using var testCts = new CancellationTokenSource(TimeSpan.FromSeconds(testDurationSeconds)); + + var sw = System.Diagnostics.Stopwatch.StartNew(); + while (!testCts.IsCancellationRequested) + { + var e = new BaseEventState(null); + e.Initialize( + serverContext, + serverInternal.ServerObject, + EventSeverity.Medium, + new LocalizedText($"LoadTest event {eventCount}")); + serverInternal.ReportEvent(serverContext, e); + eventCount++; + } + + sw.Stop(); + TestContext.Out.WriteLine($"Generated {eventCount} events in {sw.ElapsedMilliseconds} ms " + + $"({eventCount / sw.Elapsed.TotalSeconds:F0} events/sec)."); + + // Wait for subscriptions to deliver the events. + // Allow enough publishing intervals for all notifications to be sent and acknowledged. + const int publishingIntervalsToWait = 20; + await Task.Delay(publishingInterval * publishingIntervalsToWait).ConfigureAwait(false); + + long expectedTotal = (long)eventCount * totalSubscriptions; + long received = Interlocked.Read(ref eventsReceived); + + TestContext.Out.WriteLine($"Expected event notifications : {expectedTotal}"); + TestContext.Out.WriteLine($"Received event notifications : {received}"); + + double receiveRatio = expectedTotal > 0 ? (double)received / expectedTotal : 0; + TestContext.Out.WriteLine($"Receive ratio: {receiveRatio:P2}"); + + if (received > 0) + { + long averageDelayTicks = Interlocked.Read(ref totalDelayTicks) / received; + var averageDelay = TimeSpan.FromTicks(averageDelayTicks); + TestContext.Out.WriteLine($"Average event delivery delay: {averageDelay.TotalMilliseconds:F2} ms"); + } + + NUnit.Framework.Assert.That( + received, + Is.GreaterThan(0), + "No event notifications were received."); + + NUnit.Framework.Assert.That( + receiveRatio, + Is.GreaterThan(0.99), + "The event notification receive ratio is too low."); + } + finally + { + // Cleanup all sessions. + var closeTasks = sessions.Select(session => Task.Run(async () => + { + try + { + if (session.Connected) + { + await session.CloseAsync().ConfigureAwait(false); + } + + session.Dispose(); + } + catch (Exception ex) + { + TestContext.Out.WriteLine($"Failed to close session: {ex.Message}"); + } + })).ToList(); + await Task.WhenAll(closeTasks).ConfigureAwait(false); + } + } } } diff --git a/Tests/Opc.Ua.Client.Tests/SubscriptionUnitTests.cs b/Tests/Opc.Ua.Client.Tests/SubscriptionUnitTests.cs index 6a0c4b551..783558df3 100644 --- a/Tests/Opc.Ua.Client.Tests/SubscriptionUnitTests.cs +++ b/Tests/Opc.Ua.Client.Tests/SubscriptionUnitTests.cs @@ -169,9 +169,8 @@ private static async Task BuildSubscriptionAsync( bool sequentialPublishing, CancellationToken cancellationToken) { - TaskCompletionSource[] messageAwaiters = messagesToProcess - .Select(_ => new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)) - .ToArray(); + TaskCompletionSource[] messageAwaiters = + [.. messagesToProcess.Select(_ => new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously))]; messageAwaiters[0].SetResult(true); List availableSequenceNumbers = [.. messagesToProcess.Skip(1).Select(x => x.SequenceNumber)]; diff --git a/Tests/Opc.Ua.Core.Tests/Stack/Server/RequestQueueTests.cs b/Tests/Opc.Ua.Core.Tests/Stack/Server/RequestQueueTests.cs new file mode 100644 index 000000000..da3a40476 --- /dev/null +++ b/Tests/Opc.Ua.Core.Tests/Stack/Server/RequestQueueTests.cs @@ -0,0 +1,244 @@ +/* ======================================================================== + * Copyright (c) 2005-2025 The OPC Foundation, Inc. All rights reserved. + * + * OPC Foundation MIT License 1.00 + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + * + * The complete license agreement can be found here: + * http://opcfoundation.org/License/MIT/1.00/ + * ======================================================================*/ + +using System; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using Opc.Ua.Tests; + +namespace Opc.Ua.Core.Tests.Stack.Server +{ + [TestFixture] + [Category("Server")] + [SetCulture("en-us")] + [SetUICulture("en-us")] + [Parallelizable] + public class RequestQueueTests + { + private sealed class TestEndpointIncomingRequest : IEndpointIncomingRequest + { + public TaskCompletionSource ProcessingStarted { get; } = new TaskCompletionSource(); + public TaskCompletionSource ProcessingCompleted { get; } = new TaskCompletionSource(); + public StatusCode? CompletedStatusCode { get; private set; } + + public IServiceRequest Request => null; + + public SecureChannelContext SecureChannelContext => null; + + public ValueTask CallAsync(CancellationToken cancellationToken = default) + { + ProcessingStarted.TrySetResult(true); + return new ValueTask(ProcessingCompleted.Task); + } + + public void OperationCompleted(IServiceResponse response, ServiceResult error) + { + CompletedStatusCode = error?.StatusCode; + ProcessingCompleted.TrySetResult(true); + } + } + + private sealed class TestServer : ServerBase + { + public TestServer() + : base(NUnitTelemetryContext.Create(true)) + { + } + + public void ReplaceRequestQueue(int minThreads, int maxThreads, int maxQueue) + { + FieldInfo field = typeof(ServerBase).GetField( + "m_requestQueue", + BindingFlags.NonPublic | BindingFlags.Instance); + + var oldQueue = field.GetValue(this) as IDisposable; + oldQueue?.Dispose(); + + var newQueue = new RequestQueue(this, minThreads, maxThreads, maxQueue); + field.SetValue(this, newQueue); + } + + protected override Task ProcessRequestAsync(IEndpointIncomingRequest request, CancellationToken cancellationToken = default) + { + return request.CallAsync(cancellationToken).AsTask(); + } + } + + [Test] + public async Task TestRequestProcessingAsync() + { + using var server = new TestServer(); + server.ReplaceRequestQueue(2, 4, 10); + + var request1 = new TestEndpointIncomingRequest(); + server.ScheduleIncomingRequest(request1); + + using var cts = new CancellationTokenSource(5000); + Task t1 = request1.ProcessingStarted.Task; + if (await Task.WhenAny(t1, Task.Delay(5000, cts.Token)).ConfigureAwait(false) != t1) + { + Assert.Fail("Timed out waiting for processing to start."); + } + + request1.ProcessingCompleted.SetResult(true); + Assert.That(request1.CompletedStatusCode, Is.Null, "Request should not have an error status."); + } + + [Test] + public async Task TestTooManyRequestsAsync() + { + using var server = new TestServer(); + server.ReplaceRequestQueue(1, 1, 1); + + var req1 = new TestEndpointIncomingRequest(); + var req2 = new TestEndpointIncomingRequest(); + var req3 = new TestEndpointIncomingRequest(); + + server.ScheduleIncomingRequest(req1); + + // Wait for req1 to start processing so it leaves the queue. + using var cts = new CancellationTokenSource(5000); + Task t1 = req1.ProcessingStarted.Task; + if (await Task.WhenAny(t1, Task.Delay(5000, cts.Token)).ConfigureAwait(false) != t1) + { + Assert.Fail("Timed out waiting for processing to start."); + } + + // req1 is active (taking the 1 thread), queue is now empty. Capacity is 1. + server.ScheduleIncomingRequest(req2); // Goes to queue. + server.ScheduleIncomingRequest(req3); // Should fail to enter queue. + + Assert.That(req3.CompletedStatusCode, Is.EqualTo(StatusCodes.BadServerTooBusy)); + + req1.ProcessingCompleted.TrySetResult(true); + + Task t2 = req2.ProcessingStarted.Task; + if (await Task.WhenAny(t2, Task.Delay(5000, cts.Token)).ConfigureAwait(false) != t2) + { + Assert.Fail("Timed out waiting for req2 processing to start."); + } + + req2.ProcessingCompleted.TrySetResult(true); + req3.ProcessingCompleted.TrySetResult(true); + } + + [Test] + public async Task TestHighLoadAsync() + { + using var server = new TestServer(); + server.ReplaceRequestQueue(4, 10, 1000); + + const int count = 500; + var requests = new TestEndpointIncomingRequest[count]; + + for (int i = 0; i < count; i++) + { + requests[i] = new TestEndpointIncomingRequest(); + + // Automatically complete request when it starts to parallelize processing + TestEndpointIncomingRequest req = requests[i]; + _ = req.ProcessingStarted.Task.ContinueWith(_ => req.ProcessingCompleted.TrySetResult(true), TaskScheduler.Default); + } + + var tcsStart = new TaskCompletionSource(); + + var loadTask = Task.Run(async () => + { + await tcsStart.Task.ConfigureAwait(false); + for (int i = 0; i < count; i++) + { + server.ScheduleIncomingRequest(requests[i]); + } + }); + + tcsStart.SetResult(true); + await loadTask.ConfigureAwait(false); + + // Wait for all to finish + using var cts = new CancellationTokenSource(5000); + Task allFinished = Task.WhenAll(Array.ConvertAll(requests, r => r.ProcessingCompleted.Task)); + if (await Task.WhenAny(allFinished, Task.Delay(5000, cts.Token)).ConfigureAwait(false) != allFinished) + { + Assert.Fail("Timed out waiting for all requests to finish."); + } + + for (int i = 0; i < count; i++) + { + Assert.That(requests[i].CompletedStatusCode, Is.Not.EqualTo(StatusCodes.BadServerTooBusy)); + Assert.That(requests[i].CompletedStatusCode, Is.Not.EqualTo(StatusCodes.BadServerHalted)); + } + } + + [Test] + public async Task TestDisposeAsync() + { + using var server = new TestServer(); + server.ReplaceRequestQueue(1, 1, 10); + + var req1 = new TestEndpointIncomingRequest(); + var req2 = new TestEndpointIncomingRequest(); + + server.ScheduleIncomingRequest(req1); + + // Wait for req1 to start processing so it leaves the queue. + using var cts = new CancellationTokenSource(5000); + Task t1 = req1.ProcessingStarted.Task; + if (await Task.WhenAny(t1, Task.Delay(5000, cts.Token)).ConfigureAwait(false) != t1) + { + Assert.Fail("Timed out waiting for processing to start."); + } + + // req1 is active (taking the 1 thread) + // queue is now empty. Capacity is 10. + server.ScheduleIncomingRequest(req2); // Goes to queue. + + // Request queue has req2 pending. + // Dispose the server, which disposes the queue. + var disposeTask = Task.Run(server.Dispose); + + // req2 should be failed with BadServerHalted. + Task t2 = req2.ProcessingCompleted.Task; + if (await Task.WhenAny(t2, Task.Delay(5000, cts.Token)).ConfigureAwait(false) != t2) + { + Assert.Fail("Timed out waiting for req2 processing to be completed via Dispose."); + } + Assert.That(req2.CompletedStatusCode, Is.EqualTo(StatusCodes.BadServerHalted)); + + // Scheduling a new request should immediately fail with BadServerHalted + var req3 = new TestEndpointIncomingRequest(); + server.ScheduleIncomingRequest(req3); + Assert.That(req3.CompletedStatusCode, Is.EqualTo(StatusCodes.BadServerHalted)); + + req1.ProcessingCompleted.TrySetResult(true); + await disposeTask.ConfigureAwait(false); + } + } +} diff --git a/Tests/Opc.Ua.Server.Tests/DiagnosticsNodeManagerTests.cs b/Tests/Opc.Ua.Server.Tests/DiagnosticsNodeManagerTests.cs new file mode 100644 index 000000000..f40d58b51 --- /dev/null +++ b/Tests/Opc.Ua.Server.Tests/DiagnosticsNodeManagerTests.cs @@ -0,0 +1,996 @@ +/* ======================================================================== + * Copyright (c) 2005-2025 The OPC Foundation, Inc. All rights reserved. + * + * OPC Foundation MIT License 1.00 + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + * + * The complete license agreement can be found here: + * http://opcfoundation.org/License/MIT/1.00/ + * ======================================================================*/ + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; +using NUnit.Framework; +using Opc.Ua.Tests; + +namespace Opc.Ua.Server.Tests +{ + [TestFixture] + [Category("DiagnosticsNodeManager")] + [Parallelizable] + public class DiagnosticsNodeManagerTests + { + private Mock m_serverMock; + private Mock m_coreNodeManagerMock; + private Mock m_subscriptionManagerMock; + + private void SetupServerMock() + { + ITelemetryContext telemetry = NUnitTelemetryContext.Create(); + + var namespaces = new NamespaceTable(); + namespaces.Append(Ua.Namespaces.OpcUa); + var serverUris = new StringTable(); + var typeTree = new TypeTable(namespaces); + var messageContext = new ServiceMessageContext(telemetry) { NamespaceUris = namespaces, ServerUris = serverUris }; + + m_coreNodeManagerMock = new Mock(); + m_coreNodeManagerMock.Setup(m => m.ImportNodesAsync( + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(new ValueTask()); + + m_subscriptionManagerMock = new Mock(); + + var masterNodeManagerMock = new Mock(); + masterNodeManagerMock.Setup(m => m.RemoveReferencesAsync(It.IsAny>(), It.IsAny())) + .Returns(new ValueTask()); + + m_serverMock = new Mock(); + m_serverMock.Setup(s => s.Telemetry).Returns(telemetry); + m_serverMock.Setup(s => s.NamespaceUris).Returns(namespaces); + m_serverMock.Setup(s => s.ServerUris).Returns(serverUris); + m_serverMock.Setup(s => s.TypeTree).Returns(typeTree); + m_serverMock.Setup(s => s.MessageContext).Returns(messageContext); + m_serverMock.Setup(s => s.CoreNodeManager).Returns(m_coreNodeManagerMock.Object); + m_serverMock.Setup(s => s.NodeManager).Returns(masterNodeManagerMock.Object); + m_serverMock.Setup(s => s.SubscriptionManager).Returns(m_subscriptionManagerMock.Object); + m_serverMock.Setup(s => s.Factory).Returns(new Mock().Object); + + var defSysContext = new ServerSystemContext(m_serverMock.Object); + m_serverMock.Setup(s => s.DefaultSystemContext).Returns(defSysContext); + } + + [Test] + public async Task CreateAddressSpace_HooksUpGetMonitoredItemsAndResendDataAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + GetMonitoredItemsMethodState getMonitoredItems = manager.FindPredefinedNode(MethodIds.Server_GetMonitoredItems); + Assert.That(getMonitoredItems, Is.Not.Null, "GetMonitoredItems should exist."); + Assert.That(getMonitoredItems.OnCallMethod, Is.Not.Null, "GetMonitoredItems OnCallMethod should be wired."); + + PropertyState getMonitoredItemsOutputArgs = manager.FindPredefinedNode(VariableIds.Server_GetMonitoredItems_OutputArguments); + Assert.That(getMonitoredItemsOutputArgs, Is.Not.Null, "GetMonitoredItems output arguments should exist."); + Assert.That(getMonitoredItemsOutputArgs.Value.IsNull, Is.False, "Output arguments value should be initialized."); + + ResendDataMethodState resendData = manager.FindPredefinedNode(MethodIds.Server_ResendData); + Assert.That(resendData, Is.Not.Null, "ResendData should exist."); + Assert.That(resendData.OnCallMethod, Is.Not.Null, "ResendData OnCallMethod should be wired."); + } + + [Test] + public async Task CreateAddressSpace_DurableSubscriptionsEnabled_SetsOnCallAsync() + { + var config = new ApplicationConfiguration + { + ServerConfiguration = new ServerConfiguration + { + DurableSubscriptionsEnabled = true + } + }; + SetupServerMock(); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + SetSubscriptionDurableMethodState setSubscriptionDurable = + manager.FindPredefinedNode(MethodIds.Server_SetSubscriptionDurable); + + Assert.That(setSubscriptionDurable, Is.Not.Null, "SetSubscriptionDurable should exist."); + Assert.That(setSubscriptionDurable.OnCall, Is.Not.Null, "SetSubscriptionDurable OnCall method should be wired."); + } + + [Test] + public async Task CreateAddressSpace_DurableSubscriptionsDisabled_DeletesNodeAsync() + { + var config = new ApplicationConfiguration + { + ServerConfiguration = new ServerConfiguration + { + DurableSubscriptionsEnabled = false + } + }; + SetupServerMock(); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + SetSubscriptionDurableMethodState setSubscriptionDurable = + manager.FindPredefinedNode(MethodIds.Server_SetSubscriptionDurable); + Assert.That(setSubscriptionDurable, Is.Null, "SetSubscriptionDurable should not exist."); + } + + [Test] + public async Task SetSubscriptionDurable_CallsSubscriptionManagerAsync() + { + var config = new ApplicationConfiguration + { + ServerConfiguration = new ServerConfiguration + { + DurableSubscriptionsEnabled = true + } + }; + SetupServerMock(); + + uint mockRevisedLifetime = 50; + m_subscriptionManagerMock.Setup(m => m.SetSubscriptionDurable( + It.IsAny(), + 1234u, + 100u, + out mockRevisedLifetime)) + .Returns(StatusCodes.Good); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + SetSubscriptionDurableMethodState setSubscriptionDurable = + manager.FindPredefinedNode(MethodIds.Server_SetSubscriptionDurable); + + uint actualRevisedLifetime = 0; + ServiceResult result = setSubscriptionDurable.OnCall( + manager.SystemContext, + setSubscriptionDurable, +ObjectIds.Server, + 1234, + 100, + ref actualRevisedLifetime); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(actualRevisedLifetime, Is.EqualTo(50)); + m_subscriptionManagerMock.Verify(m => m.SetSubscriptionDurable(It.IsAny(), 1234, 100, out It.Ref.IsAny), Times.Once); + } + + [Test] + public async Task GetMonitoredItems_ValidatesSessionAndReturnsItemsAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1234); + subMock.Setup(s => s.SessionId).Returns(new NodeId(1, 1)); + ArrayOf serverHandles = [1, 2]; + ArrayOf clientHandles = [3, 4]; + subMock.Setup(s => s.GetMonitoredItems(out serverHandles, out clientHandles)); + + ISubscription outSub = subMock.Object; + m_subscriptionManagerMock.Setup(m => m.TryGetSubscription(It.IsAny(), out outSub)).Returns(true); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + GetMonitoredItemsMethodState getMonitoredItems = manager.FindPredefinedNode(MethodIds.Server_GetMonitoredItems); + + var sysContextMock = new Mock(); + sysContextMock.Setup(c => c.SessionId).Returns(new NodeId(1, 1)); + + ArrayOf inputs = [new Variant(1234u)]; + var outputs = new List { Variant.Null, Variant.Null }; + + ServiceResult result = getMonitoredItems.OnCallMethod( + sysContextMock.Object, + getMonitoredItems, + inputs, + outputs); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(outputs[0].GetUInt32Array(), Is.EqualTo(serverHandles)); + Assert.That(outputs[1].GetUInt32Array(), Is.EqualTo(clientHandles)); + + // Test access denied (different session ID) + sysContextMock.Setup(c => c.SessionId).Returns(new NodeId(2, 1)); + result = getMonitoredItems.OnCallMethod( + sysContextMock.Object, + getMonitoredItems, + inputs, + outputs); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.BadUserAccessDenied)); + } + + [Test] + public async Task GetMonitoredItems_InvalidSubscriptionId_ReturnsBadSubscriptionIdInvalidAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + ISubscription outSub = null; + m_subscriptionManagerMock.Setup(m => m.TryGetSubscription(It.IsAny(), out outSub)).Returns(false); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + GetMonitoredItemsMethodState getMonitoredItems = manager.FindPredefinedNode(MethodIds.Server_GetMonitoredItems); + + var sysContextMock = new Mock(); + ArrayOf inputs = [new Variant(1234u)]; + var outputs = new List { Variant.Null, Variant.Null }; + + ServiceResult result = getMonitoredItems.OnCallMethod( + sysContextMock.Object, + getMonitoredItems, + inputs, + outputs); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.BadSubscriptionIdInvalid)); + } + + [Test] + public async Task ResendData_ValidatesAccessAndCallsResendDataAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1234); + subMock.Setup(s => s.SessionId).Returns(new NodeId(1, 1)); + subMock.Setup(s => s.ResendData(It.IsAny())); + + ISubscription outSub = subMock.Object; + m_subscriptionManagerMock.Setup(m => m.TryGetSubscription(It.IsAny(), out outSub)).Returns(true); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + ResendDataMethodState resendData = manager.FindPredefinedNode(MethodIds.Server_ResendData); + + var reqHeader = new RequestHeader(); + var sessionMock = new Mock(); + sessionMock.Setup(s => s.Id).Returns(new NodeId(1, 1)); + sessionMock.Setup(s => s.EffectiveIdentity).Returns(new UserIdentity()); + + var opContext = new OperationContext(reqHeader, null, RequestType.Read, sessionMock.Object); + var sysContext = new ServerSystemContext(m_serverMock.Object, opContext); + + ArrayOf inputs = [new Variant(1234u)]; + var outputs = new List(); + + ServiceResult result = resendData.OnCallMethod( + sysContext, + resendData, + inputs, + outputs); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + subMock.Verify(s => s.ResendData(opContext), Times.Once); + + // Test bad access denied + sessionMock.Setup(s => s.Id).Returns(new NodeId(2, 1)); + result = resendData.OnCallMethod( + sysContext, + resendData, + inputs, + outputs); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.BadUserAccessDenied)); + } + + [Test] + public async Task ResendData_InvalidSubscriptionId_ReturnsBadSubscriptionIdInvalidAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + ISubscription outSub = null; + m_subscriptionManagerMock.Setup(m => m.TryGetSubscription(It.IsAny(), out outSub)).Returns(false); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + ResendDataMethodState resendData = manager.FindPredefinedNode(MethodIds.Server_ResendData); + + var reqHeader = new RequestHeader(); + var sessionMock = new Mock(); + sessionMock.Setup(s => s.Id).Returns(new NodeId(1, 1)); + sessionMock.Setup(s => s.EffectiveIdentity).Returns(new UserIdentity()); + + var opContext = new OperationContext(reqHeader, null, RequestType.Read, sessionMock.Object); + var sysContext = new ServerSystemContext(m_serverMock.Object, opContext); + + ArrayOf inputs = [new Variant(1234u)]; + var outputs = new List(); + + ServiceResult result = resendData.OnCallMethod( + sysContext, + resendData, + inputs, + outputs); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.BadSubscriptionIdInvalid)); + } + + [Test] + public async Task NodesAreCreatedCorrectlyAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + ServerObjectState serverObj = manager.FindPredefinedNode(ObjectIds.Server); + Assert.That(serverObj, Is.Not.Null, "Server object should be present."); + + ServerDiagnosticsState serverDiagnostics = manager.FindPredefinedNode(ObjectIds.Server_ServerDiagnostics); + Assert.That(serverDiagnostics, Is.Not.Null, "ServerDiagnostics should be present."); + + HistoryServerCapabilitiesState historyCapabilities = + manager.FindPredefinedNode(ObjectIds.HistoryServerCapabilities); + Assert.That(historyCapabilities, Is.Not.Null, "HistoryServerCapabilities should be present."); + } + + [Test] + public async Task SetDiagnosticsEnabledAsync_SameState_DoesNothingAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + // By default DiagnosticsEnabled is true + Assert.That(manager.DiagnosticsEnabled, Is.True); + + await manager.SetDiagnosticsEnabledAsync(manager.SystemContext, true).ConfigureAwait(false); + + // Should remain true without side effects + Assert.That(manager.DiagnosticsEnabled, Is.True); + } + + [Test] + public async Task SetDiagnosticsEnabledAsync_WhenDisabled_DeletesNodesAndStopsTimerAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + // Setup a mock update callback + static ServiceResult UpdateCallback(ISystemContext ctx, NodeState node, ref Variant value) => ServiceResult.Good; + + // Add a mock session to verify deletion path + var sessionDiag = new SessionDiagnosticsDataType { SessionName = "TestSession" }; + var sessionSecDiag = new SessionSecurityDiagnosticsDataType(); + + NodeId sessionId = await manager.CreateSessionDiagnosticsAsync( + manager.SystemContext, + sessionDiag, + UpdateCallback, + sessionSecDiag, + UpdateCallback).ConfigureAwait(false); + + Assert.That(sessionId.IsNull, Is.False); + + // Add a mock subscription to verify deletion path + var subDiag = new SubscriptionDiagnosticsDataType { SubscriptionId = 1, SessionId = sessionId }; + NodeId subId = await manager.CreateSubscriptionDiagnosticsAsync( + manager.SystemContext, + subDiag, + UpdateCallback).ConfigureAwait(false); + + Assert.That(subId.IsNull, Is.False); + + // Check node exists before deleting + SessionDiagnosticsObjectState sessionNodeBefore = manager.FindPredefinedNode(sessionId); + Assert.That(sessionNodeBefore, Is.Not.Null); + + SubscriptionDiagnosticsState subNodeBefore = manager.FindPredefinedNode(subId); + Assert.That(subNodeBefore, Is.Not.Null); + + // Act: Disable diagnostics + await manager.SetDiagnosticsEnabledAsync(manager.SystemContext, false).ConfigureAwait(false); + + Assert.That(manager.DiagnosticsEnabled, Is.False); + + // Verifying the nodes have been removed from the PredefinedNodes is a strong indicator DeleteNodeAsync was called + SessionDiagnosticsObjectState sessionNodeAfter = manager.FindPredefinedNode(sessionId); + Assert.That(sessionNodeAfter, Is.Null, "Session node should be deleted."); + + // Also check subscription diagnostics array was updated / removed + SubscriptionDiagnosticsState subNodeAfter = manager.FindPredefinedNode(subId); + Assert.That(subNodeAfter, Is.Null, "Subscription node should be deleted."); + } + + [Test] + public async Task SetDiagnosticsEnabledAsync_WhenEnabled_ClearsArraysAndStartsScanAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + static ServiceResult UpdateCallback(ISystemContext ctx, NodeState node, ref Variant value) => ServiceResult.Good; + await manager.CreateServerDiagnosticsAsync(manager.SystemContext, new ServerDiagnosticsSummaryDataType(), UpdateCallback, CancellationToken.None) + .ConfigureAwait(false); + + // start disabled + await manager.SetDiagnosticsEnabledAsync(manager.SystemContext, false).ConfigureAwait(false); + Assert.That(manager.DiagnosticsEnabled, Is.False); + + // Act: Enable diagnostics + await manager.SetDiagnosticsEnabledAsync(manager.SystemContext, true).ConfigureAwait(false); + + Assert.That(manager.DiagnosticsEnabled, Is.True); + + // Verify arrays. Following SetDiagnosticsEnabledAsync(true), DoScan(true) is called. + // Empty sessions and subscriptions will result in empty arrays and 'Good' StatusCodes. + ServerDiagnosticsState serverDiagnostics = manager.FindPredefinedNode(ObjectIds.Server_ServerDiagnostics); + Assert.That(serverDiagnostics, Is.Not.Null); + + if (serverDiagnostics.SamplingIntervalDiagnosticsArray != null) + { + // This array is not evaluated heavily on normal DoScan, so it stays BadWaitingForInitialData + Assert.That(serverDiagnostics.SamplingIntervalDiagnosticsArray.StatusCode.Code, + Is.EqualTo(StatusCodes.BadWaitingForInitialData)); + } + + // The rest are populated with empty arrays due to no sessions in test + if (serverDiagnostics.SubscriptionDiagnosticsArray != null) + { + Assert.That(serverDiagnostics.SubscriptionDiagnosticsArray.Value, Is.Empty); + } + if (serverDiagnostics.SessionsDiagnosticsSummary?.SessionDiagnosticsArray != null) + { + Assert.That(serverDiagnostics.SessionsDiagnosticsSummary.SessionDiagnosticsArray.Value, Is.Empty); + } + if (serverDiagnostics.SessionsDiagnosticsSummary?.SessionSecurityDiagnosticsArray != null) + { + Assert.That(serverDiagnostics.SessionsDiagnosticsSummary.SessionSecurityDiagnosticsArray.Value, Is.Empty); + } + + ServerDiagnosticsSummaryState serverDiagSummary = + manager.FindPredefinedNode(VariableIds.Server_ServerDiagnostics_ServerDiagnosticsSummary); + Assert.That(serverDiagSummary, Is.Not.Null); + Assert.That(serverDiagSummary.StatusCode.Code, Is.EqualTo(StatusCodes.Good)); + } + + [Test] + public async Task CreateServerDiagnosticsAsync_WiresUpHandlers_AndPerformsActionsAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + static ServiceResult UpdateCallback(ISystemContext ctx, NodeState node, ref Variant value) => ServiceResult.Good; + await manager.CreateServerDiagnosticsAsync( + manager.SystemContext, + new ServerDiagnosticsSummaryDataType(), + UpdateCallback, + CancellationToken.None).ConfigureAwait(false); + + // 1. Verify ServerDiagnosticsSummaryState + ServerDiagnosticsSummaryState summaryNode = manager.FindPredefinedNode( +VariableIds.Server_ServerDiagnostics_ServerDiagnosticsSummary); + Assert.That(summaryNode, Is.Not.Null); + Assert.That(summaryNode.OnReadUserRolePermissions, Is.Not.Null, + "OnReadUserRolePermissions should be wired on ServerDiagnosticsSummary."); + + // Test OnReadUserRolePermissions (Non-admin context -> PermissionType.None) + ArrayOf permissions = default; + ServiceResult result = summaryNode.OnReadUserRolePermissions(manager.SystemContext, summaryNode, ref permissions); + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(permissions.IsNull, Is.False); + Assert.That(permissions.Count, Is.GreaterThan(0)); + Assert.That(permissions[0].Permissions, Is.EqualTo((uint)PermissionType.None)); + + // 2. Verify SessionDiagnosticsArrayState + SessionDiagnosticsArrayState sessionArrayNode = manager.FindPredefinedNode( +VariableIds.Server_ServerDiagnostics_SessionsDiagnosticsSummary_SessionDiagnosticsArray); + Assert.That(sessionArrayNode, Is.Not.Null); + Assert.That(sessionArrayNode.OnSimpleReadValue, Is.Not.Null, + "OnSimpleReadValue should be wired on SessionDiagnosticsArray."); + Assert.That(sessionArrayNode.OnReadUserRolePermissions, Is.Not.Null, + "OnReadUserRolePermissions should be wired on SessionDiagnosticsArray."); + + manager.ForceDiagnosticsScan(); + Variant arrayValue = default; + result = sessionArrayNode.OnSimpleReadValue(manager.SystemContext, sessionArrayNode, ref arrayValue); + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(arrayValue.IsNull, Is.False); // Depends on Variant.FromStructure + + // 3. Verify SessionSecurityDiagnosticsArrayState + SessionSecurityDiagnosticsArrayState sessionSecurityArrayNode = manager.FindPredefinedNode( +VariableIds.Server_ServerDiagnostics_SessionsDiagnosticsSummary_SessionSecurityDiagnosticsArray); + Assert.That(sessionSecurityArrayNode, Is.Not.Null); + Assert.That(sessionSecurityArrayNode.OnSimpleReadValue, Is.Not.Null, + "OnSimpleReadValue should be wired on SessionSecurityDiagnosticsArray."); + Assert.That(sessionSecurityArrayNode.OnReadUserRolePermissions, Is.Not.Null, + "OnReadUserRolePermissions should be wired on SessionSecurityDiagnosticsArray."); + + manager.ForceDiagnosticsScan(); + arrayValue = default; + result = sessionSecurityArrayNode.OnSimpleReadValue( + manager.SystemContext, + sessionSecurityArrayNode, + ref arrayValue); + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(arrayValue.IsNull, Is.False); + + // 4. Verify SubscriptionDiagnosticsArrayState + SubscriptionDiagnosticsArrayState subArrayNode = manager.FindPredefinedNode( +VariableIds.Server_ServerDiagnostics_SubscriptionDiagnosticsArray); + Assert.That(subArrayNode, Is.Not.Null); + Assert.That(subArrayNode.OnSimpleReadValue, Is.Not.Null, + "OnSimpleReadValue should be wired on SubscriptionDiagnosticsArray."); + Assert.That(subArrayNode.OnReadUserRolePermissions, Is.Not.Null, + "OnReadUserRolePermissions should be wired on SubscriptionDiagnosticsArray."); + + manager.ForceDiagnosticsScan(); + arrayValue = default; + result = subArrayNode.OnSimpleReadValue(manager.SystemContext, subArrayNode, ref arrayValue); + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(arrayValue.IsNull, Is.False); + } + + [Test] + public async Task ReadUserRolePermissions_AdminAndSessionHandlingAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + static ServiceResult UpdateCallback(ISystemContext ctx, NodeState node, ref Variant value) => ServiceResult.Good; + + // 1. Create server and session diagnostics + await manager.CreateServerDiagnosticsAsync( + manager.SystemContext, + new ServerDiagnosticsSummaryDataType(), + UpdateCallback, + CancellationToken.None).ConfigureAwait(false); + + var initialSessionId = new NodeId(1234, 1); + var sessionDiag = new SessionDiagnosticsDataType { SessionName = "TestSession", SessionId = initialSessionId }; + var sessionSecDiag = new SessionSecurityDiagnosticsDataType { SessionId = initialSessionId }; + + NodeId sessionId = await manager.CreateSessionDiagnosticsAsync( + manager.SystemContext, + sessionDiag, + UpdateCallback, + sessionSecDiag, + UpdateCallback).ConfigureAwait(false); + + // Get nodes for permissions testing + ServerDiagnosticsSummaryState summaryNode = + manager.FindPredefinedNode(VariableIds.Server_ServerDiagnostics_ServerDiagnosticsSummary); + SessionDiagnosticsObjectState sessionObjectNode = manager.FindPredefinedNode(sessionId); + + Assert.That(summaryNode, Is.Not.Null); + Assert.That(sessionObjectNode, Is.Not.Null); + Assert.That(summaryNode.OnReadUserRolePermissions, Is.Not.Null); + Assert.That(sessionObjectNode.OnReadUserRolePermissions, Is.Not.Null); + + // 2. Test Context: Admin + var identity = new UserIdentity("admin", []); + Role[] roles = [Role.SecurityAdmin]; + var namespaces = new NamespaceTable(); + namespaces.Append(Ua.Namespaces.OpcUa); + var roleIdentity = new RoleBasedIdentity(identity, roles, namespaces); + + var endpoint = new EndpointDescription { SecurityMode = MessageSecurityMode.SignAndEncrypt }; + var channelContext = new SecureChannelContext("1", endpoint, RequestEncoding.Binary); + var reqHeader = new RequestHeader(); + + var sessionMock = new Mock(); + sessionMock.Setup(s => s.Id).Returns(new NodeId(9999, 1)); // Different from sessionId to test admin overrides + sessionMock.Setup(s => s.EffectiveIdentity).Returns(roleIdentity); + + var opContextAdmin = new OperationContext(reqHeader, channelContext, RequestType.Read, sessionMock.Object); + var adminContext = new ServerSystemContext(m_serverMock.Object, opContextAdmin); + + ArrayOf permissionsAdmin = default; + summaryNode.OnReadUserRolePermissions(adminContext, summaryNode, ref permissionsAdmin); + Assert.That(permissionsAdmin.IsNull, Is.False); + Assert.That(permissionsAdmin.Count, Is.GreaterThan(0)); + Assert.That(permissionsAdmin[0].Permissions, Is.Not.EqualTo((uint)PermissionType.None), + "Admin should have permissions on summary"); + + permissionsAdmin = default; + sessionObjectNode.OnReadUserRolePermissions(adminContext, sessionObjectNode, ref permissionsAdmin); + Assert.That(permissionsAdmin.IsNull, Is.False); + Assert.That(permissionsAdmin[0].Permissions, Is.Not.EqualTo((uint)PermissionType.None), + "Admin should have permissions on any session"); + + // 3. Test Context: Session Owner (Non-Admin) + var normalIdentity = new UserIdentity("owner", []); + var normalRoleIdentity = new RoleBasedIdentity(normalIdentity, [Role.AuthenticatedUser], namespaces); + + var sessionOwnerMock = new Mock(); + sessionOwnerMock.Setup(s => s.Id).Returns(sessionId); // Same as sessionId + sessionOwnerMock.Setup(s => s.EffectiveIdentity).Returns(normalRoleIdentity); + + var opContextOwner = new OperationContext( + reqHeader, channelContext, RequestType.Read, sessionOwnerMock.Object); + var ownerContext = new ServerSystemContext(m_serverMock.Object, opContextOwner); + + ArrayOf permissionsOwner = default; + summaryNode.OnReadUserRolePermissions(ownerContext, summaryNode, ref permissionsOwner); + Assert.That(permissionsOwner.IsNull, Is.False); + Assert.That(permissionsOwner[0].Permissions, Is.EqualTo((uint)PermissionType.None), + "Non-admin owner should NOT have overall summary permissions"); + + permissionsOwner = default; + sessionObjectNode.OnReadUserRolePermissions(ownerContext, sessionObjectNode, ref permissionsOwner); + Assert.That(permissionsOwner.IsNull, Is.False); + Assert.That(permissionsOwner[0].Permissions, Is.Not.EqualTo((uint)PermissionType.None), + "Owner should have permissions on their own session"); + + // 4. Test Context: Other Session (Non-Admin) + var sessionOtherMock = new Mock(); + sessionOtherMock.Setup(s => s.Id).Returns(new NodeId(9999, 1)); // Different sessionId + sessionOtherMock.Setup(s => s.EffectiveIdentity).Returns(normalRoleIdentity); + + var opContextOther = new OperationContext( + reqHeader, channelContext, RequestType.Read, sessionOtherMock.Object); + var otherContext = new ServerSystemContext(m_serverMock.Object, opContextOther); + + ArrayOf permissionsOther = default; + sessionObjectNode.OnReadUserRolePermissions(otherContext, sessionObjectNode, ref permissionsOther); + Assert.That(permissionsOther.IsNull, Is.False); + Assert.That(permissionsOther[0].Permissions, Is.EqualTo((uint)PermissionType.None), + "Other non-admin session should NOT have permissions on this session"); + } + + [Test] + public async Task OnReadDiagnosticsArray_ExpectedBehaviorForDifferentTypesAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + var sessionDiag = new SessionDiagnosticsDataType { SessionName = "TestSession", SessionId = new NodeId(1, 1) }; + var sessionSecDiag = new SessionSecurityDiagnosticsDataType { SessionId = new NodeId(1, 1) }; + var subDiag = new SubscriptionDiagnosticsDataType { SubscriptionId = 100, SessionId = new NodeId(1, 1) }; + + var sessionDiag2 = new SessionDiagnosticsDataType { SessionName = "TestSession2", SessionId = new NodeId(2, 1) }; + var sessionSecDiag2 = new SessionSecurityDiagnosticsDataType { SessionId = new NodeId(2, 1) }; + var subDiag2 = new SubscriptionDiagnosticsDataType { SubscriptionId = 200, SessionId = new NodeId(2, 1) }; + + ServiceResult UpdateCallback(ISystemContext ctx, NodeState node, ref Variant value) + { + var instanceState = node as BaseInstanceState; + if (node.BrowseName.Name == BrowseNames.SessionDiagnostics) + { + value = instanceState?.Parent?.BrowseName.Name == "TestSession" ? Variant.FromStructure(sessionDiag) : Variant.FromStructure(sessionDiag2); + } + else if (node.BrowseName.Name == BrowseNames.SessionSecurityDiagnostics) + { + value = instanceState?.Parent?.BrowseName.Name == "TestSession" + ? Variant.FromStructure(sessionSecDiag) + : Variant.FromStructure(sessionSecDiag2); + } + else if (node.BrowseName.Name == "100") + { + value = Variant.FromStructure(subDiag); // Subscription node is named after SubscriptionId + } + else if (node.BrowseName.Name == "200") + { + value = Variant.FromStructure(subDiag2); + } + else + { + value = Variant.FromStructure(new ServerDiagnosticsSummaryDataType()); + } + return ServiceResult.Good; + } + + await manager.CreateServerDiagnosticsAsync(manager.SystemContext, new ServerDiagnosticsSummaryDataType(), UpdateCallback, CancellationToken.None) + .ConfigureAwait(false); + + NodeId sessionId = await manager.CreateSessionDiagnosticsAsync( + manager.SystemContext, + sessionDiag, + UpdateCallback, + sessionSecDiag, + UpdateCallback).ConfigureAwait(false); + + subDiag.SessionId = sessionId; + + NodeId subId = await manager.CreateSubscriptionDiagnosticsAsync( + manager.SystemContext, + subDiag, + UpdateCallback).ConfigureAwait(false); + + subDiag2.SessionId = await manager.CreateSessionDiagnosticsAsync( + manager.SystemContext, + sessionDiag2, + UpdateCallback, + sessionSecDiag2, + UpdateCallback).ConfigureAwait(false); + + NodeId subId2 = await manager.CreateSubscriptionDiagnosticsAsync( + manager.SystemContext, + subDiag2, + UpdateCallback).ConfigureAwait(false); + + manager.ForceDiagnosticsScan(); + + var adminIdentity = new UserIdentity("admin", []); + var namespaces = new NamespaceTable(); + namespaces.Append(Ua.Namespaces.OpcUa); + var roleIdentity = new RoleBasedIdentity(adminIdentity, [Role.SecurityAdmin], namespaces); + var sessionMock = new Mock(); + sessionMock.Setup(s => s.Id).Returns(new NodeId(9999, 1)); + sessionMock.Setup(s => s.EffectiveIdentity).Returns(roleIdentity); + var endpoint = new EndpointDescription { SecurityMode = MessageSecurityMode.SignAndEncrypt }; + var secureChannelContext = new SecureChannelContext("1", endpoint, RequestEncoding.Binary); + var opContext = new OperationContext( + new RequestHeader(), + secureChannelContext, + RequestType.Read, + sessionMock.Object); + var adminContext = new ServerSystemContext(m_serverMock.Object, opContext); + + SessionDiagnosticsArrayState sessionArrayNode = manager.FindPredefinedNode( +VariableIds.Server_ServerDiagnostics_SessionsDiagnosticsSummary_SessionDiagnosticsArray); + Variant arrayValue = default; + ServiceResult result = sessionArrayNode.OnSimpleReadValue(adminContext, sessionArrayNode, ref arrayValue); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(arrayValue.TryGet(out ArrayOf sessionArray), Is.True); + Assert.That(sessionArray.Count, Is.EqualTo(2)); + sessionArray[0].TryGetEncodeable(out SessionDiagnosticsDataType sessionDiagObj); + Assert.That(sessionDiagObj.SessionName, Is.EqualTo("TestSession")); + + SessionSecurityDiagnosticsArrayState sessionSecurityArrayNode = manager.FindPredefinedNode( +VariableIds.Server_ServerDiagnostics_SessionsDiagnosticsSummary_SessionSecurityDiagnosticsArray); + arrayValue = default; + result = sessionSecurityArrayNode.OnSimpleReadValue(adminContext, sessionSecurityArrayNode, ref arrayValue); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(arrayValue.TryGet(out ArrayOf sessionSecurityArray), Is.True); + Assert.That(sessionSecurityArray.Count, Is.EqualTo(2)); + sessionSecurityArray[0].TryGetEncodeable(out SessionSecurityDiagnosticsDataType sessionSecDiagObj); + Assert.That(sessionSecDiagObj.SessionId, Is.EqualTo(sessionId)); + + SubscriptionDiagnosticsArrayState subArrayNode = manager.FindPredefinedNode( +VariableIds.Server_ServerDiagnostics_SubscriptionDiagnosticsArray); + + arrayValue = default; + result = subArrayNode.OnSimpleReadValue(adminContext, subArrayNode, ref arrayValue); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(arrayValue.TryGet(out ArrayOf subArray), Is.True); + Assert.That(subArray.Count, Is.EqualTo(2)); + subArray[0].TryGetEncodeable(out SubscriptionDiagnosticsDataType subDiagObj); + Assert.That(subDiagObj.SubscriptionId, Is.EqualTo(100)); + + // Test unauthorized non-admin user accessing their own session + var normalIdentity = new UserIdentity("user", []); + var normalRoleIdentity = new RoleBasedIdentity(normalIdentity, [Role.AuthenticatedUser], namespaces); + var normalSessionMock = new Mock(); + normalSessionMock.Setup(s => s.Id).Returns(sessionId); // set to first session + normalSessionMock.Setup(s => s.EffectiveIdentity).Returns(normalRoleIdentity); + var normalOpContext = new OperationContext( + new RequestHeader(), + secureChannelContext, + RequestType.Read, + normalSessionMock.Object); + var normalContext = new ServerSystemContext(m_serverMock.Object, normalOpContext); + + manager.ForceDiagnosticsScan(); + + arrayValue = default; + result = sessionArrayNode.OnSimpleReadValue(normalContext, sessionArrayNode, ref arrayValue); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(arrayValue.TryGet(out ArrayOf normalSessionArray), Is.True); + Assert.That(normalSessionArray.Count, Is.EqualTo(1)); + normalSessionArray[0].TryGetEncodeable(out SessionDiagnosticsDataType normalSessionDiagObj); + Assert.That(normalSessionDiagObj.SessionName, Is.EqualTo("TestSession")); + + manager.ForceDiagnosticsScan(); + + arrayValue = default; + result = sessionSecurityArrayNode.OnSimpleReadValue(normalContext, sessionSecurityArrayNode, ref arrayValue); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(arrayValue.TryGet(out ArrayOf normalSessionSecArray), Is.True); + Assert.That(normalSessionSecArray.Count, Is.EqualTo(1)); + normalSessionSecArray[0].TryGetEncodeable(out SessionSecurityDiagnosticsDataType normalSessionSecDiagObj); + Assert.That(normalSessionSecDiagObj.SessionId, Is.EqualTo(sessionId)); + + manager.ForceDiagnosticsScan(); + + arrayValue = default; + result = subArrayNode.OnSimpleReadValue(normalContext, subArrayNode, ref arrayValue); + + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + Assert.That(arrayValue.TryGet(out ArrayOf normalSubArray), Is.True); + Assert.That(normalSubArray.Count, Is.EqualTo(1)); + normalSubArray[0].TryGetEncodeable(out SubscriptionDiagnosticsDataType normalSubDiagObj); + Assert.That(normalSubDiagObj.SessionId, Is.EqualTo(sessionId)); + + // Test if recently scanned just returns Good + result = sessionArrayNode.OnSimpleReadValue(adminContext, sessionArrayNode, ref arrayValue); + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.Good)); + + // Test behavior when diagnostics are disabled + await manager.SetDiagnosticsEnabledAsync(manager.SystemContext, false).ConfigureAwait(false); + result = sessionArrayNode.OnSimpleReadValue(adminContext, sessionArrayNode, ref arrayValue); + Assert.That(result.StatusCode, Is.EqualTo(StatusCodes.BadOutOfService)); + } + + [Test] + public async Task MonitoredItemLifecycle_ChangesDiagnosticsMonitoring_And_SamplingAsync() + { + var config = new ApplicationConfiguration { ServerConfiguration = new ServerConfiguration() }; + SetupServerMock(); + + // Required for CreateMonitoredItemsAsync + m_serverMock.Setup(s => s.MonitoredItemQueueFactory) + .Returns(new MonitoredItemQueueFactory(m_serverMock.Object.Telemetry)); + + using var manager = new DiagnosticsNodeManager(m_serverMock.Object, config, NullLogger.Instance); + var externalRefs = new Dictionary>(); + await manager.CreateAddressSpaceAsync(externalRefs).ConfigureAwait(false); + + var endpoint = new EndpointDescription { SecurityMode = MessageSecurityMode.SignAndEncrypt }; + var secureChannelContext = new SecureChannelContext("1", endpoint, RequestEncoding.Binary); + + var mockIdentity = new UserIdentity("admin", []); + var mockRoleIdentity = new RoleBasedIdentity(mockIdentity, [Role.SecurityAdmin], m_serverMock.Object.NamespaceUris); + + var sessionMock = new Mock(); + sessionMock.Setup(s => s.Id).Returns(new NodeId(1, 1)); + sessionMock.Setup(s => s.EffectiveIdentity).Returns(mockRoleIdentity); + + var opContext = new OperationContext(new RequestHeader(), secureChannelContext, RequestType.Read, sessionMock.Object); + var sysContext = new ServerSystemContext(m_serverMock.Object, opContext); + + static ServiceResult UpdateCallback(ISystemContext ctx, NodeState node, ref Variant value) + { + value = Variant.FromStructure(new ServerDiagnosticsSummaryDataType { CurrentSessionCount = 42 }); + return ServiceResult.Good; + } + + await manager.CreateServerDiagnosticsAsync( + manager.SystemContext, + new ServerDiagnosticsSummaryDataType(), + UpdateCallback, + CancellationToken.None).ConfigureAwait(false); + + ServerDiagnosticsSummaryState summaryNode = + manager.FindPredefinedNode(VariableIds.Server_ServerDiagnostics_ServerDiagnosticsSummary); + Assert.That(summaryNode, Is.Not.Null); + + var itemToCreate = new MonitoredItemCreateRequest + { + ItemToMonitor = new ReadValueId { NodeId = summaryNode.NodeId, AttributeId = Attributes.Value }, + MonitoringMode = MonitoringMode.Reporting, + RequestedParameters = new MonitoringParameters { ClientHandle = 1, SamplingInterval = 10, QueueSize = 10 } + }; + + var itemsToCreate = new List { itemToCreate }; + var createErrors = new List { null }; + var filterErrors = new List { null }; + var monitoredItems = new List { null }; + + await manager.CreateMonitoredItemsAsync( + opContext, + 1, + 100, +TimestampsToReturn.Both, + itemsToCreate, + createErrors, + filterErrors, + monitoredItems, + false, + new MonitoredItemIdFactory()).ConfigureAwait(false); + + Assert.That(ServiceResult.IsGood(createErrors[0]), Is.True); + var monitoredItem = monitoredItems[0] as IDataChangeMonitoredItem; + Assert.That(monitoredItem, Is.Not.Null); + + // Push an update + manager.ForceDiagnosticsScan(); + + // Wait for internal timer to call DoSample + bool valueQueued = false; + for (int i = 0; i < 10; i++) + { + var notifications = new Queue(); + var diagnostics = new Queue(); + monitoredItem.Publish( + opContext, + notifications, + diagnostics, + 1, + NullLogger.Instance); + + if (notifications.Count > 0) + { + valueQueued = true; + break; + } + + await Task.Delay(250).ConfigureAwait(false); + } + + Assert.That(valueQueued, Is.True, "MonitoredItem did not receive updated value from sampling timer."); + + // Delete nodes + var processedItems = new List { false }; + var deleteErrors = new List { null }; + + await manager.DeleteMonitoredItemsAsync( + opContext, + [monitoredItem], + processedItems, + deleteErrors).ConfigureAwait(false); + + Assert.That(ServiceResult.IsGood(deleteErrors[0]), Is.True); + } + } +} diff --git a/Tests/Opc.Ua.Server.Tests/MonitoredItemBenchmarks.cs b/Tests/Opc.Ua.Server.Tests/MonitoredItemBenchmarks.cs index 58e3fc076..830262a33 100644 --- a/Tests/Opc.Ua.Server.Tests/MonitoredItemBenchmarks.cs +++ b/Tests/Opc.Ua.Server.Tests/MonitoredItemBenchmarks.cs @@ -1,9 +1,12 @@ using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Configs; +using Moq; +using Opc.Ua.Tests; namespace Opc.Ua.Server.Tests { [GroupBenchmarksBy(BenchmarkLogicalGroupRule.ByCategory)] + [MemoryDiagnoser] public class MonitoredItemBenchmarks { private DataValue m_valueDouble; @@ -13,6 +16,9 @@ public class MonitoredItemBenchmarks private DataValue m_valueArrayDouble; private DataValue m_lastValueArrayDouble; private DataChangeFilter m_filter; + private MonitoredItem m_monitoredItem; + private BaseEventState m_event1; + private BaseEventState m_event2; private readonly double m_range = 100.0; private const int kIterations = 10000; private static readonly double[] s_value = [1.0, 2.0, 3.0, 4.0, 5.0]; @@ -31,6 +37,122 @@ public void Setup() { Trigger = DataChangeTrigger.StatusValue }; + + ITelemetryContext telemetry = NUnitTelemetryContext.Create(); + + var eventFilter = new EventFilter + { + SelectClauses = [ + new SimpleAttributeOperand + { + TypeDefinitionId = ObjectTypeIds.BaseEventType, + BrowsePath = [new QualifiedName(BrowseNames.EventId)], + AttributeId = Attributes.Value + }, + new SimpleAttributeOperand + { + TypeDefinitionId = ObjectTypeIds.BaseEventType, + BrowsePath = [new QualifiedName(BrowseNames.EventType)], + AttributeId = Attributes.Value + }, + new SimpleAttributeOperand + { + TypeDefinitionId = ObjectTypeIds.BaseEventType, + BrowsePath = [new QualifiedName(BrowseNames.SourceNode)], + AttributeId = Attributes.Value + }, + new SimpleAttributeOperand + { + TypeDefinitionId = ObjectTypeIds.BaseEventType, + BrowsePath = [new QualifiedName(BrowseNames.SourceName)], + AttributeId = Attributes.Value + }, + new SimpleAttributeOperand + { + TypeDefinitionId = ObjectTypeIds.BaseEventType, + BrowsePath = [new QualifiedName(BrowseNames.Time)], + AttributeId = Attributes.Value + }, + new SimpleAttributeOperand + { + TypeDefinitionId = ObjectTypeIds.BaseEventType, + BrowsePath = [new QualifiedName(BrowseNames.ReceiveTime)], + AttributeId = Attributes.Value + }, + new SimpleAttributeOperand + { + TypeDefinitionId = ObjectTypeIds.BaseEventType, + BrowsePath = [new QualifiedName(BrowseNames.Message)], + AttributeId = Attributes.Value + }, + new SimpleAttributeOperand + { + TypeDefinitionId = ObjectTypeIds.BaseEventType, + BrowsePath = [new QualifiedName(BrowseNames.Severity)], + AttributeId = Attributes.Value + } + ], + + WhereClause = new ContentFilter + { + Elements = [ + new ContentFilterElement + { + FilterOperator = FilterOperator.GreaterThanOrEqual, + FilterOperands = [ + new ExtensionObject(new SimpleAttributeOperand + { + TypeDefinitionId = ObjectTypeIds.BaseEventType, + BrowsePath = [new QualifiedName(BrowseNames.Severity)], + AttributeId = Attributes.Value + }), + new ExtensionObject(new LiteralOperand { Value = Variant.From((ushort)100) }) + ] + } + ] + } + }; + + var serverMock = new Mock(); + serverMock.Setup(s => s.Telemetry).Returns(telemetry); + serverMock.Setup(s => s.NamespaceUris).Returns(new NamespaceTable()); + serverMock.Setup(s => s.TypeTree).Returns(new TypeTable(new NamespaceTable())); + serverMock.Setup(s => s.MonitoredItemQueueFactory).Returns(new MonitoredItemQueueFactory(telemetry)); + + var nodeManagerMock = new Mock(); + + m_monitoredItem = new MonitoredItem( + serverMock.Object, + nodeManagerMock.Object, + null, + 1, + 2, + new ReadValueId { NodeId = ObjectIds.Server }, + DiagnosticsMasks.All, + TimestampsToReturn.Both, + MonitoringMode.Reporting, + 3, + eventFilter, + eventFilter, + null, + 1000.0, + 1, + true, + 1000); + + var systemContext = new SystemContext(telemetry) + { + NamespaceUris = serverMock.Object.NamespaceUris + }; + m_event1 = new BaseEventState(null); + m_event1.Initialize(systemContext, null, EventSeverity.Medium, new LocalizedText("Event 1")); + m_event1.SetChildValue(systemContext, BrowseNames.SourceNode, ObjectIds.Server, false); + m_event1.SetChildValue(systemContext, BrowseNames.SourceName, "Internal 1", false); + + m_event2 = new BaseEventState(null); + m_event2.Initialize(systemContext, null, EventSeverity.High, new LocalizedText("Event 2")); + m_event2.SetChildValue(systemContext, BrowseNames.SourceNode, ObjectIds.Server, false); + m_event2.SetChildValue(systemContext, BrowseNames.SourceName, "Internal 2", false); } [Benchmark] @@ -62,5 +184,15 @@ public void ValueChangedDoubleArraySame() MonitoredItem.ValueChanged(m_valueArrayDouble, null, m_lastValueArrayDouble, null, m_filter, m_range); } } + + [Benchmark] + [BenchmarkCategory("Event")] + public void QueueEvent() + { + for (int i = 0; i < kIterations; i++) + { + m_monitoredItem.QueueEvent(i % 2 == 0 ? m_event1 : m_event2); + } + } } } diff --git a/Tests/Opc.Ua.Server.Tests/Opc.Ua.Server.Tests.csproj b/Tests/Opc.Ua.Server.Tests/Opc.Ua.Server.Tests.csproj index 2f997b456..d5ac947c3 100644 --- a/Tests/Opc.Ua.Server.Tests/Opc.Ua.Server.Tests.csproj +++ b/Tests/Opc.Ua.Server.Tests/Opc.Ua.Server.Tests.csproj @@ -10,6 +10,7 @@ + diff --git a/Tests/Opc.Ua.Server.Tests/ReferenceServerTest.cs b/Tests/Opc.Ua.Server.Tests/ReferenceServerTest.cs index 82e01d8a2..d0121783b 100644 --- a/Tests/Opc.Ua.Server.Tests/ReferenceServerTest.cs +++ b/Tests/Opc.Ua.Server.Tests/ReferenceServerTest.cs @@ -625,6 +625,131 @@ public async Task SubscriptionAsync() await CommonTestWorkers.SubscriptionTestAsync(serverTestServices, m_requestHeader).ConfigureAwait(false); } + /// + /// Create a single session + subscription and raise a single event on the server node. + /// Validates all the event fields are properly received. + /// + [Test] + public async Task ServerEventSubscribeTestAsync() + { + var services = new ServerTestServices(m_server, m_secureChannelContext); + RequestHeader requestHeader = m_requestHeader; + requestHeader.Timestamp = DateTime.UtcNow; + + CreateSubscriptionResponse createSubscriptionResponse = await services.CreateSubscriptionAsync( + requestHeader, + 100, + 100, + 10, + 0, + true, + 0).ConfigureAwait(false); + + uint subscriptionId = createSubscriptionResponse.SubscriptionId; + + // Build an event filter for the base event type. + var eventFilter = new EventFilter(); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.EventId)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.EventType)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.SourceNode)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.SourceName)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.Time)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.Message)); + eventFilter.AddSelectClause( + ObjectTypeIds.BaseEventType, + QualifiedName.From(BrowseNames.Severity)); + + ArrayOf monitoredItems = [ + new MonitoredItemCreateRequest + { + ItemToMonitor = new ReadValueId { NodeId = ObjectIds.Server, AttributeId = Attributes.EventNotifier }, + MonitoringMode = MonitoringMode.Reporting, + RequestedParameters = new MonitoringParameters + { + ClientHandle = 1, + SamplingInterval = 0, + QueueSize = 100, + DiscardOldest = true, + Filter = new ExtensionObject(eventFilter) + } + } + ]; + + CreateMonitoredItemsResponse createItemsResponse = await services.CreateMonitoredItemsAsync( + requestHeader, + subscriptionId, + TimestampsToReturn.Both, + monitoredItems).ConfigureAwait(false); + + ServerFixtureUtils.ValidateResponse(createItemsResponse.ResponseHeader, createItemsResponse.Results, monitoredItems); + Assert.AreEqual(1, createItemsResponse.Results.Count); + Assert.IsTrue(StatusCode.IsGood(createItemsResponse.Results[0].StatusCode)); + + // Generate event directly on the server + IServerInternal serverInternal = m_server.CurrentInstance; + ISystemContext serverContext = serverInternal.DefaultSystemContext; + var e = new BaseEventState(null); + const string eventMessage = "Integration Test Event"; + e.Initialize( + serverContext, + serverInternal.ServerObject, + EventSeverity.Medium, + new LocalizedText(eventMessage)); + serverInternal.ReportEvent(serverContext, e); + + // Wait for subscription to deliver the event + await Task.Delay(200).ConfigureAwait(false); + + // Publish request to get the event notification + var acknowledgements = new ArrayOf(); + PublishResponse publishResponse = await services.PublishAsync( + requestHeader, + acknowledgements).ConfigureAwait(false); + + Assert.IsNotNull(publishResponse.NotificationMessage); + Assert.IsNotNull(publishResponse.NotificationMessage.NotificationData); + Assert.IsTrue(publishResponse.NotificationMessage.NotificationData.Count > 0); + + publishResponse.NotificationMessage.NotificationData[0].TryGetEncodeable(out EventNotificationList eventNotification); + Assert.IsNotNull(eventNotification); + Assert.IsTrue(eventNotification.Events.Count > 0); + + EventFieldList targetEvent = eventNotification.Events.ToList().FirstOrDefault( + x => x.EventFields[5].TryGet(out LocalizedText lt) && lt.Text == eventMessage); + Assert.IsNotNull(targetEvent, "Did not receive the target event."); + + ArrayOf eventFields = targetEvent.EventFields; + Assert.AreEqual(7, eventFields.Count); // we requested 7 fields in select clauses + + Assert.IsFalse(eventFields[0].IsNull); // EventId + Assert.IsFalse(eventFields[1].IsNull); // EventType + Assert.IsFalse(eventFields[2].IsNull); // SourceNode + Assert.IsFalse(eventFields[3].IsNull); // SourceName + Assert.IsFalse(eventFields[4].IsNull); // Time + Assert.That(eventFields[5].TryGet(out LocalizedText receivedMessage), Is.True); // Message + Assert.IsFalse(receivedMessage.IsNull); + Assert.AreEqual(eventMessage, receivedMessage.Text); + Assert.That(eventFields[6].TryGet(out ushort receiveSeverity), Is.True); + Assert.AreEqual((ushort)EventSeverity.Medium, receiveSeverity); // Severity + + // Delete subscription + await services.DeleteSubscriptionsAsync( + requestHeader, + [subscriptionId]).ConfigureAwait(false); + } + /// /// Create multiple sessions, each with a subscription. /// Close all sessions without deleting subscriptions (abandoning them). diff --git a/Tests/Opc.Ua.Server.Tests/SessionPublishQueueBenchmarks.cs b/Tests/Opc.Ua.Server.Tests/SessionPublishQueueBenchmarks.cs new file mode 100644 index 000000000..8ec915bf5 --- /dev/null +++ b/Tests/Opc.Ua.Server.Tests/SessionPublishQueueBenchmarks.cs @@ -0,0 +1,123 @@ +/* ======================================================================== + * Copyright (c) 2005-2025 The OPC Foundation, Inc. All rights reserved. + * + * OPC Foundation MIT License 1.00 + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + * + * The complete license agreement can be found here: + * http://opcfoundation.org/License/MIT/1.00/ + * ======================================================================*/ + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Configs; +using Moq; +using Opc.Ua.Tests; + +namespace Opc.Ua.Server.Tests +{ + [GroupBenchmarksBy(BenchmarkLogicalGroupRule.ByCategory)] + [MemoryDiagnoser] + public class SessionPublishQueueBenchmarks + { + private Mock m_serverMock; + private Mock m_sessionMock; + private Mock m_subscriptionManagerMock; + private ITelemetryContext m_telemetry; + + [Params(50, 500)] + public int NumItems { get; set; } + + [GlobalSetup] + public void Setup() + { + m_telemetry = NUnitTelemetryContext.Create(); + m_serverMock = new Mock(); + m_sessionMock = new Mock(); + m_subscriptionManagerMock = new Mock(); + + m_serverMock.Setup(s => s.Telemetry).Returns(m_telemetry); + m_serverMock.Setup(s => s.SubscriptionManager).Returns(m_subscriptionManagerMock.Object); + + m_sessionMock.Setup(s => s.Id).Returns(new NodeId(Guid.NewGuid())); + m_sessionMock.Setup(s => s.IsSecureChannelValid(It.IsAny())).Returns(true); + } + + [Benchmark] + public async Task Concurrency_MultipleRequestsAndSubscriptions() + { + int maxPublishRequests = NumItems * 2; + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, maxPublishRequests); + + var subs = new List>(); + for (int i = 0; i < NumItems; i++) + { + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns((uint)(i + 1)); + subMock.Setup(s => s.Priority).Returns((byte)(i % 5)); + subMock.Setup(s => s.PublishTimerExpired()).Returns(PublishingState.NotificationsAvailable); + subs.Add(subMock); + queue.Add(subMock.Object); + } + + using var startGate = new ManualResetEventSlim(false); + var publishTasks = new List>(NumItems); + var timerTasks = new List(NumItems); + + // Start multiple threads requesting publish + for (int i = 0; i < NumItems; i++) + { + publishTasks.Add(Task.Run(() => + { + startGate.Wait(); + return queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + })); + } + + // Start multiple threads mimicking subscriptions being ready or timer expiring + for (int i = 0; i < NumItems; i++) + { + int index = i; + timerTasks.Add(Task.Run(() => + { + startGate.Wait(); + queue.PublishCompleted(subs[index].Object, true); + })); + } + + // Open the gate + startGate.Set(); + + // Wait for publishers to finish producing + await Task.WhenAll(timerTasks).ConfigureAwait(false); + + // Wait for consumers to get their subscriptions + var resultsTask = Task.WhenAll(publishTasks); + await Task.WhenAny(resultsTask, Task.Delay(TimeSpan.FromSeconds(30))).ConfigureAwait(false); + + queue.Close(); + } + } +} diff --git a/Tests/Opc.Ua.Server.Tests/SessionPublishQueueTests.cs b/Tests/Opc.Ua.Server.Tests/SessionPublishQueueTests.cs new file mode 100644 index 000000000..71355d009 --- /dev/null +++ b/Tests/Opc.Ua.Server.Tests/SessionPublishQueueTests.cs @@ -0,0 +1,555 @@ +/* ======================================================================== + * Copyright (c) 2005-2025 The OPC Foundation, Inc. All rights reserved. + * + * OPC Foundation MIT License 1.00 + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + * + * The complete license agreement can be found here: + * http://opcfoundation.org/License/MIT/1.00/ + * ======================================================================*/ + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Moq; +using NUnit.Framework; +using Opc.Ua.Tests; + +namespace Opc.Ua.Server.Tests +{ + [TestFixture] + [Category("Subscription")] + [Parallelizable] + public class SessionPublishQueueTests + { + private Mock m_serverMock; + private Mock m_sessionMock; + private Mock m_subscriptionManagerMock; + private ITelemetryContext m_telemetry; + private const int kMaxPublishRequests = 10; + + [SetUp] + public void SetUp() + { + m_telemetry = NUnitTelemetryContext.Create(); + m_serverMock = new Mock(); + m_sessionMock = new Mock(); + m_subscriptionManagerMock = new Mock(); + + m_serverMock.Setup(s => s.Telemetry).Returns(m_telemetry); + m_serverMock.Setup(s => s.SubscriptionManager).Returns(m_subscriptionManagerMock.Object); + + m_sessionMock.Setup(s => s.Id).Returns(new NodeId(Guid.NewGuid())); + m_sessionMock.Setup(s => s.IsSecureChannelValid(It.IsAny())).Returns(true); + } + + [Test] + public void Constructor_NullArgs_ThrowsArgumentNullException() + { + Assert.Throws(() => new SessionPublishQueue(null, m_sessionMock.Object, kMaxPublishRequests)); + Assert.Throws(() => new SessionPublishQueue(m_serverMock.Object, null, kMaxPublishRequests)); + } + + [Test] + public void PublishAsync_NoSubscriptions_ThrowsBadNoSubscription() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + ServiceResultException ex = + Assert.CatchAsync(() => queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None)); + Assert.That(ex.StatusCode, Is.EqualTo(StatusCodes.BadNoSubscription)); + } + + [Test] + public void PublishAsync_QueueFull_ThrowsBadTooManyPublishRequests() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, 1); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + // First publish request should be queued + Task task1 = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + Assert.That(task1.IsCompleted, Is.False); + + // Second publish request should fail because max queue size is 1 + ServiceResultException ex = + Assert.CatchAsync(() => queue.PublishAsync("channel2", DateTime.MaxValue, false, CancellationToken.None)); + Assert.That(ex.StatusCode, Is.EqualTo(StatusCodes.BadTooManyPublishRequests)); + } + + [Test] + public async Task PublishAsync_ReturnsSubscriptionIfReadyAsync() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + queue.Requeue(subMock.Object); // Sets ReadyToPublish to true + + ISubscription result = await queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None).ConfigureAwait(false); + + Assert.That(result, Is.SameAs(subMock.Object)); + } + + [Test] + public async Task Add_And_PublishTimerExpired_AssignsSubscriptionToRequestAsync() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + subMock.Setup(s => s.PublishTimerExpired()).Returns(PublishingState.NotificationsAvailable); + queue.Add(subMock.Object); + + Task task = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + Assert.That(task.IsCompleted, Is.False); + + queue.PublishTimerExpired(); + + ISubscription result = await task.ConfigureAwait(false); + Assert.That(result, Is.SameAs(subMock.Object)); + } + + [Test] + public void Close_ClearsQueuesAndSignalsSessionClosed() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + Task task = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + + IList subs = queue.Close(); + + Assert.That(subs, Has.Count.EqualTo(1)); + Assert.That(subs[0], Is.SameAs(subMock.Object)); + subMock.Verify(s => s.SessionClosed(), Times.Once); + + ServiceResultException ex = Assert.CatchAsync(() => task); + Assert.That(ex.StatusCode, Is.EqualTo(StatusCodes.BadSessionClosed)); + } + + [Test] + public void Remove_RemovesSubscription() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + Task task = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + + queue.Remove(subMock.Object, removeQueuedRequests: true); + + ServiceResultException ex = Assert.CatchAsync(() => task); + Assert.That(ex.StatusCode, Is.EqualTo(StatusCodes.BadNoSubscription)); + } + + [Test] + public async Task TryPublishCustomStatus_CompletesRemainingRequestsAsync() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + Task task1 = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + + bool published = queue.TryPublishCustomStatus(StatusCodes.Good); + Assert.That(published, Is.True); + + ISubscription result = await task1.ConfigureAwait(false); + Assert.That(result, Is.Null); // Good status completes with null to allow sending custom status + + bool publishedAgain = queue.TryPublishCustomStatus(StatusCodes.Good); + Assert.That(publishedAgain, Is.False); + } + + [Test] + public void Acknowledge_ValidAcks_ReturnsGood() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + subMock.Setup(s => s.Acknowledge(It.IsAny(), 10)) + .Returns(StatusCodes.Good); + + queue.Add(subMock.Object); + + var acks = (ArrayOf)[ + new SubscriptionAcknowledgement { SubscriptionId = 1, SequenceNumber = 10 } + ]; + + var context = new OperationContext(new RequestHeader(), null, RequestType.Publish, m_sessionMock.Object); + + queue.Acknowledge(context, acks, out ArrayOf results, out ArrayOf diagInfos); + + Assert.That(results.Count, Is.EqualTo(1)); + Assert.That(results[0], Is.EqualTo(StatusCodes.Good)); + Assert.That(diagInfos.Count, Is.Zero); + } + + [Test] + public void Acknowledge_InvalidSubscription_ReturnsBadSubscriptionIdInvalid() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var acks = (ArrayOf)[ + new SubscriptionAcknowledgement { SubscriptionId = 99, SequenceNumber = 10 } + ]; + + var context = new OperationContext(new RequestHeader(), null, RequestType.Publish, m_sessionMock.Object); + + queue.Acknowledge(context, acks, out ArrayOf results, out ArrayOf diagInfos); + + Assert.That(results.Count, Is.EqualTo(1)); + Assert.That(results[0], Is.EqualTo(StatusCodes.BadSubscriptionIdInvalid)); + } + + [Test] + public async Task PublishCompleted_MoreNotifications_AssignsNextRequestAsync() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + Task task = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + + // Mark sub as publishing by manually triggering assignment + subMock.Setup(s => s.PublishTimerExpired()).Returns(PublishingState.NotificationsAvailable); + queue.PublishTimerExpired(); + + Task task2 = queue.PublishAsync("channel2", DateTime.MaxValue, false, CancellationToken.None); + Assert.That(task2.IsCompleted, Is.False); + + // Complete first publish and state there's more notifications + queue.PublishCompleted(subMock.Object, moreNotifications: true); + + ISubscription result = await task2.ConfigureAwait(false); + Assert.That(result, Is.SameAs(subMock.Object)); + } + + [Test] + public void PublishCompleted_NoMoreNotifications_SetsReadyToPublishFalse() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + Task task = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + + subMock.Setup(s => s.PublishTimerExpired()).Returns(PublishingState.NotificationsAvailable); + queue.PublishTimerExpired(); // Gets assigned, sets ReadyToPublish = true, Publishing = true + + queue.PublishCompleted(subMock.Object, moreNotifications: false); + + Task task2 = queue.PublishAsync("channel2", DateTime.MaxValue, false, CancellationToken.None); + Assert.That(task2.IsCompleted, Is.False); // Still incomplete because it's no longer ready + } + + [Test] + public void PublishTimerExpired_Idle_SetsReadyToPublishFalse() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + // Make it ready + queue.Requeue(subMock.Object); + + subMock.Setup(s => s.PublishTimerExpired()).Returns(PublishingState.Idle); + queue.PublishTimerExpired(); // Should set ReadyToPublish = false + + Task task = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + Assert.That(task.IsCompleted, Is.False); // Since it's idle, task is queued instead of fulfilled + } + + [Test] + public void TryPublishCustomStatus_BadStatus_CompletesRequestWithException() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + Task task = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + + bool published = queue.TryPublishCustomStatus(StatusCodes.BadNotConnected); + Assert.That(published, Is.True); + + ServiceResultException ex = Assert.CatchAsync(() => task); + Assert.That(ex.StatusCode, Is.EqualTo(StatusCodes.BadNotConnected)); + } + + [Test] + public void PublishAsync_TimesOut_ThrowsBadTimeout() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + DateTime timeout = DateTime.UtcNow.AddMilliseconds(-1000); // Already past timeout + Task task = queue.PublishAsync("channel1", timeout, false, CancellationToken.None); + + // Since operationTimeout < DateTime.MaxValue and timeOut <= 0, wait, timeOut calculation: + // timeOut = operationTimeout.AddMilliseconds(500) - DateTime.UtcNow + // If already past, timeout immediately? Let's use a short delay. + DateTime timeout2 = DateTime.UtcNow.AddMilliseconds(1); + Task task2 = queue.PublishAsync("channel2", timeout2, false, CancellationToken.None); + + ServiceResultException ex = Assert.CatchAsync(() => task2); + Assert.That(ex.StatusCode, Is.EqualTo(StatusCodes.BadTimeout)); + } + + [Test] + public void RemoveQueuedRequests_NoSubscriptions_FailsRequests() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + // Force add a request bypassing normal validations by providing a sub then removing it without auto-removal + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + Task task = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + + queue.Remove(subMock.Object, removeQueuedRequests: false); + Assert.That(task.IsCompleted, Is.False); + + queue.RemoveQueuedRequests(); + + ServiceResultException ex = Assert.CatchAsync(() => task); + Assert.That(ex.StatusCode, Is.EqualTo(StatusCodes.BadNoSubscription)); + } + + [Test] + public void AssignSubscriptionToRequest_InvalidSecureChannel_ThrowsBadSecureChannelIdInvalid() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + // Initial publish request with a specific channel ID + Task task = queue.PublishAsync("invalid_channel", DateTime.MaxValue, false, CancellationToken.None); + + // Mock the session to return false for this channel + m_sessionMock.Setup(s => s.IsSecureChannelValid("invalid_channel")).Returns(false); + + subMock.Setup(s => s.PublishTimerExpired()).Returns(PublishingState.NotificationsAvailable); + queue.PublishTimerExpired(); // Triggers assignment + + ServiceResultException ex = Assert.CatchAsync(() => task); + Assert.That(ex.StatusCode, Is.EqualTo(StatusCodes.BadSecureChannelIdInvalid)); + } + + [Test] + public async Task GetSubscriptionToPublish_SelectsHighestPriorityAndOldestAsync() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var sub1 = new Mock(); + sub1.Setup(s => s.Id).Returns(1); + sub1.Setup(s => s.Priority).Returns(10); + queue.Add(sub1.Object); + + await Task.Delay(15).ConfigureAwait(false); // Ensure older timestamp + + var sub3 = new Mock(); + sub3.Setup(s => s.Id).Returns(3); + sub3.Setup(s => s.Priority).Returns(20); // Same high priority, older + queue.Add(sub3.Object); + + await Task.Delay(15).ConfigureAwait(false); // Ensure newer timestamp + + var sub2 = new Mock(); + sub2.Setup(s => s.Id).Returns(2); + sub2.Setup(s => s.Priority).Returns(20); // Highest priority, newer + queue.Add(sub2.Object); + + // Make them ready sequentially + queue.Requeue(sub1.Object); + queue.Requeue(sub2.Object); + queue.Requeue(sub3.Object); + + // Publish Request should return sub3 (highest priority, oldest) + ISubscription result1 = await queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None).ConfigureAwait(false); + Assert.That(result1, Is.SameAs(sub3.Object)); + + // Next should be sub2 (highest priority, newer) + ISubscription result2 = await queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None).ConfigureAwait(false); + Assert.That(result2, Is.SameAs(sub2.Object)); + + // Next should be sub1 (lower priority) + ISubscription result3 = await queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None).ConfigureAwait(false); + Assert.That(result3, Is.SameAs(sub1.Object)); + } + + [Test] + public async Task PublishAsync_RequeueTrue_AddsToFrontOfQueueAsync() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, kMaxPublishRequests); + + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns(1); + queue.Add(subMock.Object); + + Task taskA = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + Task taskB = queue.PublishAsync("channel1", DateTime.MaxValue, true, CancellationToken.None); + Task taskC = queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + + subMock.Setup(s => s.PublishTimerExpired()).Returns(PublishingState.NotificationsAvailable); + + // First expiration should complete taskB because it was requeued (added to front) + queue.PublishTimerExpired(); + ISubscription resultB = await taskB.ConfigureAwait(false); + Assert.That(resultB, Is.SameAs(subMock.Object)); + Assert.That(taskA.IsCompleted, Is.False); + Assert.That(taskC.IsCompleted, Is.False); + + queue.Remove(subMock.Object, false); + + // Need another subscription to fulfill the next request + var subMock2 = new Mock(); + subMock2.Setup(s => s.Id).Returns(2); + subMock2.Setup(s => s.PublishTimerExpired()).Returns(PublishingState.NotificationsAvailable); + queue.Add(subMock2.Object); + + // Second expiration should complete taskA (it was the first added with requeue=false) + queue.PublishTimerExpired(); + ISubscription resultA = await taskA.ConfigureAwait(false); + Assert.That(resultA, Is.SameAs(subMock2.Object)); + Assert.That(taskC.IsCompleted, Is.False); + + queue.Remove(subMock2.Object, false); + + // Need a third subscription to fulfill the last request + var subMock3 = new Mock(); + subMock3.Setup(s => s.Id).Returns(3); + subMock3.Setup(s => s.PublishTimerExpired()).Returns(PublishingState.NotificationsAvailable); + queue.Add(subMock3.Object); + + // Third expiration should complete taskC + queue.PublishTimerExpired(); + ISubscription resultC = await taskC.ConfigureAwait(false); + Assert.That(resultC, Is.SameAs(subMock3.Object)); + } + + [Test] + public async Task Concurrency_MultipleRequestsAndSubscriptionsAsync() + { + using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, 100); + + const int numItems = 50; + + var subs = new List>(); + for (int i = 0; i < numItems; i++) + { + var subMock = new Mock(); + subMock.Setup(s => s.Id).Returns((uint)(i + 1)); + subMock.Setup(s => s.Priority).Returns((byte)(i % 5)); + subMock.Setup(s => s.PublishTimerExpired()).Returns(PublishingState.NotificationsAvailable); + subs.Add(subMock); + queue.Add(subMock.Object); + } + + using var startGate = new ManualResetEventSlim(false); + var publishTasks = new List>(); + var timerTasks = new List(); + + // Start multiple threads requesting publish + for (int i = 0; i < numItems; i++) + { + publishTasks.Add(Task.Run(() => + { + startGate.Wait(); + return queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None); + })); + } + + // Start multiple threads mimicking subscriptions being ready or timer expiring + for (int i = 0; i < numItems; i++) + { + int index = i; + timerTasks.Add(Task.Run(() => + { + startGate.Wait(); + queue.PublishCompleted(subs[index].Object, true); + })); + } + + // Open the gate + startGate.Set(); + + // Wait for publishers to finish producing + await Task.WhenAll(timerTasks).ConfigureAwait(false); + + // Wait for consumers to get their subscriptions + Task resultsTask = Task.WhenAll(publishTasks); + Task completedTask = await Task.WhenAny(resultsTask, Task.Delay(TimeSpan.FromSeconds(5))).ConfigureAwait(false); + + queue.Close(); + + if (completedTask != resultsTask) + { + Assert.Fail("Timed out waiting for publish tasks to complete."); + } + + ISubscription[] results = await resultsTask.ConfigureAwait(false); + + // Verify results + int validSubscriptions = 0; + var returnedSubIds = new HashSet(); + foreach (ISubscription result in results) + { + if (result != null) + { + validSubscriptions++; + returnedSubIds.Add(result.Id); + } + } + + Assert.That(validSubscriptions, Is.EqualTo(numItems), "All publish requests should have received a subscription."); + Assert.That(returnedSubIds, Has.Count.EqualTo(numItems), "All subscriptions should have been processed."); + } + } +}