Skip to content

Commit 4459e70

Browse files
authored
[Server] Make EndpointIncomingRequest a ReadonlyStruct & pool ValueTaks (#3363)
* Make EndpointIncomingRequest a readonly struct * Switch to ValueTask & pool ValueTasks using ValueTaskSource
1 parent c545bca commit 4459e70

File tree

8 files changed

+330
-38
lines changed

8 files changed

+330
-38
lines changed

Stack/Opc.Ua.Core/Stack/Server/EndpointBase.cs

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
1313
using System;
1414
using System.Collections.Generic;
1515
using System.Diagnostics;
16+
using System.Diagnostics.CodeAnalysis;
1617
using System.Security.Cryptography.X509Certificates;
1718
using System.Threading;
1819
using System.Threading.Tasks;
@@ -76,7 +77,7 @@ protected EndpointBase(ServerBase server)
7677
}
7778

7879
/// <inheritdoc/>
79-
public Task<IServiceResponse> ProcessRequestAsync(
80+
public ValueTask<IServiceResponse> ProcessRequestAsync(
8081
SecureChannelContext secureChannelContext,
8182
IServiceRequest request,
8283
CancellationToken cancellationToken = default)
@@ -670,7 +671,7 @@ public IServiceResponse Invoke(IServiceRequest request, SecureChannelContext sec
670671
{
671672
logger.LogWarning(
672673
"Async Service invoced sychronously. Prefer using InvokeAsync for best performance.");
673-
return InvokeAsync(request, null).GetAwaiter().GetResult();
674+
return InvokeAsync(request, secureChannelContext).GetAwaiter().GetResult();
674675
}
675676
return m_invokeService?.Invoke(request, secureChannelContext);
676677
}
@@ -777,7 +778,7 @@ public void CallSynchronously()
777778
/// thread that calls IServerBase.ScheduleIncomingRequest().
778779
/// This method always traps any exceptions and reports them to the client as a fault.
779780
/// </remarks>
780-
public async Task CallAsync(CancellationToken cancellationToken = default)
781+
public async ValueTask CallAsync(CancellationToken cancellationToken = default)
781782
{
782783
await OnProcessRequestAsync(null, cancellationToken).ConfigureAwait(false);
783784
}
@@ -1042,7 +1043,7 @@ .Body is AdditionalParametersType parameters &&
10421043
else
10431044
{
10441045
// call the service even when there is no trace information
1045-
m_response = await m_service.InvokeAsync(Request,SecureChannelContext, cancellationToken)
1046+
m_response = await m_service.InvokeAsync(Request, SecureChannelContext, cancellationToken)
10461047
.ConfigureAwait(false);
10471048
}
10481049
}
@@ -1074,26 +1075,25 @@ .Body is AdditionalParametersType parameters &&
10741075
/// <summary>
10751076
/// An object that handles an incoming request for an endpoint.
10761077
/// </summary>
1077-
protected class EndpointIncomingRequest : IEndpointIncomingRequest
1078+
protected readonly struct EndpointIncomingRequest : IEndpointIncomingRequest, IEquatable<EndpointIncomingRequest>
10781079
{
10791080
/// <summary>
10801081
/// Initialize the Object with a Request
10811082
/// </summary>
10821083
public EndpointIncomingRequest(
10831084
EndpointBase endpoint,
10841085
SecureChannelContext context,
1085-
IServiceRequest request)
1086+
IServiceRequest request,
1087+
CancellationToken cancellationToken = default)
10861088
{
10871089
m_endpoint = endpoint;
10881090
SecureChannelContext = context;
10891091
Request = request;
1090-
m_tcs = new TaskCompletionSource<IServiceResponse>(
1091-
TaskCreationOptions.RunContinuationsAsynchronously);
1092+
m_vts = ServiceResponsePooledValueTaskSource.Create();
1093+
m_service = m_endpoint.FindService(Request.TypeId);
1094+
m_cancellationToken = cancellationToken;
10921095
}
10931096

