Skip to content

Commit b8856c9

Browse files
fix(SubscriptionManager): replace List with ConcurrentDictionary for abandoned subscriptions (#3617) (#3618)
* fix(SubscriptionManager): replace List with ConcurrentDictionary for abandoned subscriptions (#3612) * replace the semaphore-protected `List<ISubscription>` used for abandoned subscriptions with a `readonly ConcurrentDictionary<uint, ISubscription>`, eliminating three separate lock acquisitions in `SessionClosed`, `DeleteSubscription`, and the publish loop * add `DeleteAbandonedSubscriptionsConcurrentlyAsync` integration test to cover the concurrent-delete scenario. * Fix build --------- Co-authored-by: Marc Schier <marcschier@hotmail.com>
1 parent 03777ec commit b8856c9

File tree

2 files changed

+78
-59
lines changed

2 files changed

+78
-59
lines changed

Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs

Lines changed: 14 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public SubscriptionManager(IServerInternal server, ApplicationConfiguration conf
7575
m_subscriptionStore = server.SubscriptionStore;
7676

7777
m_subscriptions = [];
78+
m_abandonedSubscriptions = [];
7879
m_publishQueues = [];
7980
m_statusMessages = [];
8081
m_lastSubscriptionId = BitConverter.ToUInt32(
@@ -528,18 +529,12 @@ public virtual async ValueTask SessionClosingAsync(
528529
// mark the subscriptions as abandoned.
529530
else
530531
{
531-
await m_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
532-
try
532+
if (m_abandonedSubscriptions.TryAdd(subscription.Id, subscription))
533533
{
534-
(m_abandonedSubscriptions ??= []).Add(subscription);
535534
m_logger.LogWarning(
536535
"Subscription ABANDONED, Id={SubscriptionId}.",
537536
subscription.Id);
538537
}
539-
finally
540-
{
541-
m_semaphoreSlim.Release();
542-
}
543538
}
544539
}
545540
}
@@ -699,19 +694,11 @@ public async ValueTask<StatusCode> DeleteSubscriptionAsync(OperationContext cont
699694
}
700695

701696
// check for abandoned subscription.
702-
if (m_abandonedSubscriptions != null)
697+
if (m_abandonedSubscriptions.TryRemove(subscriptionId, out _))
703698
{
704-
for (int ii = 0; ii < m_abandonedSubscriptions.Count; ii++)
705-
{
706-
if (m_abandonedSubscriptions[ii].Id == subscriptionId)
707-
{
708-
m_abandonedSubscriptions.RemoveAt(ii);
709-
m_logger.LogWarning(
710-
"Subscription DELETED(ABANDONED), Id={SubscriptionId}.",
711-
subscriptionId);
712-
break;
713-
}
714-
}
699+
m_logger.LogWarning(
700+
"Subscription DELETED(ABANDONED), Id={SubscriptionId}.",
701+
subscriptionId);
715702
}
716703

717704
// remove subscription.
@@ -2063,34 +2050,10 @@ private async ValueTask PublishSubscriptionsAsync(int sleepCycle, CancellationTo
20632050

20642051
while (true)
20652052
{
2066-
DateTime start = HiResClock.UtcNow;
2067-
2068-
SessionPublishQueue[] queues = null;
2069-
ISubscription[] abandonedSubscriptions = null;
2070-
2071-
await m_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
2072-
try
2073-
{
2074-
// collect active session queues.
2075-
queues = new SessionPublishQueue[m_publishQueues.Count];
2076-
m_publishQueues.Values.CopyTo(queues, 0);
2077-
2078-
// collect abandoned subscriptions.
2079-
if (m_abandonedSubscriptions != null && m_abandonedSubscriptions.Count > 0)
2080-
{
2081-
abandonedSubscriptions = new ISubscription[m_abandonedSubscriptions
2082-
.Count];
2083-
2084-
for (int ii = 0; ii < abandonedSubscriptions.Length; ii++)
2085-
{
2086-
abandonedSubscriptions[ii] = m_abandonedSubscriptions[ii];
2087-
}
2088-
}
2089-
}
2090-
finally
2091-
{
2092-
m_semaphoreSlim.Release();
2093-
}
2053+
// ConcurrentDictionary enumeration is thread-safe and provides a stable
2054+
// snapshot for the current pass without taking the manager semaphore.
2055+
SessionPublishQueue[] queues = [.. m_publishQueues.Values];
2056+
ISubscription[] abandonedSubscriptions = [.. m_abandonedSubscriptions.Values];
20942057

20952058
// check the publish timer for each subscription.
20962059
for (int ii = 0; ii < queues.Length; ii++)
@@ -2099,7 +2062,7 @@ private async ValueTask PublishSubscriptionsAsync(int sleepCycle, CancellationTo
20992062
}
21002063

21012064
// check the publish timer for each abandoned subscription.
2102-
if (abandonedSubscriptions != null)
2065+
if (abandonedSubscriptions.Length > 0)
21032066
{
21042067
var subscriptionsToDelete = new List<ISubscription>();
21052068

@@ -2122,17 +2085,9 @@ private async ValueTask PublishSubscriptionsAsync(int sleepCycle, CancellationTo
21222085
// schedule cleanup on a background thread.
21232086
if (subscriptionsToDelete.Count > 0)
21242087
{
2125-
await m_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
2126-
try
2127-
{
2128-
for (int ii = 0; ii < subscriptionsToDelete.Count; ii++)
2129-
{
2130-
m_abandonedSubscriptions.Remove(subscriptionsToDelete[ii]);
2131-
}
2132-
}
2133-
finally
2088+
for (int ii = 0; ii < subscriptionsToDelete.Count; ii++)
21342089
{
2135-
m_semaphoreSlim.Release();
2090+
m_abandonedSubscriptions.TryRemove(subscriptionsToDelete[ii].Id, out _);
21362091
}
21372092

21382093
CleanupSubscriptions(m_server, subscriptionsToDelete, m_logger);
@@ -2329,7 +2284,7 @@ public override int GetHashCode()
23292284
private readonly int m_maxSubscriptionCount;
23302285
private readonly bool m_durableSubscriptionsEnabled;
23312286
private readonly ConcurrentDictionary<uint, ISubscription> m_subscriptions;
2332-
private List<ISubscription> m_abandonedSubscriptions;
2287+
private readonly ConcurrentDictionary<uint, ISubscription> m_abandonedSubscriptions;
23332288
private readonly NodeIdDictionary<Queue<StatusMessage>> m_statusMessages;
23342289
private readonly NodeIdDictionary<SessionPublishQueue> m_publishQueues;
23352290
private readonly ManualResetEvent m_shutdownEvent;

Tests/Opc.Ua.Server.Tests/ReferenceServerTest.cs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,70 @@ public async Task SubscriptionAsync()
626626
await CommonTestWorkers.SubscriptionTestAsync(serverTestServices, m_requestHeader).ConfigureAwait(false);
627627
}
628628

629+
/// <summary>
630+
/// Create multiple sessions, each with a subscription.
631+
/// Close all sessions without deleting subscriptions (abandoning them).
632+
/// Concurrently delete the abandoned subscriptions from the main session.
633+
/// Verifies that the concurrent dictionary backing abandoned subscriptions
634+
/// handles parallel access correctly (fix for issue #3612).
635+
/// </summary>
636+
[Test]
637+
public async Task DeleteAbandonedSubscriptionsConcurrentlyAsync()
638+
{
639+
const int sessionCount = 5;
640+
var subscriptionIds = new List<uint>();
641+
642+
NamespaceTable namespaceUris = m_server.CurrentInstance.NamespaceUris;
643+
NodeId[] testSet =
644+
[
645+
.. CommonTestWorkers.NodeIdTestSetStatic
646+
.Select(n => ExpandedNodeId.ToNodeId(n, namespaceUris))
647+
];
648+
649+
// Create multiple sessions, each with a subscription, then close
650+
// the session without deleting subscriptions so they become abandoned.
651+
for (int i = 0; i < sessionCount; i++)
652+
{
653+
(RequestHeader header, SecureChannelContext context) =
654+
await m_server.CreateAndActivateSessionAsync($"AbandonSession_{i}")
655+
.ConfigureAwait(false);
656+
657+
var services = new ServerTestServices(m_server, context);
658+
header.Timestamp = DateTimeUtc.Now;
659+
ArrayOf<uint> ids = await CommonTestWorkers.CreateSubscriptionForTransferAsync(
660+
services, header, testSet, kQueueSize, -1).ConfigureAwait(false);
661+
subscriptionIds.AddRange(ids.ToList());
662+
663+
// Close session without deleting subscriptions - makes them abandoned
664+
header.Timestamp = DateTimeUtc.Now;
665+
await m_server.CloseSessionAsync(context, header, false, CancellationToken.None)
666+
.ConfigureAwait(false);
667+
}
668+
669+
// Concurrently delete all abandoned subscriptions from the main session.
670+
var mainServices = new ServerTestServices(m_server, m_secureChannelContext);
671+
var deleteTasks = new List<Task<DeleteSubscriptionsResponse>>();
672+
foreach (uint id in subscriptionIds)
673+
{
674+
ArrayOf<uint> singleId = [id];
675+
m_requestHeader.Timestamp = DateTimeUtc.Now;
676+
deleteTasks.Add(
677+
mainServices.DeleteSubscriptionsAsync(m_requestHeader, singleId)
678+
.AsTask());
679+
}
680+
681+
DeleteSubscriptionsResponse[] responses = await Task.WhenAll(deleteTasks)
682+
.ConfigureAwait(false);
683+
684+
// All deletions should succeed.
685+
foreach (DeleteSubscriptionsResponse response in responses)
686+
{
687+
Assert.AreEqual(StatusCodes.Good, response.ResponseHeader.ServiceResult);
688+
Assert.AreEqual(1, response.Results.Count);
689+
Assert.AreEqual(StatusCodes.Good, (uint)response.Results[0]);
690+
}
691+
}
692+
629693
/// <summary>
630694
/// Create a secondary Session.
631695
/// Create a subscription with a monitored item.

0 commit comments

Comments
 (0)