Skip to content

Commit b3b764d

Browse files
committed
Add tests and benchmarks for session publish queue
1 parent 12792d9 commit b3b764d

File tree

2 files changed

+629
-0
lines changed

2 files changed

+629
-0
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/* ========================================================================
2+
* Copyright (c) 2005-2025 The OPC Foundation, Inc. All rights reserved.
3+
*
4+
* OPC Foundation MIT License 1.00
5+
*
6+
* Permission is hereby granted, free of charge, to any person
7+
* obtaining a copy of this software and associated documentation
8+
* files (the "Software"), to deal in the Software without
9+
* restriction, including without limitation the rights to use,
10+
* copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
* copies of the Software, and to permit persons to whom the
12+
* Software is furnished to do so, subject to the following
13+
* conditions:
14+
*
15+
* The above copyright notice and this permission notice shall be
16+
* included in all copies or substantial portions of the Software.
17+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18+
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
19+
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20+
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
21+
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
22+
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
24+
* OTHER DEALINGS IN THE SOFTWARE.
25+
*
26+
* The complete license agreement can be found here:
27+
* http://opcfoundation.org/License/MIT/1.00/
28+
* ======================================================================*/
29+
30+
using System;
31+
using System.Collections.Generic;
32+
using System.Threading;
33+
using System.Threading.Tasks;
34+
using BenchmarkDotNet.Attributes;
35+
using BenchmarkDotNet.Configs;
36+
using Moq;
37+
using Opc.Ua.Tests;
38+
39+
namespace Opc.Ua.Server.Tests
40+
{
41+
[GroupBenchmarksBy(BenchmarkLogicalGroupRule.ByCategory)]
42+
[MemoryDiagnoser]
43+
public class SessionPublishQueueBenchmarks
44+
{
45+
private Mock<IServerInternal> m_serverMock;
46+
private Mock<ISession> m_sessionMock;
47+
private Mock<ISubscriptionManager> m_subscriptionManagerMock;
48+
private ITelemetryContext m_telemetry;
49+
50+
[Params(50, 500)]
51+
public int NumItems { get; set; }
52+
53+
[GlobalSetup]
54+
public void Setup()
55+
{
56+
m_telemetry = NUnitTelemetryContext.Create();
57+
m_serverMock = new Mock<IServerInternal>();
58+
m_sessionMock = new Mock<ISession>();
59+
m_subscriptionManagerMock = new Mock<ISubscriptionManager>();
60+
61+
m_serverMock.Setup(s => s.Telemetry).Returns(m_telemetry);
62+
m_serverMock.Setup(s => s.SubscriptionManager).Returns(m_subscriptionManagerMock.Object);
63+
64+
m_sessionMock.Setup(s => s.Id).Returns(new NodeId(Guid.NewGuid()));
65+
m_sessionMock.Setup(s => s.IsSecureChannelValid(It.IsAny<string>())).Returns(true);
66+
}
67+
68+
[Benchmark]
69+
public async Task Concurrency_MultipleRequestsAndSubscriptions()
70+
{
71+
int maxPublishRequests = NumItems * 2;
72+
using var queue = new SessionPublishQueue(m_serverMock.Object, m_sessionMock.Object, maxPublishRequests);
73+
74+
var subs = new List<Mock<ISubscription>>();
75+
for (int i = 0; i < NumItems; i++)
76+
{
77+
var subMock = new Mock<ISubscription>();
78+
subMock.Setup(s => s.Id).Returns((uint)(i + 1));
79+
subMock.Setup(s => s.Priority).Returns((byte)(i % 5));
80+
subMock.Setup(s => s.PublishTimerExpired()).Returns(PublishingState.NotificationsAvailable);
81+
subs.Add(subMock);
82+
queue.Add(subMock.Object);
83+
}
84+
85+
using var startGate = new ManualResetEventSlim(false);
86+
var publishTasks = new List<Task<ISubscription>>(NumItems);
87+
var timerTasks = new List<Task>(NumItems);
88+
89+
// Start multiple threads requesting publish
90+
for (int i = 0; i < NumItems; i++)
91+
{
92+
publishTasks.Add(Task.Run(() =>
93+
{
94+
startGate.Wait();
95+
return queue.PublishAsync("channel1", DateTime.MaxValue, false, CancellationToken.None);
96+
}));
97+
}
98+
99+
// Start multiple threads mimicking subscriptions being ready or timer expiring
100+
for (int i = 0; i < NumItems; i++)
101+
{
102+
int index = i;
103+
timerTasks.Add(Task.Run(() =>
104+
{
105+
startGate.Wait();
106+
queue.PublishCompleted(subs[index].Object, true);
107+
}));
108+
}
109+
110+
// Open the gate
111+
startGate.Set();
112+
113+
// Wait for publishers to finish producing
114+
await Task.WhenAll(timerTasks).ConfigureAwait(false);
115+
116+
// Wait for consumers to get their subscriptions
117+
var resultsTask = Task.WhenAll(publishTasks);
118+
await Task.WhenAny(resultsTask, Task.Delay(TimeSpan.FromSeconds(30))).ConfigureAwait(false);
119+
120+
queue.Close();
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)