Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/agents/opcua-v16-migration.agent.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ Also: `ReadOnlyList<T>` → `ArrayOf<T>`, `IList<T>` parameters → `ArrayOf<T>`
- Use `List<T>` when items are added/removed/modified, then convert to `ArrayOf<T>` with `.ToArrayOf()`.
- `ArrayOf<T>` implicitly converts from `List<T>` but not vice versa. Use `.ToList()` to convert back.
- `ArrayOf<T>` supports collection expressions: `ArrayOf<int> arr = [1, 2, 3];`
- To follow best coding practices Do NOT use casts to create the ArrayOf but use `.ToArrayOf()` or create it directly using a collection expression

### ArrayOf<T> Key API

Expand Down
2 changes: 2 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="18.3.0" />
<PackageVersion Include="Microsoft.SourceLink.AzureRepos.Git" Version="10.0.102" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="10.0.102" />
<PackageVersion Include="Microsoft.VisualStudio.DiagnosticsHub.BenchmarkDotNetDiagnosers" Version="18.6.37110.2" />
<PackageVersion Include="Mono.Options" Version="6.12.0.148" />
<PackageVersion Include="Moq" Version="4.20.72" />
<PackageVersion Include="MQTTnet" Version="5.1.0.1559" />
Expand Down Expand Up @@ -63,6 +64,7 @@
<PackageVersion Include="System.Private.Uri" Version="4.3.2" />
<PackageVersion Include="System.Runtime.InteropServices.RuntimeInformation" Version="4.3.0" />
<PackageVersion Include="System.Security.Cryptography.Cng" Version="5.0.0" />
<PackageVersion Include="System.Threading.Channels" Version="10.0.5" />
<PackageVersion Include="System.Threading.Tasks.Extensions" Version="4.6.3" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ protected override async ValueTask<NodeState> AddBehaviourToPredefinedNodeAsync(
}
else
{
NodeState serverNode = Server.NodeManager.FindNodeInAddressSpaceAsync(ObjectIds.Server).AsTask().GetAwaiter().GetResult();
NodeState serverNode = await Server.NodeManager.FindNodeInAddressSpaceAsync(ObjectIds.Server).ConfigureAwait(false);
serverNode?.ReplaceChild(context, activeNode);
}
// remove the reference to server node because it is set as parent
Expand Down
89 changes: 39 additions & 50 deletions Libraries/Opc.Ua.Server/Diagnostics/DiagnosticsNodeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* ======================================================================*/

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
Expand Down Expand Up @@ -261,29 +262,26 @@ protected ServiceResult OnGetMonitoredItems(
return StatusCodes.BadInvalidArgument;
}

foreach (ISubscription subscription in Server.SubscriptionManager.GetSubscriptions())
if (!Server.SubscriptionManager.TryGetSubscription(subscriptionId, out ISubscription subscription))
{
if (subscription.Id == subscriptionId)
{
if (context is ISessionSystemContext session &&
subscription.SessionId != session.SessionId)
{
// user tries to access subscription of different session
return StatusCodes.BadUserAccessDenied;
}
return StatusCodes.BadSubscriptionIdInvalid;
}

subscription.GetMonitoredItems(
out ArrayOf<uint> serverHandles,
out ArrayOf<uint> clientHandles);
if (context is ISessionSystemContext session &&
subscription.SessionId != session.SessionId)
{
// user tries to access subscription of different session
return StatusCodes.BadUserAccessDenied;
}

outputArguments[0] = serverHandles;
outputArguments[1] = clientHandles;
subscription.GetMonitoredItems(
out ArrayOf<uint> serverHandles,
out ArrayOf<uint> clientHandles);

return ServiceResult.Good;
}
}
outputArguments[0] = serverHandles;
outputArguments[1] = clientHandles;

return StatusCodes.BadSubscriptionIdInvalid;
return ServiceResult.Good;
}

/// <summary>
Expand All @@ -305,24 +303,21 @@ protected ServiceResult OnResendData(
return StatusCodes.BadInvalidArgument;
}

foreach (ISubscription subscription in Server.SubscriptionManager.GetSubscriptions())
if (!Server.SubscriptionManager.TryGetSubscription(subscriptionId, out ISubscription subscription))
{
if (subscription.Id == subscriptionId)
{
if (context is not ServerSystemContext session ||
subscription.SessionId != session.SessionId)
{
// user tries to access subscription of different session
return StatusCodes.BadUserAccessDenied;
}

subscription.ResendData(session.OperationContext);
return StatusCodes.BadSubscriptionIdInvalid;
}

return ServiceResult.Good;
}
if (context is not ServerSystemContext session ||
subscription.SessionId != session.SessionId)
{
// user tries to access subscription of different session
return StatusCodes.BadUserAccessDenied;
}

return StatusCodes.BadSubscriptionIdInvalid;
subscription.ResendData(session.OperationContext);

return ServiceResult.Good;
}