1094-
/// <inheritdoc/>
1095-
public object Calldata { get; set; }
1096-
10971097
/// <inheritdoc/>
10981098
public SecureChannelContext SecureChannelContext { get; }
10991099

@@ -1104,25 +1104,22 @@ public EndpointIncomingRequest(
11041104
/// Process an incoming request
11051105
/// </summary>
11061106
/// <returns></returns>
1107-
public Task<IServiceResponse> ProcessAsync(CancellationToken cancellationToken = default)
1107+
public ValueTask<IServiceResponse> ProcessAsync(CancellationToken cancellationToken = default)
11081108
{
11091109
try
11101110
{
1111-
m_cancellationToken = cancellationToken;
1112-
m_cancellationToken.Register(() => m_tcs.TrySetCanceled());
1113-
m_service = m_endpoint.FindService(Request.TypeId);
1114-
m_endpoint.ServerForContext.ScheduleIncomingRequest(this, m_cancellationToken);
1111+
m_endpoint.ServerForContext.ScheduleIncomingRequest(this, cancellationToken);
11151112
}
11161113
catch (Exception e)
11171114
{
1118-
m_tcs.TrySetResult(m_endpoint.CreateFault(Request, e));
1115+
m_vts.SetResult(m_endpoint.CreateFault(Request, e));
11191116
}
11201117

1121-
return m_tcs.Task;
1118+
return m_vts.Task;
11221119
}
11231120

11241121
/// <inheritdoc/>
1125-
public async Task CallAsync(CancellationToken cancellationToken = default)
1122+
public async ValueTask CallAsync(CancellationToken cancellationToken = default)
11261123
{
11271124
using CancellationTokenSource timeoutHintCts = (int)Request.RequestHeader.TimeoutHint > 0 ?
11281125
new CancellationTokenSource((int)Request.RequestHeader.TimeoutHint) : null;
@@ -1157,7 +1154,7 @@ .Body is AdditionalParametersType parameters &&
11571154
using (activity)
11581155
{
11591156
IServiceResponse response = await m_service.InvokeAsync(Request, SecureChannelContext, linkedCts.Token).ConfigureAwait(false);
1160-
m_tcs.TrySetResult(response);
1157+
m_vts.SetResult(response);
11611158
}
11621159
}
11631160
catch (Exception e)
@@ -1166,8 +1163,7 @@ .Body is AdditionalParametersType parameters &&
11661163
{
11671164
e = new ServiceResultException(StatusCodes.BadTimeout);
11681165
}
1169-
1170-
m_tcs.TrySetResult(m_endpoint.CreateFault(Request, e));
1166+
m_vts.SetResult(m_endpoint.CreateFault(Request, e));
11711167
}
11721168
}
11731169

@@ -1176,18 +1172,52 @@ public void OperationCompleted(IServiceResponse response, ServiceResult error)
11761172
{
11771173
if (ServiceResult.IsBad(error))
11781174
{
1179-
m_tcs.TrySetResult(m_endpoint.CreateFault(Request, new ServiceResultException(error)));
1175+
m_vts.SetResult(m_endpoint.CreateFault(Request, new ServiceResultException(error)));
11801176
}
11811177
else
11821178
{
1183-
m_tcs.TrySetResult(response);
1179+
m_vts.SetResult(response);
11841180
}
11851181
}
11861182

