Skip to content

Commit 8794ab9

Browse files
committed
client handshake re-connect, clean up, sendasync thread safe
1 parent a84dc3e commit 8794ab9

25 files changed

+77
-228
lines changed

src/NetCoreStack.WebSockets.ProxyClient/ApplicationBuilderExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public static IApplicationBuilder UseProxyWebSockets(this IApplicationBuilder ap
2525
foreach (var connector in connectors)
2626
{
2727
appLifeTime.ApplicationStopping.Register(OnShutdown, connector);
28-
Task.Run(async () => await connector.ConnectAsync(cancellationTokenSource));
28+
Task.Factory.StartNew(async () => await connector.ConnectAsync(cancellationTokenSource), TaskCreationOptions.LongRunning);
2929
}
3030

3131
return app;

src/NetCoreStack.WebSockets.ProxyClient/ClientWebSocketConnector.cs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public ClientWebSocketConnector(IServiceProvider serviceProvider,
4747

4848
protected abstract InvocatorContext CreateInvocatorContext();
4949

50-
private async Task TryConnectAsync(CancellationTokenSource cancellationTokenSource = null)
50+
private async Task<WebSocketReceiver> TryConnectAsync(CancellationTokenSource cancellationTokenSource = null)
5151
{
5252
var invocatorContext = CreateInvocatorContext();
5353
var uri = new Uri($"ws://{invocatorContext.HostAddress}");
@@ -61,7 +61,7 @@ private async Task TryConnectAsync(CancellationTokenSource cancellationTokenSour
6161
catch (Exception ex)
6262
{
6363
ProxyLogHelper.Log(_loggerFactory, invocatorContext, "Error", ex);
64-
return;
64+
return null;
6565
}
6666

6767
var receiverContext = new WebSocketReceiverContext
@@ -71,26 +71,36 @@ private async Task TryConnectAsync(CancellationTokenSource cancellationTokenSour
7171
LoggerFactory = _loggerFactory,
7272
WebSocket = _webSocket
7373
};
74+
7475
var receiver = new WebSocketReceiver(_serviceProvider, receiverContext, Close, (connectionId) => {
7576
_connectionId = connectionId;
7677
});
77-
await receiver.ReceiveAsync();
78+
79+
return receiver;
7880
}
7981

8082
public async Task ConnectAsync(CancellationTokenSource cancellationTokenSource = null)
8183
{
8284
if (cancellationTokenSource == null)
8385
cancellationTokenSource = new CancellationTokenSource();
8486

87+
WebSocketReceiver receiver = null;
8588
while (!cancellationTokenSource.IsCancellationRequested)
8689
{
87-
await TryConnectAsync(cancellationTokenSource);
88-
89-
if (WebSocketState == WebSocketState.Open)
90+
receiver = await TryConnectAsync(cancellationTokenSource);
91+
if (receiver != null && WebSocketState == WebSocketState.Open)
9092
{
9193
break;
9294
}
9395
}
96+
97+
await receiver.ReceiveAsync();
98+
99+
// Handshake down try re-connect
100+
if (_webSocket.CloseStatus.HasValue)
101+
{
102+
await ConnectAsync(cancellationTokenSource);
103+
}
94104
}
95105

96106
private ArraySegment<byte> CreateTextSegment(WebSocketMessageContext context)
@@ -105,10 +115,14 @@ private ArraySegment<byte> CreateTextSegment(WebSocketMessageContext context)
105115
{
106116
var id = connectionId as string;
107117
if (string.IsNullOrEmpty(id))
118+
{
108119
throw new InvalidOperationException(nameof(connectionId));
120+
}
109121
}
110122
else
123+
{
111124
context.Header.Add(SocketsConstants.ConnectionId, ConnectionId);
125+
}
112126

113127
return context.ToSegment();
114128
}
@@ -135,9 +149,7 @@ internal void Close(WebSocketReceiverContext context)
135149

136150
internal void Close(string statusDescription)
137151
{
138-
_webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
139-
statusDescription,
140-
CancellationToken.None);
152+
_webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, statusDescription, CancellationToken.None);
141153
}
142154
}
143155
}