/// <summary>
Expand Down Expand Up @@ -453,9 +448,10 @@ protected override async ValueTask<NodeState> AddBehaviourToPredefinedNodeAsync(
{
if (passiveNode is ServerObjectState)
{
// add the server object as the root notifier.
await AddRootNotifierAsync(passiveNode, cancellationToken).ConfigureAwait(false);
break;
}

var activeNode = new ServerObjectState(passiveNode.Parent);
activeNode.Create(context, passiveNode);

Expand Down Expand Up @@ -1888,7 +1884,7 @@ handle.Node is BaseVariableState variable &&

if (monitoredItem.MonitoringMode != MonitoringMode.Disabled)
{
m_diagnosticsMonitoringCount++;
Interlocked.Increment(ref m_diagnosticsMonitoringCount);

m_diagnosticsScanTimer ??= new Timer(DoScan, null, 1000, 1000);

Expand All @@ -1914,7 +1910,7 @@ protected override ValueTask OnMonitoredItemDeletedAsync(
if (IsDiagnosticsNode(handle.Node) &&
monitoredItem.MonitoringMode != MonitoringMode.Disabled)
{
m_diagnosticsMonitoringCount--;
Interlocked.Decrement(ref m_diagnosticsMonitoringCount);

if (m_diagnosticsMonitoringCount == 0 && m_diagnosticsScanTimer != null)
{
Expand Down Expand Up @@ -1958,12 +1954,12 @@ protected override async ValueTask OnMonitoringModeChangedAsync(
{
if (previousMode != MonitoringMode.Disabled)
{
m_diagnosticsMonitoringCount--;
Interlocked.Decrement(ref m_diagnosticsMonitoringCount);
}

if (monitoringMode != MonitoringMode.Disabled)
{
m_diagnosticsMonitoringCount++;
Interlocked.Increment(ref m_diagnosticsMonitoringCount);
}

if (m_diagnosticsMonitoringCount == 0 && m_diagnosticsScanTimer != null)
Expand Down Expand Up @@ -2027,7 +2023,7 @@ private void CreateSampledItem(
double samplingInterval,
ISampledDataChangeMonitoredItem monitoredItem)
{
m_sampledItems.Add(monitoredItem);
m_sampledItems.TryAdd(monitoredItem.Id, monitoredItem);

m_samplingTimer ??= new Timer(
DoSample,
Expand All @@ -2041,16 +2037,9 @@ private void CreateSampledItem(
/// </summary>
private void DeleteSampledItem(ISampledDataChangeMonitoredItem monitoredItem)
{
for (int ii = 0; ii < m_sampledItems.Count; ii++)
{
if (ReferenceEquals(monitoredItem, m_sampledItems[ii]))
{
m_sampledItems.RemoveAt(ii);
break;
}
}
m_sampledItems.TryRemove(monitoredItem.Id, out _);

if (m_sampledItems.Count == 0 && m_samplingTimer != null)
if (m_sampledItems.IsEmpty && m_samplingTimer != null)
{
m_samplingTimer.Dispose();
m_samplingTimer = null;
Expand All @@ -2066,9 +2055,9 @@ private void DoSample(object state)
{
lock (m_diagnosticsLock)
{
for (int ii = 0; ii < m_sampledItems.Count; ii++)
foreach (KeyValuePair<uint, ISampledDataChangeMonitoredItem> kvp in m_sampledItems)
{
ISampledDataChangeMonitoredItem monitoredItem = m_sampledItems[ii];
ISampledDataChangeMonitoredItem monitoredItem = kvp.Value;

// get the handle.
if (monitoredItem.ManagerHandle is not NodeHandle handle)
Expand Down Expand Up @@ -2125,7 +2114,7 @@ private void DoSample(object state)
private readonly List<SubscriptionDiagnosticsData> m_subscriptions;
private NodeId m_serverLockHolder;
private Timer m_samplingTimer;
private readonly List<ISampledDataChangeMonitoredItem> m_sampledItems;
private readonly ConcurrentDictionary<uint, ISampledDataChangeMonitoredItem> m_sampledItems;
private readonly double m_minimumSamplingInterval;
private HistoryServerCapabilitiesState m_historyCapabilities;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,7 @@ public void Remove(IDataChangeMonitoredItem2 datachangeItem)
Node.OnStateChanged = null;

// Unsubscribe from namespace default permission changes when the last item is removed.
if (m_server.ConfigurationNodeManager != null)
{
m_server.ConfigurationNodeManager.DefaultPermissionsChanged -= OnDefaultPermissionsChanged;
}
m_server.ConfigurationNodeManager?.DefaultPermissionsChanged -= OnDefaultPermissionsChanged;
}
}

Expand Down
8 changes: 8 additions & 0 deletions Libraries/Opc.Ua.Server/Subscription/ISubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ public interface ISubscriptionManager : IDisposable
/// <returns>A list of the subscriptions.</returns>
IList<ISubscription> GetSubscriptions();

/// <summary>
/// Get the subscription with the specified id
/// </summary>
/// <param name="id">The id of the subscription</param>
/// <param name="subscription">The subscription if found else null</param>
/// <returns>True if found</returns>
bool TryGetSubscription(uint id, out ISubscription subscription);

/// <summary>
/// Set a subscription into durable mode
/// </summary>
Expand Down
50 changes: 28 additions & 22 deletions Libraries/Opc.Ua.Server/Subscription/SessionPublishQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Task<ISubscription> PublishAsync(string secureChannelId,
}

QueuedSubscription subscriptionToPublish;
lock (m_subscriptionPublishLock)
lock (m_lock)
{
// find the waiting subscription with the highest priority.
subscriptionToPublish = GetSubscriptionToPublish();
Expand All @@ -117,10 +117,7 @@ public Task<ISubscription> PublishAsync(string secureChannelId,
{
return Task.FromResult(subscriptionToPublish.Subscription);
}
}

lock (m_lock)
{
// check if queue is full.
if (m_queuedRequests.Count >= m_maxRequestCount)
{
Expand Down Expand Up @@ -150,6 +147,8 @@ public Task<ISubscription> PublishAsync(string secureChannelId,
/// <returns>The list of subscriptions in the queue.</returns>
public IList<ISubscription> Close()
{
var subscriptions = new List<ISubscription>();

lock (m_lock)
{
// TraceState("SESSION CLOSED");
Expand All @@ -164,19 +163,21 @@ public IList<ISubscription> Close()
}

// tell the subscriptions that the session is closed.
var subscriptions = new List<ISubscription>(m_queuedSubscriptions.Count);

foreach (KeyValuePair<uint, QueuedSubscription> entry in m_queuedSubscriptions)
{
subscriptions.Add(entry.Value.Subscription);
entry.Value.Subscription.SessionClosed();
}

// clear the queue.
m_queuedSubscriptions.Clear();
}

return subscriptions;
foreach (ISubscription subscription in subscriptions)
{
subscription.SessionClosed();
}

return subscriptions;
}

/// <summary>
Expand Down Expand Up @@ -371,19 +372,19 @@ public void PublishCompleted(ISubscription subscription, bool moreNotifications)
if (m_queuedSubscriptions.TryGetValue(subscription.Id,
out QueuedSubscription queuedSubscription))
{
queuedSubscription.Publishing = false;

if (moreNotifications)
lock (m_lock)
{
lock (m_subscriptionPublishLock)
queuedSubscription.Publishing = false;

if (moreNotifications)
{
AssignSubscriptionToRequest(queuedSubscription);
}
}
else
{
queuedSubscription.ReadyToPublish = false;
queuedSubscription.Timestamp = DateTime.UtcNow;
else
{
queuedSubscription.ReadyToPublish = false;
queuedSubscription.Timestamp = DateTime.UtcNow;
}
}
}
}
Expand All @@ -395,8 +396,11 @@ public void Requeue(ISubscription subscription)
{
if (m_queuedSubscriptions.TryGetValue(subscription.Id, out QueuedSubscription queuedSubscription))
{
queuedSubscription.Publishing = false;
queuedSubscription.ReadyToPublish = true;
lock (m_lock)
{
queuedSubscription.Publishing = false;
queuedSubscription.ReadyToPublish = true;
}
}
}

Expand Down Expand Up @@ -439,9 +443,12 @@ public void PublishTimerExpired()
// assign subscription to request if one is available.
if (!subscription.Publishing)
{
lock (m_subscriptionPublishLock)
lock (m_lock)
{
AssignSubscriptionToRequest(subscription);
if (!subscription.Publishing)
{
AssignSubscriptionToRequest(subscription);
}
}
}
}
Expand Down Expand Up @@ -659,7 +666,6 @@ internal void TraceState(string context, params object[] args)
}

private readonly Lock m_lock = new();
private readonly Lock m_subscriptionPublishLock = new();
private readonly ILogger m_logger;
private readonly IServerInternal m_server;
private readonly ISession m_session;
Expand Down
12 changes: 8 additions & 4 deletions Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,19 @@ public event SubscriptionEventHandler SubscriptionDeleted
}
}

/// <summary>
/// Returns all of the subscriptions known to the subscription manager.
/// </summary>
/// <returns>A list of the subscriptions.</returns>

/// <inheritdoc/>
public IList<ISubscription> GetSubscriptions()
{
return [.. m_subscriptions.Values];
}

/// <inheritdoc/>
public bool TryGetSubscription(uint id, out ISubscription subscription)
{
return m_subscriptions.TryGetValue(id, out subscription);
}

/// <summary>
/// Raises an event related to a subscription.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ More samples based on the official [Nuget](https://www.nuget.org/packages/OPCFou
* Source generator generated code behind during build
* See [MigrationGuide](Docs/MigrationGuide.md) for details.
* New AsyncCustomNodeManager (successor of CustomNodeManager2) with improved Locking Strategy, see [Server Async (TAP) Support](Docs/AsyncServerSupport.md)
* In our Load Test the Server shows at least 2.5x higher throughput under load with 750 subscriptions totaling 450k Monitored items
and write times for 600 items below 5 seconds were before > 10 seconds were needed. Also for event at least 3x faster event reporting was observed.

#### **New in 1.05.378**

Expand Down
Loading
Loading