1183+
/// <inheritdoc/>
1184+
public override bool Equals(object obj)
1185+
{
1186+
if (obj is EndpointIncomingRequest other)
1187+
{
1188+
return Request.RequestHeader.Equals(other.Request.RequestHeader);
1189+
}
1190+
return false;
1191+
}
1192+
1193+
/// <inheritdoc/>
1194+
public override int GetHashCode()
1195+
{
1196+
return Request.RequestHeader.GetHashCode();
1197+
}
1198+
1199+
/// <inheritdoc/>
1200+
public static bool operator ==(EndpointIncomingRequest left, EndpointIncomingRequest right)
1201+
{
1202+
return left.Equals(right);
1203+
}
1204+
1205+
/// <inheritdoc/>
1206+
public static bool operator !=(EndpointIncomingRequest left, EndpointIncomingRequest right)
1207+
{
1208+
return !(left == right);
1209+
}
1210+
1211+
/// <inheritdoc/>
1212+
public bool Equals(EndpointIncomingRequest other)
1213+
{
1214+
return Request.RequestHeader.Equals(other.Request.RequestHeader);
1215+
}
1216+
11871217
private readonly EndpointBase m_endpoint;
1188-
private CancellationToken m_cancellationToken;
1189-
private ServiceDefinition m_service;
1190-
private readonly TaskCompletionSource<IServiceResponse> m_tcs;
1218+
private readonly ServiceDefinition m_service;
1219+
private readonly ServiceResponsePooledValueTaskSource m_vts;
1220+
private readonly CancellationToken m_cancellationToken;
11911221
}
11921222

11931223
/// <summary>

Stack/Opc.Ua.Core/Stack/Server/IServerBase.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,6 @@ public interface IEndpointIncomingRequest
105105
/// <value>The secure channel context.</value>
106106
SecureChannelContext SecureChannelContext { get; }
107107

108-
/// <summary>
109-
/// Gets or sets the call data associated with the request.
110-
/// </summary>
111-
/// <value>The call data.</value>
112-
object Calldata { get; set; }
113-
114108
/// <summary>
115109
/// Used to call the default asynchronous handler.
116110
/// </summary>
@@ -119,7 +113,7 @@ public interface IEndpointIncomingRequest
119113
/// thread that calls IServerBase.ScheduleIncomingRequest().
120114
/// This method always traps any exceptions and reports them to the client as a fault.
121115
/// </remarks>
122-
Task CallAsync(CancellationToken cancellationToken = default);
116+
ValueTask CallAsync(CancellationToken cancellationToken = default);
123117

124118
/// <summary>
125119
/// Used to indicate that the asynchronous operation has completed.

Stack/Opc.Ua.Core/Stack/Server/ServerBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1611,7 +1611,7 @@ protected virtual void Dispose(bool disposing)
16111611

16121612
foreach (IEndpointIncomingRequest request in m_queue.ToList())
16131613
{
1614-
Utils.SilentDispose(request);
1614+
request.OperationCompleted(null, StatusCodes.BadServerHalted);
16151615
}
16161616
#if NETSTANDARD2_1_OR_GREATER
16171617
m_queue.Clear();

