Skip to content

Commit a05005e

Browse files
gedemgedem
authored andcommitted
Middleware and Manager improvments, Dependencies, Context Design
1 parent 9998eba commit a05005e

File tree

8 files changed

+204
-170
lines changed

8 files changed

+204
-170
lines changed

src/NetCoreStack.WebSockets.ProxyClient/ClientWebSocketConnector.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,16 @@ public async Task ConnectAsync()
3838
var uri = new Uri($"ws://{_options.WebSocketHostAddress}");
3939
_webSocket = new ClientWebSocket();
4040
await _webSocket.ConnectAsync(uri, CancellationToken.None);
41-
await WebSocketReceiver.Receive(_webSocket, _compressor, _invocatorRegistry, (SocketsOptions)_options);
41+
var receiverContext = new WebSocketReceiverContext
42+
{
43+
Compressor = _compressor,
44+
InvocatorRegistry = _invocatorRegistry,
45+
LoggerFactory = _loggerFactory,
46+
Options = _options,
47+
WebSocket = _webSocket
48+
};
49+
var receiver = new WebSocketReceiver(receiverContext, Close);
50+
await receiver.ReceiveAsync();
4251
}
4352
catch (Exception ex)
4453
{
@@ -68,9 +77,18 @@ public async Task SendBinaryAsync(byte[] bytes)
6877
await _webSocket.SendAsync(segments, WebSocketMessageType.Binary, true, CancellationToken.None);
6978
}
7079

80+
internal void Close(WebSocketReceiverContext context)
81+
{
82+
context.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
83+
nameof(WebSocketReceiverContext),
84+
CancellationToken.None);
85+
}
86+
7187
internal void Close(string statusDescription)
7288
{
73-
_webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, statusDescription, CancellationToken.None);
89+
_webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
90+
statusDescription,
91+
CancellationToken.None);
7492
}
7593
}
7694
}

src/NetCoreStack.WebSockets/ConnectionManager.cs

Lines changed: 60 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using NetCoreStack.WebSockets.Interfaces;
1+
using Microsoft.Extensions.Logging;
2+
using Microsoft.Extensions.Options;
3+
using NetCoreStack.WebSockets.Interfaces;
24
using NetCoreStack.WebSockets.Internal;
35
using Newtonsoft.Json;
46
using System;
@@ -14,13 +16,25 @@ namespace NetCoreStack.WebSockets
1416
{
1517
public class ConnectionManager : IConnectionManager
1618
{
17-
protected IStreamCompressor Compressor { get; }
18-
protected ConcurrentDictionary<string, WebSocketTransport> Connections { get; }
19-
20-
public ConnectionManager(IStreamCompressor compressor)
19+
private readonly InvocatorRegistry _invocatorRegistry;
20+
private readonly ServerSocketsOptions _options;
21+
private readonly IHandshakeStateTransport _initState;
22+
private readonly ILoggerFactory _loggerFactory;
23+
private readonly IStreamCompressor _compressor;
24+
private readonly ConcurrentDictionary<string, WebSocketTransport> _connections;
25+
26+
public ConnectionManager(IStreamCompressor compressor,
27+
InvocatorRegistry invocatorRegistry,
28+
IOptions<ServerSocketsOptions> options,
29+
IHandshakeStateTransport initState,
30+
ILoggerFactory loggerFactory)
2131
{
22-
Compressor = compressor;
23-
Connections = new ConcurrentDictionary<string, WebSocketTransport>();
32+
_invocatorRegistry = invocatorRegistry;
33+
_options = options.Value;
34+
_initState = initState;
35+
_loggerFactory = loggerFactory;
36+
_compressor = compressor;
37+
_connections = new ConcurrentDictionary<string, WebSocketTransport>();
2438
}
2539

2640
private async Task<byte[]> PrepareBytesAsync(byte[] input, JsonObject properties)
@@ -36,17 +50,7 @@ private async Task<byte[]> PrepareBytesAsync(byte[] input, JsonObject properties
3650
var bytesCount = input.Length;
3751
input = propsBytes.Concat(input).ToArray();
3852

39-
return await Compressor.CompressAsync(input);
40-
41-
//if (input.Length > SocketsConstants.CompressorThreshold)
42-
//{
43-
// using (MemoryStream ms = new MemoryStream(input))
44-
// {
45-
// return await Compressor.CompressAsync(ms);
46-
// }
47-
//}
48-
//else
49-
// return input;
53+
return await _compressor.CompressAsync(input);
5054
}
5155

5256
private async Task SendAsync(WebSocketTransport transport, WebSocketMessageDescriptor descriptor)
@@ -82,6 +86,31 @@ await transport.WebSocket.SendAsync(segments,
8286
token);
8387
}
8488

89+
public async Task ConnectAsync(WebSocket webSocket)
90+
{
91+
WebSocketTransport transport = new WebSocketTransport(webSocket);
92+
var connectionId = transport.ConnectionId;
93+
var context = new WebSocketMessageContext();
94+
context.Command = WebSocketCommands.Handshake;
95+
context.Value = connectionId;
96+
context.State = await _initState.GetStateAsync();
97+
_connections.TryAdd(connectionId, transport);
98+
99+
await SendAsync(connectionId, context);
100+
101+
var receiverContext = new WebSocketReceiverContext
102+
{
103+
Compressor = _compressor,
104+
ConnectionId = connectionId,
105+
InvocatorRegistry = _invocatorRegistry,
106+
LoggerFactory = _loggerFactory,
107+
Options = _options,
108+
WebSocket = webSocket
109+
};
110+
var receiver = new WebSocketReceiver(receiverContext, CloseConnection);
111+
await receiver.ReceiveAsync();
112+
}
113+
85114
public async Task BroadcastAsync(WebSocketMessageContext context)
86115
{
87116
if (context == null)
@@ -94,7 +123,7 @@ public async Task BroadcastAsync(WebSocketMessageContext context)
94123
throw new ArgumentNullException(nameof(context.Value));
95124
}
96125

97-
if (!Connections.Any())
126+
if (!_connections.Any())
98127
{
99128
return;
100129
}
@@ -107,15 +136,15 @@ public async Task BroadcastAsync(WebSocketMessageContext context)
107136
MessageType = WebSocketMessageType.Text
108137
};
109138

