Skip to content

Commit 452edaa

Browse files
gedemgedem
authored andcommitted
Lifetimemanager, message holder
1 parent 46ce194 commit 452edaa

File tree

5 files changed

+55
-11
lines changed

5 files changed

+55
-11
lines changed

src/NetCoreStack.WebSockets/ConnectionManager.cs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,22 @@ public class ConnectionManager : IConnectionManager
2424
private readonly IHandshakeStateTransport _initState;
2525
private readonly ILoggerFactory _loggerFactory;
2626
private readonly IStreamCompressor _compressor;
27+
private readonly TransportLifetimeManager _lifetimeManager;
2728
public ConcurrentDictionary<string, WebSocketTransport> Connections { get; }
2829

2930
public ConnectionManager(IStreamCompressor compressor,
31+
TransportLifetimeManager lifetimeManager,
3032
InvocatorRegistry invocatorRegistry,
3133
IOptions<ServerSocketsOptions> options,
3234
IHandshakeStateTransport initState,
3335
ILoggerFactory loggerFactory)
3436
{
37+
_compressor = compressor;
38+
_lifetimeManager = lifetimeManager;
3539
_invocatorRegistry = invocatorRegistry;
3640
_options = options.Value;
3741
_initState = initState;
38-
_loggerFactory = loggerFactory;
39-
_compressor = compressor;
42+
_loggerFactory = loggerFactory;
4043
Connections = new ConcurrentDictionary<string, WebSocketTransport>(StringComparer.OrdinalIgnoreCase);
4144
}
4245

@@ -89,10 +92,18 @@ private async Task SendAsync(WebSocketTransport transport, WebSocketMessageDescr
8992
throw new ArgumentNullException(nameof(descriptor.Segments));
9093
}
9194

92-
await transport.WebSocket.SendAsync(descriptor.Segments,
93-
descriptor.MessageType,
94-
descriptor.EndOfMessage,
95-
CancellationToken.None);
95+
if (!transport.WebSocket.CloseStatus.HasValue)
96+
{
97+
await transport.WebSocket.SendAsync(descriptor.Segments,
98+
descriptor.MessageType,
99+
descriptor.EndOfMessage,
100+
CancellationToken.None);
101+
}
102+
else
103+
{
104+
// Only text messages
105+
_lifetimeManager.AddQueue(new MessageHolder { ConnectionId = transport.ConnectionId, Segments = descriptor.Segments });
106+
}
96107
}
97108

98109
private async Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes, bool endOfMessage, CancellationToken token)
@@ -104,10 +115,13 @@ private async Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedB
104115

105116
var segments = new ArraySegment<byte>(chunkedBytes);
106117

107-
await transport.WebSocket.SendAsync(segments,
108-
WebSocketMessageType.Binary,
109-
endOfMessage,
110-
token);
118+
if (!transport.WebSocket.CloseStatus.HasValue)
119+
{
120+
await transport.WebSocket.SendAsync(segments,
121+
WebSocketMessageType.Binary,
122+
endOfMessage,
123+
token);
124+
}
111125
}
112126

113127
public async Task ConnectAsync(WebSocket webSocket, string connectorName = "")

src/NetCoreStack.WebSockets/Extensions/SocketServiceCollectionExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public static void AddNativeWebSockets(this IServiceCollection services, Action<
2121
throw new ArgumentNullException(nameof(setup));
2222
}
2323

24+
services.AddSingleton<TransportLifetimeManager>();
2425
services.TryAdd(ServiceDescriptor.Singleton<ILoggerFactory, LoggerFactory>());
2526
services.TryAdd(ServiceDescriptor.Singleton<IStreamCompressor, GZipStreamCompressor>());
2627
services.TryAdd(ServiceDescriptor.Transient<IHandshakeStateTransport, DefaultHandshakeStateTransport>());
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System;
2+
3+
namespace NetCoreStack.WebSockets.Internal
4+
{
5+
public class MessageHolder
6+
{
7+
public string ConnectionId { get; set; }
8+
public ArraySegment<byte> Segments { get; set; }
9+
}
10+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using System.Collections.Concurrent;
2+
3+
namespace NetCoreStack.WebSockets.Internal
4+
{
5+
public class TransportLifetimeManager
6+
{
7+
public ConcurrentQueue<MessageHolder> Queue { get; }
8+
9+
public TransportLifetimeManager()
10+
{
11+
Queue = new ConcurrentQueue<MessageHolder>();
12+
}
13+
14+
public void AddQueue(MessageHolder holder)
15+
{
16+
Queue.Enqueue(holder);
17+
}
18+
}
19+
}

src/NetCoreStack.WebSockets/Internal/WebSocketMessageDescriptor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ public WebSocketMessageDescriptor()
1616
CancellationToken = CancellationToken.None;
1717
}
1818
}
19-
}
19+
}

0 commit comments

Comments
 (0)