Skip to content

Commit 57bc5f9

Browse files
committed
websocket receiver, extensions
1 parent 1450e1b commit 57bc5f9

File tree

11 files changed

+127
-175
lines changed

11 files changed

+127
-175
lines changed

README.md

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
### Cross-Platform WebSockets Proxy
22

3-
This project is demonstrating bidirectional connection and data transfer via .NET Core WebSockets.
4-
5-
You can use it on your API - Service side to communicate among your trusted Backend API consumer
6-
Clients (for example MVC Web Application Hosting) and at the same time may
7-
client to be a Browser (User-Agent) then you can manage your connection and domain data transfer
8-
operation with same interface.
3+
Minimalist websocket framework for .NET Core. You can use it on your APIs to communicate among your various client types.
94

105
[Latest release on Nuget](https://www.nuget.org/packages/NetCoreStack.WebSockets/)
116

src/NetCoreStack.WebSockets.ProxyClient/ClientWebSocketConnector.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public abstract class ClientWebSocketConnector : IWebSocketConnector
1717
private readonly IStreamCompressor _compressor;
1818
private readonly ILoggerFactory _loggerFactory;
1919

20+
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);
21+
2022
public string ConnectionId
2123
{
2224
get
@@ -92,7 +94,7 @@ public async Task ConnectAsync(CancellationTokenSource cancellationTokenSource =
9294
}
9395
}
9496

95-
await receiver.ReceiveAsync();
97+
await Task.WhenAll(receiver.ReceiveAsync());
9698

9799
// Handshake down try re-connect
98100
if (_webSocket.CloseStatus.HasValue)
@@ -128,14 +130,17 @@ private ArraySegment<byte> CreateTextSegment(WebSocketMessageContext context)
128130
public async Task SendAsync(WebSocketMessageContext context)
129131
{
130132
var segments = CreateTextSegment(context);
133+
// _semaphoreSlim.Wait();
131134
await _webSocket.SendAsync(segments, WebSocketMessageType.Text, true, CancellationToken.None);
135+
// _semaphoreSlim.Release();
132136
}
133137

134138
public async Task SendBinaryAsync(byte[] bytes)
135139
{
136-
// TODO Chunked
137140
var segments = new ArraySegment<byte>(bytes, 0, bytes.Count());
141+
// _semaphoreSlim.Wait();
138142
await _webSocket.SendAsync(segments, WebSocketMessageType.Binary, true, CancellationToken.None);
143+
// _semaphoreSlim.Release();
139144
}
140145

141146
internal void Close(ClientWebSocketReceiverContext context)

src/NetCoreStack.WebSockets/ConnectionManager.cs

Lines changed: 77 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -78,51 +78,86 @@ private async Task<byte[]> PrepareFramesBytesAsync(byte[] body, IDictionary<stri
7878
return body;
7979
}
8080

81-
private Task SendAsync(WebSocketTransport transport, WebSocketMessageDescriptor descriptor)
81+
private async Task SendMessageAsync(WebSocket webSocket, byte[] bytes, WebSocketMessageType messageType)
8282
{
83-
if (descriptor == null)
83+
if (bytes == null)
8484
{
85-
throw new ArgumentNullException(nameof(descriptor));
85+
return;
8686
}
8787

88-
if (descriptor.Segments == null)
88+
var length = bytes.Length;
89+
if (length < ChunkSize)
8990
{
90-
throw new ArgumentNullException(nameof(descriptor.Segments));
91+
var segments = new ArraySegment<byte>(bytes, 0, length);
92+
if (!webSocket.CloseStatus.HasValue)
93+
{
94+
await webSocket.SendAsync(segments,
95+
messageType,
96+
true,
97+
CancellationToken.None);
98+
}
99+
100+
return;
91101
}
92102

93-
if (!transport.WebSocket.CloseStatus.HasValue)
103+
using (var ms = new MemoryStream(bytes))
94104
{
95-
return transport.WebSocket.SendAsync(descriptor.Segments,
96-
descriptor.MessageType,
97-
descriptor.EndOfMessage,
98-
descriptor.CancellationToken);
99-
}
105+
using (var br = new BinaryReader(ms))
106+
{
107+
byte[] chunkedBytes = null;
108+
do
109+
{
110+
chunkedBytes = br.ReadBytes(ChunkSize);
111+
var endOfMessage = false;
100112

101-
return Task.CompletedTask;
113+
if (chunkedBytes.Length < ChunkSize)
114+
endOfMessage = true;
115+
116+
var segments = new ArraySegment<byte>(chunkedBytes);
117+
118+
if (!webSocket.CloseStatus.HasValue)
119+
{
120+
await webSocket.SendAsync(segments,
121+
messageType,
122+
endOfMessage,
123+
CancellationToken.None);
124+
}
125+
126+
if (endOfMessage)
127+
break;
128+
129+
} while (chunkedBytes.Length <= ChunkSize);
130+
}
131+
}
102132
}
103133