110-
foreach (var connection in Connections)
139+
foreach (var connection in _connections)
111140
{
112141
await SendAsync(connection.Value, descriptor);
113142
}
114143
}
115144

116145
public async Task BroadcastBinaryAsync(byte[] inputs, JsonObject properties)
117146
{
118-
if (!Connections.Any())
147+
if (!_connections.Any())
119148
{
120149
return;
121150
}
@@ -136,7 +165,7 @@ public async Task BroadcastBinaryAsync(byte[] inputs, JsonObject properties)
136165
if (chunkedBytes.Length < SocketsConstants.ChunkSize)
137166
endOfMessage = true;
138167

139-
foreach (var connection in Connections)
168+
foreach (var connection in _connections)
140169
{
141170
await SendBinaryAsync(connection.Value, chunkedBytes, endOfMessage, CancellationToken.None);
142171
}
@@ -151,13 +180,13 @@ public async Task BroadcastBinaryAsync(byte[] inputs, JsonObject properties)
151180

152181
public async Task SendAsync(string connectionId, WebSocketMessageContext context)
153182
{
154-
if (!Connections.Any())
183+
if (!_connections.Any())
155184
{
156185
return;
157186
}
158187

159188
WebSocketTransport transport = null;
160-
if (!Connections.TryGetValue(connectionId, out transport))
189+
if (!_connections.TryGetValue(connectionId, out transport))
161190
{
162191
throw new ArgumentOutOfRangeException(nameof(transport));
163192
}
@@ -175,13 +204,13 @@ public async Task SendAsync(string connectionId, WebSocketMessageContext context
175204

176205
public async Task SendBinaryAsync(string connectionId, byte[] input, JsonObject properties)
177206
{
178-
if (!Connections.Any())
207+
if (!_connections.Any())
179208
{
180209
return;
181210
}
182211

183212
WebSocketTransport transport = null;
184-
if (!Connections.TryGetValue(connectionId, out transport))
213+
if (!_connections.TryGetValue(connectionId, out transport))
185214
{
186215
throw new ArgumentOutOfRangeException(nameof(transport));
187216
}
@@ -216,39 +245,18 @@ await transport.WebSocket.SendAsync(segments,
216245
}
217246
}
218247

219-
public async Task SendAsync(string connectionId, WebSocketMessageContext context, WebSocket webSocket)
248+
public void CloseConnection(string connectionId)
220249
{
221-
if (string.IsNullOrEmpty(connectionId))
222-
{
223-
throw new ArgumentNullException(nameof(connectionId));
224-
}
225-
226-
if (context == null)
227-
{
228-
throw new ArgumentNullException(nameof(context));
229-
}
230-
231-
if (webSocket == null)
232-
{
233-
throw new ArgumentNullException(nameof(webSocket));
234-
}
235-
236250
WebSocketTransport transport = null;
237-
if (!Connections.TryGetValue(connectionId, out transport))
251+
if (_connections.TryRemove(connectionId, out transport))
238252
{
239-
Connections.TryAdd(connectionId, new WebSocketTransport(webSocket));
253+
transport.Dispose();
240254
}
241-
242-
await SendAsync(connectionId, context);
243255
}
244256

245-
public void CloseConnection(string connectionId)
257+
public void CloseConnection(WebSocketReceiverContext context)
246258
{
247-
WebSocketTransport transport = null;
248-
if (Connections.TryRemove(connectionId, out transport))
249-
{
250-
transport.Dispose();
251-
}
259+
CloseConnection(context.ConnectionId);
252260
}
253261
}
254262
}

src/NetCoreStack.WebSockets/Interfaces/IConnectionManager.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ namespace NetCoreStack.WebSockets
55
{
66
public interface IConnectionManager
77
{
8+
Task ConnectAsync(WebSocket webSocket);
9+
810
Task BroadcastAsync(WebSocketMessageContext context);
911

1012
Task BroadcastBinaryAsync(byte[] input, JsonObject properties);
@@ -13,8 +15,6 @@ public interface IConnectionManager
1315

1416
Task SendBinaryAsync(string connectionId, byte[] input, JsonObject properties);
1517

16-
Task SendAsync(string connectionId, WebSocketMessageContext context, WebSocket webSocket);
17-
1818
void CloseConnection(string connectionId);
1919
}
2020
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using Microsoft.Extensions.Logging;
2+
using NetCoreStack.WebSockets.Internal;
3+
using System.Net.WebSockets;
4+
5+
namespace NetCoreStack.WebSockets.Interfaces
6+
{
7+
public class WebSocketReceiverContext
8+
{
9+
public string ConnectionId { get; set; }
10+
public WebSocket WebSocket { get; set; }
11+
public InvocatorRegistry InvocatorRegistry { get; set; }
12+
public SocketsOptions Options { get; set; }
13+
public ILoggerFactory LoggerFactory { get; set; }
14+
public IStreamCompressor Compressor { get; set; }
15+
}
16+
}

src/NetCoreStack.WebSockets/Internal/ConnectionManagerExtensions.cs

Lines changed: 0 additions & 44 deletions
This file was deleted.

src/NetCoreStack.WebSockets/Internal/WebSocketMiddleware.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
using Microsoft.AspNetCore.Http;
2+
using Microsoft.Extensions.Logging;
23
using Microsoft.Extensions.Options;
34
using NetCoreStack.WebSockets.Interfaces;
5+
using System;
6+
using System.Net.WebSockets;
47
using System.Threading.Tasks;
58

69
namespace NetCoreStack.WebSockets.Internal
@@ -19,12 +22,12 @@ public async Task Invoke(HttpContext httpContext,
1922
IStreamCompressor compressor,
2023
InvocatorRegistry invocatorRegistry,
2124
IOptions<ServerSocketsOptions> options,
22-
IHandshakeStateTransport initState)
25+
ILoggerFactory loggerFactory)
2326
{
2427
if (httpContext.WebSockets.IsWebSocketRequest)
2528
{
2629
var webSocket = await httpContext.WebSockets.AcceptWebSocketAsync();
27-
await manager.Handshake(webSocket, compressor, invocatorRegistry, options.Value, initState);
30+
await manager.ConnectAsync(webSocket);
2831
}
2932
else
3033
{

0 commit comments

Comments
 (0)