Stack/Opc.Ua.Core/Stack/Transport/ITransportListenerCallback.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public interface ITransportListenerCallback : IAuditEventCallback
2828
/// <param name="request">The incoming request.</param>
2929
/// <param name="cancellationToken">The cancellation token.</param>
3030
/// <returns>The response to return over the secure channel.</returns>
31-
Task<IServiceResponse> ProcessRequestAsync(
31+
ValueTask<IServiceResponse> ProcessRequestAsync(
3232
SecureChannelContext secureChannelContext,
3333
IServiceRequest request,
3434
CancellationToken cancellationToken = default);
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/* Copyright (c) 1996-2022 The OPC Foundation. All rights reserved.
2+
The source code in this file is covered under a dual-license scenario:
3+
- RCL: for OPC Foundation Corporate Members in good-standing
4+
- GPL V2: everybody else
5+
RCL license terms accompanied with this source code. See http://opcfoundation.org/License/RCL/1.00/
6+
GNU General Public License as published by the Free Software Foundation;
7+
version 2 of the License are accompanied with this source code. See http://opcfoundation.org/License/GPLv2
8+
This source code is distributed in the hope that it will be useful,
9+
but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11+
*/
12+
13+
using System;
14+
using System.Threading.Tasks;
15+
using System.Threading.Tasks.Sources;
16+
17+
namespace Opc.Ua
18+
{
19+
/// <summary>
20+
/// A reusable value task source.
21+
/// </summary>
22+
/// <typeparam name="T"></typeparam>
23+
internal class ManualResetValueTaskSource<T> : IValueTaskSource<T>, IValueTaskSource
24+
{
25+
private ManualResetValueTaskSourceCore<T> m_core;
26+
27+
public bool RunContinuationsAsynchronously
28+
{
29+
get => m_core.RunContinuationsAsynchronously;
30+
set => m_core.RunContinuationsAsynchronously = value;
31+
}
32+
33+
public short Version => m_core.Version;
34+
35+
public void Reset()
36+
{
37+
m_core.Reset();
38+
}
39+
40+
public void SetResult(T result)
41+
{
42+
m_core.SetResult(result);
43+
}
44+
45+
public void SetException(Exception error)
46+
{
47+
m_core.SetException(error);
48+
}
49+
50+
public T GetResult(short token)
51+
{
52+
return m_core.GetResult(token);
53+
}
54+
55+
void IValueTaskSource.GetResult(short token)
56+
{
57+
m_core.GetResult(token);
58+
}
59+
60+
public ValueTaskSourceStatus GetStatus(short token)
61+
{
62+
return m_core.GetStatus(token);
63+
}
64+
65+
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
66+
{
67+
m_core.OnCompleted(continuation, state, token, flags);
68+
}
69+
70+
public ValueTask<T> Task => new(this, m_core.Version);
71+
public ValueTask SourceTask => new(this, m_core.Version);
72+
}
73+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/* Copyright (c) 1996-2022 The OPC Foundation. All rights reserved.
2+
The source code in this file is covered under a dual-license scenario:
3+
- RCL: for OPC Foundation Corporate Members in good-standing
4+
- GPL V2: everybody else
5+
RCL license terms accompanied with this source code. See http://opcfoundation.org/License/RCL/1.00/
6+
GNU General Public License as published by the Free Software Foundation;
7+
version 2 of the License are accompanied with this source code. See http://opcfoundation.org/License/GPLv2
8+
This source code is distributed in the hope that it will be useful,
9+
but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11+
*/
12+
13+
using System;
14+
using System.Collections.Concurrent;
15+
16+
namespace Opc.Ua
17+
{
18+
/// <summary>
19+
/// A simple object pool implementation.
20+
/// </summary>
21+
/// <typeparam name="T">The type of object to pool.</typeparam>
22+
internal class ObjectPool<T> where T : class
23+
{
24+
private readonly ConcurrentBag<T> m_objects;
25+
private readonly Func<T> m_objectGenerator;
26+
private readonly int m_maxSize;
27+
28+
/// <summary>
29+
/// Initializes a new instance of the <see cref="ObjectPool{T}"/> class.
30+
/// </summary>
31+
/// <param name="objectGenerator">The function to generate new objects.</param>
32+
/// <param name="maxSize">The maximum size of the pool.</param>
33+
public ObjectPool(Func<T> objectGenerator, int maxSize)
34+
{
35+
m_objectGenerator = objectGenerator ?? throw new ArgumentNullException(nameof(objectGenerator));
36+
m_maxSize = maxSize > 0 ? maxSize : throw new ArgumentOutOfRangeException(nameof(maxSize));
37+
m_objects = new ConcurrentBag<T>();
38+
}
39+
40+
/// <summary>
41+
/// Gets an object from the pool.
42+
/// </summary>
43+
/// <returns>An object from the pool or a new one if the pool is empty.</returns>
44+
public T Get()
45+
{
46+
if (m_objects.TryTake(out T item))
47+
{
48+
return item;
49+
}
50+
51+
return m_objectGenerator();
52+
}
53+
54+
/// <summary>
55+
/// Returns an object to the pool.
56+
/// </summary>
57+
/// <param name="item">The object to return.</param>
58+
public void Return(T item)
59+
{
60+
if (m_objects.Count < m_maxSize)
61+
{
62+
m_objects.Add(item);
63+
}
64+
}
65+
}
66+
}

0 commit comments

Comments
 (0)