104-
private Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes, bool endOfMessage)
134+
private async Task BroadcastMessageAsync(byte[] bytes, WebSocketMessageType messageType)
105135
{
106-
if (transport == null)
136+
if (bytes == null)
107137
{
108-
throw new ArgumentNullException(nameof(transport));
138+
return;
109139
}
110140

111-
var segments = new ArraySegment<byte>(chunkedBytes);
112-
113-
if (!transport.WebSocket.CloseStatus.HasValue)
141+
var length = bytes.Length;
142+
if (length < ChunkSize)
114143
{
115-
return transport.WebSocket.SendAsync(segments,
116-
WebSocketMessageType.Binary,
117-
endOfMessage,
118-
CancellationToken.None);
119-
}
144+
var segments = new ArraySegment<byte>(bytes, 0, length);
120145

121-
return Task.CompletedTask;
122-
}
146+
foreach (var connection in Connections)
147+
{
148+
var webSocket = connection.Value.WebSocket;
149+
if (!webSocket.CloseStatus.HasValue)
150+
{
151+
await webSocket.SendAsync(segments,
152+
messageType,
153+
true,
154+
CancellationToken.None);
155+
}
156+
}
157+
158+
return;
159+
}
123160

124-
private async Task SendConcurrentBinaryAsync(byte[] bytes)
125-
{
126161
using (var ms = new MemoryStream(bytes))
127162
{
128163
using (var br = new BinaryReader(ms))
@@ -136,9 +171,19 @@ private async Task SendConcurrentBinaryAsync(byte[] bytes)
136171
if (chunkedBytes.Length < ChunkSize)
137172
endOfMessage = true;
138173

174+
var segments = new ArraySegment<byte>(chunkedBytes);
175+
139176
foreach (var connection in Connections)
140177
{
141-
await SendBinaryAsync(connection.Value, chunkedBytes, endOfMessage);
178+
var webSocket = connection.Value.WebSocket;
179+
180+
if (!webSocket.CloseStatus.HasValue)
181+
{
182+
await webSocket.SendAsync(segments,
183+
messageType,
184+
endOfMessage,
185+
CancellationToken.None);
186+
}
142187
}
143188

144189
if (endOfMessage)
@@ -198,18 +243,7 @@ public async Task BroadcastAsync(WebSocketMessageContext context)
198243
return;
199244
}
200245

201-
var segments = context.ToSegment();
202-
var descriptor = new WebSocketMessageDescriptor
203-
{
204-
Segments = segments,
205-
EndOfMessage = true,
206-
MessageType = WebSocketMessageType.Text
207-
};
208-
209-
foreach (var connection in Connections)
210-
{
211-
await SendAsync(connection.Value, descriptor);
212-
}
246+
await BroadcastMessageAsync(context.ToBytes(), WebSocketMessageType.Text);
213247
}
214248

215249
public async Task BroadcastBinaryAsync(byte[] inputs, IDictionary<string, object> properties)
@@ -220,7 +254,7 @@ public async Task BroadcastBinaryAsync(byte[] inputs, IDictionary<string, object
220254
}
221255

222256
var bytes = await PrepareFramesBytesAsync(inputs, properties);
223-
await SendConcurrentBinaryAsync(bytes);
257+
await BroadcastMessageAsync(bytes, WebSocketMessageType.Binary);
224258
}
225259

226260
public Task SendAsync(string connectionId, WebSocketMessageContext context)
@@ -236,15 +270,7 @@ public Task SendAsync(string connectionId, WebSocketMessageContext context)
236270
}
237271

238272
_headerProvider.Invoke(context.Header);
239-
var segments = context.ToSegment();
240-
var descriptor = new WebSocketMessageDescriptor
241-
{
242-
Segments = segments,
243-
EndOfMessage = true,
244-
MessageType = WebSocketMessageType.Text
245-
};
246-
247-
return SendAsync(transport, descriptor);
273+
return SendMessageAsync(transport.WebSocket, context.ToBytes(), WebSocketMessageType.Text);
248274
}
249275

250276
public async Task SendBinaryAsync(string connectionId, byte[] input, IDictionary<string, object> properties)
@@ -261,31 +287,7 @@ public async Task SendBinaryAsync(string connectionId, byte[] input, IDictionary
261287

262288
byte[] bytes = await PrepareFramesBytesAsync(input, properties);
263289

264-
using (var ms = new MemoryStream(bytes))
265-
{
266-
using (BinaryReader br = new BinaryReader(ms))
267-
{
268-
byte[] chunkBytes = null;
269-
do
270-
{
271-
chunkBytes = br.ReadBytes(ChunkSize);
272-
var segments = new ArraySegment<byte>(chunkBytes);
273-
var endOfMessage = false;
274-
275-
if (chunkBytes.Length < ChunkSize)
276-
endOfMessage = true;
277-
278-
await transport.WebSocket.SendAsync(segments,
279-
WebSocketMessageType.Binary,
280-
endOfMessage,
281-
CancellationToken.None);
282-
283-
if (endOfMessage)
284-
break;
285-
286-
} while (chunkBytes.Length <= ChunkSize);
287-
}
288-
}
290+
await SendMessageAsync(transport.WebSocket, bytes, WebSocketMessageType.Binary);
289291
}
290292

