Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 14 additions & 59 deletions Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public SubscriptionManager(IServerInternal server, ApplicationConfiguration conf
m_subscriptionStore = server.SubscriptionStore;

m_subscriptions = [];
m_abandonedSubscriptions = [];
m_publishQueues = [];
m_statusMessages = [];
m_lastSubscriptionId = BitConverter.ToUInt32(
Expand Down Expand Up @@ -528,18 +529,12 @@ public virtual async ValueTask SessionClosingAsync(
// mark the subscriptions as abandoned.
else
{
await m_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
try
if (m_abandonedSubscriptions.TryAdd(subscription.Id, subscription))
{
(m_abandonedSubscriptions ??= []).Add(subscription);
m_logger.LogWarning(
"Subscription ABANDONED, Id={SubscriptionId}.",
subscription.Id);
}
finally
{
m_semaphoreSlim.Release();
}
}
}
}
Expand Down Expand Up @@ -699,19 +694,11 @@ public async ValueTask<StatusCode> DeleteSubscriptionAsync(OperationContext cont
}

// check for abandoned subscription.
if (m_abandonedSubscriptions != null)
if (m_abandonedSubscriptions.TryRemove(subscriptionId, out _))
{
for (int ii = 0; ii < m_abandonedSubscriptions.Count; ii++)
{
if (m_abandonedSubscriptions[ii].Id == subscriptionId)
{
m_abandonedSubscriptions.RemoveAt(ii);
m_logger.LogWarning(
"Subscription DELETED(ABANDONED), Id={SubscriptionId}.",
subscriptionId);
break;
}
}
m_logger.LogWarning(
"Subscription DELETED(ABANDONED), Id={SubscriptionId}.",
subscriptionId);
}

// remove subscription.
Expand Down Expand Up @@ -2061,34 +2048,10 @@ private async ValueTask PublishSubscriptionsAsync(int sleepCycle, CancellationTo

while (true)
{
DateTime start = HiResClock.UtcNow;

SessionPublishQueue[] queues = null;
ISubscription[] abandonedSubscriptions = null;

await m_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
// collect active session queues.
queues = new SessionPublishQueue[m_publishQueues.Count];
m_publishQueues.Values.CopyTo(queues, 0);

// collect abandoned subscriptions.
if (m_abandonedSubscriptions != null && m_abandonedSubscriptions.Count > 0)
{
abandonedSubscriptions = new ISubscription[m_abandonedSubscriptions
.Count];

for (int ii = 0; ii < abandonedSubscriptions.Length; ii++)
{
abandonedSubscriptions[ii] = m_abandonedSubscriptions[ii];
}
}
}
finally
{
m_semaphoreSlim.Release();
}
// ConcurrentDictionary enumeration is thread-safe and provides a stable
// snapshot for the current pass without taking the manager semaphore.
SessionPublishQueue[] queues = [.. m_publishQueues.Values];
ISubscription[] abandonedSubscriptions = [.. m_abandonedSubscriptions.Values];

// check the publish timer for each subscription.
for (int ii = 0; ii < queues.Length; ii++)
Expand All @@ -2097,7 +2060,7 @@ private async ValueTask PublishSubscriptionsAsync(int sleepCycle, CancellationTo
}

// check the publish timer for each abandoned subscription.
if (abandonedSubscriptions != null)
if (abandonedSubscriptions.Length > 0)
{
var subscriptionsToDelete = new List<ISubscription>();

Expand All @@ -2120,17 +2083,9 @@ private async ValueTask PublishSubscriptionsAsync(int sleepCycle, CancellationTo
// schedule cleanup on a background thread.
if (subscriptionsToDelete.Count > 0)
{
await m_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
for (int ii = 0; ii < subscriptionsToDelete.Count; ii++)
{
m_abandonedSubscriptions.Remove(subscriptionsToDelete[ii]);
}
}
finally
for (int ii = 0; ii < subscriptionsToDelete.Count; ii++)
{
m_semaphoreSlim.Release();
m_abandonedSubscriptions.TryRemove(subscriptionsToDelete[ii].Id, out _);
}

CleanupSubscriptions(m_server, subscriptionsToDelete, m_logger);
Expand Down Expand Up @@ -2327,7 +2282,7 @@ public override int GetHashCode()
private readonly int m_maxSubscriptionCount;
private readonly bool m_durableSubscriptionsEnabled;
private readonly ConcurrentDictionary<uint, ISubscription> m_subscriptions;
private List<ISubscription> m_abandonedSubscriptions;
private readonly ConcurrentDictionary<uint, ISubscription> m_abandonedSubscriptions;
private readonly NodeIdDictionary<Queue<StatusMessage>> m_statusMessages;
private readonly NodeIdDictionary<SessionPublishQueue> m_publishQueues;
private readonly ManualResetEvent m_shutdownEvent;
Expand Down
65 changes: 65 additions & 0 deletions Tests/Opc.Ua.Server.Tests/ReferenceServerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* ======================================================================*/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -616,6 +617,70 @@ public async Task SubscriptionAsync()
await CommonTestWorkers.SubscriptionTestAsync(serverTestServices, m_requestHeader).ConfigureAwait(false);
}

/// <summary>
/// Create multiple sessions, each with a subscription.
/// Close all sessions without deleting subscriptions (abandoning them).
/// Concurrently delete the abandoned subscriptions from the main session.
/// Verifies that the concurrent dictionary backing abandoned subscriptions
/// handles parallel access correctly (fix for issue #3612).
/// </summary>
[Test]
public async Task DeleteAbandonedSubscriptionsConcurrentlyAsync()
{
const int sessionCount = 5;
var subscriptionIds = new UInt32Collection();

NamespaceTable namespaceUris = m_server.CurrentInstance.NamespaceUris;
NodeId[] testSet =
[
.. CommonTestWorkers.NodeIdTestSetStatic
.Select(n => ExpandedNodeId.ToNodeId(n, namespaceUris))
];

// Create multiple sessions, each with a subscription, then close
// the session without deleting subscriptions so they become abandoned.
for (int i = 0; i < sessionCount; i++)
{
(RequestHeader header, SecureChannelContext context) =
await m_server.CreateAndActivateSessionAsync($"AbandonSession_{i}")
.ConfigureAwait(false);

var services = new ServerTestServices(m_server, context);
header.Timestamp = DateTime.UtcNow;
UInt32Collection ids = await CommonTestWorkers.CreateSubscriptionForTransferAsync(
services, header, testSet, kQueueSize, -1).ConfigureAwait(false);
subscriptionIds.AddRange(ids);

// Close session without deleting subscriptions - makes them abandoned
header.Timestamp = DateTime.UtcNow;
await m_server.CloseSessionAsync(context, header, false, CancellationToken.None)
.ConfigureAwait(false);
}

// Concurrently delete all abandoned subscriptions from the main session.
var mainServices = new ServerTestServices(m_server, m_secureChannelContext);
var deleteTasks = new List<Task<DeleteSubscriptionsResponse>>();
foreach (uint id in subscriptionIds)
{
var singleId = new UInt32Collection { id };
m_requestHeader.Timestamp = DateTime.UtcNow;
deleteTasks.Add(
mainServices.DeleteSubscriptionsAsync(m_requestHeader, singleId)
.AsTask());
}

DeleteSubscriptionsResponse[] responses = await Task.WhenAll(deleteTasks)
.ConfigureAwait(false);

// All deletions should succeed.
foreach (DeleteSubscriptionsResponse response in responses)
{
Assert.AreEqual(StatusCodes.Good, response.ResponseHeader.ServiceResult);
Assert.AreEqual(1, response.Results.Count);
Assert.AreEqual(StatusCodes.Good, (uint)response.Results[0]);
}
}

/// <summary>
/// Create a secondary Session.
/// Create a subscription with a monitored item.
Expand Down
Loading