src/NetCoreStack.WebSockets.ProxyClient/IClientWebSocketCommandInvocator.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
using NetCoreStack.WebSockets.Internal;
2-
3-
namespace NetCoreStack.WebSockets.ProxyClient
1+
namespace NetCoreStack.WebSockets.ProxyClient
42
{
53
public interface IClientWebSocketCommandInvocator : IWebSocketCommandInvocator
64
{

src/NetCoreStack.WebSockets/ConnectionManager.cs

Lines changed: 15 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using Microsoft.Extensions.Logging;
2-
using Microsoft.Extensions.Options;
32
using NetCoreStack.WebSockets.Interfaces;
43
using NetCoreStack.WebSockets.Internal;
54
using Newtonsoft.Json;
@@ -13,31 +12,26 @@
1312
using System.Text;
1413
using System.Threading;
1514
using System.Threading.Tasks;
16-
using Microsoft.Extensions.DependencyInjection;
1715
using static NetCoreStack.WebSockets.Internal.SocketsConstants;
1816

1917
namespace NetCoreStack.WebSockets
2018
{
2119
public class ConnectionManager : IConnectionManager
2220
{
23-
private readonly SemaphoreSlim _sendFrameAsyncLock = new SemaphoreSlim(1, 1);
2421
private readonly IServiceProvider _serviceProvider;
2522
private readonly IHandshakeStateTransport _initState;
2623
private readonly ILoggerFactory _loggerFactory;
2724
private readonly IStreamCompressor _compressor;
28-
private readonly TransportLifetimeManager _lifetimeManager;
2925

3026
public ConcurrentDictionary<string, WebSocketTransport> Connections { get; }
3127

3228
public ConnectionManager(IServiceProvider serviceProvider,
3329
IStreamCompressor compressor,
34-
TransportLifetimeManager lifetimeManager,
3530
IHandshakeStateTransport initState,
3631
ILoggerFactory loggerFactory)
3732
{
3833
_serviceProvider = serviceProvider;
3934
_compressor = compressor;
40-
_lifetimeManager = lifetimeManager;
4135
_initState = initState;
4236
_loggerFactory = loggerFactory;
4337
Connections = new ConcurrentDictionary<string, WebSocketTransport>(StringComparer.OrdinalIgnoreCase);
@@ -51,32 +45,31 @@ private async Task<byte[]> PrepareFramesBytesAsync(byte[] body, IDictionary<stri
5145
}
5246

5347
if (properties == null)
48+
{
5449
properties = new Dictionary<string, object>();
50+
}
5551

5652
bool compressed = GZipHelper.IsGZipBody(body);
5753

5854
object key = null;
5955
if (properties.TryGetValue(CompressedKey, out key))
56+
{
6057
properties[CompressedKey] = compressed;
58+
}
6159
else
60+
{
6261
properties.Add(CompressedKey, compressed);
62+
}
6363

6464
string props = JsonConvert.SerializeObject(properties);
6565
byte[] header = Encoding.UTF8.GetBytes($"{props}");
6666

67-
#if DEBUG
68-
if (properties.TryGetValue("Key", out key))
69-
{
70-
int length = body.Length;
71-
Debug.WriteLine($"=====Key: {key?.ToString()}=====Length: {length}=====");
72-
}
73-
#endif
74-
7567
if (!compressed)
68+
{
7669
body = await _compressor.CompressAsync(body);
70+
}
7771

7872
body = header.Concat(Splitter).Concat(body).ToArray();
79-
8073
return body;
8174
}
8275

@@ -97,19 +90,10 @@ private Task SendAsync(WebSocketTransport transport, WebSocketMessageDescriptor
9790
return transport.WebSocket.SendAsync(descriptor.Segments,
9891
descriptor.MessageType,
9992
descriptor.EndOfMessage,
100-
CancellationToken.None);
93+
descriptor.CancellationToken);
10194
}
102-
else
103-
{
104-
// Only text messages
105-
_lifetimeManager.AddQueue(transport.ConnectionId, new MessageHolder
106-
{
107-
Segments = descriptor.Segments,
108-
KeepTime = DateTime.Now.AddMinutes(3)
109-
});
11095

111-
return TaskCache.CompletedTask;
112-
}
96+
return Task.CompletedTask;
11397
}
11498

11599
private Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes, bool endOfMessage)
@@ -129,7 +113,7 @@ private Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes,
129113
CancellationToken.None);
130114
}
131115

132-
return TaskCache.CompletedTask;
116+
return Task.CompletedTask;
133117
}
134118