291293
public void CloseConnection(string connectionId, bool keepAlive)

src/NetCoreStack.WebSockets/Extensions/WebSocketExtensions.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,16 @@ public static ArraySegment<byte> ToSegment(this WebSocketMessageContext webSocke
102102
return new ArraySegment<byte>(content, 0, content.Length);
103103
}
104104

105+
public static byte[] ToBytes(this WebSocketMessageContext webSocketContext)
106+
{
107+
if (webSocketContext == null)
108+
{
109+
throw new ArgumentNullException(nameof(webSocketContext));
110+
}
111+
112+
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(webSocketContext));
113+
}
114+
105115
public static string GetConnectionId(this WebSocketMessageContext context)
106116
{
107117
if (context == null)

src/NetCoreStack.WebSockets/Internal/NCSConstants.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
using System;
2-
3-
namespace NetCoreStack.WebSockets.Internal
1+
namespace NetCoreStack.WebSockets.Internal
42
{
53
public static class NCSConstants
64
{
75
public static byte[] Splitter = new byte[] { 0x1f };
86
public const int ChunkSize = 1024 * 4;
97
public const string WSFQN = "X-NetCoreStack-WSHost";
10-
public const string CompressedKey = "Compressed";
8+
public const string CompressedKey = "GZipCompressed";
119
public const string ConnectorName = "ConnectorName";
1210
public const string ConnectionId = "ConnectionId";
1311

src/NetCoreStack.WebSockets/Internal/WebSocketMessageDescriptor.cs

Lines changed: 0 additions & 19 deletions
This file was deleted.

src/NetCoreStack.WebSockets/Internal/WebSocketReceiver.cs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,14 @@ public class WebSocketReceiver
1111
private readonly IServiceProvider _serviceProvider;
1212
private readonly WebSocketReceiverContext _context;
1313
private readonly Action<WebSocketReceiverContext> _closeCallback;
14-
private readonly Action<string> _handshakeCallback;
1514

1615
public WebSocketReceiver(IServiceProvider serviceProvider,
1716
WebSocketReceiverContext context,
18-
Action<WebSocketReceiverContext> closeCallback,
19-
Action<string> handshakeCallback = null)
17+
Action<WebSocketReceiverContext> closeCallback)
2018
{
2119
_serviceProvider = serviceProvider;
2220
_context = context;
2321
_closeCallback = closeCallback;
24-
_handshakeCallback = handshakeCallback;
2522
}
2623

2724
private async Task InternalReceiveAsync()
@@ -35,12 +32,6 @@ private async Task InternalReceiveAsync()
3532
try
3633
{
3734
var context = result.ToContext(buffer);
38-
if (context.Command == WebSocketCommands.Handshake)
39-
{
40-
_context.ConnectionId = context.Value?.ToString();
41-
_handshakeCallback?.Invoke(_context.ConnectionId);
42-
}
43-
4435
var invocator = _context.GetInvocator(_serviceProvider);
4536
if (invocator != null)
4637
{

test/ServerTestApp/Controllers/DiscoveryController.cs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.Extensions.Logging;
55
using NetCoreStack.WebSockets;
66
using NetCoreStack.WebSockets.Internal;
7+
using Newtonsoft.Json;
78
using ServerTestApp.Models;
89
using System;
910
using System.Linq;
@@ -39,22 +40,26 @@ public IActionResult Get()
3940
[HttpPost(nameof(SendAsync))]
4041
public async Task<IActionResult> SendAsync([FromBody]SimpleModel model)
4142
{
42-
var echo = $"Echo from server '{model.Key}' - {DateTime.Now}";
43-
var obj = new { message = echo };
44-
var webSocketContext = new WebSocketMessageContext { Command = WebSocketCommands.DataSend, Value = obj };
45-
await _connectionManager.BroadcastAsync(webSocketContext);
43+
if (model != null)
44+
{
45+
var echo = $"Echo from server '{model.Key}' - {DateTime.Now}";
46+
var obj = new { message = echo };
47+
var webSocketContext = new WebSocketMessageContext { Command = WebSocketCommands.DataSend, Value = obj };
48+
await _connectionManager.BroadcastAsync(webSocketContext);
49+
}
50+
4651
return Ok();
4752
}
4853

4954
[HttpPost(nameof(BroadcastBinaryAsync))]
5055
public async Task<IActionResult> BroadcastBinaryAsync([FromBody]SimpleModel model)
5156
{
52-
var bytes = _distrubutedCache.Get(model.Key);
53-
var routeValueDictionary = new RouteValueDictionary(new { Key = model.Key });
54-
if (bytes != null)
57+
if (model != null)
5558
{
56-
await _connectionManager.BroadcastBinaryAsync(bytes, routeValueDictionary);
59+
var bytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(model));
60+
await _connectionManager.BroadcastBinaryAsync(bytes, new RouteValueDictionary(new { Id = 1, SomeProperty = "Some value" }));
5761
}
62+
5863
return Ok();
5964
}
6065

0 commit comments

Comments
 (0)