Skip to content

Commit 444c99a

Browse files
committed
semaphore slim, concurrent broadcast
1 parent 13a8333 commit 444c99a

20 files changed

+297
-175
lines changed

src/NetCoreStack.WebSockets/ConnectionManager.cs

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ namespace NetCoreStack.WebSockets
1919
{
2020
public class ConnectionManager : IConnectionManager
2121
{
22+
private readonly SemaphoreSlim _sendFrameAsyncLock = new SemaphoreSlim(1, 1);
2223
private readonly InvocatorRegistry _invocatorRegistry;
2324
private readonly ServerSocketsOptions _options;
2425
private readonly IHandshakeStateTransport _initState;
2526
private readonly ILoggerFactory _loggerFactory;
2627
private readonly IStreamCompressor _compressor;
2728
private readonly TransportLifetimeManager _lifetimeManager;
29+
2830
public ConcurrentDictionary<string, WebSocketTransport> Connections { get; }
2931

3032
public ConnectionManager(IStreamCompressor compressor,
@@ -43,7 +45,7 @@ public ConnectionManager(IStreamCompressor compressor,
4345
Connections = new ConcurrentDictionary<string, WebSocketTransport>(StringComparer.OrdinalIgnoreCase);
4446
}
4547

46-
private async Task<byte[]> PrepareBytesAsync(byte[] body, IDictionary<string, object> properties)
48+
private async Task<byte[]> PrepareFramesBytesAsync(byte[] body, IDictionary<string, object> properties)
4749
{
4850
if (body == null)
4951
{
@@ -80,7 +82,7 @@ private async Task<byte[]> PrepareBytesAsync(byte[] body, IDictionary<string, ob
8082
return body;
8183
}
8284

83-
private async Task SendAsync(WebSocketTransport transport, WebSocketMessageDescriptor descriptor)
85+
private Task SendAsync(WebSocketTransport transport, WebSocketMessageDescriptor descriptor)
8486
{
8587
if (descriptor == null)
8688
{
@@ -94,7 +96,7 @@ private async Task SendAsync(WebSocketTransport transport, WebSocketMessageDescr
9496

9597
if (!transport.WebSocket.CloseStatus.HasValue)
9698
{
97-
await transport.WebSocket.SendAsync(descriptor.Segments,
99+
return transport.WebSocket.SendAsync(descriptor.Segments,
98100
descriptor.MessageType,
99101
descriptor.EndOfMessage,
100102
CancellationToken.None);
@@ -107,10 +109,12 @@ await transport.WebSocket.SendAsync(descriptor.Segments,
107109
Segments = descriptor.Segments,
108110
KeepTime = DateTime.Now.AddMinutes(3)
109111
});
112+
113+
return TaskCache.CompletedTask;
110114
}
111115
}
112116

113-
private async Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes, bool endOfMessage, CancellationToken token)
117+
private Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes, bool endOfMessage)
114118
{
115119
if (transport == null)
116120
{
@@ -121,10 +125,40 @@ private async Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedB
121125

122126
if (!transport.WebSocket.CloseStatus.HasValue)
123127
{
124-
await transport.WebSocket.SendAsync(segments,
128+
return transport.WebSocket.SendAsync(segments,
125129
WebSocketMessageType.Binary,
126130
endOfMessage,
127-
token);
131+
CancellationToken.None);
132+
}
133+
134+
return TaskCache.CompletedTask;
135+
}
136+
137+
private async Task SendConcurrentBinaryAsync(byte[] bytes)
138+
{
139+
using (var ms = new MemoryStream(bytes))
140+
{
141+
using (var br = new BinaryReader(ms))
142+
{
143+
byte[] chunkedBytes = null;
144+
do
145+
{
146+
chunkedBytes = br.ReadBytes(ChunkSize);
147+
var endOfMessage = false;
148+
149+
if (chunkedBytes.Length < ChunkSize)
150+
endOfMessage = true;
151+
152+
foreach (var connection in Connections)
153+
{
154+
await SendBinaryAsync(connection.Value, chunkedBytes, endOfMessage);
155+
}
156+
157+
if (endOfMessage)
158+
break;
159+
160+
} while (chunkedBytes.Length <= ChunkSize);
161+
}
128162
}
129163
}
130164

@@ -197,10 +231,12 @@ public async Task BroadcastAsync(WebSocketMessageContext context)
197231
MessageType = WebSocketMessageType.Text
198232
};
199233

234+
_sendFrameAsyncLock.Wait();
200235
foreach (var connection in Connections)
201236
{
202237
await SendAsync(connection.Value, descriptor);
203238
}
239+
_sendFrameAsyncLock.Release();
204240
}
205241

206242
public async Task BroadcastBinaryAsync(byte[] inputs, IDictionary<string, object> properties)
@@ -210,39 +246,17 @@ public async Task BroadcastBinaryAsync(byte[] inputs, IDictionary<string, object
210246
return;
211247
}
212248

213-
var bytes = await PrepareBytesAsync(inputs, properties);
214-
215-
using (var ms = new MemoryStream(bytes))
216-
{
217-
using (var br = new BinaryReader(ms))
218-
{
219-
byte[] chunkedBytes = null;
220-
do
221-
{
222-
chunkedBytes = br.ReadBytes(ChunkSize);
223-
var endOfMessage = false;
224-
225-
if (chunkedBytes.Length < ChunkSize)
226-
endOfMessage = true;
227-
228-
foreach (var connection in Connections)
229-
{
230-
await SendBinaryAsync(connection.Value, chunkedBytes, endOfMessage, CancellationToken.None);
231-
}
232-
233-
if (endOfMessage)
234-
break;
235-
236-
} while (chunkedBytes.Length <= ChunkSize);
237-
}
238-
}
249+
_sendFrameAsyncLock.Wait();
250+
var bytes = await PrepareFramesBytesAsync(inputs, properties);
251+
await SendConcurrentBinaryAsync(bytes);
252+
_sendFrameAsyncLock.Release();
239253
}
240254

241-
public async Task SendAsync(string connectionId, WebSocketMessageContext context)
255+
public Task SendAsync(string connectionId, WebSocketMessageContext context)
242256
{
243257
if (!Connections.Any())
244258
{
245-
return;
259+
return TaskCache.CompletedTask;
246260
}
247261

248262
WebSocketTransport transport = null;
@@ -259,7 +273,7 @@ public async Task SendAsync(string connectionId, WebSocketMessageContext context
259273
MessageType = WebSocketMessageType.Text
260274
};
261275

262-
await SendAsync(transport, descriptor);
276+
return SendAsync(transport, descriptor);
263277
}
264278

265279
public async Task SendBinaryAsync(string connectionId, byte[] input, IDictionary<string, object> properties)
@@ -275,7 +289,7 @@ public async Task SendBinaryAsync(string connectionId, byte[] input, IDictionary
275289
throw new ArgumentOutOfRangeException(nameof(transport));
276290
}
277291

278-
byte[] bytes = await PrepareBytesAsync(input, properties);
292+
byte[] bytes = await PrepareFramesBytesAsync(input, properties);
279293

280294
using (var ms = new MemoryStream(bytes))
281295
{
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Threading.Tasks;
5+
6+
namespace NetCoreStack.WebSockets.Internal
7+
{
8+
public static class TaskCache
9+
{
10+
11+
/// <summary>
12+
/// A <see cref="Task"/> that's already completed successfully.
13+
/// </summary>
14+
/// <remarks>
15+
/// We're caching this in a static readonly field to make it more inlinable and avoid the volatile lookup done
16+
/// by <c>Task.CompletedTask</c>.
17+
/// </remarks>
18+
#if NET451
19+
public static readonly Task CompletedTask = Task.FromResult(0);
20+
#else
21+
public static readonly Task CompletedTask = Task.CompletedTask;
22+
#endif
23+
}
24+
}

src/NetCoreStack.WebSockets/NetCoreStack.WebSockets.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<ItemGroup>
1616
<PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="1.0.2" />
1717
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.2" />
18-
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" />
18+
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
1919
</ItemGroup>
2020

2121
<ItemGroup Condition=" '$(TargetFramework)' == 'net451' ">

test/Common.Libs/CacheHelper.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
5+
namespace Common.Libs
6+
{
7+
public static class CacheHelper
8+
{
9+
public static IDictionary<string, CacheItemDescriptor> CacheKeys
10+
{
11+
get
12+
{
13+
var dict = new Dictionary<string, CacheItemDescriptor>()
14+
{
15+
[nameof(HizmetEnvanteri)] = new CacheItemDescriptor { Type = typeof(HizmetEnvanteri), Weight = CacheItemWeights.MiddleWeight },
16+
[nameof(KurumBirimDto)] = new CacheItemDescriptor { Type = typeof(KurumBirimDto), Weight = CacheItemWeights.HeavyWeight },
17+
[nameof(KurumBirimPasifDto)] = new CacheItemDescriptor { Type = typeof(KurumBirimPasifDto), Weight = CacheItemWeights.MiddleWeight },
18+
};
19+
20+
var keys = dict.OrderBy(x => x.Key).ToDictionary(t => t.Key, t => t.Value);
21+
return keys;
22+
}
23+
}
24+
25+
public static CacheItemDescriptor GetDescriptor(string key)
26+
{
27+
if (CacheKeys.TryGetValue(key, out CacheItemDescriptor descriptor))
28+
{
29+
return descriptor;
30+
}
31+
32+
throw new ArgumentOutOfRangeException($"Key could not be found: \"{key}\"");
33+
}
34+
}
35+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System;
2+
using System.Collections.Generic;
3+
namespace Common.Libs
4+
{
5+
public class CacheItemDescriptor
6+
{
7+
public Type Type { get; set; }
8+
public CacheItemWeights Weight { get; set; }
9+
10+
public CacheItemDescriptor()
11+
{
12+
Weight = CacheItemWeights.LightWeight;
13+
}
14+
}
15+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace Common.Libs
2+
{
3+
public enum CacheItemWeights
4+
{
5+
LightWeight = 0,
6+
MiddleWeight = 1,
7+
HeavyWeight = 2
8+
}
9+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace Common.Libs
6+
{
7+
public class HizmetEnvanteri
8+
{
9+
}
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using Microsoft.Extensions.Caching.Memory;
2+
using System;
3+
using static Common.Libs.CacheHelper;
4+
5+
namespace Common.Libs
6+
{
7+
public static class InMemoryCacheExtensions
8+
{
9+
}
10+
}

test/Common.Libs/KurumBirimDto.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace Common.Libs
6+
{
7+
public class KurumBirimDto
8+
{
9+
}
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace Common.Libs
6+
{
7+
public class KurumBirimPasifDto
8+
{
9+
}
10+
}

0 commit comments

Comments
 (0)