135119
private async Task SendConcurrentBinaryAsync(byte[] bytes)
@@ -174,17 +158,6 @@ public async Task ConnectAsync(WebSocket webSocket, string connectionId, string
174158
if (Connections.TryGetValue(connectionId, out transport))
175159
{
176160
transport.ReConnect(webSocket);
177-
List<MessageHolder> messages = _lifetimeManager.TryDequeue(connectionId);
178-
foreach (var message in messages)
179-
{
180-
await SendAsync(transport, new WebSocketMessageDescriptor
181-
{
182-
MessageType = WebSocketMessageType.Text,
183-
Segments = message.Segments,
184-
EndOfMessage = true,
185-
IsQueue = true,
186-
});
187-
}
188161
}
189162
else
190163
{
@@ -226,13 +199,11 @@ public async Task BroadcastAsync(WebSocketMessageContext context)
226199
EndOfMessage = true,
227200
MessageType = WebSocketMessageType.Text
228201
};
229-
230-
_sendFrameAsyncLock.Wait();
202+
231203
foreach (var connection in Connections)
232204
{
233205
await SendAsync(connection.Value, descriptor);
234206
}
235-
_sendFrameAsyncLock.Release();
236207
}
237208

238209
public async Task BroadcastBinaryAsync(byte[] inputs, IDictionary<string, object> properties)
@@ -242,21 +213,18 @@ public async Task BroadcastBinaryAsync(byte[] inputs, IDictionary<string, object
242213
return;
243214
}
244215

245-
_sendFrameAsyncLock.Wait();
246216
var bytes = await PrepareFramesBytesAsync(inputs, properties);
247217
await SendConcurrentBinaryAsync(bytes);
248-
_sendFrameAsyncLock.Release();
249218
}
250219

251220
public Task SendAsync(string connectionId, WebSocketMessageContext context)
252221
{
253222
if (!Connections.Any())
254223
{
255-
return TaskCache.CompletedTask;
224+
return Task.CompletedTask;
256225
}
257226

258-
WebSocketTransport transport = null;
259-
if (!Connections.TryGetValue(connectionId, out transport))
227+
if (!Connections.TryGetValue(connectionId, out WebSocketTransport transport))
260228
{
261229
throw new ArgumentOutOfRangeException(nameof(transport));
262230
}
@@ -279,8 +247,7 @@ public async Task SendBinaryAsync(string connectionId, byte[] input, IDictionary
279247
return;
280248
}
281249

282-
WebSocketTransport transport = null;
283-
if (!Connections.TryGetValue(connectionId, out transport))
250+
if (!Connections.TryGetValue(connectionId, out WebSocketTransport transport))
284251
{
285252
throw new ArgumentOutOfRangeException(nameof(transport));
286253
}

src/NetCoreStack.WebSockets/DefaultHandshakeStateTransport.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System.Collections;
2-
using System.Collections.Generic;
1+
using System.Collections.Generic;
32
using System.Threading.Tasks;
43

54
namespace NetCoreStack.WebSockets

src/NetCoreStack.WebSockets/Extensions/SocketServiceCollectionExtensions.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ public static void AddNativeWebSockets<TInvocator>(this IServiceCollection servi
1616
{
1717
throw new ArgumentNullException(nameof(services));
1818
}
19-
20-
services.AddSingleton<TransportLifetimeManager>();
19+
2120
services.TryAdd(ServiceDescriptor.Singleton<ILoggerFactory, LoggerFactory>());
2221
services.TryAdd(ServiceDescriptor.Singleton<IStreamCompressor, GZipStreamCompressor>());
2322
services.TryAdd(ServiceDescriptor.Transient<IHandshakeStateTransport, DefaultHandshakeStateTransport>());

src/NetCoreStack.WebSockets/Interfaces/IServerWebSocketCommandInvocator.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
using NetCoreStack.WebSockets.Internal;
2-
3-
namespace NetCoreStack.WebSockets
1+
namespace NetCoreStack.WebSockets
42
{
53
public interface IServerWebSocketCommandInvocator : IWebSocketCommandInvocator
64
{

src/NetCoreStack.WebSockets/Interfaces/IStreamCompressor.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System.IO;
2-
using System.Threading.Tasks;
1+
using System.Threading.Tasks;
32

43
namespace NetCoreStack.WebSockets.Interfaces
54
{

src/NetCoreStack.WebSockets/Internal/LogHelper.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@ internal static void Log(WebSocketReceiverContext context, string message, Excep
1717
var logger = context.LoggerFactory.CreateLogger<WebSocketReceiver>();
1818
var content = $"{message}=={ex?.Message}";
1919

20+
object state = new
21+
{
22+
context.ConnectionId
23+
};
24+
2025
logger.Log(logLevel,
2126
new EventId((int)WebSocketState.Aborted, nameof(WebSocketState.Aborted)),
22-
context.InvocatorContext,
27+
state,
2328
ex,
2429
(msg, exception) => {
2530
var values = new Dictionary<string, object>();

src/NetCoreStack.WebSockets/Internal/MessageHolder.cs

Lines changed: 0 additions & 11 deletions
This file was deleted.

0 commit comments

Comments
 (0)